Skip to content

Make watch queries easier to cancel #92

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 22, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 14 additions & 16 deletions packages/sqlite_async/lib/src/sqlite_queries.dart
Original file line number Diff line number Diff line change
Expand Up @@ -44,25 +44,23 @@ mixin SqliteQueries implements SqliteWriteContext, SqliteConnection {
Stream<sqlite.ResultSet> watch(String sql,
{List<Object?> parameters = const [],
Duration throttle = const Duration(milliseconds: 30),
Iterable<String>? triggerOnTables}) async* {
Iterable<String>? triggerOnTables}) {
assert(updates != null,
'updates stream must be provided to allow query watching');
final tables =
triggerOnTables ?? await getSourceTables(this, sql, parameters);
final filteredStream =
updates!.transform(UpdateNotification.filterTablesTransformer(tables));
final throttledStream = UpdateNotification.throttleStream(
filteredStream, throttle,
addOne: UpdateNotification.empty());

// FIXME:
// When the subscription is cancelled, this performs a final query on the next
// update.
// The loop only stops once the "yield" is reached.
// Using asyncMap instead of a generator would solve it, but then the body
// here can't be async for getSourceTables().
await for (var _ in throttledStream) {
yield await getAll(sql, parameters);
Stream<sqlite.ResultSet> watchInner(Iterable<String> trigger) {
return onChange(
trigger,
throttle: throttle,
triggerImmediately: true,
).asyncMap((_) => getAll(sql, parameters));
}

if (triggerOnTables case final knownTrigger?) {
return watchInner(knownTrigger);
} else {
return Stream.fromFuture(getSourceTables(this, sql, parameters))
.asyncExpand(watchInner);
}
}

Expand Down
163 changes: 111 additions & 52 deletions packages/sqlite_async/lib/src/update_notification.dart
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,13 @@ class UpdateNotification {
static Stream<UpdateNotification> throttleStream(
Stream<UpdateNotification> input, Duration timeout,
{UpdateNotification? addOne}) {
return _throttleStream(input, timeout, addOne: addOne, throttleFirst: true,
add: (a, b) {
return a.union(b);
});
return _throttleStream(
input: input,
timeout: timeout,
throttleFirst: true,
add: (a, b) => a.union(b),
addOne: addOne,
);
}

/// Filter an update stream by specific tables.
Expand All @@ -67,62 +70,118 @@ class UpdateNotification {
}
}

/// Given a broadcast stream, return a singular throttled stream that is throttled.
/// This immediately starts listening.
/// Throttles an [input] stream to not emit events more often than with a
/// frequency of 1/[timeout].
///
/// Behaviour:
/// If there was no event in "timeout", and one comes in, it is pushed immediately.
/// Otherwise, we wait until the timeout is over.
Stream<T> _throttleStream<T extends Object>(Stream<T> input, Duration timeout,
{bool throttleFirst = false, T Function(T, T)? add, T? addOne}) async* {
var nextPing = Completer<void>();
var done = false;
T? lastData;

var listener = input.listen((data) {
if (lastData != null && add != null) {
lastData = add(lastData!, data);
} else {
lastData = data;
/// When an event is received and no timeout window is active, it is forwarded
/// downstream and a timeout window is started. For events received within a
/// timeout window, [add] is called to fold events. Then when the window
/// expires, pending events are emitted.
/// The subscription to the [input] stream is never paused.
///
/// When the returned stream is paused, an active timeout window is reset and
/// restarts after the stream is resumed.
///
/// If [addOne] is not null, that event will always be added when the stream is
/// subscribed to.
/// When [throttleFirst] is true, a timeout window begins immediately after
/// listening (so that the first event, apart from [addOne], is emitted no
/// earlier than after [timeout]).
Stream<T> _throttleStream<T extends Object>({
required Stream<T> input,
required Duration timeout,
required bool throttleFirst,
required T Function(T, T) add,
required T? addOne,
}) {
return Stream.multi((listener) {
T? pendingData;
Timer? activeTimeoutWindow;
var needsTimeoutWindowAfterResume = false;

/// Add pending data, bypassing the active timeout window.
///
/// This is used to forward error and done events immediately.
bool addPendingEvents() {
if (pendingData case final data?) {
pendingData = null;
listener.addSync(data);
activeTimeoutWindow?.cancel();
activeTimeoutWindow = null;
return true;
} else {
return false;
}
}
if (!nextPing.isCompleted) {
nextPing.complete();

/// Emits [pendingData] if no timeout window is active, and then starts a
/// timeout window if necessary.
void maybeEmit() {
if (activeTimeoutWindow == null && !listener.isPaused) {
final didAdd = addPendingEvents();
if (didAdd) {
if (listener.isPaused) {
needsTimeoutWindowAfterResume = true;
} else {
activeTimeoutWindow = Timer(timeout, () {
activeTimeoutWindow = null;
maybeEmit();
});
}
}
}
}
}, onDone: () {
if (!nextPing.isCompleted) {
nextPing.complete();

void setTimeout() {
activeTimeoutWindow = Timer(timeout, () {
activeTimeoutWindow = null;
maybeEmit();
});
}

done = true;
});
void onData(T data) {
pendingData = switch (pendingData) {
null => data,
final pending => add(pending, data),
};
maybeEmit();
}

try {
if (addOne != null) {
yield addOne;
void onError(Object error, StackTrace trace) {
addPendingEvents();
listener.addErrorSync(error, trace);
}
if (throttleFirst) {
await Future.delayed(timeout);

void onDone() {
addPendingEvents();
listener.closeSync();
}
while (!done) {
// If a value is available now, we'll use it immediately.
// If not, this waits for it.
await nextPing.future;
if (done) break;

// Capture any new values coming in while we wait.
nextPing = Completer<void>();
T data = lastData as T;
// Clear before we yield, so that we capture new changes while yielding
lastData = null;
yield data;
// Wait a minimum of this duration between tasks
await Future.delayed(timeout);

final subscription = input.listen(onData, onError: onError, onDone: onDone);

listener.onPause = () {
needsTimeoutWindowAfterResume = activeTimeoutWindow != null;
activeTimeoutWindow?.cancel();
activeTimeoutWindow = null;
};
listener.onResume = () {
if (needsTimeoutWindowAfterResume) {
setTimeout();
} else {
maybeEmit();
}
};
listener.onCancel = () async {
activeTimeoutWindow?.cancel();
return subscription.cancel();
};

if (addOne != null) {
// This must not be sync, we're doing this directly in onListen
listener.add(addOne);
}
} finally {
if (lastData case final data?) {
yield data;
if (throttleFirst) {
setTimeout();
}

await listener.cancel();
}
});
}
55 changes: 55 additions & 0 deletions packages/sqlite_async/test/update_notification_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,61 @@ void main() {
});
});

test('increases delay after pause', () {
fakeAsync((control) {
final source = StreamController<UpdateNotification>(sync: true);
final events = <UpdateNotification>[];

final sub = UpdateNotification.throttleStream(source.stream, timeout)
.listen(null);
sub.onData((event) {
events.add(event);
sub.pause();
});

source.add(UpdateNotification({'a'}));
control.elapse(timeout);
expect(events, hasLength(1));

// Assume the stream stays paused for the timeout window that would
// be created after emitting the notification.
control.elapse(timeout * 2);
source.add(UpdateNotification({'b'}));
control.elapse(timeout * 2);

// A full timeout needs to pass after resuming before a new item is
// emitted.
sub.resume();
expect(events, hasLength(1));

control.elapse(halfTimeout);
expect(events, hasLength(1));
control.elapse(halfTimeout);
expect(events, hasLength(2));
});
});

test('does not introduce artificial delay in pause', () {
fakeAsync((control) {
final source = StreamController<UpdateNotification>(sync: true);
final events = <UpdateNotification>[];

final sub = UpdateNotification.throttleStream(source.stream, timeout)
.listen(events.add);

// Await the initial delay
control.elapse(timeout);

sub.pause();
source.add(UpdateNotification({'a'}));
// Resuming should not introduce a timeout window because no window
// was active when the stream was paused.
sub.resume();
control.flushMicrotasks();
expect(events, hasLength(1));
});
});

test('merges events', () {
fakeAsync((control) {
final source = StreamController<UpdateNotification>(sync: true);
Expand Down
8 changes: 5 additions & 3 deletions packages/sqlite_async/test/watch_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,10 @@ void main() {
lastCount = count;
}

// The number of read queries must not be greater than the number of writes overall.
expect(numberOfQueries, lessThanOrEqualTo(results.last.first['count']));
// The number of read queries must not be greater than the number of
// writes overall, plus one for the initial stream emission.
expect(numberOfQueries,
lessThanOrEqualTo(results.last.first['count'] + 1));

DateTime? lastTime;
for (var r in times) {
Expand Down Expand Up @@ -283,7 +285,7 @@ void main() {
});
await Future.delayed(delay);

subscription.cancel();
await subscription.cancel();

expect(
counts,
Expand Down
Loading