Skip to content

Commit c3cb9ae

Browse files
committed
#282 - Patching documents in parallel to change their folder makes some disappear
1 parent 3e0ecbf commit c3cb9ae

File tree

12 files changed

+654
-288
lines changed

12 files changed

+654
-288
lines changed

aws-dynamodb/src/main/java/com/formkiq/aws/dynamodb/DynamoDbService.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,4 +264,25 @@ Map<String, AttributeValue> updateItem(AttributeValue pk, AttributeValue sk,
264264
*/
265265
Map<String, AttributeValue> updateValues(AttributeValue pk, AttributeValue sk,
266266
Map<String, AttributeValue> updateValues);
267+
268+
/**
269+
* Aquire Lock.
270+
*
271+
* @param pk {@link AttributeValue}
272+
* @param sk {@link AttributeValue}
273+
* @param aquireLockTimeoutInMs long
274+
* @param lockExpirationInMs long
275+
* @return boolean
276+
*/
277+
boolean acquireLock(AttributeValue pk, AttributeValue sk, long aquireLockTimeoutInMs,
278+
long lockExpirationInMs);
279+
280+
/**
281+
* Release Lock.
282+
*
283+
* @param pk {@link AttributeValue}
284+
* @param sk {@link AttributeValue}
285+
* @return boolean
286+
*/
287+
boolean releaseLock(AttributeValue pk, AttributeValue sk);
267288
}

aws-dynamodb/src/main/java/com/formkiq/aws/dynamodb/DynamoDbServiceImpl.java

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,16 @@
2525

2626
import static com.formkiq.aws.dynamodb.DbKeys.PK;
2727
import static com.formkiq.aws.dynamodb.DbKeys.SK;
28+
29+
import java.time.Instant;
2830
import java.util.ArrayList;
2931
import java.util.Collection;
3032
import java.util.Collections;
3133
import java.util.HashMap;
3234
import java.util.List;
3335
import java.util.Map;
3436
import java.util.Objects;
37+
import java.util.concurrent.TimeUnit;
3538
import java.util.stream.Collectors;
3639

