Skip to content

Commit a3e9983

Browse files
noahsilasacinader
andauthored
Add query.eachBatch() to Parse.Query API (#1114)
When a query has a large enough result set that we don't want to materialize it at once, the SDK provides `query.each()`, which yields each matching object to a processor, one at a time. This is a handy tool, but if the number of records to process is really so large, we also can take advantage of batching the processing. Compare the following operations: ``` // Processing N items involves N calls to Parse Server new Parse.Query('Item').each((item) => { item.set('foo', 'bar'); return item.save(); }) // Processing N items involves ceil(N / batchSize) calls to Parse Server const batchSize = 200; new Parse.Query('Item').eachBatch((items) => { items.forEach(item => item.set('foo', 'bar')); return Parse.Object.saveAll(items, { batchSize }); }, { batchSize }); ``` The `.each()` method is already written to do fetch the objects in batches; we effectively are splitting it out into two: - `.eachBatch()` does the work to fetch objects in batches and yield each batch - `.each()` calls `.eachBatch()` and handles invoking the callback for every item in the batch Aside: I considered adding the undocumented `batchSize` attribute already accepted by `.each()`, `.filter()`, `.map()` and `.reduce()` to the public API, but I suspect that at the time that you are performance sensitive enough to tune that parameter you are better served by switching to `eachBatch()`; the current implementation of `.each()` is to construct a promise chain with a node for every value in the batch, and my experience with very long promise chains has been a bit frustrating. Co-authored-by: Arthur Cinader <700572+acinader@users.noreply.github.com>
1 parent bc9d6af commit a3e9983

File tree

2 files changed

+166
-16
lines changed

2 files changed

+166
-16
lines changed

src/ParseQuery.js

Lines changed: 38 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -856,15 +856,16 @@ class ParseQuery {
856856
}
857857

858858
/**
859-
* Iterates over each result of a query, calling a callback for each one. If
860-
* the callback returns a promise, the iteration will not continue until
859+
* Iterates over objects matching a query, calling a callback for each batch.
860+
* If the callback returns a promise, the iteration will not continue until
861861
* that promise has been fulfilled. If the callback returns a rejected
862-
* promise, then iteration will stop with that error. The items are
863-
* processed in an unspecified order. The query may not have any sort order,
864-
* and may not use limit or skip.
862+
* promise, then iteration will stop with that error. The items are processed
863+
* in an unspecified order. The query may not have any sort order, and may
864+
* not use limit or skip.
865865
* @param {Function} callback Callback that will be called with each result
866866
* of the query.
867867
* @param {Object} options Valid options are:<ul>
868+
* <li>batchSize: How many objects to yield in each batch (default: 100)
868869
* <li>useMasterKey: In Cloud Code and Node only, causes the Master Key to
869870
* be used for this request.
870871
* <li>sessionToken: A valid session token, used for making a request on
@@ -873,7 +874,7 @@ class ParseQuery {
873874
* @return {Promise} A promise that will be fulfilled once the
874875
* iteration has completed.
875876
*/
876-
each(callback: (obj: ParseObject) => any, options?: BatchOptions): Promise<Array<ParseObject>> {
877+
eachBatch(callback: (objs: Array<ParseObject>) => Promise<*>, options?: BatchOptions): Promise<void> {
877878
options = options || {};
878879

879880
if (this._order || this._skip || (this._limit >= 0)) {
@@ -882,8 +883,6 @@ class ParseQuery {
882883
}
883884

884885
const query = new ParseQuery(this.className);
885-
// We can override the batch size from the options.
886-
// This is undocumented, but useful for testing.
887886
query._limit = options.batchSize || 100;
888887
query._include = this._include.map((i) => {
889888
return i;
@@ -927,14 +926,7 @@ class ParseQuery {
927926
return !finished;
928927
}, () => {
929928
return query.find(findOptions).then((results) => {
930-
let callbacksDone = Promise.resolve();
931-
results.forEach((result) => {
932-
callbacksDone = callbacksDone.then(() => {
933-
return callback(result);
934-
});
935-
});
936-
937-
return callbacksDone.then(() => {
929+
return Promise.resolve(callback(results)).then(() => {
938930
if (results.length >= query._limit) {
939931
query.greaterThan('objectId', results[results.length - 1].id);
940932
} else {
@@ -945,6 +937,36 @@ class ParseQuery {
945937
});
946938
}
947939

940+
/**
941+
* Iterates over each result of a query, calling a callback for each one. If
942+
* the callback returns a promise, the iteration will not continue until
943+
* that promise has been fulfilled. If the callback returns a rejected
944+
* promise, then iteration will stop with that error. The items are
945+
* processed in an unspecified order. The query may not have any sort order,
946+
* and may not use limit or skip.
947+
* @param {Function} callback Callback that will be called with each result
948+
* of the query.
949+
* @param {Object} options Valid options are:<ul>
950+
* <li>useMasterKey: In Cloud Code and Node only, causes the Master Key to
951+
* be used for this request.
952+
* <li>sessionToken: A valid session token, used for making a request on
953+
* behalf of a specific user.
954+
* </ul>
955+
* @return {Promise} A promise that will be fulfilled once the
956+
* iteration has completed.
957+
*/
958+
each(callback: (obj: ParseObject) => any, options?: BatchOptions): Promise<void> {
959+
return this.eachBatch((results) => {
960+
let callbacksDone = Promise.resolve();
961+
results.forEach((result) => {
962+
callbacksDone = callbacksDone.then(() => {
963+
return callback(result);
964+
});
965+
});
966+
return callbacksDone;
967+
}, options);
968+
}
969+
948970
/**
949971
* Adds a hint to force index selection. (https://docs.mongodb.com/manual/reference/operator/meta/hint/)
950972
*

src/__tests__/ParseQuery-test.js

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1503,6 +1503,134 @@ describe('ParseQuery', () => {
15031503
});
15041504
});
15051505

1506+
describe('iterating over batches with .eachBatch()', () => {
1507+
let findMock;
1508+
beforeEach(() => {
1509+
findMock = jest.fn();
1510+
findMock.mockReturnValueOnce(Promise.resolve({
1511+
results: [
1512+
{ objectId: 'I55', size: 'medium', name: 'Product 55' },
1513+
{ objectId: 'I89', size: 'small', name: 'Product 89' },
1514+
]
1515+
}));
1516+
findMock.mockReturnValueOnce(Promise.resolve({
1517+
results: [
1518+
{ objectId: 'I91', size: 'small', name: 'Product 91' },
1519+
]
1520+
}));
1521+
CoreManager.setQueryController({
1522+
aggregate() {},
1523+
find: findMock,
1524+
});
1525+
});
1526+
1527+
it('passes query attributes through to the REST API', async () => {
1528+
const q = new ParseQuery('Item');
1529+
q.containedIn('size', ['small', 'medium']);
1530+
q.matchesKeyInQuery(
1531+
'name',
1532+
'productName',
1533+
new ParseQuery('Review').equalTo('stars', 5)
1534+
);
1535+
q.equalTo('valid', true);
1536+
q.select('size', 'name');
1537+
q.includeAll();
1538+
q.hint('_id_');
1539+
1540+
await q.eachBatch(() => {});
1541+
1542+
expect(findMock).toHaveBeenCalledTimes(1);
1543+
const [className, params, options] = findMock.mock.calls[0];
1544+
expect(className).toBe('Item')
1545+
expect(params).toEqual({
1546+
limit: 100,
1547+
order: 'objectId',
1548+
keys: 'size,name',
1549+
include: '*',
1550+
hint: '_id_',
1551+
where: {
1552+
size: {
1553+
$in: ['small', 'medium']
1554+
},
1555+
name: {
1556+
$select: {
1557+
key: 'productName',
1558+
query: {
1559+
className: 'Review',
1560+
where: {
1561+
stars: 5
1562+
}
1563+
}
1564+
}
1565+
},
1566+
valid: true
1567+
}
1568+
});
1569+
expect(options.requestTask).toBeDefined();
1570+
});
1571+
1572+
it('passes options through to the REST API', async () => {
1573+
const batchOptions = {
1574+
useMasterKey: true,
1575+
sessionToken: '1234',
1576+
batchSize: 50,
1577+
};
1578+
const q = new ParseQuery('Item');
1579+
await q.eachBatch(() => {}, batchOptions);
1580+
expect(findMock).toHaveBeenCalledTimes(1);
1581+
const [className, params, options] = findMock.mock.calls[0];
1582+
expect(className).toBe('Item');
1583+
expect(params).toEqual({
1584+
limit: 50,
1585+
order: 'objectId',
1586+
where: {},
1587+
});
1588+
expect(options.useMasterKey).toBe(true);
1589+
expect(options.sessionToken).toEqual('1234');
1590+
});
1591+
1592+
it('only makes one request when the results fit in one page', async () => {
1593+
const q = new ParseQuery('Item');
1594+
await q.eachBatch(() => {});
1595+
expect(findMock).toHaveBeenCalledTimes(1);
1596+
});
1597+
1598+
it('makes more requests when the results do not fit in one page', async () => {
1599+
const q = new ParseQuery('Item');
1600+
await q.eachBatch(() => {}, { batchSize: 2 });
1601+
expect(findMock).toHaveBeenCalledTimes(2);
1602+
})
1603+
1604+
it('stops iteration when the callback returns a promise that rejects', async () => {
1605+
let callCount = 0;
1606+
const callback = () => {
1607+
callCount++;
1608+
return Promise.reject(new Error('Callback rejecting'));
1609+
};
1610+
const q = new ParseQuery('Item');
1611+
await q.eachBatch(callback, { batchSize: 2 }).catch(() => {});
1612+
expect(callCount).toBe(1);
1613+
});
1614+
1615+
it('handles a synchronous callback', async () => {
1616+
const results = [];
1617+
const q = new ParseQuery('Item');
1618+
await q.eachBatch((items) => {
1619+
items.map(item => results.push(item.attributes.size))
1620+
});
1621+
expect(results).toEqual(['medium', 'small']);
1622+
});
1623+
1624+
it('handles an asynchronous callback', async () => {
1625+
const results = [];
1626+
const q = new ParseQuery('Item');
1627+
await q.eachBatch((items) => {
1628+
items.map(item => results.push(item.attributes.size))
1629+
return new Promise(resolve => setImmediate(resolve));
1630+
});
1631+
expect(results).toEqual(['medium', 'small']);
1632+
});
1633+
});
15061634

15071635
it('can iterate over results with each()', (done) => {
15081636
CoreManager.setQueryController({

0 commit comments

Comments
 (0)