Skip to content

Commit 9a08fe4

Browse files
committed
move publishing code into separate file
1 parent bd558cb commit 9a08fe4

File tree

5 files changed

+432
-346
lines changed

5 files changed

+432
-346
lines changed

src/execution/Publisher.ts

Lines changed: 331 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,331 @@
1+
import type { ObjMap } from '../jsutils/ObjMap.js';
2+
import type { Path } from '../jsutils/Path.js';
3+
import { pathToArray } from '../jsutils/Path.js';
4+
import type { PromiseOrValue } from '../jsutils/PromiseOrValue.js';
5+
6+
import type {
7+
GraphQLError,
8+
GraphQLFormattedError,
9+
} from '../error/GraphQLError.js';
10+
11+
export interface SubsequentIncrementalExecutionResult<
12+
TData = ObjMap<unknown>,
13+
TExtensions = ObjMap<unknown>,
14+
> {
15+
hasNext: boolean;
16+
incremental?: ReadonlyArray<IncrementalResult<TData, TExtensions>>;
17+
extensions?: TExtensions;
18+
}
19+
20+
export interface FormattedSubsequentIncrementalExecutionResult<
21+
TData = ObjMap<unknown>,
22+
TExtensions = ObjMap<unknown>,
23+
> {
24+
hasNext: boolean;
25+
incremental?: ReadonlyArray<FormattedIncrementalResult<TData, TExtensions>>;
26+
extensions?: TExtensions;
27+
}
28+
29+
export interface IncrementalDeferResult<
30+
TData = ObjMap<unknown>,
31+
TExtensions = ObjMap<unknown>,
32+
> {
33+
errors?: ReadonlyArray<GraphQLError>;
34+
data?: TData | null;
35+
path?: ReadonlyArray<string | number>;
36+
label?: string;
37+
extensions?: TExtensions;
38+
}
39+
40+
export interface FormattedIncrementalDeferResult<
41+
TData = ObjMap<unknown>,
42+
TExtensions = ObjMap<unknown>,
43+
> {
44+
errors?: ReadonlyArray<GraphQLFormattedError>;
45+
data?: TData | null;
46+
path?: ReadonlyArray<string | number>;
47+
label?: string;
48+
extensions?: TExtensions;
49+
}
50+
51+
export interface IncrementalStreamResult<
52+
TData = Array<unknown>,
53+
TExtensions = ObjMap<unknown>,
54+
> {
55+
errors?: ReadonlyArray<GraphQLError>;
56+
items?: TData | null;
57+
path?: ReadonlyArray<string | number>;
58+
label?: string;
59+
extensions?: TExtensions;
60+
}
61+
62+
export interface FormattedIncrementalStreamResult<
63+
TData = Array<unknown>,
64+
TExtensions = ObjMap<unknown>,
65+
> {
66+
errors?: ReadonlyArray<GraphQLFormattedError>;
67+
items?: TData | null;
68+
path?: ReadonlyArray<string | number>;
69+
label?: string;
70+
extensions?: TExtensions;
71+
}
72+
73+
export type IncrementalResult<
74+
TData = ObjMap<unknown>,
75+
TExtensions = ObjMap<unknown>,
76+
> =
77+
| IncrementalDeferResult<TData, TExtensions>
78+
| IncrementalStreamResult<TData, TExtensions>;
79+
80+
export type FormattedIncrementalResult<
81+
TData = ObjMap<unknown>,
82+
TExtensions = ObjMap<unknown>,
83+
> =
84+
| FormattedIncrementalDeferResult<TData, TExtensions>
85+
| FormattedIncrementalStreamResult<TData, TExtensions>;
86+
87+
export function filterSubsequentPayloads(
88+
subsequentPayloads: Set<IncrementalDataRecord>,
89+
nullPath: Path,
90+
currentIncrementalDataRecord: IncrementalDataRecord | undefined,
91+
): void {
92+
const nullPathArray = pathToArray(nullPath);
93+
subsequentPayloads.forEach((incrementalDataRecord) => {
94+
if (incrementalDataRecord === currentIncrementalDataRecord) {
95+
// don't remove payload from where error originates
96+
return;
97+
}
98+
for (let i = 0; i < nullPathArray.length; i++) {
99+
if (incrementalDataRecord.path[i] !== nullPathArray[i]) {
100+
// incrementalDataRecord points to a path unaffected by this payload
101+
return;
102+
}
103+
}
104+
// incrementalDataRecord path points to nulled error field
105+
if (
106+
isStreamItemsRecord(incrementalDataRecord) &&
107+
incrementalDataRecord.asyncIterator?.return
108+
) {
109+
incrementalDataRecord.asyncIterator.return().catch(() => {
110+
// ignore error
111+
});
112+
}
113+
subsequentPayloads.delete(incrementalDataRecord);
114+
});
115+
}
116+
117+
function getCompletedIncrementalResults(
118+
subsequentPayloads: Set<IncrementalDataRecord>,
119+
): Array<IncrementalResult> {
120+
const incrementalResults: Array<IncrementalResult> = [];
121+
for (const incrementalDataRecord of subsequentPayloads) {
122+
const incrementalResult: IncrementalResult = {};
123+
if (!incrementalDataRecord.isCompleted) {
124+
continue;
125+
}
126+
subsequentPayloads.delete(incrementalDataRecord);
127+
if (isStreamItemsRecord(incrementalDataRecord)) {
128+
const items = incrementalDataRecord.items;
129+
if (incrementalDataRecord.isCompletedAsyncIterator) {
130+
// async iterable resolver just finished but there may be pending payloads
131+
continue;
132+
}
133+
(incrementalResult as IncrementalStreamResult).items = items;
134+
} else {
135+
const data = incrementalDataRecord.data;
136+
(incrementalResult as IncrementalDeferResult).data = data ?? null;
137+
}
138+
139+
incrementalResult.path = incrementalDataRecord.path;
140+
if (incrementalDataRecord.label != null) {
141+
incrementalResult.label = incrementalDataRecord.label;
142+
}
143+
if (incrementalDataRecord.errors.length > 0) {
144+
incrementalResult.errors = incrementalDataRecord.errors;
145+
}
146+
incrementalResults.push(incrementalResult);
147+
}
148+
return incrementalResults;
149+
}
150+
151+
export function yieldSubsequentPayloads(
152+
subsequentPayloads: Set<IncrementalDataRecord>,
153+
): AsyncGenerator<SubsequentIncrementalExecutionResult, void, void> {
154+
let isDone = false;
155+
156+
async function next(): Promise<
157+
IteratorResult<SubsequentIncrementalExecutionResult, void>
158+
> {
159+
if (isDone) {
160+
return { value: undefined, done: true };
161+
}
162+
163+
await Promise.race(Array.from(subsequentPayloads).map((p) => p.promise));
164+
165+
if (isDone) {
166+
// a different call to next has exhausted all payloads
167+
return { value: undefined, done: true };
168+
}
169+
170+
const incremental = getCompletedIncrementalResults(subsequentPayloads);
171+
const hasNext = subsequentPayloads.size > 0;
172+
173+
if (!incremental.length && hasNext) {
174+
return next();
175+
}
176+
177+
if (!hasNext) {
178+
isDone = true;
179+
}
180+
181+
return {
182+
value: incremental.length ? { incremental, hasNext } : { hasNext },
183+
done: false,
184+
};
185+
}
186+
187+
function returnStreamIterators() {
188+
const promises: Array<Promise<IteratorResult<unknown>>> = [];
189+
subsequentPayloads.forEach((incrementalDataRecord) => {
190+
if (
191+
isStreamItemsRecord(incrementalDataRecord) &&
192+
incrementalDataRecord.asyncIterator?.return
193+
) {
194+
promises.push(incrementalDataRecord.asyncIterator.return());
195+
}
196+
});
197+
return Promise.all(promises);
198+
}
199+
200+
return {
201+
[Symbol.asyncIterator]() {
202+
return this;
203+
},
204+
next,
205+
async return(): Promise<
206+
IteratorResult<SubsequentIncrementalExecutionResult, void>
207+
> {
208+
await returnStreamIterators();
209+
isDone = true;
210+
return { value: undefined, done: true };
211+
},
212+
async throw(
213+
error?: unknown,
214+
): Promise<IteratorResult<SubsequentIncrementalExecutionResult, void>> {
215+
await returnStreamIterators();
216+
isDone = true;
217+
return Promise.reject(error);
218+
},
219+
};
220+
}
221+
222+
/** @internal */
223+
export class DeferredFragmentRecord {
224+
type: 'defer';
225+
errors: Array<GraphQLError>;
226+
label: string | undefined;
227+
path: Array<string | number>;
228+
promise: Promise<void>;
229+
data: ObjMap<unknown> | null;
230+
parentContext: IncrementalDataRecord | undefined;
231+
isCompleted: boolean;
232+
_subsequentPayloads: Set<IncrementalDataRecord>;
233+
_resolve?: (arg: PromiseOrValue<ObjMap<unknown> | null>) => void;
234+
constructor(opts: {
235+
label: string | undefined;
236+
path: Path | undefined;
237+
parentContext: IncrementalDataRecord | undefined;
238+
subsequentPayloads: Set<IncrementalDataRecord>;
239+
}) {
240+
this.type = 'defer';
241+
this.label = opts.label;
242+
this.path = pathToArray(opts.path);
243+
this.parentContext = opts.parentContext;
244+
this.errors = [];
245+
this._subsequentPayloads = opts.subsequentPayloads;
246+
this._subsequentPayloads.add(this);
247+
this.isCompleted = false;
248+
this.data = null;
249+
this.promise = new Promise<ObjMap<unknown> | null>((resolve) => {
250+
this._resolve = (promiseOrValue) => {
251+
resolve(promiseOrValue);
252+
};
253+
}).then((data) => {
254+
this.data = data;
255+
this.isCompleted = true;
256+
});
257+
}
258+
259+
addData(data: PromiseOrValue<ObjMap<unknown> | null>) {
260+
const parentData = this.parentContext?.promise;
261+
if (parentData) {
262+
this._resolve?.(parentData.then(() => data));
263+
return;
264+
}
265+
this._resolve?.(data);
266+
}
267+
}
268+
269+
/** @internal */
270+
export class StreamItemsRecord {
271+
type: 'stream';
272+
errors: Array<GraphQLError>;
273+
label: string | undefined;
274+
path: Array<string | number>;
275+
items: Array<unknown> | null;
276+
promise: Promise<void>;
277+
parentContext: IncrementalDataRecord | undefined;
278+
asyncIterator: AsyncIterator<unknown> | undefined;
279+
isCompletedAsyncIterator?: boolean;
280+
isCompleted: boolean;
281+
_subsequentPayloads: Set<IncrementalDataRecord>;
282+
_resolve?: (arg: PromiseOrValue<Array<unknown> | null>) => void;
283+
constructor(opts: {
284+
label: string | undefined;
285+
path: Path | undefined;
286+
asyncIterator?: AsyncIterator<unknown>;
287+
parentContext: IncrementalDataRecord | undefined;
288+
subsequentPayloads: Set<IncrementalDataRecord>;
289+
}) {
290+
this.type = 'stream';
291+
this.items = null;
292+
this.label = opts.label;
293+
this.path = pathToArray(opts.path);
294+
this.parentContext = opts.parentContext;
295+
this.asyncIterator = opts.asyncIterator;
296+
this.errors = [];
297+
this._subsequentPayloads = opts.subsequentPayloads;
298+
this._subsequentPayloads.add(this);
299+
this.isCompleted = false;
300+
this.items = null;
301+
this.promise = new Promise<Array<unknown> | null>((resolve) => {
302+
this._resolve = (promiseOrValue) => {
303+
resolve(promiseOrValue);
304+
};
305+
}).then((items) => {
306+
this.items = items;
307+
this.isCompleted = true;
308+
});
309+
}
310+
311+
addItems(items: PromiseOrValue<Array<unknown> | null>) {
312+
const parentData = this.parentContext?.promise;
313+
if (parentData) {
314+
this._resolve?.(parentData.then(() => items));
315+
return;
316+
}
317+
this._resolve?.(items);
318+
}
319+
320+
setIsCompletedAsyncIterator() {
321+
this.isCompletedAsyncIterator = true;
322+
}
323+
}
324+
325+
export type IncrementalDataRecord = DeferredFragmentRecord | StreamItemsRecord;
326+
327+
function isStreamItemsRecord(
328+
incrementalDataRecord: IncrementalDataRecord,
329+
): incrementalDataRecord is StreamItemsRecord {
330+
return incrementalDataRecord.type === 'stream';
331+
}