3740
import com.formkiq.aws.dynamodb.objects.Strings;
@@ -43,13 +46,17 @@
4346
import software.amazon.awssdk.services.dynamodb.model.DeleteRequest;
4447
import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
4548
import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
49+
import software.amazon.awssdk.services.dynamodb.model.Put;
4650
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
4751
import software.amazon.awssdk.services.dynamodb.model.PutRequest;
4852
import software.amazon.awssdk.services.dynamodb.model.QueryRequest;
4953
import software.amazon.awssdk.services.dynamodb.model.QueryResponse;
5054
import software.amazon.awssdk.services.dynamodb.model.ReturnValue;
5155
import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
5256
import software.amazon.awssdk.services.dynamodb.model.ScanResponse;
57+
import software.amazon.awssdk.services.dynamodb.model.TransactWriteItem;
58+
import software.amazon.awssdk.services.dynamodb.model.TransactWriteItemsRequest;
59+
import software.amazon.awssdk.services.dynamodb.model.TransactionCanceledException;
5360
import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
5461
import software.amazon.awssdk.services.dynamodb.model.UpdateItemResponse;
5562
import software.amazon.awssdk.services.dynamodb.model.WriteRequest;
@@ -61,6 +68,13 @@
6168
*/
6269
public final class DynamoDbServiceImpl implements DynamoDbService {
6370

71+
/** Max Backoff in MS. */
72+
private static final int MAX_BACKOFF_IN_MS = 2000;
73+
/** Default Backoff In Ms. */
74+
private static final int DEFAULT_BACKOFF_IN_MS = 200;
75+
/** Thousand constant. */
76+
private static final int TS = 1000;
77+
6478
/** {@link DynamoDbClient}. */
6579
private final DynamoDbClient dbClient;
6680
/** Table Name. */
@@ -435,4 +449,61 @@ public Map<String, AttributeValue> updateValues(final AttributeValue pk, final A
435449

436450
return updateItem(pk, sk, values);
437451
}
452+
453+
@Override
454+
public boolean acquireLock(final AttributeValue pk, final AttributeValue sk,
455+
final long aquireLockTimeoutInMs, final long lockExpirationInMs) {
456+
457+
boolean lock = false;
458+
long expirationTime = Instant.now().getEpochSecond() + lockExpirationInMs / TS;
459+
460+
Map<String, AttributeValue> item = new HashMap<>();
461+
item.put(PK, pk);
462+
item.put(SK, getLock(sk));
463+
item.put("ExpirationTime", AttributeValue.builder().n(Long.toString(expirationTime)).build());
464+
465+
Put.Builder put = Put.builder().tableName(tableName).item(item).conditionExpression(
466+
"(attribute_not_exists(PK) and attribute_not_exists(SK)) OR ExpirationTime < :currentTime");
467+
468+
long startTime = System.currentTimeMillis();
469+
long waitInterval = DEFAULT_BACKOFF_IN_MS;
470+
471+
while (System.currentTimeMillis() - startTime < aquireLockTimeoutInMs) {
472+
473+
try {
474+
475+
put.expressionAttributeValues(Map.of(":currentTime",
476+
AttributeValue.builder().n(Long.toString(Instant.now().getEpochSecond())).build()));
477+
478+
TransactWriteItemsRequest tx = TransactWriteItemsRequest.builder()
479+
.transactItems(TransactWriteItem.builder().put(put.build()).build()).build();
480+
481+
this.dbClient.transactWriteItems(tx);
482+
lock = true;
483+
break;
484+
485+
} catch (TransactionCanceledException e) {
486+
// Lock is already held or transaction was canceled, wait and retry with exponential backoff
487+
try {
488+
TimeUnit.MILLISECONDS.sleep(waitInterval);
489+
} catch (InterruptedException ex) {
490+
throw new RuntimeException(ex);
491+
}
492+
493+
// Cap backoff at 1 second
494+
waitInterval = Math.min(waitInterval * 2, MAX_BACKOFF_IN_MS);
495+
}
496+
}
497+
498+
return lock;
499+
}
500+
501+
@Override
502+
public boolean releaseLock(final AttributeValue pk, final AttributeValue sk) {
503+
return deleteItem(pk, getLock(sk));
504+
}
505+
506+
private AttributeValue getLock(final AttributeValue sk) {
507+
return AttributeValue.fromS(sk.s() + ".lock");
508+
}
438509
}
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
/**
2+
* MIT License
3+
*
4+
* Copyright (c) 2018 - 2020 FormKiQ
5+
*
6+
* Permission is hereby granted, free of charge, to any person obtaining a copy
7+
* of this software and associated documentation files (the "Software"), to deal
8+
* in the Software without restriction, including without limitation the rights
9+
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10+
* copies of the Software, and to permit persons to whom the Software is
11+
* furnished to do so, subject to the following conditions:
12+
*
13+
* The above copyright notice and this permission notice shall be included in all
14+
* copies or substantial portions of the Software.
15+
*
16+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18+
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19+
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20+
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21+
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22+
* SOFTWARE.
23+
*/
24+
package com.formkiq.aws.dynamodb;
25+
26+
import com.formkiq.testutils.aws.DynamoDbExtension;
27+
import com.formkiq.testutils.aws.DynamoDbTestServices;
28+
import org.junit.jupiter.api.BeforeEach;
29+
import org.junit.jupiter.api.Test;
30+
import org.junit.jupiter.api.Timeout;
31+
import org.junit.jupiter.api.extension.ExtendWith;
32+
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
33+
34+
import static com.formkiq.testutils.aws.DynamoDbExtension.DOCUMENTS_TABLE;
35+
import static org.junit.jupiter.api.Assertions.assertFalse;
36+
import static org.junit.jupiter.api.Assertions.assertTrue;
37+
38+
/** Unit Tests for {@link DynamoDbService}. */
39+
@ExtendWith(DynamoDbExtension.class)
40+
public class DynamoDbServiceTest {
41+
42+
/** Timeout. */
43+
private static final long TIMEOUT = 20;
44+
45+
/** {@link DynamoDbService}. */
46+
private DynamoDbService service;
47+
48+
/**
49+
* Before Test.
50+
*
51+
* @throws Exception Exception
52+
*/
53+
@BeforeEach
54+
public void before() throws Exception {
55+
this.service =
56+
new DynamoDbServiceImpl(DynamoDbTestServices.getDynamoDbConnection(), DOCUMENTS_TABLE);
57+
}
58+
59+
/**
60+
* Test aquire lock / release lock.
61+
*
62+
*/
63+
@Test
64+
@Timeout(TIMEOUT)
65+
public void testAcquireLock01() {
66+
// given
67+
final long timeout = 2000;
68+
final long lockExpiry = 10000;
69+
AttributeValue pk = AttributeValue.fromS("test");
70+
AttributeValue sk = AttributeValue.fromS("test1");
71+
72+
// when
73+
boolean locked = this.service.acquireLock(pk, sk, timeout, lockExpiry);
74+
75+
// then
76+
assertTrue(locked);
77+
assertTrue(this.service.releaseLock(pk, sk));
78+
79+
// when
80+
locked = this.service.acquireLock(pk, sk, timeout, lockExpiry);
81+
82+
// then
83+
assertTrue(locked);
84+
assertTrue(this.service.releaseLock(pk, sk));
85+
}
86+
87+
/**
88+
* Test aquire lock when already locked.
89+
*/
90+
@Test
91+
@Timeout(TIMEOUT)
92+
public void testAcquireLock02() {
93+
// given
94+
final long timeout = 2000;
95+
final long lockExpiry = 10000;
96+
AttributeValue pk = AttributeValue.fromS("test");
97+
AttributeValue sk = AttributeValue.fromS("test1");
98+
99+
// when
100+
boolean lock1 = this.service.acquireLock(pk, sk, timeout, lockExpiry);
101+
boolean lock2 = this.service.acquireLock(pk, sk, timeout, lockExpiry);
102+
103+
// then
104+
assertFalse(lock2);
105+
assertTrue(lock1);
106+
}
107+
108+
/**
109+
* Test aquire lock after expiration.
110+
*/
111+
@Test
112+
@Timeout(TIMEOUT)
113+
public void testAcquireLock03() {
114+
// given
115+
final long timeout = 5000;
116+
final long lockExpiry = 1000;
117+
AttributeValue pk = AttributeValue.fromS("test");
118+
AttributeValue sk = AttributeValue.fromS("test1");
119+
120+
// when
121+
boolean lock1 = this.service.acquireLock(pk, sk, timeout, lockExpiry);
122+
boolean lock2 = this.service.acquireLock(pk, sk, timeout, lockExpiry);
123+
124+
// then
125+
assertTrue(lock1);
126+
assertTrue(lock2);
127+
}
128+
}

