Skip to content

Commit ca9a289

Browse files
authored
Merge pull request #22 from vinted/fix/retry-logic
Fix/retry logic
2 parents 4df9abb + c78b2b2 commit ca9a289

File tree

8 files changed

+191
-90
lines changed

8 files changed

+191
-90
lines changed

build.gradle

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,3 +80,29 @@ publishing {
8080
}
8181
}
8282

83+
tasks.withType(Test).configureEach {
84+
doFirst {
85+
jvmArgs = [
86+
'--add-exports=java.base/sun.net.util=ALL-UNNAMED',
87+
'--add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED',
88+
'--add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED',
89+
'--add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED',
90+
'--add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED',
91+
'--add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED',
92+
'--add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED',
93+
'--add-exports=java.security.jgss/sun.security.krb5=ALL-UNNAMED',
94+
'--add-opens=java.base/java.lang=ALL-UNNAMED',
95+
'--add-opens=java.base/java.net=ALL-UNNAMED',
96+
'--add-opens=java.base/java.io=ALL-UNNAMED',
97+
'--add-opens=java.base/java.nio=ALL-UNNAMED',
98+
'--add-opens=java.base/sun.nio.ch=ALL-UNNAMED',
99+
'--add-opens=java.base/java.lang.reflect=ALL-UNNAMED',
100+
'--add-opens=java.base/java.text=ALL-UNNAMED',
101+
'--add-opens=java.base/java.time=ALL-UNNAMED',
102+
'--add-opens=java.base/java.util=ALL-UNNAMED',
103+
'--add-opens=java.base/java.util.concurrent=ALL-UNNAMED',
104+
'--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED',
105+
'--add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED'
106+
]
107+
}
108+
}

src/main/java/com/vinted/flink/bigquery/client/BigQueryJsonClientProvider.java

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.vinted.flink.bigquery.client;
22

33
import com.google.api.gax.core.FixedExecutorProvider;
4+
import com.google.api.gax.retrying.RetrySettings;
45
import com.google.cloud.bigquery.BigQueryOptions;
56
import com.google.cloud.bigquery.TableId;
67
import com.google.cloud.bigquery.storage.v1.*;
@@ -10,6 +11,7 @@
1011
import com.vinted.flink.bigquery.model.config.WriterSettings;
1112
import com.vinted.flink.bigquery.schema.SchemaTransformer;
1213
import com.vinted.flink.bigquery.serializer.RowValueSerializer;
14+
import org.threeten.bp.Duration;
1315

1416
import java.io.IOException;
1517
import java.util.Optional;
@@ -46,13 +48,25 @@ public BigQueryStreamWriter<A> getWriter(String streamName, TableId table, RowVa
4648
var executorProvider = this.writerSettings.getWriterThreads() > 1 ?
4749
FixedExecutorProvider.create(Executors.newScheduledThreadPool(writerSettings.getWriterThreads())) :
4850
BigQueryWriteSettings.defaultExecutorProviderBuilder().build();
49-
var writer = JsonStreamWriter
51+
var writerBuilder = JsonStreamWriter
5052
.newBuilder(streamName, getTableSchema(table), this.getClient())
5153
.setEnableConnectionPool(this.writerSettings.getEnableConnectionPool())
52-
.setExecutorProvider(executorProvider)
53-
.build();
54+
.setExecutorProvider(executorProvider);
55+
56+
if (writerSettings.getRetrySettings() != null) {
57+
var settings = writerSettings.getRetrySettings();
58+
var retrySettings =
59+
RetrySettings.newBuilder()
60+
.setInitialRetryDelay(Duration.ofMillis(settings.getInitialRetryDelay().toMillis()))
61+
.setRetryDelayMultiplier(settings.getRetryDelayMultiplier())
62+
.setMaxAttempts(settings.getMaxRetryAttempts())
63+
.setMaxRetryDelay(Duration.ofMillis(settings.getMaxRetryDelay().toMillis()))
64+
.build();
65+
66+
writerBuilder.setRetrySettings(retrySettings);
67+
}
5468
JsonStreamWriter.setMaxRequestCallbackWaitTime(this.writerSettings.getMaxRequestWaitCallbackTime());
55-
return new com.vinted.flink.bigquery.client.JsonStreamWriter<>(serializer, writer);
69+
return new com.vinted.flink.bigquery.client.JsonStreamWriter<>(serializer, writerBuilder.build());
5670
} catch (Descriptors.DescriptorValidationException | IOException | InterruptedException e) {
5771
throw new RuntimeException(e);
5872
}

