From d6377eb8966861ed7692a35a58033a0b540eec63 Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Tue, 9 Jul 2024 13:53:24 +0300 Subject: [PATCH] fix(incremental): pass through errors from return functions that throw Early execution may result in non-completed streams after publishing is completed -- these streams must be closed using their return methods. When this occurs, we should pass through any error that occurs in the clean-up function instead of swallowing errors. Swallowing errors is a bad practice that could lead to memory leaks. The argument in favor of swallowing the error might be that because the stream was "executed early" and the error does not impact any of the returned data, there is no "place" to forward the error. But there is a way to forward the error, and that's on the next() call that returns { value: undefined, done: true } to end the stream. We will therefore appropriately send all the data and be able to pass through an error. Servers processing our stream should be made aware of this behavior and put in place error handling procedures that allow them to forward the data to clients when they see a payload with { hasNext: false } and then filter any further errors from clients. Servers could also postpone sending { hasNext: false } until the clean-up has been performed, and then optionally send information about the error in the extensions key, if that made sense. It would be up to the server! --- src/execution/IncrementalPublisher.ts | 53 +++++++++++++++++--------- src/execution/__tests__/stream-test.ts | 21 +++++----- 2 files changed, 44 insertions(+), 30 deletions(-) diff --git a/src/execution/IncrementalPublisher.ts b/src/execution/IncrementalPublisher.ts index dd27033ed8..03f36b80cf 100644 --- a/src/execution/IncrementalPublisher.ts +++ b/src/execution/IncrementalPublisher.ts @@ -52,6 +52,19 @@ interface SubsequentIncrementalExecutionResultContext { completed: Array; } +/** + * The IncrementalPublisherState Enum tracks the state of the IncrementalPublisher, which is initialized to + * "Started". When there are no more incremental results to publish, the state is set to "Completed". On the + * next call to next, clean-up is potentially performed and the state is set to "Finished". + * + * If the IncrementalPublisher is ended early, it may be advanced directly from "Started" to "Finished". + */ +enum IncrementalPublisherState { + Started = 1, + Completed = 2, + Finished = 3, +} + /** * This class is used to publish incremental results to the client, enabling semi-concurrent * execution while preserving result order. @@ -119,14 +132,29 @@ class IncrementalPublisher { void, void > { - let isDone = false; + let incrementalPublisherState: IncrementalPublisherState = + IncrementalPublisherState.Started; + + const _finish = async (): Promise => { + incrementalPublisherState = IncrementalPublisherState.Finished; + this._incrementalGraph.abort(); + await this._returnAsyncIterators(); + }; const _next = async (): Promise< IteratorResult > => { - if (isDone) { - await this._returnAsyncIteratorsIgnoringErrors(); - return { value: undefined, done: true }; + switch (incrementalPublisherState) { + case IncrementalPublisherState.Finished: { + return { value: undefined, done: true }; + } + case IncrementalPublisherState.Completed: { + await _finish(); + return { value: undefined, done: true }; + } + case IncrementalPublisherState.Started: { + // continue + } } const context: SubsequentIncrementalExecutionResultContext = { @@ -147,7 +175,7 @@ class IncrementalPublisher { const hasNext = this._incrementalGraph.hasNext(); if (!hasNext) { - isDone = true; + incrementalPublisherState = IncrementalPublisherState.Completed; } const subsequentIncrementalExecutionResult: SubsequentIncrementalExecutionResult = @@ -171,25 +199,20 @@ class IncrementalPublisher { batch = await this._incrementalGraph.nextCompletedBatch(); } while (batch !== undefined); - await this._returnAsyncIteratorsIgnoringErrors(); return { value: undefined, done: true }; }; const _return = async (): Promise< IteratorResult > => { - isDone = true; - this._incrementalGraph.abort(); - await this._returnAsyncIterators(); + await _finish(); return { value: undefined, done: true }; }; const _throw = async ( error?: unknown, ): Promise> => { - isDone = true; - this._incrementalGraph.abort(); - await this._returnAsyncIterators(); + await _finish(); return Promise.reject(error); }; @@ -372,10 +395,4 @@ class IncrementalPublisher { } await Promise.all(promises); } - - private async _returnAsyncIteratorsIgnoringErrors(): Promise { - await this._returnAsyncIterators().catch(() => { - // Ignore errors - }); - } } diff --git a/src/execution/__tests__/stream-test.ts b/src/execution/__tests__/stream-test.ts index 49c8e064fe..bd73e9bd65 100644 --- a/src/execution/__tests__/stream-test.ts +++ b/src/execution/__tests__/stream-test.ts @@ -2,6 +2,7 @@ import { assert, expect } from 'chai'; import { describe, it } from 'mocha'; import { expectJSON } from '../../__testUtils__/expectJSON.js'; +import { expectPromise } from '../../__testUtils__/expectPromise.js'; import { resolveOnNextTick } from '../../__testUtils__/resolveOnNextTick.js'; import type { PromiseOrValue } from '../../jsutils/PromiseOrValue.js'; @@ -1791,7 +1792,7 @@ describe('Execute: stream directive', () => { ]); }); - it('Returns iterator and ignores errors when stream payloads are filtered', async () => { + it('Returns iterator and passes through errors when stream payloads are filtered', async () => { let returned = false; let requested = false; const iterable = { @@ -1814,7 +1815,7 @@ describe('Execute: stream directive', () => { }, return: () => { returned = true; - // Ignores errors from return. + // This error should be passed through. return Promise.reject(new Error('Oops')); }, }), @@ -1889,8 +1890,8 @@ describe('Execute: stream directive', () => { }, }); - const result3 = await iterator.next(); - expectJSON(result3).toDeepEqual({ done: true, value: undefined }); + const result3Promise = iterator.next(); + await expectPromise(result3Promise).toRejectWith('Oops'); assert(returned); }); @@ -2339,6 +2340,8 @@ describe('Execute: stream directive', () => { }), return: () => { returned = true; + // This error should be passed through. + return Promise.reject(new Error('Oops')); }, }), }; @@ -2378,7 +2381,7 @@ describe('Execute: stream directive', () => { done: true, value: undefined, }); - await returnPromise; + await expectPromise(returnPromise).toRejectWith('Oops'); assert(returned); }); it('Can return async iterable when underlying iterable does not have a return method', async () => { @@ -2498,13 +2501,7 @@ describe('Execute: stream directive', () => { done: true, value: undefined, }); - try { - await throwPromise; /* c8 ignore start */ - // Not reachable, always throws - /* c8 ignore stop */ - } catch (e) { - // ignore error - } + await expectPromise(throwPromise).toRejectWith('bad'); assert(returned); }); });