import 'dart:async'; import 'package:drift/drift.dart'; import 'package:easy_localization/easy_localization.dart'; import 'package:immich_mobile/constants/constants.dart'; import 'package:immich_mobile/domain/models/album/album.model.dart'; import 'package:immich_mobile/domain/models/asset/base_asset.model.dart'; import 'package:immich_mobile/domain/models/timeline.model.dart'; import 'package:immich_mobile/domain/services/timeline.service.dart'; import 'package:immich_mobile/infrastructure/entities/local_asset.entity.dart'; import 'package:immich_mobile/infrastructure/entities/remote_asset.entity.dart'; import 'package:immich_mobile/infrastructure/entities/remote_asset.entity.drift.dart'; import 'package:immich_mobile/infrastructure/repositories/db.repository.dart'; import 'package:stream_transform/stream_transform.dart'; class DriftTimelineRepository extends DriftDatabaseRepository { final Drift _db; const DriftTimelineRepository(super._db) : _db = _db; Stream> watchTimelineUserIds(String userId) { final query = _db.partnerEntity.selectOnly() ..addColumns([_db.partnerEntity.sharedById]) ..where( _db.partnerEntity.inTimeline.equals(true) & _db.partnerEntity.sharedWithId.equals(userId), ); return query .map((row) => row.read(_db.partnerEntity.sharedById)!) .watch() // Add current user ID to the list .map((users) => users..add(userId)); } TimelineQuery main(List userIds, GroupAssetsBy groupBy) => ( bucketSource: () => _watchMainBucket( userIds, groupBy: groupBy, ), assetSource: (offset, count) => _getMainBucketAssets( userIds, offset: offset, count: count, ), ); Stream> _watchMainBucket( List userIds, { GroupAssetsBy groupBy = GroupAssetsBy.day, }) { if (groupBy == GroupAssetsBy.none) { throw UnsupportedError( "GroupAssetsBy.none is not supported for watchMainBucket", ); } return _db.mergedAssetDrift .mergedBucket(userIds, groupBy: groupBy.index) .map((row) { final date = row.bucketDate.dateFmt(groupBy); return TimeBucket(date: date, assetCount: row.assetCount); }) .watch() .throttle(const Duration(seconds: 3), trailing: true); } Future> _getMainBucketAssets( List userIds, { required int offset, required int count, }) { return _db.mergedAssetDrift .mergedAsset(userIds, limit: (_) => Limit(count, offset)) .map( (row) => row.remoteId != null && row.ownerId != null ? RemoteAsset( id: row.remoteId!, localId: row.localId, name: row.name, ownerId: row.ownerId!, checksum: row.checksum, type: row.type, createdAt: row.createdAt, updatedAt: row.updatedAt, thumbHash: row.thumbHash, width: row.width, height: row.height, isFavorite: row.isFavorite, durationInSeconds: row.durationInSeconds, livePhotoVideoId: row.livePhotoVideoId, stackId: row.stackId, ) : LocalAsset( id: row.localId!, remoteId: row.remoteId, name: row.name, checksum: row.checksum, type: row.type, createdAt: row.createdAt, updatedAt: row.updatedAt, width: row.width, height: row.height, isFavorite: row.isFavorite, durationInSeconds: row.durationInSeconds, orientation: row.orientation, ), ) .get(); } TimelineQuery localAlbum(String albumId, GroupAssetsBy groupBy) => ( bucketSource: () => _watchLocalAlbumBucket( albumId, groupBy: groupBy, ), assetSource: (offset, count) => _getLocalAlbumBucketAssets( albumId, offset: offset, count: count, ), ); Stream> _watchLocalAlbumBucket( String albumId, { GroupAssetsBy groupBy = GroupAssetsBy.day, }) { if (groupBy == GroupAssetsBy.none) { return _db.localAlbumAssetEntity .count(where: (row) => row.albumId.equals(albumId)) .map(_generateBuckets) .watchSingle(); } final assetCountExp = _db.localAssetEntity.id.count(); final dateExp = _db.localAssetEntity.createdAt.dateFmt(groupBy); final query = _db.localAssetEntity.selectOnly().join([ innerJoin( _db.localAlbumAssetEntity, _db.localAlbumAssetEntity.assetId.equalsExp(_db.localAssetEntity.id), useColumns: false, ), leftOuterJoin( _db.remoteAssetEntity, _db.localAssetEntity.checksum.equalsExp(_db.remoteAssetEntity.checksum), useColumns: false, ), ]) ..addColumns([assetCountExp, dateExp]) ..where(_db.localAlbumAssetEntity.albumId.equals(albumId)) ..groupBy([dateExp]) ..orderBy([OrderingTerm.desc(dateExp)]); return query.map((row) { final timeline = row.read(dateExp)!.dateFmt(groupBy); final assetCount = row.read(assetCountExp)!; return TimeBucket(date: timeline, assetCount: assetCount); }).watch(); } Future> _getLocalAlbumBucketAssets( String albumId, { required int offset, required int count, }) { final query = _db.localAssetEntity.select().join( [ innerJoin( _db.localAlbumAssetEntity, _db.localAlbumAssetEntity.assetId.equalsExp(_db.localAssetEntity.id), useColumns: false, ), leftOuterJoin( _db.remoteAssetEntity, _db.localAssetEntity.checksum.equalsExp(_db.remoteAssetEntity.checksum), useColumns: false, ), ], ) ..addColumns([_db.remoteAssetEntity.id]) ..where(_db.localAlbumAssetEntity.albumId.equals(albumId)) ..orderBy([OrderingTerm.desc(_db.localAssetEntity.createdAt)]) ..limit(count, offset: offset); return query.map((row) { final asset = row.readTable(_db.localAssetEntity).toDto(); return asset.copyWith( remoteId: row.read(_db.remoteAssetEntity.id), ); }).get(); } TimelineQuery remoteAlbum(String albumId, GroupAssetsBy groupBy) => ( bucketSource: () => _watchRemoteAlbumBucket( albumId, groupBy: groupBy, ), assetSource: (offset, count) => _getRemoteAlbumBucketAssets( albumId, offset: offset, count: count, ), ); Stream> _watchRemoteAlbumBucket( String albumId, { GroupAssetsBy groupBy = GroupAssetsBy.day, }) { if (groupBy == GroupAssetsBy.none) { return _db.remoteAlbumAssetEntity .count(where: (row) => row.albumId.equals(albumId)) .map(_generateBuckets) .watch() .map((results) => results.isNotEmpty ? results.first : []) .handleError((error) { return []; }); } return (_db.remoteAlbumEntity.select()..where((row) => row.id.equals(albumId))).watch().switchMap((albums) { if (albums.isEmpty) { return Stream.value([]); } final album = albums.first; final isAscending = album.order == AlbumAssetOrder.asc; final assetCountExp = _db.remoteAssetEntity.id.count(); final dateExp = _db.remoteAssetEntity.createdAt.dateFmt(groupBy); final query = _db.remoteAssetEntity.selectOnly() ..addColumns([assetCountExp, dateExp]) ..join([ innerJoin( _db.remoteAlbumAssetEntity, _db.remoteAlbumAssetEntity.assetId.equalsExp(_db.remoteAssetEntity.id), useColumns: false, ), ]) ..where( _db.remoteAssetEntity.deletedAt.isNull() & _db.remoteAlbumAssetEntity.albumId.equals(albumId), ) ..groupBy([dateExp]); if (isAscending) { query.orderBy([OrderingTerm.asc(dateExp)]); } else { query.orderBy([OrderingTerm.desc(dateExp)]); } return query.map((row) { final timeline = row.read(dateExp)!.dateFmt(groupBy); final assetCount = row.read(assetCountExp)!; return TimeBucket(date: timeline, assetCount: assetCount); }).watch(); }).handleError((error) { // If there's an error (e.g., album was deleted), return empty buckets return []; }); } Future> _getRemoteAlbumBucketAssets( String albumId, { required int offset, required int count, }) async { final albumData = await (_db.remoteAlbumEntity.select()..where((row) => row.id.equals(albumId))).getSingleOrNull(); // If album doesn't exist (was deleted), return empty list if (albumData == null) { return []; } final isAscending = albumData.order == AlbumAssetOrder.asc; final query = _db.remoteAssetEntity.select().join( [ innerJoin( _db.remoteAlbumAssetEntity, _db.remoteAlbumAssetEntity.assetId.equalsExp(_db.remoteAssetEntity.id), useColumns: false, ), ], )..where( _db.remoteAssetEntity.deletedAt.isNull() & _db.remoteAlbumAssetEntity.albumId.equals(albumId), ); if (isAscending) { query.orderBy([OrderingTerm.asc(_db.remoteAssetEntity.createdAt)]); } else { query.orderBy([OrderingTerm.desc(_db.remoteAssetEntity.createdAt)]); } query.limit(count, offset: offset); return query.map((row) => row.readTable(_db.remoteAssetEntity).toDto()).get(); } TimelineQuery fromAssets(List assets) => ( bucketSource: () => Stream.value(_generateBuckets(assets.length)), assetSource: (offset, count) => Future.value(assets.skip(offset).take(count).toList()), ); TimelineQuery remote(String ownerId, GroupAssetsBy groupBy) => _remoteQueryBuilder( filter: (row) => row.deletedAt.isNull() & row.visibility.equalsValue(AssetVisibility.timeline) & row.ownerId.equals(ownerId), groupBy: groupBy, ); TimelineQuery favorite(String userId, GroupAssetsBy groupBy) => _remoteQueryBuilder( filter: (row) => row.deletedAt.isNull() & row.isFavorite.equals(true) & row.ownerId.equals(userId), groupBy: groupBy, ); TimelineQuery trash(String userId, GroupAssetsBy groupBy) => _remoteQueryBuilder( filter: (row) => row.deletedAt.isNotNull() & row.ownerId.equals(userId), groupBy: groupBy, joinLocal: true, ); TimelineQuery archived(String userId, GroupAssetsBy groupBy) => _remoteQueryBuilder( filter: (row) => row.deletedAt.isNull() & row.ownerId.equals(userId) & row.visibility.equalsValue(AssetVisibility.archive), groupBy: groupBy, ); TimelineQuery locked(String userId, GroupAssetsBy groupBy) => _remoteQueryBuilder( filter: (row) => row.deletedAt.isNull() & row.visibility.equalsValue(AssetVisibility.locked) & row.ownerId.equals(userId), groupBy: groupBy, ); TimelineQuery video(String userId, GroupAssetsBy groupBy) => _remoteQueryBuilder( filter: (row) => row.deletedAt.isNull() & row.type.equalsValue(AssetType.video) & row.visibility.equalsValue(AssetVisibility.timeline) & row.ownerId.equals(userId), groupBy: groupBy, ); TimelineQuery place(String place, GroupAssetsBy groupBy) => ( bucketSource: () => _watchPlaceBucket( place, groupBy: groupBy, ), assetSource: (offset, count) => _getPlaceBucketAssets( place, offset: offset, count: count, ), ); Stream> _watchPlaceBucket( String place, { GroupAssetsBy groupBy = GroupAssetsBy.day, }) { if (groupBy == GroupAssetsBy.none) { // TODO: implement GroupAssetBy for place throw UnsupportedError( "GroupAssetsBy.none is not supported for watchPlaceBucket", ); } final assetCountExp = _db.remoteAssetEntity.id.count(); final dateExp = _db.remoteAssetEntity.createdAt.dateFmt(groupBy); final query = _db.remoteAssetEntity.selectOnly() ..addColumns([assetCountExp, dateExp]) ..join([ innerJoin( _db.remoteExifEntity, _db.remoteExifEntity.assetId.equalsExp(_db.remoteAssetEntity.id), useColumns: false, ), ]) ..where( _db.remoteExifEntity.city.equals(place) & _db.remoteAssetEntity.deletedAt.isNull() & _db.remoteAssetEntity.visibility.equalsValue(AssetVisibility.timeline), ) ..groupBy([dateExp]) ..orderBy([OrderingTerm.desc(dateExp)]); return query.map((row) { final timeline = row.read(dateExp)!.dateFmt(groupBy); final assetCount = row.read(assetCountExp)!; return TimeBucket(date: timeline, assetCount: assetCount); }).watch(); } Future> _getPlaceBucketAssets( String place, { required int offset, required int count, }) { final query = _db.remoteAssetEntity.select().join( [ innerJoin( _db.remoteExifEntity, _db.remoteExifEntity.assetId.equalsExp(_db.remoteAssetEntity.id), useColumns: false, ), ], ) ..where( _db.remoteAssetEntity.deletedAt.isNull() & _db.remoteAssetEntity.visibility.equalsValue(AssetVisibility.timeline) & _db.remoteExifEntity.city.equals(place), ) ..orderBy([OrderingTerm.desc(_db.remoteAssetEntity.createdAt)]) ..limit(count, offset: offset); return query.map((row) => row.readTable(_db.remoteAssetEntity).toDto()).get(); } TimelineQuery _remoteQueryBuilder({ required Expression Function($RemoteAssetEntityTable row) filter, GroupAssetsBy groupBy = GroupAssetsBy.day, bool joinLocal = false, }) { return ( bucketSource: () => _watchRemoteBucket(filter: filter, groupBy: groupBy), assetSource: (offset, count) => _getRemoteAssets( filter: filter, offset: offset, count: count, joinLocal: joinLocal, ), ); } Stream> _watchRemoteBucket({ required Expression Function($RemoteAssetEntityTable row) filter, GroupAssetsBy groupBy = GroupAssetsBy.day, }) { if (groupBy == GroupAssetsBy.none) { final query = _db.remoteAssetEntity.count(where: filter); return query.map(_generateBuckets).watchSingle(); } final assetCountExp = _db.remoteAssetEntity.id.count(); final dateExp = _db.remoteAssetEntity.createdAt.dateFmt(groupBy); final query = _db.remoteAssetEntity.selectOnly() ..addColumns([assetCountExp, dateExp]) ..where(filter(_db.remoteAssetEntity)) ..groupBy([dateExp]) ..orderBy([OrderingTerm.desc(dateExp)]); return query.map((row) { final timeline = row.read(dateExp)!.dateFmt(groupBy); final assetCount = row.read(assetCountExp)!; return TimeBucket(date: timeline, assetCount: assetCount); }).watch(); } Future> _getRemoteAssets({ required Expression Function($RemoteAssetEntityTable row) filter, required int offset, required int count, bool joinLocal = false, }) { if (joinLocal) { final query = _db.remoteAssetEntity.select().join([ leftOuterJoin( _db.localAssetEntity, _db.remoteAssetEntity.checksum.equalsExp(_db.localAssetEntity.checksum), useColumns: false, ), ]) ..addColumns([_db.localAssetEntity.id]) ..where(filter(_db.remoteAssetEntity)) ..orderBy([OrderingTerm.desc(_db.remoteAssetEntity.createdAt)]) ..limit(count, offset: offset); return query.map((row) { final asset = row.readTable(_db.remoteAssetEntity).toDto(); final localId = row.read(_db.localAssetEntity.id); return asset.copyWith(localId: localId); }).get(); } else { final query = _db.remoteAssetEntity.select() ..where(filter) ..orderBy([(row) => OrderingTerm.desc(row.createdAt)]) ..limit(count, offset: offset); return query.map((row) => row.toDto()).get(); } } } List _generateBuckets(int count) { final buckets = List.generate( (count / kTimelineNoneSegmentSize).floor(), (_) => const Bucket(assetCount: kTimelineNoneSegmentSize), ); if (count % kTimelineNoneSegmentSize != 0) { buckets.add(Bucket(assetCount: count % kTimelineNoneSegmentSize)); } return buckets; } extension on Expression { Expression dateFmt(GroupAssetsBy groupBy) { // DateTimes are stored in UTC, so we need to convert them to local time inside the query before formatting // to create the correct time bucket final localTimeExp = modify(const DateTimeModifier.localTime()); return switch (groupBy) { GroupAssetsBy.day || GroupAssetsBy.auto => localTimeExp.date, GroupAssetsBy.month => localTimeExp.strftime("%Y-%m"), GroupAssetsBy.none => throw ArgumentError( "GroupAssetsBy.none is not supported for date formatting", ), }; } } extension on String { DateTime dateFmt(GroupAssetsBy groupBy) { final format = switch (groupBy) { GroupAssetsBy.day || GroupAssetsBy.auto => "y-M-d", GroupAssetsBy.month => "y-M", GroupAssetsBy.none => throw ArgumentError( "GroupAssetsBy.none is not supported for date formatting", ), }; try { return DateFormat(format).parse(this); } catch (e) { throw FormatException("Invalid date format: $this", e); } } }