Skip to content

Commit 4064af8

Browse files
authored
fix: retry idle timeout (#4209)
SeqFileWriter#write() which write to sequence file buffer could be slow. And the latency depends on the size of the data. Watchdog could throw idle timeout exceptions and this shouldn't bubble up to the pipeline. Retrying idle timeout exception.
1 parent 5b27310 commit 4064af8

File tree

6 files changed

+380
-1
lines changed

6 files changed

+380
-1
lines changed

bigtable-client-core-parent/bigtable-hbase/src/main/java/com/google/cloud/bigtable/hbase/BigtableOptionsFactory.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import com.google.api.core.BetaApi;
1919
import com.google.api.core.InternalExtensionOnly;
20+
import com.google.common.annotations.VisibleForTesting;
2021

2122
/**
2223
* Define {@link org.apache.hadoop.conf.Configuration} names for setting {@link
@@ -332,4 +333,8 @@ public class BigtableOptionsFactory {
332333
*/
333334
public static final String BIGTABLE_ENABLE_BULK_MUTATION_FLOW_CONTROL =
334335
"google.bigtable.enable.bulk.mutation.flow.control";
336+
337+
/** Override idle timeout, for testing only. */
338+
@VisibleForTesting
339+
public static final String BIGTABLE_TEST_IDLE_TIMEOUT_MS = "google.bigtable.idle.timeout.ms";
335340
}

bigtable-client-core-parent/bigtable-hbase/src/main/java/com/google/cloud/bigtable/hbase/wrappers/veneer/BigtableHBaseVeneerSettings.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import static com.google.cloud.bigtable.hbase.BigtableOptionsFactory.BIGTABLE_SERVICE_ACCOUNT_JSON_KEYFILE_LOCATION_KEY;
4444
import static com.google.cloud.bigtable.hbase.BigtableOptionsFactory.BIGTABLE_SERVICE_ACCOUNT_JSON_VALUE_KEY;
4545
import static com.google.cloud.bigtable.hbase.BigtableOptionsFactory.BIGTABLE_SERVICE_ACCOUNT_P12_KEYFILE_LOCATION_KEY;
46+
import static com.google.cloud.bigtable.hbase.BigtableOptionsFactory.BIGTABLE_TEST_IDLE_TIMEOUT_MS;
4647
import static com.google.cloud.bigtable.hbase.BigtableOptionsFactory.BIGTABLE_USE_BATCH;
4748
import static com.google.cloud.bigtable.hbase.BigtableOptionsFactory.BIGTABLE_USE_CACHED_DATA_CHANNEL_POOL;
4849
import static com.google.cloud.bigtable.hbase.BigtableOptionsFactory.BIGTABLE_USE_PLAINTEXT_NEGOTIATION;
@@ -728,6 +729,11 @@ private void configureReadRowsSettings(
728729
.retrySettings()
729730
.setTotalTimeout(operationTimeouts.getOperationTimeout().get());
730731
}
732+
733+
String idleTimeout = configuration.get(BIGTABLE_TEST_IDLE_TIMEOUT_MS);
734+
if (idleTimeout != null) {
735+
readRowsSettings.setIdleTimeout(Duration.ofMillis(Long.parseLong(idleTimeout)));
736+
}
731737
}
732738

733739
private void configureRetryableCallSettings(

bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/TemplateUtils.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,13 +87,15 @@ public static CloudBigtableScanConfiguration buildExportConfig(ExportOptions opt
8787
.withAppProfileId(options.getBigtableAppProfileId())
8888
.withConfiguration(
8989
BigtableOptionsFactory.CUSTOM_USER_AGENT_KEY, "SequenceFileExportJob")
90+
.withConfiguration(
91+
CloudBigtableIO.Reader.RETRY_IDLE_TIMEOUT,
92+
String.valueOf(options.getRetryIdleTimeout()))
9093
.withScan(
9194
new ScanValueProvider(
9295
options.getBigtableStartRow(),
9396
options.getBigtableStopRow(),
9497
options.getBigtableMaxVersions(),
9598
options.getBigtableFilter()));
96-
9799
return configBuilder.build();
98100
}
99101
}

bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/sequencefiles/ExportJob.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,13 @@ public interface ExportOptions extends GcpOptions {
168168

169169
@SuppressWarnings("unused")
170170
void setWait(boolean wait);
171+
172+
@Description("Get if idle timeout is retried.")
173+
@Default.Boolean(true)
174+
boolean getRetryIdleTimeout();
175+
176+
@SuppressWarnings("unused")
177+
void setRetryIdleTimeout(boolean retryIdleTimeout);
171178
}
172179