src/execution/__tests__/defer-test.ts

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,9 @@ import {
1616
import { GraphQLID, GraphQLString } from '../../type/scalars.js';
1717
import { GraphQLSchema } from '../../type/schema.js';
1818

19-
import type {
20-
InitialIncrementalExecutionResult,
21-
SubsequentIncrementalExecutionResult,
22-
} from '../execute.js';
19+
import type { InitialIncrementalExecutionResult } from '../execute.js';
2320
import { execute, experimentalExecuteIncrementally } from '../execute.js';
21+
import type { SubsequentIncrementalExecutionResult } from '../Publisher.js';
2422

2523
const friendType = new GraphQLObjectType({
2624
fields: {

src/execution/__tests__/stream-test.ts

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,9 @@ import {
1616
import { GraphQLID, GraphQLString } from '../../type/scalars.js';
1717
import { GraphQLSchema } from '../../type/schema.js';
1818

19-
import type {
20-
InitialIncrementalExecutionResult,
21-
SubsequentIncrementalExecutionResult,
22-
} from '../execute.js';
19+
import type { InitialIncrementalExecutionResult } from '../execute.js';
2320
import { experimentalExecuteIncrementally } from '../execute.js';
21+
import type { SubsequentIncrementalExecutionResult } from '../Publisher.js';
2422

2523
const friendType = new GraphQLObjectType({
2624
fields: {

0 commit comments

Comments
 (0)