dynamodb-documents/src/main/java/com/formkiq/stacks/dynamodb/DocumentServiceImpl.java

Lines changed: 46 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -384,6 +384,34 @@ private void insertAttributeIfExists(final Collection<String> attributeKeys, fin
384384
}
385385
}
386386

387+
private FolderIndexRecord createDocumentPath(final String siteId, final DocumentItem document,
388+
final boolean isPathChanged) {
389+
390+
FolderIndexRecord folderIndexRecord = null;
391+
392+
if (isPathChanged) {
393+
List<FolderIndexRecord> folders =
394+
this.folderIndexProcessor.createFolders(siteId, document.getPath(), document.getUserId());
395+
396+
folderIndexRecord = this.folderIndexProcessor.addFileToFolder(siteId,
397+
document.getDocumentId(), Objects.last(folders), document.getPath());
398+
399+
String filename = Strings.getFilename(document.getPath());
400+
if (!filename.contains(folderIndexRecord.path())) {
401+
String path = createPath(folders, folderIndexRecord);
402+
document.setPath(path);
403+
}
404+
}
405+
406+
return folderIndexRecord;
407+
}
408+
409+
private String createPath(final List<FolderIndexRecord> folders,
410+
final FolderIndexRecord folderIndexRecord) {
411+
return String.join("/", folders.stream().map(FolderIndexRecord::path).toList()) + "/"
412+
+ folderIndexRecord.path();
413+
}
414+
387415
private List<SchemaAttributes> getSchemaAttributes(final String siteId,
388416
final Collection<DocumentAttributeRecord> documentAttributeRecords) {
389417

@@ -1552,14 +1580,14 @@ private boolean isNextDayPagination(final String siteId, final String dateKey,
15521580
* Is Document Path Changed.
15531581
*
15541582
* @param previous {@link Map}
1555-
* @param current {@link Map}
1583+
* @param item {@link DocumentItem}
15561584
* @return boolean
15571585
*/
15581586
private boolean isPathChanges(final Map<String, AttributeValue> previous,
1559-
final Map<String, AttributeValue> current) {
1587+
final DocumentItem item) {
15601588
String path0 = previous.containsKey("path") ? previous.get("path").s() : "";
1561-
String path1 = current.containsKey("path") ? current.get("path").s() : "";
1562-
return !path1.equals(path0) && !"".equals(path0);
1589+
String path1 = item.getPath();
1590+
return !path1.equals(path0);
15631591
}
15641592

15651593
@Override
@@ -1796,11 +1824,10 @@ public boolean restoreSoftDeletedDocument(final String siteId, final String docu
17961824
DocumentItem item = new DocumentItemDynamoDb(documentId, new Date(), userId);
17971825
item.setPath(path);
17981826

1799-
List<Map<String, AttributeValue>> folderIndex =
1800-
this.folderIndexProcessor.generateIndex(siteId, item);
1827+
FolderIndexRecord record = createDocumentPath(siteId, item, true);
18011828

18021829
WriteRequestBuilder writeBuilder =
1803-
new WriteRequestBuilder().appends(this.documentTableName, folderIndex);
1830+
new WriteRequestBuilder().append(this.documentTableName, record.getAttributes(siteId));
18041831

18051832
writeBuilder.batchWriteItem(this.dbClient);
18061833
}
@@ -1905,16 +1932,9 @@ private void saveDocument(final Map<String, AttributeValue> keys, final String s
19051932

19061933
removeNullMetadata(document, documentValues);
19071934

1908-
// String documentVersionsTableName = this.versionsService.getDocumentVersionsTableName();
1909-
// boolean hasDocumentChanged = documentVersionsTableName != null && !previous.isEmpty()
1910-
// && isChangedMatching(previous, current);
1911-
1912-
// if (hasDocumentChanged) {
1913-
// this.versionsService.addDocumentVersionAttributes(previous, documentValues);
1914-
// }
1935+
boolean isPathChanged = isPathChanges(previous, document);
19151936

1916-
List<Map<String, AttributeValue>> folderIndex =
1917-
this.folderIndexProcessor.generateIndex(siteId, document);
1937+
FolderIndexRecord folderIndexRecord = createDocumentPath(siteId, document, isPathChanged);
19181938
if (!isEmpty(document.getPath())) {
19191939
documentValues.put("path", AttributeValue.fromS(document.getPath()));
19201940
}
@@ -1923,23 +1943,25 @@ private void saveDocument(final Map<String, AttributeValue> keys, final String s
19231943
getSaveTagsAttributes(siteId, document.getDocumentId(), tags, options.timeToLive());
19241944

19251945
WriteRequestBuilder writeBuilder = new WriteRequestBuilder()
1926-
.append(this.documentTableName, documentValues).appends(this.documentTableName, tagValues)
1927-
.appends(this.documentTableName, folderIndex);
1946+
.append(this.documentTableName, documentValues).appends(this.documentTableName, tagValues);
1947+
1948+
if (folderIndexRecord != null) {
1949+
writeBuilder.append(this.documentTableName, folderIndexRecord.getAttributes(siteId));
1950+
}
19281951

19291952
appendDocumentAttributes(writeBuilder, siteId, document.getDocumentId(), attributes,
19301953
documentExists, AttributeValidation.FULL, options.getValidationAccess());
19311954

1932-
// if (hasDocumentChanged) {
1933-
// writeBuilder = writeBuilder.appends(documentVersionsTableName, List.of(previous));
1934-
// }
1935-
19361955
if (writeBuilder.batchWriteItem(this.dbClient)) {
19371956

19381957
String documentId = document.getDocumentId();
19391958
saveDocumentInterceptor(siteId, documentId, current, previous);
19401959

1941-
if (isPathChanges(previous, documentValues)) {
1942-
this.folderIndexProcessor.deletePath(siteId, documentId, previous.get("path").s());
1960+
if (isPathChanged) {
1961+
String path = previous.containsKey("path") ? previous.get("path").s() : null;
1962+
if (!Strings.isEmpty(path)) {
1963+
this.folderIndexProcessor.deletePath(siteId, documentId, previous.get("path").s());
1964+
}
19431965
}
19441966

19451967
List<String> tagKeys =

0 commit comments

Comments
 (0)