From 39b2af1940e464a961f29a3db601ee5bbc7f49e9 Mon Sep 17 00:00:00 2001 From: shenlong-tanwen <139912620+shalong-tanwen@users.noreply.github.com> Date: Mon, 19 Jan 2026 18:44:10 +0530 Subject: [PATCH] fix: isolate management --- .../services/background_worker.service.dart | 12 +- mobile/lib/domain/utils/background_sync.dart | 107 +++--- mobile/lib/main.dart | 4 - .../providers/app_life_cycle.provider.dart | 11 +- mobile/lib/utils/isolate.dart | 323 ++++++++++++++---- mobile/lib/wm_executor.dart | 251 -------------- mobile/pubspec.lock | 8 - mobile/pubspec.yaml | 1 - 8 files changed, 321 insertions(+), 396 deletions(-) delete mode 100644 mobile/lib/wm_executor.dart diff --git a/mobile/lib/domain/services/background_worker.service.dart b/mobile/lib/domain/services/background_worker.service.dart index 9019db664d..18154668c6 100644 --- a/mobile/lib/domain/services/background_worker.service.dart +++ b/mobile/lib/domain/services/background_worker.service.dart @@ -24,12 +24,11 @@ import 'package:immich_mobile/providers/user.provider.dart'; import 'package:immich_mobile/repositories/file_media.repository.dart'; import 'package:immich_mobile/services/app_settings.service.dart'; import 'package:immich_mobile/services/auth.service.dart'; -import 'package:immich_mobile/services/localization.service.dart'; import 'package:immich_mobile/services/foreground_upload.service.dart'; +import 'package:immich_mobile/services/localization.service.dart'; import 'package:immich_mobile/utils/bootstrap.dart'; import 'package:immich_mobile/utils/debug_print.dart'; import 'package:immich_mobile/utils/http_ssl_options.dart'; -import 'package:immich_mobile/wm_executor.dart'; import 'package:isar/isar.dart'; import 'package:logging/logging.dart'; @@ -93,7 +92,6 @@ class BackgroundWorkerBgService extends BackgroundWorkerFlutterApi { await Future.wait( [ loadTranslations(), - workerManagerPatch.init(dynamicSpawning: true), _ref?.read(authServiceProvider).setOpenApiServiceEndpoint(), // Initialize the file downloader FileDownloader().configure( @@ -203,14 +201,10 @@ class BackgroundWorkerBgService extends BackgroundWorkerFlutterApi { final cleanupFutures = [ nativeSyncApi?.cancelHashing(), - workerManagerPatch.dispose().catchError((_) async { - // Discard any errors on the dispose call - return; - }), LogService.I.dispose(), Store.dispose(), - - backgroundSyncManager?.cancel(), + backgroundSyncManager?.cancel(immediate: true), + backgroundSyncManager?.cancelLocal(immediate: true), ]; if (_isar.isOpen) { diff --git a/mobile/lib/domain/utils/background_sync.dart b/mobile/lib/domain/utils/background_sync.dart index 637ae20cb8..4742aa403a 100644 --- a/mobile/lib/domain/utils/background_sync.dart +++ b/mobile/lib/domain/utils/background_sync.dart @@ -4,7 +4,6 @@ import 'package:immich_mobile/domain/utils/migrate_cloud_ids.dart' as m; import 'package:immich_mobile/domain/utils/sync_linked_album.dart'; import 'package:immich_mobile/providers/infrastructure/sync.provider.dart'; import 'package:immich_mobile/utils/isolate.dart'; -import 'package:worker_manager/worker_manager.dart'; typedef SyncCallback = void Function(); typedef SyncCallbackWithResult = void Function(T result); @@ -27,12 +26,12 @@ class BackgroundSyncManager { final SyncCallback? onCloudIdSyncComplete; final SyncErrorCallback? onCloudIdSyncError; - Cancelable? _syncTask; - Cancelable? _syncWebsocketTask; - Cancelable? _cloudIdSyncTask; - Cancelable? _deviceAlbumSyncTask; - Cancelable? _linkedAlbumSyncTask; - Cancelable? _hashTask; + CancellableTask? _syncTask; + CancellableTask? _syncWebsocketTask; + CancellableTask? _cloudIdSyncTask; + CancellableTask? _deviceAlbumSyncTask; + CancellableTask? _linkedAlbumSyncTask; + CancellableTask? _hashTask; BackgroundSyncManager({ this.onRemoteSyncStart, @@ -49,60 +48,43 @@ class BackgroundSyncManager { this.onCloudIdSyncError, }); - Future cancel() async { - final futures = []; - - if (_syncTask != null) { - futures.add(_syncTask!.future); - } - _syncTask?.cancel(); - _syncTask = null; - - if (_syncWebsocketTask != null) { - futures.add(_syncWebsocketTask!.future); - } - _syncWebsocketTask?.cancel(); - _syncWebsocketTask = null; - - if (_cloudIdSyncTask != null) { - futures.add(_cloudIdSyncTask!.future); - } - _cloudIdSyncTask?.cancel(); - _cloudIdSyncTask = null; - - if (_linkedAlbumSyncTask != null) { - futures.add(_linkedAlbumSyncTask!.future); - } - _linkedAlbumSyncTask?.cancel(); - _linkedAlbumSyncTask = null; + Future cancel({bool immediate = false}) async { + _syncTask!.cancel(immediate: immediate); + _syncWebsocketTask!.cancel(immediate: immediate); + _cloudIdSyncTask!.cancel(immediate: immediate); + _linkedAlbumSyncTask!.cancel(immediate: immediate); try { - await Future.wait(futures); - } on CanceledError { - // Ignore cancellation errors + await Future.wait( + [ + _syncTask?.future, + _syncWebsocketTask?.future, + _cloudIdSyncTask?.future, + _linkedAlbumSyncTask?.future, + ].nonNulls, + ); + } catch (e) { + // Ignore cancellation errors and cleanup timeouts } + + _syncTask = null; + _syncWebsocketTask = null; + _cloudIdSyncTask = null; + _linkedAlbumSyncTask = null; } - Future cancelLocal() async { - final futures = []; - - if (_hashTask != null) { - futures.add(_hashTask!.future); - } - _hashTask?.cancel(); - _hashTask = null; - - if (_deviceAlbumSyncTask != null) { - futures.add(_deviceAlbumSyncTask!.future); - } - _deviceAlbumSyncTask?.cancel(); - _deviceAlbumSyncTask = null; + Future cancelLocal({bool immediate = false}) async { + _hashTask!.cancel(immediate: immediate); + _deviceAlbumSyncTask!.cancel(immediate: immediate); try { - await Future.wait(futures); - } on CanceledError { - // Ignore cancellation errors + await Future.wait([_hashTask?.future, _deviceAlbumSyncTask?.future].nonNulls); + } catch (e) { + // Ignore cancellation errors and cleanup timeouts } + + _hashTask = null; + _deviceAlbumSyncTask = null; } // No need to cancel the task, as it can also be run when the user logs out @@ -133,7 +115,8 @@ class BackgroundSyncManager { .catchError((error) { onLocalSyncError?.call(error.toString()); _deviceAlbumSyncTask = null; - }); + }) + .future; } Future hashAssets() { @@ -156,7 +139,8 @@ class BackgroundSyncManager { .catchError((error) { onHashingError?.call(error.toString()); _hashTask = null; - }); + }) + .future; } Future syncRemote() { @@ -170,7 +154,7 @@ class BackgroundSyncManager { computation: (ref) => ref.read(syncStreamServiceProvider).sync(), debugLabel: 'remote-sync', ); - return _syncTask! + return _syncTask!.future .then((result) { final success = result ?? false; onRemoteSyncComplete?.call(success); @@ -193,7 +177,7 @@ class BackgroundSyncManager { _syncWebsocketTask = _handleWsAssetUploadReadyV1Batch(batchData); return _syncWebsocketTask!.whenComplete(() { _syncWebsocketTask = null; - }); + }).future; } Future syncLinkedAlbum() { @@ -204,7 +188,7 @@ class BackgroundSyncManager { _linkedAlbumSyncTask = runInIsolateGentle(computation: syncLinkedAlbumsIsolated, debugLabel: 'linked-album-sync'); return _linkedAlbumSyncTask!.whenComplete(() { _linkedAlbumSyncTask = null; - }); + }).future; } Future syncCloudIds() { @@ -214,7 +198,7 @@ class BackgroundSyncManager { onCloudIdSyncStart?.call(); - _cloudIdSyncTask = runInIsolateGentle(computation: m.syncCloudIds); + _cloudIdSyncTask = runInIsolateGentle(computation: m.syncCloudIds, debugLabel: 'cloud-id-sync'); return _cloudIdSyncTask! .whenComplete(() { onCloudIdSyncComplete?.call(); @@ -223,11 +207,12 @@ class BackgroundSyncManager { .catchError((error) { onCloudIdSyncError?.call(error.toString()); _cloudIdSyncTask = null; - }); + }) + .future; } } -Cancelable _handleWsAssetUploadReadyV1Batch(List batchData) => runInIsolateGentle( +CancellableTask _handleWsAssetUploadReadyV1Batch(List batchData) => runInIsolateGentle( computation: (ref) => ref.read(syncStreamServiceProvider).handleWsAssetUploadReadyV1Batch(batchData), debugLabel: 'websocket-batch', ); diff --git a/mobile/lib/main.dart b/mobile/lib/main.dart index 83bc840df1..43bb09f8f0 100644 --- a/mobile/lib/main.dart +++ b/mobile/lib/main.dart @@ -1,6 +1,5 @@ import 'dart:async'; import 'dart:io'; -import 'dart:math'; import 'package:auto_route/auto_route.dart'; import 'package:background_downloader/background_downloader.dart'; @@ -41,7 +40,6 @@ import 'package:immich_mobile/utils/debug_print.dart'; import 'package:immich_mobile/utils/http_ssl_options.dart'; import 'package:immich_mobile/utils/licenses.dart'; import 'package:immich_mobile/utils/migration.dart'; -import 'package:immich_mobile/wm_executor.dart'; import 'package:immich_ui/immich_ui.dart'; import 'package:intl/date_symbol_data_local.dart'; import 'package:logging/logging.dart'; @@ -53,8 +51,6 @@ void main() async { final (isar, drift, logDb) = await Bootstrap.initDB(); await Bootstrap.initDomain(isar, drift, logDb); await initApp(); - // Warm-up isolate pool for worker manager - await workerManagerPatch.init(dynamicSpawning: true, isolatesCount: max(Platform.numberOfProcessors - 1, 5)); await migrateDatabaseIfNeeded(isar, drift); HttpSSLOptions.apply(); diff --git a/mobile/lib/providers/app_life_cycle.provider.dart b/mobile/lib/providers/app_life_cycle.provider.dart index 604f1c8d0d..11037e12cd 100644 --- a/mobile/lib/providers/app_life_cycle.provider.dart +++ b/mobile/lib/providers/app_life_cycle.provider.dart @@ -160,7 +160,7 @@ class AppLifeCycleNotifier extends StateNotifier { _resumeBackup(); }), _resumeBackup(), - backgroundManager.syncCloudIds(), + _safeRun(backgroundManager.syncCloudIds(), "syncCloudIds"), ]); } else { await _safeRun(backgroundManager.hashAssets(), "hashAssets"); @@ -218,7 +218,14 @@ class AppLifeCycleNotifier extends StateNotifier { try { if (Store.isBetaTimelineEnabled) { - unawaited(_ref.read(backgroundWorkerLockServiceProvider).unlock()); + unawaited( + Future.wait([ + _ref.read(backgroundWorkerLockServiceProvider).unlock(), + _ref.read(nativeSyncApiProvider).cancelHashing(), + _ref.read(backgroundSyncProvider).cancel(immediate: true), + _ref.read(backgroundSyncProvider).cancelLocal(immediate: true), + ]), + ); } await _performPause(); } catch (e, stackTrace) { diff --git a/mobile/lib/utils/isolate.dart b/mobile/lib/utils/isolate.dart index 491e1bf107..b5c6bc0fdb 100644 --- a/mobile/lib/utils/isolate.dart +++ b/mobile/lib/utils/isolate.dart @@ -1,95 +1,298 @@ import 'dart:async'; +import 'dart:isolate'; import 'dart:ui'; import 'package:flutter/services.dart'; import 'package:hooks_riverpod/hooks_riverpod.dart'; import 'package:immich_mobile/domain/services/log.service.dart'; import 'package:immich_mobile/entities/store.entity.dart'; +import 'package:immich_mobile/infrastructure/repositories/db.repository.dart'; +import 'package:immich_mobile/infrastructure/repositories/logger_db.repository.dart'; import 'package:immich_mobile/providers/db.provider.dart'; import 'package:immich_mobile/providers/infrastructure/cancel.provider.dart'; import 'package:immich_mobile/providers/infrastructure/db.provider.dart'; import 'package:immich_mobile/utils/bootstrap.dart'; import 'package:immich_mobile/utils/debug_print.dart'; import 'package:immich_mobile/utils/http_ssl_options.dart'; -import 'package:immich_mobile/wm_executor.dart'; +import 'package:isar/isar.dart'; import 'package:logging/logging.dart'; -import 'package:worker_manager/worker_manager.dart'; -class InvalidIsolateUsageException implements Exception { - const InvalidIsolateUsageException(); +class CancellableTask { + final Future future; + final void Function({bool immediate}) cancel; - @override - String toString() => "IsolateHelper should only be used from the root isolate"; -} + const CancellableTask({required this.future, required this.cancel}); -// !! Should be used only from the root isolate -Cancelable runInIsolateGentle({ - required Future Function(ProviderContainer ref) computation, - String? debugLabel, -}) { - final token = RootIsolateToken.instance; - if (token == null) { - throw const InvalidIsolateUsageException(); + CancellableTask whenComplete(void Function() action) { + return CancellableTask(future: future.whenComplete(action), cancel: cancel); } - return workerManagerPatch.executeGentle((cancelledChecker) async { - T? result; + CancellableTask catchError(Function onError) { + return CancellableTask(future: future.catchError(onError), cancel: cancel); + } + + CancellableTask then(FutureOr Function(T?) onValue) { + return CancellableTask(future: future.then(onValue), cancel: cancel); + } +} + +sealed class _IsolateMessage { + const _IsolateMessage(); +} + +class _InitMessage extends _IsolateMessage { + final SendPort sendPort; + const _InitMessage(this.sendPort); +} + +class _CancelMessage extends _IsolateMessage { + const _CancelMessage(); +} + +class _ResultMessage extends _IsolateMessage { + final dynamic data; + const _ResultMessage(this.data); +} + +class _ErrorMessage extends _IsolateMessage { + final Object? error; + final StackTrace? stackTrace; + const _ErrorMessage(this.error, this.stackTrace); +} + +class _DoneMessage extends _IsolateMessage { + const _DoneMessage(); +} + +class _IsolateTaskConfig { + final Future Function(ProviderContainer ref) computation; + final SendPort mainSendPort; + final RootIsolateToken rootToken; + final String debugLabel; + + const _IsolateTaskConfig({ + required this.computation, + required this.mainSendPort, + required this.rootToken, + required this.debugLabel, + }); +} + +class _IsolateTaskRunner { + final Completer _completer = Completer(); + final ReceivePort _receivePort = ReceivePort(); + final String debugLabel; + + Isolate? _isolate; + SendPort? _isolateSendPort; + bool _isCancelled = false; + bool _isCleanedUp = false; + Timer? _cleanupTimeoutTimer; + + _IsolateTaskRunner({required this.debugLabel}); + + Future start(Future Function(ProviderContainer ref) computation) async { + final token = RootIsolateToken.instance; + if (token == null) { + _completer.completeError(Exception("RootIsolateToken is not available. Isolate cannot be started.")); + return; + } + + _receivePort.listen(_handleMessage); + + final config = _IsolateTaskConfig( + computation: computation, + mainSendPort: _receivePort.sendPort, + rootToken: token, + debugLabel: debugLabel, + ); + + try { + _isolate = await Isolate.spawn(_isolateEntryPoint, config, debugName: debugLabel); + } catch (error, stack) { + _completer.completeError(error, stack); + _cleanup(); + } + } + + void cancel({bool immediate = false}) { + if (_isCancelled || _isCleanedUp) return; + + _isCancelled = true; + dPrint(() => "[$debugLabel] Cancellation requested"); + + if (immediate) { + _isolate?.kill(priority: Isolate.immediate); + if (!_completer.isCompleted) { + _completer.completeError(Exception("Isolate task cancelled immediately")); + } + dPrint(() => "[$debugLabel] Isolate killed immediately"); + _cleanup(); + return; + } + + _isolateSendPort?.send(const _CancelMessage()); + _cleanupTimeoutTimer = Timer(const Duration(seconds: 2), () { + if (!_isCleanedUp) { + dPrint(() => "[$debugLabel] Cleanup timeout - force killing isolate"); + _isolate?.kill(priority: Isolate.immediate); + if (!_completer.isCompleted) { + _completer.completeError(Exception("Isolate cleanup timed out for task: $debugLabel")); + } + _cleanup(); + } + }); + } + + void _handleMessage(dynamic message) { + if (message is! _IsolateMessage) return; + + switch (message) { + case _InitMessage(:var sendPort): + _isolateSendPort = sendPort; + dPrint(() => "[$debugLabel] Isolate initialized"); + break; + + case _ResultMessage(:var data): + if (!_completer.isCompleted) { + _completer.complete(data as T?); + dPrint(() => "[$debugLabel] Isolate task completed with result - $data"); + } + _cleanup(); + break; + + case _ErrorMessage(:var error, :var stackTrace): + if (!_completer.isCompleted) { + dPrint(() => "[$debugLabel] Isolate task completed with error - $error"); + _completer.completeError(error ?? Exception("Unknown error in isolate"), stackTrace ?? StackTrace.current); + } + _cleanup(); + break; + + case _DoneMessage(): + dPrint(() => "[$debugLabel] Isolate cleanup completed"); + _cleanup(); + break; + + case _CancelMessage(): + // Not expected to receive cancel from isolate + break; + } + } + + void _cleanup() { + if (_isCleanedUp) return; + _isCleanedUp = true; + + _cleanupTimeoutTimer?.cancel(); + _receivePort.close(); + _isolate?.kill(priority: Isolate.beforeNextEvent); + _isolate = null; + _isolateSendPort = null; + + dPrint(() => "[$debugLabel] Isolate cleaned up"); + } + + Future get future => _completer.future; +} + +Future _isolateEntryPoint(_IsolateTaskConfig config) async { + final receivePort = ReceivePort(); + config.mainSendPort.send(_InitMessage(receivePort.sendPort)); + + bool isCancelled = false; + final subscription = receivePort.listen((message) { + if (message is _CancelMessage) { + isCancelled = true; + } + }); + + ProviderContainer? ref; + final Isar isar; + final Drift drift; + final DriftLogger logDb; + + try { + BackgroundIsolateBinaryMessenger.ensureInitialized(config.rootToken); + DartPluginRegistrant.ensureInitialized(); + final (bootIsar, bootDrift, bootLogDb) = await Bootstrap.initDB(); + await Bootstrap.initDomain(bootIsar, bootDrift, bootLogDb, shouldBufferLogs: false, listenStoreUpdates: false); + isar = bootIsar; + drift = bootDrift; + logDb = bootLogDb; + } catch (error, stack) { + dPrint(() => "[$config.debugLabel] Error during isolate bootstrap: $error"); + config.mainSendPort.send(_ErrorMessage(error, stack)); + return; + } + + final log = Logger("IsolateWorker[${config.debugLabel}]"); + try { await runZonedGuarded( () async { - BackgroundIsolateBinaryMessenger.ensureInitialized(token); - DartPluginRegistrant.ensureInitialized(); - - final (isar, drift, logDb) = await Bootstrap.initDB(); - await Bootstrap.initDomain(isar, drift, logDb, shouldBufferLogs: false, listenStoreUpdates: false); - final ref = ProviderContainer( + ref = ProviderContainer( overrides: [ - // TODO: Remove once isar is removed dbProvider.overrideWithValue(isar), isarProvider.overrideWithValue(isar), - cancellationProvider.overrideWithValue(cancelledChecker), + cancellationProvider.overrideWithValue(() => isCancelled), driftProvider.overrideWith(driftOverride(drift)), ], ); - Logger log = Logger("IsolateLogger"); + HttpSSLOptions.apply(applyNative: false); + final result = await config.computation(ref!); - try { - HttpSSLOptions.apply(applyNative: false); - result = await computation(ref); - } on CanceledError { - log.warning("Computation cancelled ${debugLabel == null ? '' : ' for $debugLabel'}"); - } catch (error, stack) { - log.severe("Error in runInIsolateGentle ${debugLabel == null ? '' : ' for $debugLabel'}", error, stack); - } finally { - try { - ref.dispose(); - - await Store.dispose(); - await LogService.I.dispose(); - await logDb.close(); - await drift.close(); - - // Close Isar safely - try { - if (isar.isOpen) { - await isar.close(); - } - } catch (e) { - dPrint(() => "Error closing Isar: $e"); - } - } catch (error, stack) { - dPrint(() => "Error closing resources in isolate: $error, $stack"); - } finally { - ref.dispose(); - // Delay to ensure all resources are released - await Future.delayed(const Duration(seconds: 2)); - } + if (!isCancelled) { + config.mainSendPort.send(_ResultMessage(result)); + } else { + log.fine("Task completed but was cancelled - not sending result"); } }, (error, stack) { - dPrint(() => "Error in isolate $debugLabel zone: $error, $stack"); + log.severe("Uncaught error in isolate zone", error, stack); + config.mainSendPort.send(_ErrorMessage(error, stack)); }, ); - return result; - }); + } catch (error, stack) { + log.severe("Error in isolate execution", error, stack); + config.mainSendPort.send(_ErrorMessage(error, stack)); + } finally { + try { + receivePort.close(); + final cleanupFutures = [ + Store.dispose(), + LogService.I.dispose(), + logDb.close(), + drift.close(), + subscription.cancel(), + if (isar.isOpen) isar.close().catchError((_) => false), + ]; + + ref?.dispose(); + + await Future.wait(cleanupFutures).timeout( + const Duration(seconds: 2), + onTimeout: () { + dPrint(() => "Cleanup timeout - some resources may not be closed"); + return []; + }, + ); + } catch (error, stack) { + dPrint(() => "Error during isolate cleanup: $error with stack: $stack"); + } finally { + unawaited(subscription.cancel()); + config.mainSendPort.send(const _DoneMessage()); + } + } +} + +CancellableTask runInIsolateGentle({ + required Future Function(ProviderContainer ref) computation, + String? debugLabel, +}) { + final runner = _IsolateTaskRunner( + debugLabel: debugLabel ?? 'isolate-task-${DateTime.now().millisecondsSinceEpoch}', + )..start(computation); + + return CancellableTask(future: runner.future, cancel: runner.cancel); } diff --git a/mobile/lib/wm_executor.dart b/mobile/lib/wm_executor.dart deleted file mode 100644 index 73e882e8e6..0000000000 --- a/mobile/lib/wm_executor.dart +++ /dev/null @@ -1,251 +0,0 @@ -// part of 'package:worker_manager/worker_manager.dart'; -// ignore_for_file: implementation_imports, avoid_print - -import 'dart:async'; -import 'dart:math'; - -import 'package:collection/collection.dart'; -import 'package:flutter/foundation.dart'; -import 'package:worker_manager/src/number_of_processors/processors_io.dart'; -import 'package:worker_manager/src/worker/worker.dart'; -import 'package:worker_manager/worker_manager.dart'; - -final workerManagerPatch = _Executor(); - -// [-2^54; 2^53] is compatible with dart2js, see core.int doc -const _minId = -9007199254740992; -const _maxId = 9007199254740992; - -class Mixinable { - late final itSelf = this as T; -} - -mixin _ExecutorLogger on Mixinable<_Executor> { - var log = false; - - @mustCallSuper - void init() { - logMessage("${itSelf._isolatesCount} workers have been spawned and initialized"); - } - - void logTaskAdded(String uid) { - logMessage("added task with number $uid"); - } - - @mustCallSuper - void dispose() { - logMessage("worker_manager have been disposed"); - } - - @mustCallSuper - void _cancel(Task task) { - logMessage("Task ${task.id} have been canceled"); - } - - void logMessage(String message) { - if (log) print(message); - } -} - -class _Executor extends Mixinable<_Executor> with _ExecutorLogger { - final _queue = PriorityQueue(); - final _pool = []; - var _nextTaskId = _minId; - var _dynamicSpawning = false; - var _isolatesCount = numberOfProcessors; - - @override - Future init({int? isolatesCount, bool? dynamicSpawning}) async { - if (_pool.isNotEmpty) { - print("worker_manager already warmed up, init is ignored. Dispose before init"); - return; - } - if (isolatesCount != null) { - if (isolatesCount < 0) { - throw Exception("isolatesCount must be greater than 0"); - } - - _isolatesCount = isolatesCount; - } - _dynamicSpawning = dynamicSpawning ?? false; - await _ensureWorkersInitialized(); - super.init(); - } - - @override - Future dispose() async { - _queue.clear(); - for (final worker in _pool) { - worker.kill(); - } - _pool.clear(); - super.dispose(); - } - - Cancelable execute(Execute execution, {WorkPriority priority = WorkPriority.immediately}) { - return _createCancelable(execution: execution, priority: priority); - } - - Cancelable executeNow(ExecuteGentle execution) { - final task = TaskGentle( - id: "", - workPriority: WorkPriority.immediately, - execution: execution, - completer: Completer(), - ); - - Future run() async { - try { - final result = await execution(() => task.canceled); - task.complete(result, null, null); - } catch (error, st) { - task.complete(null, error, st); - } - } - - run(); - return Cancelable(completer: task.completer, onCancel: () => _cancel(task)); - } - - Cancelable executeWithPort( - ExecuteWithPort execution, { - WorkPriority priority = WorkPriority.immediately, - required void Function(T value) onMessage, - }) { - return _createCancelable( - execution: execution, - priority: priority, - onMessage: (message) => onMessage(message as T), - ); - } - - Cancelable executeGentle(ExecuteGentle execution, {WorkPriority priority = WorkPriority.immediately}) { - return _createCancelable(execution: execution, priority: priority); - } - - Cancelable executeGentleWithPort( - ExecuteGentleWithPort execution, { - WorkPriority priority = WorkPriority.immediately, - required void Function(T value) onMessage, - }) { - return _createCancelable( - execution: execution, - priority: priority, - onMessage: (message) => onMessage(message as T), - ); - } - - void _createWorkers() { - for (var i = 0; i < _isolatesCount; i++) { - _pool.add(Worker()); - } - } - - Future _initializeWorkers() async { - await Future.wait(_pool.map((e) => e.initialize())); - } - - Cancelable _createCancelable({ - required Function execution, - WorkPriority priority = WorkPriority.immediately, - void Function(Object value)? onMessage, - }) { - if (_nextTaskId + 1 == _maxId) { - _nextTaskId = _minId; - } - final id = _nextTaskId.toString(); - _nextTaskId++; - late final Task task; - final completer = Completer(); - if (execution is Execute) { - task = TaskRegular(id: id, workPriority: priority, execution: execution, completer: completer); - } else if (execution is ExecuteWithPort) { - task = TaskWithPort( - id: id, - workPriority: priority, - execution: execution, - completer: completer, - onMessage: onMessage!, - ); - } else if (execution is ExecuteGentle) { - task = TaskGentle(id: id, workPriority: priority, execution: execution, completer: completer); - } else if (execution is ExecuteGentleWithPort) { - task = TaskGentleWithPort( - id: id, - workPriority: priority, - execution: execution, - completer: completer, - onMessage: onMessage!, - ); - } - _queue.add(task); - _schedule(); - logTaskAdded(task.id); - return Cancelable(completer: task.completer, onCancel: () => _cancel(task)); - } - - Future _ensureWorkersInitialized() async { - if (_pool.isEmpty) { - _createWorkers(); - if (!_dynamicSpawning) { - await _initializeWorkers(); - final poolSize = _pool.length; - final queueSize = _queue.length; - for (int i = 0; i <= min(poolSize, queueSize); i++) { - _schedule(); - } - } - } - if (_pool.every((worker) => worker.taskId != null)) { - return; - } - if (_dynamicSpawning) { - final freeWorker = _pool.firstWhereOrNull( - (worker) => worker.taskId == null && !worker.initialized && !worker.initializing, - ); - await freeWorker?.initialize(); - _schedule(); - } - } - - void _schedule() { - final availableWorker = _pool.firstWhereOrNull((worker) => worker.taskId == null && worker.initialized); - if (availableWorker == null) { - _ensureWorkersInitialized(); - return; - } - if (_queue.isEmpty) return; - final task = _queue.removeFirst(); - - availableWorker - .work(task) - .then( - (value) { - //could be completed already by cancel and it is normal. - //Assuming that worker finished with error and cleaned gracefully - task.complete(value, null, null); - }, - onError: (error, st) { - task.complete(null, error, st); - }, - ) - .whenComplete(() { - if (_dynamicSpawning && _queue.isEmpty) availableWorker.kill(); - _schedule(); - }); - } - - @override - void _cancel(Task task) { - task.cancel(); - _queue.remove(task); - final targetWorker = _pool.firstWhereOrNull((worker) => worker.taskId == task.id); - if (task is Gentle) { - targetWorker?.cancelGentle(); - } else { - targetWorker?.kill(); - if (!_dynamicSpawning) targetWorker?.initialize(); - } - super._cancel(task); - } -} diff --git a/mobile/pubspec.lock b/mobile/pubspec.lock index 3179d71bd1..1bde0c34b2 100644 --- a/mobile/pubspec.lock +++ b/mobile/pubspec.lock @@ -2154,14 +2154,6 @@ packages: url: "https://pub.dev" source: hosted version: "0.0.3" - worker_manager: - dependency: "direct main" - description: - name: worker_manager - sha256: "1bce9f894a0c187856f5fc0e150e7fe1facce326f048ca6172947754dac3d4f3" - url: "https://pub.dev" - source: hosted - version: "7.2.7" xdg_directories: dependency: transitive description: diff --git a/mobile/pubspec.yaml b/mobile/pubspec.yaml index 64d7205444..25881e303c 100644 --- a/mobile/pubspec.yaml +++ b/mobile/pubspec.yaml @@ -85,7 +85,6 @@ dependencies: url_launcher: ^6.3.2 uuid: ^4.5.1 wakelock_plus: ^1.3.0 - worker_manager: ^7.2.7 dev_dependencies: auto_route_generator: ^9.0.0