Skip to content

Commit 670be43

Browse files
committed
'Version 1.5.1 of the DynamoDB Streams Kinesis Adapter'
1 parent 01793dd commit 670be43

File tree

14 files changed

+254
-171
lines changed

14 files changed

+254
-171
lines changed

.github/PULL_REQUEST_TEMPLATE.md

Lines changed: 0 additions & 6 deletions
This file was deleted.

CODE_OF_CONDUCT.md

Lines changed: 0 additions & 4 deletions
This file was deleted.

CONTRIBUTING.md

Lines changed: 0 additions & 61 deletions
This file was deleted.

NOTICE.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
11
DynamoDB Streams Kinesis Adapter for Java
2-
Copyright 2014-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
Copyright 2014-2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.

README.md

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,13 @@
1414
* The KCL is designed to process streams from Amazon Kinesis, but by adding the DynamoDB Streams Kinesis Adapter, your application can process DynamoDB Streams instead, seamlessly and efficiently.
1515

1616
## Release Notes
17-
### Latest Release (v1.5.x)
17+
### Latest Release (v1.5.1)
18+
* Restores compile compatibility with KCL 1.13.3.
19+
* Fixes a performance issue that arised when using v1.5.0 with KCL 1.12 through 1.13.2.
20+
* Fixes a defect where `MaxLeasesForWorker` configuration was not being propagated to `StreamsLeaseTaker`.
21+
* Finished (SHARD_END) leases will now only be delete after at least 6 hours have passed since the shard was created. This further reduces the chances of lineage replay.
22+
23+
### Release (v1.5.0)
1824
* Introduces the implementation of periodic shard sync in conjunction with Amazon Kinesis Client Library v1.11.x (KCL). The default shard sync strategy is to discover new/child shards only when a consumer completes processing a shard. This default strategy constrains horizontal scaling of customer applications when consuming tables with 10,000+ partitions due to increased DescribeStream calls. Periodic shard sync guarantees that only a subset of the fleet (by default 10) will perform shard syncs, and decouples DescribeStream call volume from growth in fleet size.
1925

2026
* Improves inconsistency handling in DescribeStream result aggregation by fixing any parent-open-child-open cases. This ensures that shard sync does not fail due to an assertion failure in KCL on this type of inconsistency.
@@ -24,8 +30,9 @@
2430
* Introduces `StreamsLeaseTaker` with improved load-balancing of leases among workers.
2531
* SHARD_END and non-SHARD_END check-pointed leases are balanced independently.
2632
* Leases are now stolen evenly from other workers instead of from only the most loaded worker. `MaxLeasesToStealAtOneTime` no longer needs to be specified by users. It is now determined automatically based on the number of leases held by the worker. The user-specified value for this is no longer used.
27-
33+
2834
* Users should continue using factory methods from `StreamsWorkerFactory` to create KCL Worker as specified in the guidance of Release v1.4.x.
35+
* We strongly recommended that you create only one worker per host in your processing fleet to get optimal performance from DynamoDB Streams service.
2936

