diff --git a/packages/sqlite_async/lib/src/sqlite_queries.dart b/packages/sqlite_async/lib/src/sqlite_queries.dart index dc91dd1..80bc568 100644 --- a/packages/sqlite_async/lib/src/sqlite_queries.dart +++ b/packages/sqlite_async/lib/src/sqlite_queries.dart @@ -44,25 +44,23 @@ mixin SqliteQueries implements SqliteWriteContext, SqliteConnection { Stream watch(String sql, {List parameters = const [], Duration throttle = const Duration(milliseconds: 30), - Iterable? triggerOnTables}) async* { + Iterable? triggerOnTables}) { assert(updates != null, 'updates stream must be provided to allow query watching'); - final tables = - triggerOnTables ?? await getSourceTables(this, sql, parameters); - final filteredStream = - updates!.transform(UpdateNotification.filterTablesTransformer(tables)); - final throttledStream = UpdateNotification.throttleStream( - filteredStream, throttle, - addOne: UpdateNotification.empty()); - // FIXME: - // When the subscription is cancelled, this performs a final query on the next - // update. - // The loop only stops once the "yield" is reached. - // Using asyncMap instead of a generator would solve it, but then the body - // here can't be async for getSourceTables(). - await for (var _ in throttledStream) { - yield await getAll(sql, parameters); + Stream watchInner(Iterable trigger) { + return onChange( + trigger, + throttle: throttle, + triggerImmediately: true, + ).asyncMap((_) => getAll(sql, parameters)); + } + + if (triggerOnTables case final knownTrigger?) { + return watchInner(knownTrigger); + } else { + return Stream.fromFuture(getSourceTables(this, sql, parameters)) + .asyncExpand(watchInner); } } diff --git a/packages/sqlite_async/lib/src/update_notification.dart b/packages/sqlite_async/lib/src/update_notification.dart index 0c8f2c6..21f3541 100644 --- a/packages/sqlite_async/lib/src/update_notification.dart +++ b/packages/sqlite_async/lib/src/update_notification.dart @@ -52,10 +52,13 @@ class UpdateNotification { static Stream throttleStream( Stream input, Duration timeout, {UpdateNotification? addOne}) { - return _throttleStream(input, timeout, addOne: addOne, throttleFirst: true, - add: (a, b) { - return a.union(b); - }); + return _throttleStream( + input: input, + timeout: timeout, + throttleFirst: true, + add: (a, b) => a.union(b), + addOne: addOne, + ); } /// Filter an update stream by specific tables. @@ -67,62 +70,120 @@ class UpdateNotification { } } -/// Given a broadcast stream, return a singular throttled stream that is throttled. -/// This immediately starts listening. +/// Throttles an [input] stream to not emit events more often than with a +/// frequency of 1/[timeout]. /// -/// Behaviour: -/// If there was no event in "timeout", and one comes in, it is pushed immediately. -/// Otherwise, we wait until the timeout is over. -Stream _throttleStream(Stream input, Duration timeout, - {bool throttleFirst = false, T Function(T, T)? add, T? addOne}) async* { - var nextPing = Completer(); - var done = false; - T? lastData; - - var listener = input.listen((data) { - if (lastData != null && add != null) { - lastData = add(lastData!, data); - } else { - lastData = data; +/// When an event is received and no timeout window is active, it is forwarded +/// downstream and a timeout window is started. For events received within a +/// timeout window, [add] is called to fold events. Then when the window +/// expires, pending events are emitted. +/// The subscription to the [input] stream is never paused. +/// +/// When the returned stream is paused, an active timeout window is reset and +/// restarts after the stream is resumed. +/// +/// If [addOne] is not null, that event will always be added when the stream is +/// subscribed to. +/// When [throttleFirst] is true, a timeout window begins immediately after +/// listening (so that the first event, apart from [addOne], is emitted no +/// earlier than after [timeout]). +Stream _throttleStream({ + required Stream input, + required Duration timeout, + required bool throttleFirst, + required T Function(T, T) add, + required T? addOne, +}) { + return Stream.multi((listener) { + T? pendingData; + Timer? activeTimeoutWindow; + var needsTimeoutWindowAfterResume = false; + + /// Add pending data, bypassing the active timeout window. + /// + /// This is used to forward error and done events immediately. + bool addPendingEvents() { + if (pendingData case final data?) { + pendingData = null; + listener.addSync(data); + activeTimeoutWindow?.cancel(); + activeTimeoutWindow = null; + return true; + } else { + return false; + } } - if (!nextPing.isCompleted) { - nextPing.complete(); + + late void Function() setTimeout; + + /// Emits [pendingData] if no timeout window is active, and then starts a + /// timeout window if necessary. + void maybeEmit() { + if (activeTimeoutWindow == null && !listener.isPaused) { + final didAdd = addPendingEvents(); + if (didAdd) { + // Schedule a pause after resume if the subscription was paused + // directly in response to receiving the event. Otherwise, begin the + // timeout window immediately. + if (listener.isPaused) { + needsTimeoutWindowAfterResume = true; + } else { + setTimeout(); + } + } + } } - }, onDone: () { - if (!nextPing.isCompleted) { - nextPing.complete(); + + setTimeout = () { + activeTimeoutWindow = Timer(timeout, () { + activeTimeoutWindow = null; + maybeEmit(); + }); + }; + + void onData(T data) { + pendingData = switch (pendingData) { + null => data, + final pending => add(pending, data), + }; + maybeEmit(); } - done = true; - }); + void onError(Object error, StackTrace trace) { + addPendingEvents(); + listener.addErrorSync(error, trace); + } + + void onDone() { + addPendingEvents(); + listener.closeSync(); + } + + final subscription = input.listen(onData, onError: onError, onDone: onDone); + + listener.onPause = () { + needsTimeoutWindowAfterResume = activeTimeoutWindow != null; + activeTimeoutWindow?.cancel(); + activeTimeoutWindow = null; + }; + listener.onResume = () { + if (needsTimeoutWindowAfterResume) { + setTimeout(); + } else { + maybeEmit(); + } + }; + listener.onCancel = () async { + activeTimeoutWindow?.cancel(); + return subscription.cancel(); + }; - try { if (addOne != null) { - yield addOne; + // This must not be sync, we're doing this directly in onListen + listener.add(addOne); } if (throttleFirst) { - await Future.delayed(timeout); - } - while (!done) { - // If a value is available now, we'll use it immediately. - // If not, this waits for it. - await nextPing.future; - if (done) break; - - // Capture any new values coming in while we wait. - nextPing = Completer(); - T data = lastData as T; - // Clear before we yield, so that we capture new changes while yielding - lastData = null; - yield data; - // Wait a minimum of this duration between tasks - await Future.delayed(timeout); + setTimeout(); } - } finally { - if (lastData case final data?) { - yield data; - } - - await listener.cancel(); - } + }); } diff --git a/packages/sqlite_async/test/update_notification_test.dart b/packages/sqlite_async/test/update_notification_test.dart index 0a00ccb..05c94c6 100644 --- a/packages/sqlite_async/test/update_notification_test.dart +++ b/packages/sqlite_async/test/update_notification_test.dart @@ -47,6 +47,61 @@ void main() { }); }); + test('increases delay after pause', () { + fakeAsync((control) { + final source = StreamController(sync: true); + final events = []; + + final sub = UpdateNotification.throttleStream(source.stream, timeout) + .listen(null); + sub.onData((event) { + events.add(event); + sub.pause(); + }); + + source.add(UpdateNotification({'a'})); + control.elapse(timeout); + expect(events, hasLength(1)); + + // Assume the stream stays paused for the timeout window that would + // be created after emitting the notification. + control.elapse(timeout * 2); + source.add(UpdateNotification({'b'})); + control.elapse(timeout * 2); + + // A full timeout needs to pass after resuming before a new item is + // emitted. + sub.resume(); + expect(events, hasLength(1)); + + control.elapse(halfTimeout); + expect(events, hasLength(1)); + control.elapse(halfTimeout); + expect(events, hasLength(2)); + }); + }); + + test('does not introduce artificial delay in pause', () { + fakeAsync((control) { + final source = StreamController(sync: true); + final events = []; + + final sub = UpdateNotification.throttleStream(source.stream, timeout) + .listen(events.add); + + // Await the initial delay + control.elapse(timeout); + + sub.pause(); + source.add(UpdateNotification({'a'})); + // Resuming should not introduce a timeout window because no window + // was active when the stream was paused. + sub.resume(); + control.flushMicrotasks(); + expect(events, hasLength(1)); + }); + }); + test('merges events', () { fakeAsync((control) { final source = StreamController(sync: true); diff --git a/packages/sqlite_async/test/watch_test.dart b/packages/sqlite_async/test/watch_test.dart index 7e74790..dc28add 100644 --- a/packages/sqlite_async/test/watch_test.dart +++ b/packages/sqlite_async/test/watch_test.dart @@ -113,8 +113,10 @@ void main() { lastCount = count; } - // The number of read queries must not be greater than the number of writes overall. - expect(numberOfQueries, lessThanOrEqualTo(results.last.first['count'])); + // The number of read queries must not be greater than the number of + // writes overall, plus one for the initial stream emission. + expect(numberOfQueries, + lessThanOrEqualTo(results.last.first['count'] + 1)); DateTime? lastTime; for (var r in times) { @@ -283,7 +285,7 @@ void main() { }); await Future.delayed(delay); - subscription.cancel(); + await subscription.cancel(); expect( counts,