173180
public static void main(String[] args) {

bigtable-dataflow-parent/bigtable-hbase-beam/src/main/java/com/google/cloud/bigtable/beam/CloudBigtableIO.java

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import com.google.bigtable.repackaged.com.google.api.core.InternalApi;
2121
import com.google.bigtable.repackaged.com.google.api.core.InternalExtensionOnly;
22+
import com.google.bigtable.repackaged.com.google.api.gax.rpc.WatchdogTimeoutException;
2223
import com.google.bigtable.repackaged.com.google.bigtable.v2.ReadRowsRequest;
2324
import com.google.bigtable.repackaged.com.google.cloud.bigtable.data.v2.models.KeyOffset;
2425
import com.google.bigtable.repackaged.com.google.common.annotations.VisibleForTesting;
@@ -32,11 +33,13 @@
3233
import java.io.ObjectOutputStream;
3334
import java.io.Serializable;
3435
import java.util.ArrayList;
36+
import java.util.Arrays;
3537
import java.util.Collections;
3638
import java.util.HashMap;
3739
import java.util.List;
3840
import java.util.Map;
3941
import java.util.NoSuchElementException;
42+
import java.util.concurrent.atomic.AtomicInteger;
4043
import java.util.concurrent.atomic.AtomicLong;
4144
import org.apache.beam.sdk.coders.CannotProvideCoderException;
4245
import org.apache.beam.sdk.coders.Coder;
@@ -647,6 +650,8 @@ Object readResolve() {
647650
/** Reads rows for a specific {@link Table}, usually filtered by a {@link Scan}. */
648651
@VisibleForTesting
649652
static class Reader extends BoundedReader<Result> {
653+
static final String RETRY_IDLE_TIMEOUT = "google.cloud.bigtable.retry.idle.timeout";
654+
650655
private static final Logger READER_LOG = LoggerFactory.getLogger(Reader.class);
651656

652657
private CloudBigtableIO.AbstractSource source;
@@ -657,6 +662,9 @@ static class Reader extends BoundedReader<Result> {
657662
protected long workStart;
658663
private final AtomicLong rowsRead = new AtomicLong();
659664
private final ByteKeyRangeTracker rangeTracker;
665+
private transient Result lastScannedRow;
666+
667+
private final AtomicInteger attempt = new AtomicInteger(3);
660668

661669
@VisibleForTesting
662670
Reader(CloudBigtableIO.AbstractSource source) {
@@ -690,7 +698,37 @@ void initializeScanner() throws IOException {
690698
/** Calls {@link ResultScanner#next()}. */
691699
@Override
692700
public boolean advance() throws IOException {
701+
try {
702+
boolean hasMore = tryAdvance();
703+
// reset attempt after a success read
704+
attempt.set(3);
705+
return hasMore;
706+
} catch (Throwable e) {
707+
// if retry idle timeout is disabled, throw the exception
708+
if (!source.getConfiguration().toHBaseConfig().getBoolean(RETRY_IDLE_TIMEOUT, true)) {
709+
throw e;
710+
}
711+
// Exception is not idle timeout, throw it
712+
Throwable exception = findCause(e, WatchdogTimeoutException.class);
713+
if (exception == null) {
714+
throw e;
715+
}
716+
if (exception.getMessage() == null || !exception.getMessage().contains("idle")) {
717+
throw e;
718+
}
719+
// Run out ot retry attempt, throw the exception
720+
if (attempt.decrementAndGet() <= 0) {
721+
throw e;
722+
}
723+
READER_LOG.warn("got idle timeout exception, will try to reset the scanner and retry", e);
724+
resetScanner();
725+
return tryAdvance();
726+
}
727+
}
728+
729+
private boolean tryAdvance() throws IOException {
693730
Result row = scanner.next();
731+
lastScannedRow = row;
694732
if (row != null && rangeTracker.tryReturnRecordAt(true, ByteKey.copyFrom(row.getRow()))) {
695733
current = row;
696734
rowsRead.addAndGet(1l);
@@ -702,6 +740,40 @@ public boolean advance() throws IOException {
702740
}
703741
}
704742

743+
private void resetScanner() throws IOException {
744+
CloudBigtableScanConfiguration scanConfiguration = source.getConfiguration();
745+
Scan scan;
746+
if (lastScannedRow != null) {
747+
byte[] rowKey = lastScannedRow.getRow();
748+
// ScanConfiguration always gets start key and end key from the RowRange, and it expects
749+
// start key to be inclusive and end key to be exclusive.
750+
byte[] newStartKey = Arrays.copyOf(rowKey, rowKey.length + 1);
751+
scan =
752+
scanConfiguration
753+
.toBuilder()
754+
.withKeys(newStartKey, scanConfiguration.getStopRow())
755+
.build()
756+
.getScanValueProvider()
757+
.get();
758+
} else {
759+
scan = scanConfiguration.getScanValueProvider().get();
760+
READER_LOG.info("last scanned row key is null, haven't read any row yet");
761+
}
762+
763+
scanner =
764+
connection
765+
.getTable(TableName.valueOf(source.getConfiguration().getTableId()))
766+
.getScanner(scan);
767+
}
768+
769+
static Throwable findCause(Throwable e, Class<? extends Throwable> cls) {
770+
Throwable throwable = e;
771+
while (throwable != null && !cls.isAssignableFrom(e.getClass())) {
772+
throwable = throwable.getCause();
773+
}
774+
return throwable;
775+
}
776+
705777
@Override
706778
public final Double getFractionConsumed() {
707779
if (rangeTracker.isDone()) {

0 commit comments

Comments
 (0)