src/main/java/com/vinted/flink/bigquery/client/BigQueryProtoClientProvider.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.vinted.flink.bigquery.client;
22

33
import com.google.api.gax.core.FixedExecutorProvider;
4+
import com.google.api.gax.retrying.RetrySettings;
45
import com.google.cloud.bigquery.BigQueryOptions;
56
import com.google.cloud.bigquery.TableId;
67
import com.google.cloud.bigquery.storage.v1.*;
@@ -9,6 +10,7 @@
910
import com.vinted.flink.bigquery.model.config.WriterSettings;
1011
import com.vinted.flink.bigquery.schema.SchemaTransformer;
1112
import com.vinted.flink.bigquery.serializer.RowValueSerializer;
13+
import org.threeten.bp.Duration;
1214

1315
import java.io.IOException;
1416
import java.util.concurrent.Executors;
@@ -45,6 +47,9 @@ public BigQueryStreamWriter<A> getWriter(String streamName, TableId table, RowVa
4547
var executorProvider = this.writerSettings.getWriterThreads() > 1 ?
4648
FixedExecutorProvider.create(Executors.newScheduledThreadPool(writerSettings.getWriterThreads())) :
4749
BigQueryWriteSettings.defaultExecutorProviderBuilder().build();
50+
51+
52+
4853
var streamWriterBuilder = StreamWriter
4954
.newBuilder(streamName, getClient())
5055
.setMaxInflightRequests(this.writerSettings.getMaxInflightRequests())
@@ -56,6 +61,19 @@ public BigQueryStreamWriter<A> getWriter(String streamName, TableId table, RowVa
5661
.setLocation(table.getProject())
5762
.setWriterSchema(protoSchema);
5863

64+
if (writerSettings.getRetrySettings() != null) {
65+
var settings = writerSettings.getRetrySettings();
66+
var retrySettings =
67+
RetrySettings.newBuilder()
68+
.setInitialRetryDelay(Duration.ofMillis(settings.getInitialRetryDelay().toMillis()))
69+
.setRetryDelayMultiplier(settings.getRetryDelayMultiplier())
70+
.setMaxAttempts(settings.getMaxRetryAttempts())
71+
.setMaxRetryDelay(Duration.ofMillis(settings.getMaxRetryDelay().toMillis()))
72+
.build();
73+
74+
streamWriterBuilder.setRetrySettings(retrySettings);
75+
}
76+
5977
StreamWriter.setMaxRequestCallbackWaitTime(this.writerSettings.getMaxRequestWaitCallbackTime());
6078
return new ProtoStreamWriter<>(serializer, streamWriterBuilder.build());
6179
} catch (IOException | Descriptors.DescriptorValidationException e) {
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
package com.vinted.flink.bigquery.model.config;
2+
3+
import com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings;
4+
5+
import java.io.IOException;
6+
import java.io.Serializable;
7+
import java.time.Duration;
8+
9+
public class WriterRetrySettings implements Serializable {
10+
11+
private Duration initialRetryDelay;
12+
private double retryDelayMultiplier;
13+
14+
private int maxRetryAttempts;
15+
16+
private Duration maxRetryDelay;
17+
18+
public Duration getInitialRetryDelay() {
19+
return initialRetryDelay;
20+
}
21+
22+
public void setInitialRetryDelay(Duration initialRetryDelay) {
23+
this.initialRetryDelay = initialRetryDelay;
24+
}
25+
26+
public double getRetryDelayMultiplier() {
27+
return retryDelayMultiplier;
28+
}
29+
30+
public void setRetryDelayMultiplier(double retryDelayMultiplier) {
31+
this.retryDelayMultiplier = retryDelayMultiplier;
32+
}
33+
34+
public int getMaxRetryAttempts() {
35+
return maxRetryAttempts;
36+
}
37+
38+
public void setMaxRetryAttempts(int maxRetryAttempts) {
39+
this.maxRetryAttempts = maxRetryAttempts;
40+
}
41+
42+
public Duration getMaxRetryDelay() {
43+
return maxRetryDelay;
44+
}
45+
46+
public void setMaxRetryDelay(Duration maxRetryDelay) {
47+
this.maxRetryDelay = maxRetryDelay;
48+
}
49+
public static WriterRetrySettingsBuilder newBuilder() {
50+
return new WriterRetrySettingsBuilder();
51+
}
52+
53+
public static final class WriterRetrySettingsBuilder implements Serializable {
54+
private Duration initialRetryDelay = Duration.ofMillis(500);
55+
private double retryDelayMultiplier = 1.1;
56+
57+
private int maxRetryAttempts = 5;
58+
59+
private Duration maxRetryDelay = Duration.ofMinutes(1);
60+
private WriterRetrySettingsBuilder() {
61+
}
62+
63+
public WriterRetrySettingsBuilder withInitialRetryDelay(Duration initialRetryDelay) {
64+
this.initialRetryDelay = initialRetryDelay;
65+
return this;
66+
}
67+
68+
public WriterRetrySettingsBuilder withRetryDelayMultiplier(double retryDelayMultiplier) {
69+
this.retryDelayMultiplier = retryDelayMultiplier;
70+
return this;
71+
}
72+
73+
public WriterRetrySettingsBuilder withMaxRetryAttempts(int maxRetryAttempts) {
74+
this.maxRetryAttempts = maxRetryAttempts;
75+
return this;
76+
}
77+
78+
public WriterRetrySettingsBuilder withMaxRetryDelay(Duration maxRetryDelay) {
79+
this.maxRetryDelay = maxRetryDelay;
80+
return this;
81+
}
82+
83+
public WriterRetrySettings build() {
84+
WriterRetrySettings retrySettings = new WriterRetrySettings();
85+
retrySettings.initialRetryDelay = this.initialRetryDelay;
86+
retrySettings.retryDelayMultiplier = this.retryDelayMultiplier;
87+
retrySettings.maxRetryAttempts = this.maxRetryAttempts;
88+
retrySettings.maxRetryDelay = this.maxRetryDelay;
89+
return retrySettings;
90+
}
91+
}
92+
}
93+
94+

src/main/java/com/vinted/flink/bigquery/model/config/WriterSettings.java

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,14 @@ public class WriterSettings implements Serializable {
1313
private Duration timeout;
1414
private int retryCount;
1515
private Duration retryPause;
16-
private Long maxInflightRequests;
17-
private Long maxInflightBytes;
16+
private long maxInflightRequests;
17+
private long maxInflightBytes;
1818
private Duration maxRetryDuration;
1919

2020
private Duration maxRequestWaitCallbackTime;
21-
private Boolean enableConnectionPool;
21+
private boolean enableConnectionPool;
22+
23+
private WriterRetrySettings retrySettings;
2224

2325
public int getStreamsPerTable() {
2426
return streamsPerTable;
@@ -79,6 +81,14 @@ public void setMaxRequestWaitCallbackTime(Duration maxRequestWaitCallbackTime) {
7981
this.maxRequestWaitCallbackTime = maxRequestWaitCallbackTime;
8082
}
8183

84+
public WriterRetrySettings getRetrySettings() {
85+
return retrySettings;
86+
}
87+
88+
public void setRetrySettings(WriterRetrySettings retrySettings) {
89+
this.retrySettings = retrySettings;
90+
}
91+
8292
public static final class WriterSettingsBuilder implements Serializable {
8393
private int streamsPerTable = 1;
8494
private int writerThreads = 1;
@@ -91,6 +101,8 @@ public static final class WriterSettingsBuilder implements Serializable {
91101
private Duration maxRequestWaitCallbackTime = Duration.ofMinutes(5);
92102
private Boolean enableConnectionPool = false;
93103

104+
private WriterRetrySettings retrySettings = null;
105+
94106
private WriterSettingsBuilder() {
95107
}
96108

@@ -135,7 +147,7 @@ public WriterSettingsBuilder withMaxRetryDuration(Duration maxRetryDuration) {
135147
}
136148

137149
public WriterSettingsBuilder withMaxRequestWaitCallbackTime(Duration maxRequestWaitCallbackTime) {
138-
this.maxRequestWaitCallbackTime = maxRetryDuration;
150+
this.maxRequestWaitCallbackTime = maxRequestWaitCallbackTime;
139151
return this;
140152
}
141153

@@ -144,6 +156,11 @@ public WriterSettingsBuilder withEnableConnectionPool(Boolean enableConnectionPo
144156
return this;
145157
}
146158

159+
public WriterSettingsBuilder withRetrySettings(WriterRetrySettings retrySettings) {
160+
this.retrySettings = retrySettings;
161+
return this;
162+
}
163+
147164
public WriterSettings build() {
148165
WriterSettings writerSettings = new WriterSettings();
149166
writerSettings.writerThreads = this.writerThreads;
@@ -156,6 +173,7 @@ public WriterSettings build() {
156173
writerSettings.retryPause = this.retryPause;
157174
writerSettings.maxRetryDuration = this.maxRetryDuration;
158175
writerSettings.maxRequestWaitCallbackTime = this.maxRequestWaitCallbackTime;
176+
writerSettings.retrySettings = this.retrySettings;
159177
return writerSettings;
160178
}
161179
}

src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultSinkWriter.java

Lines changed: 3 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ protected void writeWithRetry(String traceId, Rows<A> rows, int retryCount) thro
8888
traceId, rows.getStream(), rows.getTable(), rows.getOffset(), rows.getData().size(), retryCount
8989
);
9090
var result = append(traceId, rows);
91-
var callback = new AppendCallBack<>(this, result.writerId, traceId, rows, retryCount);
91+
var callback = new AppendCallBack<>(this, traceId, rows, retryCount);
9292
ApiFutures.addCallback(result.response, callback, appendExecutor);
9393
inflightRequestCount.register();
9494
} catch (AppendException exception) {
@@ -130,14 +130,12 @@ static class AppendCallBack<A> implements ApiFutureCallback<AppendRowsResponse>
130130
private final BigQueryDefaultSinkWriter<A> parent;
131131
private final Rows<A> rows;
132132

133-
private final String writerId;
134133
private final String traceId;
135134

136135
private final int retryCount;
137136

138-
public AppendCallBack(BigQueryDefaultSinkWriter<A> parent, String writerId, String traceId, Rows<A> rows, int retryCount) {
137+
public AppendCallBack(BigQueryDefaultSinkWriter<A> parent, String traceId, Rows<A> rows, int retryCount) {
139138
this.parent = parent;
140-
this.writerId = writerId;
141139
this.traceId = traceId;
142140
this.rows = rows;
143141
this.retryCount = retryCount;
@@ -155,79 +153,8 @@ public void onSuccess(AppendRowsResponse result) {
155153

156154
@Override
157155
public void onFailure(Throwable t) {
158-
var status = Status.fromThrowable(t);
159-
switch (status.getCode()) {
160-
case INTERNAL:
161-
case CANCELLED:
162-
case FAILED_PRECONDITION:
163-
case DEADLINE_EXCEEDED:
164-
doPauseBeforeRetry();
165-
retryWrite(t, retryCount - 1);
166-
break;
167-
case ABORTED:
168-
case UNAVAILABLE: {
169-
this.parent.recreateStreamWriter(traceId, rows.getStream(), writerId, rows.getTable());
170-
retryWrite(t, retryCount - 1);
171-
break;
172-
}
173-
case INVALID_ARGUMENT:
174-
if (t.getMessage().contains("INVALID_ARGUMENT: MessageSize is too large.")) {
175-
Optional.ofNullable(this.parent.metrics.get(rows.getStream())).ifPresent(BigQueryStreamMetrics::incSplitCount);
176-
logger.warn("Trace-id {} MessageSize is too large. Splitting batch", traceId);
177-
var data = rows.getData();
178-
var first = data.subList(0, data.size() / 2);
179-
var second = data.subList(data.size() / 2, data.size());
180-
try {
181-
this.parent.writeWithRetry(traceId, rows.updateBatch(first, rows.getOffset()), retryCount - 1);
182-
this.parent.writeWithRetry(traceId, rows.updateBatch(second, rows.getOffset() + first.size()), retryCount - 1);
183-
} catch (Throwable e) {
184-
this.parent.appendAsyncException = new AppendException(traceId, rows, retryCount, t);
185-
}
186-
} else {
187-
logger.error("Trace-id {} Received error {} with status {}", traceId, t.getMessage(), status.getCode());
188-
this.parent.appendAsyncException = new AppendException(traceId, rows, retryCount, t);
189-
}
190-
break;
191-
case UNKNOWN:
192-
if (t instanceof Exceptions.MaximumRequestCallbackWaitTimeExceededException || t.getCause() instanceof Exceptions.MaximumRequestCallbackWaitTimeExceededException) {
193-
logger.info("Trace-id {} request timed out: {}", traceId, t.getMessage());
194-
Optional.ofNullable(this.parent.metrics.get(rows.getStream()))
195-
.ifPresent(BigQueryStreamMetrics::incrementTimeoutCount);
196-
this.parent.recreateStreamWriter(traceId, rows.getStream(), writerId, rows.getTable());
197-
retryWrite(t, retryCount - 1);
198-
} else {
199-
logger.error("Trace-id {} Received error {} with status {}", traceId, t.getMessage(), status.getCode());
200-
this.parent.appendAsyncException = new AppendException(traceId, rows, retryCount, t);
201-
}
202-
break;
203-
default:
204-
logger.error("Trace-id {} Received error {} with status {}", traceId, t.getMessage(), status.getCode());
205-
this.parent.appendAsyncException = new AppendException(traceId, rows, retryCount, t);
206-
}
156+
this.parent.appendAsyncException = new AppendException(traceId, rows, retryCount, t);
207157
this.parent.inflightRequestCount.arriveAndDeregister();
208158
}
209-
210-
private void retryWrite(Throwable t, int newRetryCount) {
211-
var status = Status.fromThrowable(t);
212-
try {
213-
if (newRetryCount > 0) {
214-
logger.warn("Trace-id {} Recoverable error {}. Retrying {} ...", traceId, status.getCode(), retryCount);
215-
this.parent.writeWithRetry(traceId, rows, newRetryCount);
216-
} else {
217-
logger.error("Trace-id {} Recoverable error {}. No more retries left", traceId, status.getCode(), t);
218-
this.parent.appendAsyncException = new AppendException(traceId, rows, newRetryCount, t);
219-
}
220-
} catch (Throwable e) {
221-
this.parent.appendAsyncException = new AppendException(traceId, rows, newRetryCount, e);
222-
}
223-
}
224-
225-
private void doPauseBeforeRetry() {
226-
try {
227-
Thread.sleep(parent.clientProvider.writeSettings().getRetryPause().toMillis());
228-
} catch (InterruptedException e) {
229-
throw new RuntimeException(e);
230-
}
231-
}
232159
}
233160
}

0 commit comments

Comments
 (0)