Skip to content

Commit b292fa4

Browse files
Merge pull request #88 from shiftcode/#83-improve-batch-requests
#83 improve batch requests
2 parents 24017b9 + 36655ce commit b292fa4

17 files changed

+533
-401
lines changed

src/dynamo/batchget/batch-get-full.response.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ export interface BatchGetFullResponse {
66
/**
77
* A map of table name to a list of items. Each object in Responses consists of a table name, along with a map of attribute data consisting of the data type and attribute value.
88
*/
9-
Responses?: BatchGetResponse
9+
Responses: BatchGetResponse
1010
/**
1111
* A map of tables and their respective keys that were not processed with the current response. The UnprocessedKeys value is in the same form as RequestItems, so the value can be provided directly to a subsequent BatchGetItem operation. For more information, see RequestItems in the Request Parameters section. Each element consists of: Keys - An array of primary key attribute values that define specific items in the table. ProjectionExpression - One or more attributes to be retrieved from the table or index. By default, all attributes are returned. If a requested attribute is not found, it does not appear in the result. ConsistentRead - The consistency of a read operation. If set to true, then a strongly consistent read is used; otherwise, an eventually consistent read is used. If there are no unprocessed keys remaining, the response contains an empty UnprocessedKeys map.
1212
*/
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
import { DynamoDB } from 'aws-sdk'
2+
import { of } from 'rxjs'
3+
import { DynamoRx } from '../dynamo-rx'
4+
import { batchGetItemsFetchAll, combineBatchGetResponses, hasUnprocessedKeys } from './batch-get-utils'
5+
6+
describe('batch-get utils', () => {
7+
8+
describe('hasUnprocessedKeys', () => {
9+
it('should return bool according to given object', () => {
10+
expect(hasUnprocessedKeys({})).toBeFalsy()
11+
expect(hasUnprocessedKeys({ Responses: {} })).toBeFalsy()
12+
expect(hasUnprocessedKeys({ UnprocessedKeys: {} })).toBeFalsy()
13+
expect(hasUnprocessedKeys({ UnprocessedKeys: { 'aTableName': { Keys: [] } } })).toBeFalsy()
14+
expect(hasUnprocessedKeys({ UnprocessedKeys: { 'aTableName': { Keys: [{ id: { S: 'id' } }] } } })).toBeTruthy()
15+
})
16+
})
17+
18+
describe('combineBatchGetResponses', () => {
19+
const resp1: DynamoDB.BatchGetItemOutput = {
20+
Responses: {
21+
'tableA': [
22+
{ id: { S: 'id-a1' } },
23+
],
24+
'tableB': [
25+
{ id: { S: 'id-b' } },
26+
],
27+
},
28+
UnprocessedKeys: {
29+
'tableA:': { Keys: [{ id: { S: 'id-a2' } }] },
30+
'tableC:': { Keys: [{ id: { S: 'id-c' } }] },
31+
'tableD:': { Keys: [{ id: { S: 'id-d' } }] },
32+
},
33+
}
34+
const resp2: DynamoDB.BatchGetItemOutput = {
35+
Responses: {
36+
'tableA': [
37+
{ id: { S: 'id-a2' } },
38+
],
39+
'tableC': [
40+
{ id: { S: 'id-c' } },
41+
],
42+
},
43+
UnprocessedKeys: {
44+
'tableD:': { Keys: [{ id: { S: 'id-d' } }] },
45+
},
46+
}
47+
const expectedOutput: DynamoDB.BatchGetItemOutput = {
48+
Responses: {
49+
'tableA': [
50+
{ id: { S: 'id-a1' } },
51+
{ id: { S: 'id-a2' } },
52+
],
53+
'tableB': [
54+
{ id: { S: 'id-b' } },
55+
],
56+
'tableC': [
57+
{ id: { S: 'id-c' } },
58+
],
59+
},
60+
UnprocessedKeys: {
61+
'tableD:': { Keys: [{ id: { S: 'id-d' } }] },
62+
},
63+
}
64+
it('should combine correctly', () => {
65+
expect(combineBatchGetResponses(resp1)(resp2)).toEqual(expectedOutput)
66+
})
67+
68+
})
69+
70+
describe('batchGetItemsFetchAll', () => {
71+
let batchGetItemsSpy: jasmine.Spy
72+
let dynamoRx: DynamoRx
73+
let backoffTimerMock: { next: jasmine.Spy }
74+
75+
const output1: DynamoDB.BatchGetItemOutput = {
76+
Responses: {
77+
'tableA': [{ id: { S: 'id-A' } }],
78+
},
79+
UnprocessedKeys: {
80+
'tableA': {
81+
Keys: [{ id: { S: 'id-A' } }],
82+
},
83+
},
84+
}
85+
const output2: DynamoDB.BatchGetItemOutput = {
86+
Responses: {
87+
'tableA': [{ id: { S: 'id-A' } }],
88+
},
89+
}
90+
91+
beforeEach(async () => {
92+
batchGetItemsSpy = jasmine.createSpy().and.returnValues(of(output1), of(output2))
93+
dynamoRx = <any>{ batchGetItems: batchGetItemsSpy }
94+
backoffTimerMock = { next: jasmine.createSpy().and.returnValue({ value: 0 }) }
95+
96+
await batchGetItemsFetchAll(
97+
dynamoRx,
98+
<any>{},
99+
<IterableIterator<number>><any>backoffTimerMock,
100+
0,
101+
).toPromise()
102+
})
103+
104+
105+
it('should use UnprocessedKeys for next request', () => {
106+
expect(batchGetItemsSpy).toHaveBeenCalledTimes(2)
107+
expect(batchGetItemsSpy.calls.mostRecent().args[0]).toBeDefined()
108+
expect(batchGetItemsSpy.calls.mostRecent().args[0].RequestItems).toBeDefined()
109+
expect(batchGetItemsSpy.calls.mostRecent().args[0].RequestItems).toEqual(output1.UnprocessedKeys)
110+
})
111+
112+
it('should backoff when UnprocessedItems', () => {
113+
expect(backoffTimerMock.next).toHaveBeenCalledTimes(1)
114+
})
115+
116+
})
117+
118+
})
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
import { DynamoDB } from 'aws-sdk'
2+
import { BatchGetRequestMap } from 'aws-sdk/clients/dynamodb'
3+
import { Observable, of } from 'rxjs'
4+
import { delay, map, mergeMap } from 'rxjs/operators'
5+
import { DynamoRx } from '../dynamo-rx'
6+
7+
export function batchGetItemsFetchAll(
8+
dynamoRx: DynamoRx,
9+
params: DynamoDB.BatchGetItemInput,
10+
backoffTimer: IterableIterator<number>,
11+
throttleTimeSlot: number,
12+
): Observable<DynamoDB.BatchGetItemOutput> {
13+
return dynamoRx.batchGetItems(params)
14+
.pipe(
15+
mergeMap(response => {
16+
if (hasUnprocessedKeys(response)) {
17+
return of(response.UnprocessedKeys)
18+
.pipe(
19+
delay(backoffTimer.next().value * throttleTimeSlot),
20+
mergeMap((UnprocessedKeys: DynamoDB.BatchGetRequestMap) => {
21+
const nextParams = { ...params, RequestItems: UnprocessedKeys }
22+
return batchGetItemsFetchAll(dynamoRx, nextParams, backoffTimer, throttleTimeSlot)
23+
}),
24+
map(combineBatchGetResponses(response)),
25+
)
26+
}
27+
return of(response)
28+
}),
29+
)
30+
}
31+
32+
export type ResponseWithUnprocessedKeys = DynamoDB.BatchGetItemOutput & { UnprocessedKeys: BatchGetRequestMap }
33+
34+
export function hasUnprocessedKeys(response: DynamoDB.BatchGetItemOutput): response is ResponseWithUnprocessedKeys {
35+
if (!response.UnprocessedKeys) {
36+
return false
37+
}
38+
return Object.values(response.UnprocessedKeys)
39+
.some(t => !!t && t.Keys && t.Keys.length > 0)
40+
}
41+
42+
/**
43+
* combines a first with a second response. ConsumedCapacity is always from the latter.
44+
* @param response1
45+
*/
46+
export function combineBatchGetResponses(response1: DynamoDB.BatchGetItemOutput) {
47+
return (response2: DynamoDB.BatchGetItemOutput): DynamoDB.BatchGetItemOutput => {
48+
const tableNames: string[] = Object.keys(response1.Responses || {})
49+
50+
Object.keys(response2.Responses || {})
51+
.filter(tn => !tableNames.includes(tn))
52+
.forEach(tn => tableNames.push(tn))
53+
54+
const Responses = tableNames
55+
.reduce((u, tableName) => ({
56+
...u,
57+
[tableName]: [
58+
...(response1.Responses && response1.Responses[tableName] || []),
59+
...(response2.Responses && response2.Responses[tableName] || []),
60+
],
61+
}), {})
62+
return {
63+
...response2,
64+
Responses,
65+
}
66+
}
67+
}

src/dynamo/batchget/batch-get.request.spec.ts

Lines changed: 126 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,15 @@
1+
// tslint:disable:no-non-null-assertion
12
import * as DynamoDB from 'aws-sdk/clients/dynamodb'
23
import { of } from 'rxjs'
34
import { getTableName } from '../../../test/helper'
4-
import { SimpleWithCompositePartitionKeyModel, SimpleWithPartitionKeyModel } from '../../../test/models'
5-
import { Organization } from '../../../test/models/organization.model'
6-
import { Attributes } from '../../mapper'
5+
import { Organization, SimpleWithCompositePartitionKeyModel, SimpleWithPartitionKeyModel } from '../../../test/models'
6+
import { Attributes, toDb } from '../../mapper'
77
import { DynamoRx } from '../dynamo-rx'
88
import { BatchGetRequest } from './batch-get.request'
99

1010
describe('batch get', () => {
1111
let request: BatchGetRequest
1212

13-
1413
describe('params', () => {
1514

1615
beforeEach(() => request = new BatchGetRequest())
@@ -21,39 +20,150 @@ describe('batch get', () => {
2120
})
2221

2322
it('key', () => {
24-
request.forModel(Organization, ['idValue'])
23+
const o: Partial<Organization> = {
24+
id: 'idValue',
25+
createdAtDate: new Date(),
26+
}
27+
request.forModel(Organization, [o])
2528
const params = request.params
2629
expect(params.RequestItems).toBeDefined()
2730
expect(params.RequestItems.Organization).toBeDefined()
28-
expect(params.RequestItems.Organization).toEqual({ Keys: [{ id: { S: 'idValue' } }] })
31+
expect(params.RequestItems.Organization.Keys).toBeDefined()
32+
expect(params.RequestItems.Organization.Keys).toEqual([{
33+
id: { S: 'idValue' },
34+
createdAtDate: { S: o.createdAtDate!.toISOString() },
35+
}])
2936
})
3037
})
3138

32-
3339
describe('forModel', () => {
3440
beforeEach(() => request = new BatchGetRequest())
3541

3642
it('should throw when same table is used 2 times', () => {
37-
request.forModel(SimpleWithPartitionKeyModel, ['idVal'])
38-
expect(() => request.forModel(SimpleWithPartitionKeyModel, ['otherVal'])).toThrow()
43+
request.forModel(SimpleWithPartitionKeyModel, [{ id: 'idVal' }])
44+
expect(() => request.forModel(SimpleWithPartitionKeyModel, [{ id: 'otherVal' }])).toThrow()
45+
})
46+
47+
it('should throw when sortKey is missing but necessary', () => {
48+
expect(() => request.forModel(SimpleWithCompositePartitionKeyModel, [{ id: 'idVal' }]))
49+
})
50+
51+
it('should throw when modelClazz is not @Model decorated', () => {
52+
class X {id: string}
53+
54+
expect(() => request.forModel(X, [{ id: 'ok' }])).toThrow()
3955
})
4056

4157
it('should throw when providing null value ', () => {
4258
expect(() => request.forModel(SimpleWithPartitionKeyModel, [<any>null])).toThrow()
4359
})
4460

45-
it('should throw when sortKey is missing', () => {
46-
expect(() => request.forModel(SimpleWithCompositePartitionKeyModel, [{ partitionKey: 'idVal' }]))
61+
it('should allow ConsistentRead', () => {
62+
request.forModel(SimpleWithPartitionKeyModel, [{ id: 'myId' }], true)
63+
expect(request.params).toBeDefined()
64+
expect(request.params.RequestItems).toBeDefined()
65+
const keysOfTable = request.params.RequestItems[getTableName(SimpleWithPartitionKeyModel)]
66+
expect(keysOfTable).toBeDefined()
67+
expect(keysOfTable.ConsistentRead).toBeTruthy()
4768
})
4869

49-
it('should throw when partitionKey is neither string nor object', () => {
50-
expect(() => request.forModel(SimpleWithCompositePartitionKeyModel, [<any>78]))
51-
expect(() => request.forModel(SimpleWithCompositePartitionKeyModel, [<any>true]))
52-
expect(() => request.forModel(SimpleWithCompositePartitionKeyModel, [<any>new Date()]))
70+
it('should throw when more than 100 items are added', () => {
71+
const items55: Array<Partial<SimpleWithPartitionKeyModel>> = new Array(55)
72+
.map((x, i) => ({ id: `id-${i}` }))
73+
const items60: Array<Partial<Organization>> = new Array(60)
74+
.map((x, i) => ({ id: `id-${i}`, createdAtDate: new Date() }))
75+
76+
// at once
77+
expect(() => request.forModel(SimpleWithPartitionKeyModel, [...items55, ...items55])).toThrow()
78+
79+
// in two steps
80+
expect(() => {
81+
request.forModel(SimpleWithPartitionKeyModel, items55)
82+
request.forModel(Organization, items60)
83+
}).toThrow()
5384
})
5485

5586
})
5687

88+
describe('execNoMap, execFullResponse, exec', () => {
89+
const jsItem1: SimpleWithPartitionKeyModel = { id: 'id-1', age: 21 }
90+
const jsItem2: SimpleWithPartitionKeyModel = { id: 'id-2', age: 22 }
91+
92+
const output1: DynamoDB.BatchGetItemOutput = {
93+
Responses: {
94+
[getTableName(SimpleWithPartitionKeyModel)]: [toDb(jsItem1, SimpleWithPartitionKeyModel)],
95+
},
96+
UnprocessedKeys: {
97+
[getTableName(SimpleWithPartitionKeyModel)]: {
98+
Keys: [toDb(jsItem1, SimpleWithPartitionKeyModel)],
99+
},
100+
},
101+
}
102+
const output2: DynamoDB.BatchGetItemOutput = {
103+
Responses: {
104+
[getTableName(SimpleWithPartitionKeyModel)]: [toDb(jsItem2, SimpleWithPartitionKeyModel)],
105+
},
106+
}
107+
108+
let batchGetItemsSpy: jasmine.Spy
109+
let nextSpyFn: () => { value: number }
110+
111+
const generatorMock = () => <any>{ next: nextSpyFn }
112+
113+
beforeEach(() => {
114+
request = new BatchGetRequest()
115+
request.forModel(SimpleWithPartitionKeyModel, [jsItem1, jsItem2])
116+
117+
batchGetItemsSpy = jasmine.createSpy().and.returnValues(of(output1), of(output2))
118+
const dynamoRx: DynamoRx = <any>{ batchGetItems: batchGetItemsSpy }
119+
120+
Object.assign(request, { dynamoRx })
121+
122+
nextSpyFn = jest.fn().mockImplementation(() => ({ value: 0 }))
123+
})
124+
125+
it('[execNoMap] should backoff and retry when UnprocessedItems are returned', async () => {
126+
const result = await request.execNoMap(generatorMock).toPromise()
127+
expect(nextSpyFn).toHaveBeenCalledTimes(1)
128+
expect(batchGetItemsSpy).toHaveBeenCalledTimes(2)
129+
expect(result).toBeDefined()
130+
expect(result.Responses).toBeDefined()
131+
132+
const resultItems = result.Responses![getTableName(SimpleWithPartitionKeyModel)]
133+
expect(resultItems).toBeDefined()
134+
expect(resultItems.length).toBe(2)
135+
expect(resultItems[0]).toEqual(toDb(jsItem1, SimpleWithPartitionKeyModel))
136+
expect(resultItems[1]).toEqual(toDb(jsItem2, SimpleWithPartitionKeyModel))
137+
})
138+
139+
it('[execFullResponse] should backoff and retry when UnprocessedItems are returned', async () => {
140+
const result = await request.execFullResponse(generatorMock).toPromise()
141+
expect(nextSpyFn).toHaveBeenCalledTimes(1)
142+
expect(batchGetItemsSpy).toHaveBeenCalledTimes(2)
143+
expect(result).toBeDefined()
144+
expect(result.Responses).toBeDefined()
145+
146+
const resultItems = result.Responses![getTableName(SimpleWithPartitionKeyModel)]
147+
expect(resultItems).toBeDefined()
148+
expect(resultItems.length).toBe(2)
149+
expect(resultItems[0]).toEqual(jsItem1)
150+
expect(resultItems[1]).toEqual(jsItem2)
151+
})
152+
153+
it('[exec] should backoff and retry when UnprocessedItems are returned', async () => {
154+
const result = await request.exec(generatorMock).toPromise()
155+
expect(nextSpyFn).toHaveBeenCalledTimes(1)
156+
expect(batchGetItemsSpy).toHaveBeenCalledTimes(2)
157+
expect(result).toBeDefined()
158+
159+
const resultItems = result[getTableName(SimpleWithPartitionKeyModel)]
160+
expect(resultItems).toBeDefined()
161+
expect(resultItems.length).toBe(2)
162+
expect(resultItems[0]).toEqual(jsItem1)
163+
expect(resultItems[1]).toEqual(jsItem2)
164+
})
165+
166+
})
57167

58168
describe('should map the result items', () => {
59169
let batchGetItemsSpy: jasmine.Spy
@@ -74,7 +184,7 @@ describe('batch get', () => {
74184
const dynamoRx: DynamoRx = <any>{ batchGetItems: batchGetItemsSpy }
75185
request = new BatchGetRequest()
76186
Object.assign(request, { dynamoRx })
77-
request.forModel(SimpleWithPartitionKeyModel, ['idVal'])
187+
request.forModel(SimpleWithPartitionKeyModel, [{ id: 'idVal' }])
78188
})
79189

80190
it('exec', async () => {
@@ -93,5 +203,4 @@ describe('batch get', () => {
93203
})
94204

95205
})
96-
97206
})

0 commit comments

Comments
 (0)