3037
### Release (v1.4.x)
3138
* This release fixes an issue of high propagation delay of streams records when processing streams on small tables. This issue occurs when KCL ShardSyncer is not discovering new shards due to server side delays in shard creation or in reporting new shard creation to internal services. The code is implemented in a new implementation of IKinesisProxy interface called DynamoDBStreamsProxy which is part of the latest release.
@@ -60,7 +67,7 @@ Add the following to your Maven pom file:
6067
<dependency>
6168
<groupId>com.amazonaws</groupId>
6269
<artifactId>dynamodb-streams-kinesis-adapter</artifactId>
63-
<version>1.5.0</version>
70+
<version>1.5.1</version>
6471
</dependency>
6572
```
6673

pom.xml

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
<artifactId>dynamodb-streams-kinesis-adapter</artifactId>
77
<packaging>jar</packaging>
88
<name>DynamoDB Streams Adapter for Java</name>
9-
<version>1.5.0</version>
9+
<version>1.5.1</version>
1010
<description>The DynamoDB Streams Adapter implements the AmazonKinesis interface so that your application can use KCL to consume and process data from a DynamoDB stream.</description>
1111
<url>https://aws.amazon.com/dynamodb</url>
1212

@@ -23,10 +23,10 @@
2323
</licenses>
2424

2525
<properties>
26-
<aws-java-sdk.version>1.11.603</aws-java-sdk.version>
27-
<amazon-kinesis-client.version>[1.11.0,)</amazon-kinesis-client.version>
26+
<aws-java-sdk.version>1.11.728</aws-java-sdk.version>
27+
<amazon-kinesis-client.version>1.13.3</amazon-kinesis-client.version>
2828
<powermock.version>1.6.2</powermock.version>
29-
<aws.dynamodblocal.version>1.11.86</aws.dynamodblocal.version>
29+
<aws.dynamodblocal.version>[1.12,2.0)</aws.dynamodblocal.version>
3030
<maven.dependency.version>3.0.0</maven.dependency.version>
3131
<sqlite4java.version>1.0.392</sqlite4java.version>
3232
<gpg.skip>true</gpg.skip>
@@ -62,7 +62,7 @@
6262
<dependency>
6363
<groupId>com.fasterxml.jackson.core</groupId>
6464
<artifactId>jackson-databind</artifactId>
65-
<version>2.9.10.3</version>
65+
<version>2.6.6</version>
6666
</dependency>
6767

6868
<dependency>
@@ -114,6 +114,26 @@
114114
</dependencies>
115115

116116
<developers>
117+
<developer>
118+
<name>Parijat Sinha</name>
119+
<email>parijas@amazon.com</email>
120+
</developer>
121+
<developer>
122+
<name>Shitanshu Aggarwal</name>
123+
<email>shitansh@amazon.com</email>
124+
</developer>
125+
<developer>
126+
<name>Abhishek Khanna</name>
127+
<email>akkhanna@amazon.com</email>
128+
</developer>
129+
<developer>
130+
<name>Debjyoti Roy</name>
131+
<email>debjyotr@amazon.com</email>
132+
</developer>
133+
<developer>
134+
<name>Shantanu Bhoyar</name>
135+
<email>shbhoyar@amazon.com</email>
136+
</developer>
117137
<developer>
118138
<name>Holger Jansohn</name>
119139
<email>holgs@live.ca</email>

src/main/java/com/amazonaws/services/dynamodbv2/streamsadapter/DynamoDBStreamsProxy.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import com.amazonaws.services.dynamodbv2.streamsadapter.utils.ThreadSleeper;
1212
import com.amazonaws.services.kinesis.AmazonKinesis;
1313
import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxyExtended;
14+
import com.amazonaws.services.kinesis.clientlibrary.proxies.ShardClosureVerificationResponse;
1415

1516
import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
1617
import com.amazonaws.services.kinesis.model.DescribeStreamResult;
@@ -258,6 +259,18 @@ public synchronized List<Shard> getShardList() {
258259
return listOfShardsSinceLastGet.get();
259260
}
260261

262+
@Override
263+
/**
264+
* This method gets invoked from ShutdownTask when the shard consumer is shutting down.
265+
* Kinesis modified KCL to verify that the shard being closed has children, and this requires listing all shards.
266+
* Since DynamoDB streams can have a large number of shards, this verification delays shard closure, and can severely
267+
* degrade processing performance. For large streams, this can even cause stream processing to completely halt.
268+
* Therefore, we skip performing this validation in ShutdownTask by simply returning true.
269+
*/
270+
public ShardClosureVerificationResponse verifyShardClosure(String shardId) {
271+
return () -> true; // isShardClosed -> true
272+
}
273+
261274
private ShardGraphProcessingResult buildShardGraphSnapshot() {
262275

263276
DescribeStreamResult response;

src/main/java/com/amazonaws/services/dynamodbv2/streamsadapter/DynamoDBStreamsShardSyncer.java

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77

88
import java.io.Serializable;
99
import java.math.BigInteger;
10+
import java.time.Duration;
11+
import java.time.Instant;
1012
import java.util.ArrayList;
1113
import java.util.Collection;
1214
import java.util.Collections;
@@ -48,6 +50,15 @@
4850
public class DynamoDBStreamsShardSyncer implements ShardSyncer {
4951

5052
private static final Log LOG = LogFactory.getLog(DynamoDBStreamsShardSyncer.class);
53+
private static final String SHARD_ID_SEPARATOR = "-";
54+
55+
/* This retention period mostly will protect race conditions that are triggered by shards getting sealed
56+
* immediately after creation. Average active lifetime of a shard is around 4 hours today, so setting retention to
57+
* slightly higher helps us retain the leases for shards with active lifetime close to average for investigations
58+
* and visibility. Lower values on the order of minutes may also work, but reduce operational auditability.
59+
*/
60+
private static final Duration MIN_LEASE_RETENTION = Duration.ofHours(6);
61+
5162
private final LeaseCleanupValidator leaseCleanupValidator;
5263

5364
public DynamoDBStreamsShardSyncer(final LeaseCleanupValidator leaseCleanupValidator) {
@@ -714,11 +725,20 @@ synchronized void cleanupLeaseForClosedShard(String closedShardId,
714725
boolean okayToDelete = true;
715726
for (KinesisClientLease lease : childShardLeases) {
716727
if (!lease.getCheckpoint().equals(ExtendedSequenceNumber.SHARD_END)) {
717-
okayToDelete = false;
728+
okayToDelete = false; // if any child is still being processed, don't delete lease for parent
718729
break;
719730
}
720731
}
721732

733+
try {
734+
if (Instant.now().isBefore(getShardCreationTime(closedShardId).plus(MIN_LEASE_RETENTION))) {
735+
okayToDelete = false; // if parent was created within lease retention period, don't delete lease for parent
736+
}
737+
} catch (RuntimeException e) {
738+
LOG.info("Could not extract creation time from ShardId [" + closedShardId +"]");
739+
LOG.debug(e);
740+
}
741+
722742
if (okayToDelete) {
723743
LOG.info("Deleting lease for shard " + leaseForClosedShard.getLeaseKey()
724744
+ " as it is eligible for cleanup - its child shard is check-pointed at SHARD_END.");
@@ -727,6 +747,16 @@ synchronized void cleanupLeaseForClosedShard(String closedShardId,
727747
}
728748
}
729749

750+
/**
751+
* This method extracts the shard creation time from the ShardId
752+
*
753+
* @param shardId
754+
* @return instant at which the shard was created
755+
*/
756+
private Instant getShardCreationTime(String shardId) {
757+
return Instant.ofEpochMilli(Long.parseLong(shardId.split(SHARD_ID_SEPARATOR)[1]));
758+
}
759+
730760
/**
731761
* Helper method to create a new KinesisClientLease POJO for a shard.
732762
* Note: Package level access only for testing purposes

src/main/java/com/amazonaws/services/dynamodbv2/streamsadapter/StreamsWorkerFactory.java

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,8 @@ public static Worker createDynamoDbStreamsWorker(IRecordProcessorFactory recordP
6161
.shardSyncer(new DynamoDBStreamsShardSyncer(new StreamsLeaseCleanupValidator()))
6262
.shardPrioritization(config.getShardPrioritizationStrategy())
6363
.leaseManager(kinesisClientLeaseManager)
64-
.leaseTaker(new StreamsLeaseTaker<>(kinesisClientLeaseManager, config.getWorkerIdentifier(), config.getFailoverTimeMillis()))
64+
.leaseTaker(new StreamsLeaseTaker<>(kinesisClientLeaseManager, config.getWorkerIdentifier(), config.getFailoverTimeMillis())
65+
.maxLeasesForWorker(config.getMaxLeasesForWorker()))
6566
.leaderDecider(new StreamsDeterministicShuffleShardSyncLeaderDecider(config, kinesisClientLeaseManager))
6667
.build();
6768
}
@@ -88,7 +89,8 @@ public static Worker createDynamoDbStreamsWorker(IRecordProcessorFactory recordP
8889
.shardSyncer(new DynamoDBStreamsShardSyncer(new StreamsLeaseCleanupValidator()))
8990
.shardPrioritization(config.getShardPrioritizationStrategy())
9091
.leaseManager(kinesisClientLeaseManager)
91-
.leaseTaker(new StreamsLeaseTaker<>(kinesisClientLeaseManager, config.getWorkerIdentifier(), config.getFailoverTimeMillis()))
92+
.leaseTaker(new StreamsLeaseTaker<>(kinesisClientLeaseManager, config.getWorkerIdentifier(), config.getFailoverTimeMillis())
93+
.maxLeasesForWorker(config.getMaxLeasesForWorker()))
9294
.leaderDecider(new StreamsDeterministicShuffleShardSyncLeaderDecider(config, kinesisClientLeaseManager))
9395
.build();
9496
}
@@ -118,7 +120,8 @@ public static Worker createDynamoDbStreamsWorker(IRecordProcessorFactory recordP
118120
.shardSyncer(new DynamoDBStreamsShardSyncer(new StreamsLeaseCleanupValidator()))
119121
.shardPrioritization(config.getShardPrioritizationStrategy())
120122
.leaseManager(kinesisClientLeaseManager)
121-
.leaseTaker(new StreamsLeaseTaker<>(kinesisClientLeaseManager, config.getWorkerIdentifier(), config.getFailoverTimeMillis()))
123+
.leaseTaker(new StreamsLeaseTaker<>(kinesisClientLeaseManager, config.getWorkerIdentifier(), config.getFailoverTimeMillis())
124+
.maxLeasesForWorker(config.getMaxLeasesForWorker()))
122125
.leaderDecider(new StreamsDeterministicShuffleShardSyncLeaderDecider(config, kinesisClientLeaseManager))
123126
.build();
124127
}
@@ -148,7 +151,8 @@ public static Worker createDynamoDbStreamsWorker(IRecordProcessorFactory recordP
148151
.shardSyncer(new DynamoDBStreamsShardSyncer(new StreamsLeaseCleanupValidator()))
149152
.shardPrioritization(config.getShardPrioritizationStrategy())
150153
.leaseManager(kinesisClientLeaseManager)
151-
.leaseTaker(new StreamsLeaseTaker<>(kinesisClientLeaseManager, config.getWorkerIdentifier(), config.getFailoverTimeMillis()))
154+
.leaseTaker(new StreamsLeaseTaker<>(kinesisClientLeaseManager, config.getWorkerIdentifier(), config.getFailoverTimeMillis())
155+
.maxLeasesForWorker(config.getMaxLeasesForWorker()))
152156
.leaderDecider(new StreamsDeterministicShuffleShardSyncLeaderDecider(config, kinesisClientLeaseManager))
153157
.build();
154158
}
@@ -175,7 +179,8 @@ public static Worker createDynamoDbStreamsWorker(IRecordProcessorFactory recordP
175179
.shardSyncer(new DynamoDBStreamsShardSyncer(new StreamsLeaseCleanupValidator()))
176180
.shardPrioritization(config.getShardPrioritizationStrategy())
177181
.leaseManager(kinesisClientLeaseManager)
178-
.leaseTaker(new StreamsLeaseTaker<>(kinesisClientLeaseManager, config.getWorkerIdentifier(), config.getFailoverTimeMillis()))
182+
.leaseTaker(new StreamsLeaseTaker<>(kinesisClientLeaseManager, config.getWorkerIdentifier(), config.getFailoverTimeMillis())
183+
.maxLeasesForWorker(config.getMaxLeasesForWorker()))
179184
.leaderDecider(new StreamsDeterministicShuffleShardSyncLeaderDecider(config, kinesisClientLeaseManager))
180185
.build();
181186
}
@@ -205,7 +210,8 @@ public static Worker createDynamoDbStreamsWorker(IRecordProcessorFactory recordP
205210
.shardSyncer(new DynamoDBStreamsShardSyncer(new StreamsLeaseCleanupValidator()))
206211
.shardPrioritization(config.getShardPrioritizationStrategy())
207212
.leaseManager(kinesisClientLeaseManager)
208-
.leaseTaker(new StreamsLeaseTaker<>(kinesisClientLeaseManager, config.getWorkerIdentifier(), config.getFailoverTimeMillis()))
213+
.leaseTaker(new StreamsLeaseTaker<>(kinesisClientLeaseManager, config.getWorkerIdentifier(), config.getFailoverTimeMillis())
214+
.maxLeasesForWorker(config.getMaxLeasesForWorker()))
209215
.leaderDecider(new StreamsDeterministicShuffleShardSyncLeaderDecider(config, kinesisClientLeaseManager))
210216
.build();
211217
}
@@ -235,7 +241,8 @@ public static Worker createDynamoDbStreamsWorker(IRecordProcessorFactory recordP
235241
.shardSyncer(new DynamoDBStreamsShardSyncer(new StreamsLeaseCleanupValidator()))
236242
.shardPrioritization(config.getShardPrioritizationStrategy())
237243
.leaseManager(kinesisClientLeaseManager)
238-
.leaseTaker(new StreamsLeaseTaker<>(kinesisClientLeaseManager, config.getWorkerIdentifier(), config.getFailoverTimeMillis()))
244+
.leaseTaker(new StreamsLeaseTaker<>(kinesisClientLeaseManager, config.getWorkerIdentifier(), config.getFailoverTimeMillis())
245+
.maxLeasesForWorker(config.getMaxLeasesForWorker()))
239246
.leaderDecider(new StreamsDeterministicShuffleShardSyncLeaderDecider(config, kinesisClientLeaseManager))
240247
.build();
241248
}

0 commit comments

Comments
 (0)