Skip to content

Commit b3d20af

Browse files
authored
DataStore Model Sync Parallelization (#2808)
1 parent 497e19a commit b3d20af

File tree

8 files changed

+801
-8
lines changed

8 files changed

+801
-8
lines changed

aws-datastore/api/aws-datastore.api

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ public final class com/amplifyframework/datastore/DataStoreConfiguration {
6464
public fun getSyncExpressions ()Ljava/util/Map;
6565
public fun getSyncIntervalInMinutes ()Ljava/lang/Long;
6666
public fun getSyncIntervalMs ()Ljava/lang/Long;
67+
public fun getSyncMaxConcurrentModels ()Ljava/lang/Integer;
6768
public fun getSyncMaxRecords ()Ljava/lang/Integer;
6869
public fun getSyncPageSize ()Ljava/lang/Integer;
6970
public fun hashCode ()I
@@ -80,6 +81,7 @@ public final class com/amplifyframework/datastore/DataStoreConfiguration$Builder
8081
public fun syncExpression (Ljava/lang/Class;Lcom/amplifyframework/datastore/DataStoreSyncExpression;)Lcom/amplifyframework/datastore/DataStoreConfiguration$Builder;
8182
public fun syncExpression (Ljava/lang/String;Lcom/amplifyframework/datastore/DataStoreSyncExpression;)Lcom/amplifyframework/datastore/DataStoreConfiguration$Builder;
8283
public fun syncInterval (JLjava/util/concurrent/TimeUnit;)Lcom/amplifyframework/datastore/DataStoreConfiguration$Builder;
84+
public fun syncMaxConcurrentModels (Ljava/lang/Integer;)Lcom/amplifyframework/datastore/DataStoreConfiguration$Builder;
8385
public fun syncMaxRecords (Ljava/lang/Integer;)Lcom/amplifyframework/datastore/DataStoreConfiguration$Builder;
8486
public fun syncPageSize (Ljava/lang/Integer;)Lcom/amplifyframework/datastore/DataStoreConfiguration$Builder;
8587
}

aws-datastore/src/main/java/com/amplifyframework/datastore/DataStoreConfiguration.java

Lines changed: 61 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ public final class DataStoreConfiguration {
4848
static final int DEFAULT_SYNC_PAGE_SIZE = 1_000;
4949
@VisibleForTesting
5050
static final boolean DEFAULT_DO_SYNC_RETRY = false;
51+
@VisibleForTesting
52+
static final int DEFAULT_SYNC_MAX_CONCURRENT_MODELS = 1;
5153
static final int MAX_RECORDS = 1000;
5254
static final long MAX_TIME_SEC = 2;
5355

@@ -58,6 +60,7 @@ public final class DataStoreConfiguration {
5860
private final Integer syncMaxRecords;
5961
private final Integer syncPageSize;
6062
private final boolean doSyncRetry;
63+
private final Integer syncMaxConcurrentModels;
6164
private final Map<String, DataStoreSyncExpression> syncExpressions;
6265
private final Long syncIntervalInMinutes;
6366
private final Long maxTimeLapseForObserveQuery;
@@ -71,6 +74,8 @@ private DataStoreConfiguration(Builder builder) {
7174
this.syncIntervalInMinutes = builder.syncIntervalInMinutes;
7275
this.syncExpressions = builder.syncExpressions;
7376
this.doSyncRetry = builder.doSyncRetry;
77+
this.syncMaxConcurrentModels = builder.syncMaxConcurrentModels != null ?
78+
builder.syncMaxConcurrentModels : DEFAULT_SYNC_MAX_CONCURRENT_MODELS;
7479
this.maxTimeLapseForObserveQuery = builder.maxTimeLapseForObserveQuery;
7580
this.observeQueryMaxRecords = builder.observeQueryMaxRecords;
7681
}
@@ -126,9 +131,10 @@ public static DataStoreConfiguration defaults() throws DataStoreException {
126131
.syncInterval(DEFAULT_SYNC_INTERVAL_MINUTES, TimeUnit.MINUTES)
127132
.syncPageSize(DEFAULT_SYNC_PAGE_SIZE)
128133
.syncMaxRecords(DEFAULT_SYNC_MAX_RECORDS)
129-
.doSyncRetry(DEFAULT_DO_SYNC_RETRY)
130-
.observeQueryMaxTime(MAX_TIME_SEC)
131-
.observeQueryMaxRecords(MAX_RECORDS)
134+
.doSyncRetry(DEFAULT_DO_SYNC_RETRY)
135+
.observeQueryMaxTime(MAX_TIME_SEC)
136+
.observeQueryMaxRecords(MAX_RECORDS)
137+
.syncMaxConcurrentModels(DEFAULT_SYNC_MAX_CONCURRENT_MODELS)
132138
.build();
133139
}
134140

@@ -201,6 +207,23 @@ public Boolean getDoSyncRetry() {
201207
return this.doSyncRetry;
202208
}
203209

210+
/**
211+
* Gets the number of models that are allowed to concurrently sync.
212+
* NOTE: This value will not be used if any models have associations, instead, the default (1)
213+
* will be used.
214+
* Setting this number to a high value requires that the developer ensure app memory is not a
215+
* concern. If the expected sync data contains a large number of models, with a large number
216+
* of records per model, the concurrency limit should be set to a conservative value. However,
217+
* if the expected sync data contains a large number of models, with a small amount of data in
218+
* each model, setting this limit to a high value will greatly improve sync speeds.
219+
* @return Limit to the number of models that can sync concurrently
220+
*/
221+
@IntRange(from = 1)
222+
@NonNull
223+
public Integer getSyncMaxConcurrentModels() {
224+
return syncMaxConcurrentModels;
225+
}
226+
204227
/**
205228
* Returns the Map of all {@link DataStoreSyncExpression}s used to filter data received from AppSync, either during
206229
* a sync or over the real-time subscription.
@@ -247,6 +270,9 @@ public boolean equals(@Nullable Object thatObject) {
247270
if (!ObjectsCompat.equals(getObserveQueryMaxRecords(), that.getObserveQueryMaxRecords())) {
248271
return false;
249272
}
273+
if (!ObjectsCompat.equals(getSyncMaxConcurrentModels(), that.getSyncMaxConcurrentModels())) {
274+
return false;
275+
}
250276
return true;
251277
}
252278

@@ -261,6 +287,7 @@ public int hashCode() {
261287
result = 31 * result + getDoSyncRetry().hashCode();
262288
result = 31 * result + (getObserveQueryMaxRecords() != null ? getObserveQueryMaxRecords().hashCode() : 0);
263289
result = 31 * result + getMaxTimeLapseForObserveQuery().hashCode();
290+
result = 31 * result + getSyncMaxConcurrentModels().hashCode();
264291
return result;
265292
}
266293

@@ -273,9 +300,10 @@ public String toString() {
273300
", syncPageSize=" + syncPageSize +
274301
", syncIntervalInMinutes=" + syncIntervalInMinutes +
275302
", syncExpressions=" + syncExpressions +
276-
", doSyncRetry=" + doSyncRetry +
277-
", maxTimeRelapseForObserveQuery=" + maxTimeLapseForObserveQuery +
278-
", observeQueryMaxRecords=" + observeQueryMaxRecords +
303+
", doSyncRetry=" + doSyncRetry +
304+
", maxTimeRelapseForObserveQuery=" + maxTimeLapseForObserveQuery +
305+
", observeQueryMaxRecords=" + observeQueryMaxRecords +
306+
", syncMaxConcurrentModels=" + syncMaxConcurrentModels +
279307
'}';
280308
}
281309

@@ -309,6 +337,7 @@ public static final class Builder {
309337
private Integer syncMaxRecords;
310338
private Integer syncPageSize;
311339
private boolean doSyncRetry;
340+
private Integer syncMaxConcurrentModels;
312341
private Map<String, DataStoreSyncExpression> syncExpressions;
313342
private boolean ensureDefaults;
314343
private JSONObject pluginJson;
@@ -429,6 +458,24 @@ public Builder syncPageSize(@IntRange(from = 0) Integer syncPageSize) {
429458
return Builder.this;
430459
}
431460

461+
/**
462+
* Sets the max concurrency limit for model syncing. Default is 1
463+
* NOTE: If any sync models have associations, this value will be unused and the default (1)
464+
* will be used.
465+
* Setting this number to a high value requires that the developer ensure app memory is not a
466+
* concern. If the expected sync data contains a large number of models, with a large number
467+
* of records per model, the concurrency limit should be set to a conservative value. However,
468+
* if the expected sync data contains a large number of models, with a small amount of data in
469+
* each model, setting this limit to a high value will greatly improve sync speeds.
470+
* @param syncMaxConcurrentModels Number of models that can sync concurrently
471+
* @return Current builder
472+
*/
473+
@NonNull
474+
public Builder syncMaxConcurrentModels(@IntRange(from = 1) Integer syncMaxConcurrentModels) {
475+
this.syncMaxConcurrentModels = syncMaxConcurrentModels;
476+
return Builder.this;
477+
}
478+
432479
/**
433480
* Sets a sync expression for a particular model to filter which data is synced locally.
434481
* The expression is evaluated each time DataStore is started.
@@ -518,6 +565,10 @@ private void applyUserProvidedConfiguration() {
518565
syncPageSize = getValueOrDefault(userProvidedConfiguration.getSyncPageSize(), syncPageSize);
519566
syncExpressions = userProvidedConfiguration.getSyncExpressions();
520567
doSyncRetry = getValueOrDefault(userProvidedConfiguration.getDoSyncRetry(), doSyncRetry);
568+
syncMaxConcurrentModels = getValueOrDefault(
569+
userProvidedConfiguration.getSyncMaxConcurrentModels(),
570+
syncMaxConcurrentModels
571+
);
521572
observeQueryMaxRecords = getValueOrDefault(userProvidedConfiguration.getObserveQueryMaxRecords(),
522573
observeQueryMaxRecords);
523574
maxTimeLapseForObserveQuery = userProvidedConfiguration.getMaxTimeLapseForObserveQuery()
@@ -548,6 +599,10 @@ public DataStoreConfiguration build() throws DataStoreException {
548599
syncIntervalInMinutes = getValueOrDefault(syncIntervalInMinutes, DEFAULT_SYNC_INTERVAL_MINUTES);
549600
syncMaxRecords = getValueOrDefault(syncMaxRecords, DEFAULT_SYNC_MAX_RECORDS);
550601
syncPageSize = getValueOrDefault(syncPageSize, DEFAULT_SYNC_PAGE_SIZE);
602+
syncMaxConcurrentModels = getValueOrDefault(
603+
syncMaxConcurrentModels,
604+
DEFAULT_SYNC_MAX_CONCURRENT_MODELS
605+
);
551606
observeQueryMaxRecords = getValueOrDefault(observeQueryMaxRecords, MAX_RECORDS);
552607
maxTimeLapseForObserveQuery = maxTimeLapseForObserveQuery == 0 ? MAX_TIME_SEC :
553608
maxTimeLapseForObserveQuery;

aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/SyncProcessor.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,16 +126,43 @@ Completable hydrate() {
126126
TopologicalOrdering.forRegisteredModels(schemaRegistry, modelProvider);
127127
Collections.sort(modelSchemas, ordering::compare);
128128
ArrayList<String> toBeSyncedModelArray = new ArrayList<>();
129+
boolean canSyncConcurrently = true;
129130
for (ModelSchema schema : modelSchemas) {
130131
//Check to see if query predicate for this schema is not equal to none. This means customer does
131132
// not want to sync the data for this model.
132133
if (!QueryPredicates.none().equals(queryPredicateProvider.getPredicate(schema.getName()))) {
133134
hydrationTasks.add(createHydrationTask(schema));
134135
toBeSyncedModelArray.add(schema.getName());
136+
if (!schema.getAssociations().isEmpty()) {
137+
canSyncConcurrently = false;
138+
}
135139
}
136140
}
137141

138-
return Completable.concat(hydrationTasks)
142+
int syncMaxConcurrentModels;
143+
try {
144+
syncMaxConcurrentModels = dataStoreConfigurationProvider
145+
.getConfiguration()
146+
.getSyncMaxConcurrentModels();
147+
} catch (DataStoreException exception) {
148+
syncMaxConcurrentModels = 1;
149+
}
150+
151+
Completable syncCompletable;
152+
if (canSyncConcurrently && syncMaxConcurrentModels > 1) {
153+
syncCompletable = Completable.mergeDelayError(
154+
Flowable.fromIterable(hydrationTasks),
155+
syncMaxConcurrentModels
156+
);
157+
} else {
158+
// The reason we don't do mergeDelayError here with maxConcurrency = 1 is because it would create a
159+
// behavioral difference. If a failure is encountered in concat, sync immediately stops. This would be
160+
// the wrong behavior when concurrency is enabled, but in the single concurrency use case, this matches
161+
// previous behavior
162+
syncCompletable = Completable.concat(hydrationTasks);
163+
}
164+
165+
return syncCompletable
139166
.doOnSubscribe(ignore -> {
140167
// This is where we trigger the syncQueriesStarted event since
141168
// doOnSubscribe means that all upstream hydration tasks

aws-datastore/src/test/java/com/amplifyframework/datastore/DataStoreConfigurationTest.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ public void testDefaultConfiguration() throws DataStoreException {
6262
dataStoreConfiguration.getSyncMaxRecords().intValue());
6363
assertEquals(DataStoreConfiguration.DEFAULT_SYNC_PAGE_SIZE,
6464
dataStoreConfiguration.getSyncPageSize().intValue());
65+
assertEquals(DataStoreConfiguration.DEFAULT_SYNC_MAX_CONCURRENT_MODELS,
66+
dataStoreConfiguration.getSyncMaxConcurrentModels().intValue());
6567

6668
assertTrue(dataStoreConfiguration.getConflictHandler() instanceof AlwaysApplyRemoteHandler);
6769
assertTrue(dataStoreConfiguration.getErrorHandler() instanceof DefaultDataStoreErrorHandler);
@@ -107,6 +109,7 @@ public void testDefaultOverriddenFromConfigurationAndObject()
107109
long expectedSyncIntervalMinutes = 6L;
108110
Long expectedSyncIntervalMs = TimeUnit.MINUTES.toMillis(expectedSyncIntervalMinutes);
109111
Integer expectedSyncMaxRecords = 3;
112+
Integer expectedSyncMaxConcurrentModels = 5;
110113
DummyConflictHandler dummyConflictHandler = new DummyConflictHandler();
111114
DataStoreErrorHandler errorHandler = DefaultDataStoreErrorHandler.instance();
112115

@@ -121,7 +124,8 @@ public void testDefaultOverriddenFromConfigurationAndObject()
121124
.errorHandler(errorHandler)
122125
.syncExpression(BlogOwner.class, ownerSyncExpression)
123126
.syncExpression("Post", postSyncExpression)
124-
.doSyncRetry(true)
127+
.doSyncRetry(true)
128+
.syncMaxConcurrentModels(expectedSyncMaxConcurrentModels)
125129
.build();
126130

127131
JSONObject jsonConfigFromFile = new JSONObject()
@@ -132,6 +136,7 @@ public void testDefaultOverriddenFromConfigurationAndObject()
132136

133137
assertEquals(expectedSyncIntervalMs, dataStoreConfiguration.getSyncIntervalMs());
134138
assertEquals(expectedSyncMaxRecords, dataStoreConfiguration.getSyncMaxRecords());
139+
assertEquals(expectedSyncMaxConcurrentModels, dataStoreConfiguration.getSyncMaxConcurrentModels());
135140
assertEquals(DataStoreConfiguration.DEFAULT_SYNC_PAGE_SIZE,
136141
dataStoreConfiguration.getSyncPageSize().longValue());
137142
assertTrue(dataStoreConfiguration.getDoSyncRetry());

0 commit comments

Comments
 (0)