Skip to content

Commit 0b7c4bb

Browse files
authored
Support retrying Datastore operations. (dart-archive/gcloud#168)
* Support retrying Datastore operations. * Expose only maxAttempts * add more errors
1 parent bdd4244 commit 0b7c4bb

File tree

5 files changed

+185
-4
lines changed

5 files changed

+185
-4
lines changed

pkgs/gcloud/CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
1-
## 0.8.10-wip
1+
## 0.8.10
22

33
- Widen the SDK constraint to support Dart 3.0
4+
- Support retrying Datastore operations.
45

56
## 0.8.9
67

pkgs/gcloud/lib/datastore.dart

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,12 @@ library;
1212
import 'dart:async';
1313

1414
import 'package:http/http.dart' as http;
15+
import 'package:retry/retry.dart';
1516

1617
import 'common.dart' show Page;
1718
import 'service_scope.dart' as ss;
1819
import 'src/datastore_impl.dart' show DatastoreImpl;
20+
import 'src/retry_datastore_impl.dart';
1921

2022
const Symbol _datastoreKey = #gcloud.datastore;
2123

@@ -391,6 +393,22 @@ abstract class Datastore {
391393
return DatastoreImpl(client, project);
392394
}
393395

396+
/// Retry Datastore operations where the issue seems to be transient.
397+
///
398+
/// The [delegate] is the configured [Datastore] implementation that will be
399+
/// used.
400+
///
401+
/// The operations will be retried at maximum of [maxAttempts].
402+
factory Datastore.withRetry(
403+
Datastore delegate, {
404+
int? maxAttempts,
405+
}) {
406+
return RetryDatastoreImpl(
407+
delegate,
408+
RetryOptions(maxAttempts: maxAttempts ?? 3),
409+
);
410+
}
411+
394412
/// Allocate integer IDs for the partially populated [keys] given as argument.
395413
///
396414
/// The returned [Key]s will be fully populated with the allocated IDs.
Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
// Copyright (c) 2023, the Dart project authors. Please see the AUTHORS file
2+
// for details. All rights reserved. Use of this source code is governed by a
3+
// BSD-style license that can be found in the LICENSE file.
4+
5+
import 'package:retry/retry.dart';
6+
7+
import '../common.dart';
8+
import '../datastore.dart' as datastore;
9+
10+
/// Datastore implementation which retries most operations
11+
class RetryDatastoreImpl implements datastore.Datastore {
12+
final datastore.Datastore _delegate;
13+
final RetryOptions _retryOptions;
14+
15+
RetryDatastoreImpl(this._delegate, this._retryOptions);
16+
17+
@override
18+
Future<List<datastore.Key>> allocateIds(List<datastore.Key> keys) async {
19+
return await _retryOptions.retry(
20+
() => _delegate.allocateIds(keys),
21+
retryIf: _retryIf,
22+
);
23+
}
24+
25+
@override
26+
Future<datastore.Transaction> beginTransaction({
27+
bool crossEntityGroup = false,
28+
}) async {
29+
return await _retryOptions.retry(
30+
() => _delegate.beginTransaction(crossEntityGroup: crossEntityGroup),
31+
retryIf: _retryIf,
32+
);
33+
}
34+
35+
@override
36+
Future<datastore.CommitResult> commit({
37+
List<datastore.Entity> inserts = const [],
38+
List<datastore.Entity> autoIdInserts = const [],
39+
List<datastore.Key> deletes = const [],
40+
datastore.Transaction? transaction,
41+
}) async {
42+
Future<datastore.CommitResult> fn() async {
43+
if (transaction == null) {
44+
return await _delegate.commit(
45+
inserts: inserts,
46+
autoIdInserts: autoIdInserts,
47+
deletes: deletes,
48+
);
49+
} else {
50+
return await _delegate.commit(
51+
inserts: inserts,
52+
autoIdInserts: autoIdInserts,
53+
deletes: deletes,
54+
transaction: transaction,
55+
);
56+
}
57+
}
58+
59+
final shouldNotRetry = autoIdInserts.isNotEmpty && transaction == null;
60+
if (shouldNotRetry) {
61+
return await fn();
62+
} else {
63+
return await _retryOptions.retry(fn, retryIf: _retryIf);
64+
}
65+
}
66+
67+
@override
68+
Future<List<datastore.Entity?>> lookup(
69+
List<datastore.Key> keys, {
70+
datastore.Transaction? transaction,
71+
}) async {
72+
return await _retryOptions.retry(
73+
() async {
74+
if (transaction == null) {
75+
return await _delegate.lookup(keys);
76+
} else {
77+
return await _delegate.lookup(keys, transaction: transaction);
78+
}
79+
},
80+
retryIf: _retryIf,
81+
);
82+
}
83+
84+
@override
85+
Future<Page<datastore.Entity>> query(
86+
datastore.Query query, {
87+
datastore.Partition? partition,
88+
datastore.Transaction? transaction,
89+
}) async {
90+
Future<Page<datastore.Entity>> fn() async {
91+
if (partition != null && transaction != null) {
92+
return await _delegate.query(
93+
query,
94+
partition: partition,
95+
transaction: transaction,
96+
);
97+
} else if (partition != null) {
98+
return await _delegate.query(query, partition: partition);
99+
} else if (transaction != null) {
100+
return await _delegate.query(
101+
query,
102+
transaction: transaction,
103+
);
104+
} else {
105+
return await _delegate.query(query);
106+
}
107+
}
108+
109+
return await _retryOptions.retry(
110+
() async => _RetryPage(await fn(), _retryOptions),
111+
retryIf: _retryIf,
112+
);
113+
}
114+
115+
@override
116+
Future rollback(datastore.Transaction transaction) async {
117+
return await _retryOptions.retry(
118+
() => _delegate.rollback(transaction),
119+
retryIf: _retryIf,
120+
);
121+
}
122+
}
123+
124+
class _RetryPage<K> implements Page<K> {
125+
final Page<K> _delegate;
126+
final RetryOptions _retryOptions;
127+
128+
_RetryPage(this._delegate, this._retryOptions);
129+
130+
@override
131+
bool get isLast => _delegate.isLast;
132+
133+
@override
134+
List<K> get items => _delegate.items;
135+
136+
@override
137+
Future<Page<K>> next({int? pageSize}) async {
138+
return await _retryOptions.retry(
139+
() async {
140+
if (pageSize == null) {
141+
return await _delegate.next();
142+
} else {
143+
return await _delegate.next(pageSize: pageSize);
144+
}
145+
},
146+
retryIf: _retryIf,
147+
);
148+
}
149+
}
150+
151+
bool _retryIf(Exception e) {
152+
if (e is datastore.TransactionAbortedError ||
153+
e is datastore.NeedIndexError ||
154+
e is datastore.QuotaExceededError ||
155+
e is datastore.PermissionDeniedError) {
156+
return false;
157+
}
158+
return true;
159+
}

pkgs/gcloud/pubspec.yaml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
name: gcloud
2-
version: 0.8.10-wip
2+
version: 0.8.10
33
description: >-
44
High level idiomatic Dart API for Google Cloud Storage, Pub-Sub and Datastore.
55
repository: https://github.com/dart-lang/gcloud
@@ -16,6 +16,7 @@ dependencies:
1616
googleapis: '>=3.0.0 <12.0.0'
1717
http: '>=0.13.5 <2.0.0'
1818
meta: ^1.3.0
19+
retry: ^3.1.1
1920

2021
dev_dependencies:
2122
dart_flutter_team_lints: ^1.0.0

pkgs/gcloud/test/db_all_e2e_test.dart

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ library gcloud.test.db_all_test;
1010
import 'dart:async';
1111
import 'dart:io';
1212

13+
import 'package:gcloud/datastore.dart';
1314
import 'package:gcloud/db.dart' as db;
1415
import 'package:gcloud/src/datastore_impl.dart' as datastore_impl;
1516
import 'package:http/http.dart';
@@ -25,12 +26,13 @@ Future main() async {
2526
var now = DateTime.now().millisecondsSinceEpoch;
2627
var namespace = '${Platform.operatingSystem}$now';
2728

28-
late datastore_impl.DatastoreImpl datastore;
29+
late Datastore datastore;
2930
late db.DatastoreDB datastoreDB;
3031
Client? client;
3132

3233
await withAuthClient(scopes, (String project, httpClient) async {
33-
datastore = datastore_impl.DatastoreImpl(httpClient, project);
34+
datastore =
35+
Datastore.withRetry(datastore_impl.DatastoreImpl(httpClient, project));
3436
datastoreDB = db.DatastoreDB(datastore);
3537
client = httpClient;
3638
});

0 commit comments

Comments
 (0)