Skip to content

Commit e8f1f9e

Browse files
committed
demonstrate new publisher
depends on #3784 The proposed new publisher does not use the event loop to manage AsyncRecord dependencies => and so if multiple items within a stream are released from publishing because their parent has just been published, they are all released at once. Another difference is that different sets are used to store the AsyncRecords that are pending vs ready for publishing, etc. This provides a performance benefit in that on a call to next, the set of all AsyncRecords is not inspected. As a side-effect of this change, the incremental array is ordered by which items are ready for delivery first, and not by the initial document. The subscribe algorithm used does not use Promise.race -- this may also be beneficial as the implementation of Promise.race within V8 has a known memory leak for long-running promises. (see https://bugs.chromium.org/p/v8/issues/detail?id=9858)
1 parent 26002c8 commit e8f1f9e

File tree

4 files changed

+166
-159
lines changed

4 files changed

+166
-159
lines changed

src/execution/__tests__/defer-test.ts

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -605,11 +605,6 @@ describe('Execute: defer directive', () => {
605605
data: { slowField: 'slow', friends: [{}, {}, {}] },
606606
path: ['hero'],
607607
},
608-
],
609-
hasNext: true,
610-
},
611-
{
612-
incremental: [
613608
{ data: { name: 'Han' }, path: ['hero', 'friends', 0] },
614609
{ data: { name: 'Leia' }, path: ['hero', 'friends', 1] },
615610
{ data: { name: 'C-3PO' }, path: ['hero', 'friends', 2] },
@@ -653,11 +648,6 @@ describe('Execute: defer directive', () => {
653648
},
654649
path: ['hero'],
655650
},
656-
],
657-
hasNext: true,
658-
},
659-
{
660-
incremental: [
661651
{ data: { name: 'Han' }, path: ['hero', 'friends', 0] },
662652
{ data: { name: 'Leia' }, path: ['hero', 'friends', 1] },
663653
{ data: { name: 'C-3PO' }, path: ['hero', 'friends', 2] },

src/execution/__tests__/stream-test.ts

Lines changed: 15 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -151,11 +151,10 @@ describe('Execute: stream directive', () => {
151151
hasNext: true,
152152
},
153153
{
154-
incremental: [{ items: ['banana'], path: ['scalarList', 1] }],
155-
hasNext: true,
156-
},
157-
{
158-
incremental: [{ items: ['coconut'], path: ['scalarList', 2] }],
154+
incremental: [
155+
{ items: ['banana'], path: ['scalarList', 1] },
156+
{ items: ['coconut'], path: ['scalarList', 2] },
157+
],
159158
hasNext: false,
160159
},
161160
]);
@@ -173,15 +172,11 @@ describe('Execute: stream directive', () => {
173172
hasNext: true,
174173
},
175174
{
176-
incremental: [{ items: ['apple'], path: ['scalarList', 0] }],
177-
hasNext: true,
178-
},
179-
{
180-
incremental: [{ items: ['banana'], path: ['scalarList', 1] }],
181-
hasNext: true,
182-
},
183-
{
184-
incremental: [{ items: ['coconut'], path: ['scalarList', 2] }],
175+
incremental: [
176+
{ items: ['apple'], path: ['scalarList', 0] },
177+
{ items: ['banana'], path: ['scalarList', 1] },
178+
{ items: ['coconut'], path: ['scalarList', 2] },
179+
],
185180
hasNext: false,
186181
},
187182
]);
@@ -230,11 +225,6 @@ describe('Execute: stream directive', () => {
230225
path: ['scalarList', 1],
231226
label: 'scalar-stream',
232227
},
233-
],
234-
hasNext: true,
235-
},
236-
{
237-
incremental: [
238228
{
239229
items: ['coconut'],
240230
path: ['scalarList', 2],
@@ -296,11 +286,6 @@ describe('Execute: stream directive', () => {
296286
items: [['banana', 'banana', 'banana']],
297287
path: ['scalarListList', 1],
298288
},
299-
],
300-
hasNext: true,
301-
},
302-
{
303-
incremental: [
304289
{
305290
items: [['coconut', 'coconut', 'coconut']],
306291
path: ['scalarListList', 2],
@@ -379,20 +364,10 @@ describe('Execute: stream directive', () => {
379364
items: [{ name: 'Luke', id: '1' }],
380365
path: ['friendList', 0],
381366
},
382-
],
383-
hasNext: true,
384-
},
385-
{
386-
incremental: [
387367
{
388368
items: [{ name: 'Han', id: '2' }],
389369
path: ['friendList', 1],
390370
},
391-
],
392-
hasNext: true,
393-
},
394-
{
395-
incremental: [
396371
{
397372
items: [{ name: 'Leia', id: '3' }],
398373
path: ['friendList', 2],
@@ -483,11 +458,6 @@ describe('Execute: stream directive', () => {
483458
},
484459
],
485460
},
486-
],
487-
hasNext: true,
488-
},
489-
{
490-
incremental: [
491461
{
492462
items: [{ name: 'Leia', id: '3' }],
493463
path: ['friendList', 2],
@@ -585,9 +555,6 @@ describe('Execute: stream directive', () => {
585555
path: ['friendList', 2],
586556
},
587557
],
588-
hasNext: true,
589-
},
590-
{
591558
hasNext: false,
592559
},
593560
]);
@@ -627,7 +594,7 @@ describe('Execute: stream directive', () => {
627594
}
628595
}
629596
`);
630-
const result = await completeAsync(document, 3, {
597+
const result = await completeAsync(document, 2, {
631598
async *friendList() {
632599
yield await Promise.resolve(friends[0]);
633600
yield await Promise.resolve(friends[1]);
@@ -656,10 +623,9 @@ describe('Execute: stream directive', () => {
656623
path: ['friendList', 2],
657624
},
658625
],
659-
hasNext: true,
626+
hasNext: false,
660627
},
661628
},
662-
{ done: false, value: { hasNext: false } },
663629
{ done: true, value: undefined },
664630
]);
665631
});
@@ -887,11 +853,6 @@ describe('Execute: stream directive', () => {
887853
},
888854
],
889855
},
890-
],
891-
hasNext: true,
892-
},
893-
{
894-
incremental: [
895856
{
896857
items: [{ nonNullName: 'Han' }],
897858
path: ['friendList', 2],
@@ -980,19 +941,11 @@ describe('Execute: stream directive', () => {
980941
},
981942
],
982943
},
983-
],
984-
hasNext: true,
985-
},
986-
{
987-
incremental: [
988944
{
989945
items: [{ nonNullName: 'Han' }],
990946
path: ['friendList', 2],
991947
},
992948
],
993-
hasNext: true,
994-
},
995-
{
996949
hasNext: false,
997950
},
998951
]);
@@ -1140,6 +1093,10 @@ describe('Execute: stream directive', () => {
11401093
},
11411094
{
11421095
incremental: [
1096+
{
1097+
items: [{ name: 'Luke' }],
1098+
path: ['nestedObject', 'nestedFriendList', 0],
1099+
},
11431100
{
11441101
data: { scalarField: null },
11451102
path: ['otherNestedObject'],
@@ -1151,10 +1108,6 @@ describe('Execute: stream directive', () => {
11511108
},
11521109
],
11531110
},
1154-
{
1155-
items: [{ name: 'Luke' }],
1156-
path: ['nestedObject', 'nestedFriendList', 0],
1157-
},
11581111
],
11591112
hasNext: false,
11601113
},
@@ -1258,9 +1211,6 @@ describe('Execute: stream directive', () => {
12581211
],
12591212
},
12601213
],
1261-
hasNext: true,
1262-
},
1263-
{
12641214
hasNext: false,
12651215
},
12661216
]);
@@ -1407,9 +1357,6 @@ describe('Execute: stream directive', () => {
14071357
path: ['friendList', 2],
14081358
},
14091359
],
1410-
hasNext: true,
1411-
},
1412-
{
14131360
hasNext: false,
14141361
},
14151362
]);
@@ -1463,28 +1410,10 @@ describe('Execute: stream directive', () => {
14631410
data: { scalarField: 'slow', nestedFriendList: [] },
14641411
path: ['nestedObject'],
14651412
},
1466-
],
1467-
hasNext: true,
1468-
},
1469-
done: false,
1470-
});
1471-
const result3 = await iterator.next();
1472-
expectJSON(result3).toDeepEqual({
1473-
value: {
1474-
incremental: [
14751413
{
14761414
items: [{ name: 'Luke' }],
14771415
path: ['nestedObject', 'nestedFriendList', 0],
14781416
},
1479-
],
1480-
hasNext: true,
1481-
},
1482-
done: false,
1483-
});
1484-
const result4 = await iterator.next();
1485-
expectJSON(result4).toDeepEqual({
1486-
value: {
1487-
incremental: [
14881417
{
14891418
items: [{ name: 'Han' }],
14901419
path: ['nestedObject', 'nestedFriendList', 1],

src/execution/execute.ts

Lines changed: 28 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -2150,12 +2150,13 @@ class DeferredFragmentRecord {
21502150
errors: Array<GraphQLError>;
21512151
label: string | undefined;
21522152
path: Array<string | number>;
2153-
promise: Promise<void>;
21542153
data: ObjMap<unknown> | null;
21552154
parentContext: AsyncPayloadRecord | undefined;
2156-
isCompleted: boolean;
2157-
_exeContext: ExecutionContext;
2158-
_resolve?: (arg: PromiseOrValue<ObjMap<unknown> | null>) => void;
2155+
_publisher: Publisher<
2156+
AsyncPayloadRecord,
2157+
IncrementalResult,
2158+
SubsequentIncrementalExecutionResult
2159+
>;
21592160
constructor(opts: {
21602161
label: string | undefined;
21612162
path: Path | undefined;
@@ -2167,27 +2168,21 @@ class DeferredFragmentRecord {
21672168
this.path = pathToArray(opts.path);
21682169
this.parentContext = opts.parentContext;
21692170
this.errors = [];
2170-
this._exeContext = opts.exeContext;
2171-
this._exeContext.publisher.add(this);
2172-
this.isCompleted = false;
2171+
this._publisher = opts.exeContext.publisher;
2172+
this._publisher.add(this);
21732173
this.data = null;
2174-
this.promise = new Promise<ObjMap<unknown> | null>((resolve) => {
2175-
this._resolve = (promiseOrValue) => {
2176-
resolve(promiseOrValue);
2177-
};
2178-
}).then((data) => {
2179-
this.data = data;
2180-
this.isCompleted = true;
2181-
});
21822174
}
21832175

21842176
addData(data: PromiseOrValue<ObjMap<unknown> | null>) {
2185-
const parentData = this.parentContext?.promise;
2186-
if (parentData) {
2187-
this._resolve?.(parentData.then(() => data));
2177+
if (isPromise(data)) {
2178+
data.then((resolved) => {
2179+
this.data = resolved;
2180+
this._publisher.complete(this);
2181+
});
21882182
return;
21892183
}
2190-
this._resolve?.(data);
2184+
this.data = data;
2185+
this._publisher.complete(this);
21912186
}
21922187
}
21932188

@@ -2197,13 +2192,14 @@ class StreamRecord {
21972192
label: string | undefined;
21982193
path: Array<string | number>;
21992194
items: Array<unknown> | null;
2200-
promise: Promise<void>;
22012195
parentContext: AsyncPayloadRecord | undefined;
22022196
iterator: AsyncIterator<unknown> | undefined;
22032197
isCompletedIterator?: boolean;
2204-
isCompleted: boolean;
2205-
_exeContext: ExecutionContext;
2206-
_resolve?: (arg: PromiseOrValue<Array<unknown> | null>) => void;
2198+
_publisher: Publisher<
2199+
AsyncPayloadRecord,
2200+
IncrementalResult,
2201+
SubsequentIncrementalExecutionResult
2202+
>;
22072203
constructor(opts: {
22082204
label: string | undefined;
22092205
path: Path | undefined;
@@ -2218,27 +2214,21 @@ class StreamRecord {
22182214
this.parentContext = opts.parentContext;
22192215
this.iterator = opts.iterator;
22202216
this.errors = [];
2221-
this._exeContext = opts.exeContext;
2222-
this._exeContext.publisher.add(this);
2223-
this.isCompleted = false;
2217+
this._publisher = opts.exeContext.publisher;
2218+
this._publisher.add(this);
22242219
this.items = null;
2225-
this.promise = new Promise<Array<unknown> | null>((resolve) => {
2226-
this._resolve = (promiseOrValue) => {
2227-
resolve(promiseOrValue);
2228-
};
2229-
}).then((items) => {
2230-
this.items = items;
2231-
this.isCompleted = true;
2232-
});
22332220
}
22342221

22352222
addItems(items: PromiseOrValue<Array<unknown> | null>) {
2236-
const parentData = this.parentContext?.promise;
2237-
if (parentData) {
2238-
this._resolve?.(parentData.then(() => items));
2223+
if (isPromise(items)) {
2224+
items.then((resolved) => {
2225+
this.items = resolved;
2226+
this._publisher.complete(this);
2227+
});
22392228
return;
22402229
}
2241-
this._resolve?.(items);
2230+
this.items = items;
2231+
this._publisher.complete(this);
22422232
}
22432233

22442234
setIsCompletedIterator() {

0 commit comments

Comments
 (0)