Skip to content

[dvc][server] Add exponential backoff to other HelixUtils methods with retries #1784

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public VeniceClusterConfig(VeniceProperties clusterProps, Map<String, Map<String
this.kafkaFetchQuotaUnorderedRecordPerSecond =
clusterProps.getLong(KAFKA_FETCH_QUOTA_UNORDERED_RECORDS_PER_SECOND, -1);

this.refreshAttemptsForZkReconnect = clusterProps.getInt(REFRESH_ATTEMPTS_FOR_ZK_RECONNECT, 3);
this.refreshAttemptsForZkReconnect = clusterProps.getInt(REFRESH_ATTEMPTS_FOR_ZK_RECONNECT, 9);
this.refreshIntervalForZkReconnectInMs =
clusterProps.getLong(REFRESH_INTERVAL_FOR_ZK_RECONNECT_MS, TimeUnit.SECONDS.toMillis(10));
this.kafkaReadCycleDelayMs = clusterProps.getLong(KAFKA_READ_CYCLE_DELAY_MS, 1000);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.linkedin.davinci.helix;

import static com.linkedin.venice.ConfigKeys.REFRESH_ATTEMPTS_FOR_ZK_RECONNECT;

import com.linkedin.davinci.blobtransfer.BlobTransferManager;
import com.linkedin.davinci.config.VeniceConfigLoader;
import com.linkedin.davinci.config.VeniceServerConfig;
Expand Down Expand Up @@ -69,9 +71,6 @@ public class HelixParticipationService extends AbstractVeniceService
implements StatusMessageHandler<KillOfflinePushMessage> {
private static final Logger LOGGER = LogManager.getLogger(HelixParticipationService.class);

private static final int MAX_RETRY = 30;
private static final int RETRY_INTERVAL_SEC = 1;

private final Instance instance;
private final String clusterName;
private final String participantName;
Expand Down Expand Up @@ -305,7 +304,10 @@ private void checkBeforeJoinInCluster() {
HelixAdmin admin = new ZKHelixAdmin(zkAddress);
try {
// Check whether the cluster is ready or not at first to prevent zk no node exception.
HelixUtils.checkClusterSetup(admin, clusterName, MAX_RETRY, RETRY_INTERVAL_SEC);
HelixUtils.checkClusterSetup(
admin,
clusterName,
veniceConfigLoader.getCombinedProperties().getInt(REFRESH_ATTEMPTS_FOR_ZK_RECONNECT, 9));
List<String> instances = admin.getInstancesInCluster(clusterName);
if (instances.contains(instance.getNodeId())) {
LOGGER.info("{} is not a new node to cluster: {}, skip the cleaning up.", instance.getNodeId(), clusterName);
Expand Down Expand Up @@ -365,9 +367,8 @@ private void asyncStart() {
clusterName,
zkClient,
new HelixAdapterSerializer(),
veniceConfigLoader.getVeniceClusterConfig().getRefreshAttemptsForZkReconnect(),
veniceConfigLoader.getVeniceClusterConfig().getRefreshIntervalForZkReconnectInMs(),
veniceServerConfig.getRegionName());
veniceServerConfig.getRegionName(),
veniceConfigLoader.getCombinedProperties().getInt(REFRESH_ATTEMPTS_FOR_ZK_RECONNECT, 9));

/**
* The accessor can only get created successfully after helix manager is created.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,8 @@ private void initServerStoreAndSchemaRepository() {
HelixReadOnlyZKSharedSystemStoreRepository readOnlyZKSharedSystemStoreRepository =
new HelixReadOnlyZKSharedSystemStoreRepository(zkClient, adapter, systemSchemaClusterName);

HelixReadOnlyStoreRepository readOnlyStoreRepository = new HelixReadOnlyStoreRepository(
zkClient,
adapter,
clusterName,
clusterConfig.getRefreshAttemptsForZkReconnect(),
clusterConfig.getRefreshIntervalForZkReconnectInMs());
HelixReadOnlyStoreRepository readOnlyStoreRepository =
new HelixReadOnlyStoreRepository(zkClient, adapter, clusterName);

storeRepo = new HelixReadOnlyStoreRepositoryAdapter(
readOnlyZKSharedSystemStoreRepository,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,26 @@
*/
public class CachedResourceZkStateListener implements IZkStateListener {
private final Logger logger;
public static final int DEFAULT_RETRY_LOAD_ATTEMPTS = 1;
public static final long DEFAULT_RETRY_LOAD_INTERVAL_IN_MS = TimeUnit.SECONDS.toMillis(10);
public static final int DEFAULT_REFRESH_ATTEMPTS_FOR_ZK_RECONNECT = 1;
public static final long DEFAULT_REFRESH_INTERVAL_FOR_ZK_RECONNECT_IS_MS = TimeUnit.SECONDS.toMillis(10);
private final VeniceResource resource;
private final int retryLoadAttempts;
private final long retryLoadIntervalInMs;
private final int refreshAttemptsForZkReconnect;
private final long refreshIntervalForZkReconnectInMs;
private volatile boolean disconnected = false;

public CachedResourceZkStateListener(VeniceResource resource) {
// By default, we only retry once after connection is reconnected.
this(resource, DEFAULT_RETRY_LOAD_ATTEMPTS, DEFAULT_RETRY_LOAD_INTERVAL_IN_MS);
this(resource, DEFAULT_REFRESH_ATTEMPTS_FOR_ZK_RECONNECT, DEFAULT_REFRESH_INTERVAL_FOR_ZK_RECONNECT_IS_MS);
}

public CachedResourceZkStateListener(VeniceResource resource, int retryLoadAttempts, long retryLoadIntervalInMs) {
public CachedResourceZkStateListener(
VeniceResource resource,
int refreshAttemptsForZkReconnect,
long refreshIntervalForZkReconnectInMs) {
this.resource = resource;
this.logger = LogManager.getLogger(this.getClass().getSimpleName() + " [" + getResourceName() + "]");
this.retryLoadAttempts = retryLoadAttempts;
this.retryLoadIntervalInMs = retryLoadIntervalInMs;
this.refreshAttemptsForZkReconnect = refreshAttemptsForZkReconnect;
this.refreshIntervalForZkReconnectInMs = refreshIntervalForZkReconnectInMs;
}

/**
Expand All @@ -59,25 +62,26 @@ public void handleStateChanged(Watcher.Event.KeeperState state) throws Exception
// there is only one refresh operation on the fly.
// As we met the issue that ZK could return partial result just after connection is reconnected.
// In order to reduce the possibility that we get not-up-to-date data, we keep loading data for
// retryLoadAttempts with retryLoadIntervalInMs between each two loading.
// Sleep a random time(no more than retryLoadIntervalInMs) to avoid thunderstorm issue that all nodes are
// refreshAttemptsForZkReconnect with refreshIntervalForZkReconnectInMs between each two loading.
// Sleep a random time(no more than refreshIntervalForZkReconnectInMs) to avoid thunderstorm issue that all
// nodes are
// trying to refresh resource at the same time if there is a network issue in that DC.
Utils.sleep((long) (Math.random() * retryLoadIntervalInMs));
Utils.sleep((long) (Math.random() * refreshIntervalForZkReconnectInMs));
int attempt = 1;
while (attempt <= retryLoadAttempts) {
while (attempt <= refreshAttemptsForZkReconnect) {
logger.info(
"Attempt #{} of {}: Refresh resource after connection is reconnected.",
attempt,
retryLoadAttempts);
refreshAttemptsForZkReconnect);
try {
resource.refresh();
logger.info("Attempt #{} of {}: Refresh completed.", attempt, retryLoadAttempts);
logger.info("Attempt #{} of {}: Refresh completed.", attempt, refreshAttemptsForZkReconnect);
return;
} catch (Exception e) {
logger.error("Can not refresh resource correctly after client is reconnected", e);
if (attempt < retryLoadAttempts) {
logger.info("Will retry after {} ms", retryLoadIntervalInMs);
Utils.sleep(retryLoadIntervalInMs);
if (attempt < refreshAttemptsForZkReconnect) {
logger.info("Will retry after {} ms", refreshIntervalForZkReconnectInMs);
Utils.sleep(refreshIntervalForZkReconnectInMs);
}
attempt++;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,7 @@ public HelixReadOnlySchemaRepository(
long refreshIntervalForZkReconnectInMs) {
this.storeRepository = storeRepository;
this.zkClient = zkClient;
this.accessor = new HelixSchemaAccessor(
zkClient,
adapter,
clusterName,
refreshAttemptsForZkReconnect,
refreshIntervalForZkReconnectInMs);
this.accessor = new HelixSchemaAccessor(zkClient, adapter, clusterName, refreshAttemptsForZkReconnect);

storeRepository.registerStoreDataChangedListener(this);
this.zkStateListener =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ public class HelixReadOnlyStoreRepository extends CachedReadOnlyStoreRepository
public HelixReadOnlyStoreRepository(
ZkClient zkClient,
HelixAdapterSerializer compositeSerializer,
String clusterName,
int refreshAttemptsForZkReconnect,
long refreshIntervalForZkReconnectInMs) {
String clusterName) {
/**
* HelixReadOnlyStoreRepository is used in router, server, fast-client, da-vinci and system store.
* Its centralized locking should NOT be shared with other classes. Create a new instance.
Expand Down Expand Up @@ -118,15 +116,12 @@ protected void onRepositoryChanged(Collection<String> newZkStoreNames) {

private final CachedResourceZkStateListener zkStateListener = new CachedResourceZkStateListener(this);

private final IZkChildListener zkStoreRepositoryListener = new IZkChildListener() {
@Override
public void handleChildChange(String path, List<String> children) {
if (!path.equals(clusterStoreRepositoryPath)) {
LOGGER.warn("Notification path mismatch, path={}, expected={}.", path, clusterStoreRepositoryPath);
return;
}
onRepositoryChanged(children);
private final IZkChildListener zkStoreRepositoryListener = (path, children) -> {
if (!path.equals(clusterStoreRepositoryPath)) {
LOGGER.warn("Notification path mismatch, path={}, expected={}.", path, clusterStoreRepositoryPath);
return;
}
onRepositoryChanged(children);
Comment on lines -121 to +124
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did we change this to lambda function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Forgot to mention it in my comment:

In the docs, I found no mentions in the style guide regarding lambda functions, but it was suggested to me as a refactoring by Intellij, and thought I could suggest it as a change. Although, in retrospect, I should've proposed this in another PR as this outside the scope of this current PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't have a style guide against lambda functions. I just wanted to confirm what prompted this change, and to verify if the functionality is the same.

};

private final IZkDataListener zkStoreListener = new IZkDataListener() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public HelixReadOnlyZKSharedSystemStoreRepository(
ZkClient zkClient,
HelixAdapterSerializer compositeSerializer,
String systemStoreClusterName) {
super(zkClient, compositeSerializer, systemStoreClusterName, 0, 0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What were these 0 values for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For refreshAttemptsForZkReconnect and refreshIntervalForZkReconnectInMs, but weren't used in the HelixReadOnlyStoreRepository's constructor:

public HelixReadOnlyStoreRepository(
      ZkClient zkClient,
      HelixAdapterSerializer compositeSerializer,
      String clusterName,
      int refreshAttemptsForZkReconnect,
      long refreshIntervalForZkReconnectInMs) {
    /**
     * HelixReadOnlyStoreRepository is used in router, server, fast-client, da-vinci and system store.
     * Its centralized locking should NOT be shared with other classes. Create a new instance.
     */
    super(zkClient, clusterName, compositeSerializer, new ClusterLockManager(clusterName));
  }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. I wonder why it was setup like this in the first place.

super(zkClient, compositeSerializer, systemStoreClusterName);
// Initialize the necessary zk shared system stores
for (VeniceSystemStoreType type: VeniceSystemStoreType.values()) {
if (type.isNewMedataRepositoryAdopted()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import com.linkedin.venice.utils.HelixUtils;
import com.linkedin.venice.utils.PathResourceRegistry;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.helix.AccessOption;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.zookeeper.impl.client.ZkClient;
Expand All @@ -19,9 +18,6 @@
public class HelixSchemaAccessor {
private static final Logger logger = LogManager.getLogger(HelixSchemaAccessor.class);

private static final int DEFAULT_ZK_REFRESH_ATTEMPTS = 3;
private static final long DEFAULT_ZK_REFRESH_INTERVAL = TimeUnit.SECONDS.toMillis(10);

// Key schema path name
private static final String KEY_SCHEMA_PATH = "key-schema";
// Value schema path name
Expand All @@ -45,22 +41,20 @@ public class HelixSchemaAccessor {
private final String clusterName;

private final int refreshAttemptsForZkReconnect;
private final long refreshIntervalForZkReconnectInMs;

public HelixSchemaAccessor(ZkClient zkClient, HelixAdapterSerializer helixAdapterSerializer, String clusterName) {
this(zkClient, helixAdapterSerializer, clusterName, DEFAULT_ZK_REFRESH_ATTEMPTS, DEFAULT_ZK_REFRESH_INTERVAL);
// TODO: refactor hard-coded refreshAttemptsForZkReconnect
this(zkClient, helixAdapterSerializer, clusterName, 9);
}

public HelixSchemaAccessor(
ZkClient zkClient,
HelixAdapterSerializer helixAdapterSerializer,
String clusterName,
int refreshAttemptsForZkReconnect,
long refreshIntervalForZkReconnectInMs) {
int refreshAttemptsForZkReconnect) {
this.clusterName = clusterName;

this.refreshAttemptsForZkReconnect = refreshAttemptsForZkReconnect;
this.refreshIntervalForZkReconnectInMs = refreshIntervalForZkReconnectInMs;

registerSerializerForSchema(zkClient, helixAdapterSerializer);
schemaAccessor = new ZkBaseDataAccessor<>(zkClient);
Expand Down Expand Up @@ -97,11 +91,8 @@ public SchemaEntry getValueSchema(String storeName, String id) {
}

public List<SchemaEntry> getAllValueSchemas(String storeName) {
return HelixUtils.getChildren(
schemaAccessor,
getValueSchemaParentPath(storeName).toString(),
refreshAttemptsForZkReconnect,
refreshIntervalForZkReconnectInMs);
return HelixUtils
.getChildren(schemaAccessor, getValueSchemaParentPath(storeName).toString(), refreshAttemptsForZkReconnect);
}

public DerivedSchemaEntry getDerivedSchema(String storeName, String derivedSchemaIdPair) {
Expand All @@ -113,8 +104,7 @@ public List<DerivedSchemaEntry> getAllDerivedSchemas(String storeName) {
return HelixUtils.getChildren(
derivedSchemaAccessor,
getDerivedSchemaParentPath(storeName).toString(),
refreshAttemptsForZkReconnect,
refreshIntervalForZkReconnectInMs);
refreshAttemptsForZkReconnect);
}

public void createKeySchema(String storeName, SchemaEntry schemaEntry) {
Expand Down Expand Up @@ -238,8 +228,7 @@ public List<RmdSchemaEntry> getAllReplicationMetadataSchemas(String storeName) {
return HelixUtils.getChildren(
replicationMetadataSchemaAccessor,
getReplicationMetadataSchemaParentPath(storeName).toString(),
refreshAttemptsForZkReconnect,
refreshIntervalForZkReconnectInMs);
refreshAttemptsForZkReconnect);
}

public void addReplicationMetadataSchema(String storeName, RmdSchemaEntry rmdSchemaEntry) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public SubscriptionBasedStoreRepository(
ZkClient zkClient,
HelixAdapterSerializer compositeSerializer,
String clusterName) {
super(zkClient, compositeSerializer, clusterName, 0, 0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same reason

super(zkClient, compositeSerializer, clusterName);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.venice.helix;

import static com.linkedin.venice.ConfigKeys.REFRESH_ATTEMPTS_FOR_ZK_RECONNECT;
import static com.linkedin.venice.zk.VeniceZkPaths.OFFLINE_PUSHES;

import com.linkedin.venice.exceptions.VeniceException;
Expand All @@ -14,12 +15,12 @@
import com.linkedin.venice.utils.HelixUtils;
import com.linkedin.venice.utils.LogContext;
import com.linkedin.venice.utils.PathResourceRegistry;
import com.linkedin.venice.utils.VeniceProperties;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.helix.AccessOption;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.zookeeper.impl.client.ZkClient;
Expand All @@ -46,8 +47,6 @@
*/
public class VeniceOfflinePushMonitorAccessor implements OfflinePushAccessor {
public static final String OFFLINE_PUSH_SUB_PATH = OFFLINE_PUSHES;
private static final int DEFAULT_ZK_REFRESH_ATTEMPTS = 3;
private static final long DEFAULT_ZK_REFRESH_INTERVAL = TimeUnit.SECONDS.toMillis(10);

private static final Logger LOGGER = LogManager.getLogger(VeniceOfflinePushMonitorAccessor.class);
private final String clusterName;
Expand All @@ -68,23 +67,20 @@ public class VeniceOfflinePushMonitorAccessor implements OfflinePushAccessor {

private final int refreshAttemptsForZkReconnect;

private final long refreshIntervalForZkReconnectInMs;

public VeniceOfflinePushMonitorAccessor(
String clusterName,
ZkClient zkClient,
HelixAdapterSerializer adapter,
LogContext logContext) {
this(clusterName, zkClient, adapter, DEFAULT_ZK_REFRESH_ATTEMPTS, DEFAULT_ZK_REFRESH_INTERVAL, logContext);
this(clusterName, zkClient, adapter, logContext, 9);
}

public VeniceOfflinePushMonitorAccessor(
String clusterName,
ZkClient zkClient,
HelixAdapterSerializer adapter,
int refreshAttemptsForZkReconnect,
long refreshIntervalForZkReconnectInMs,
Object logContext) {
Object logContext,
int refreshAttemptsForZkReconnect) {
this.clusterName = clusterName;
this.offlinePushStatusParentPath = getOfflinePushStatuesParentPath();
this.zkClient = zkClient;
Expand All @@ -95,7 +91,6 @@ public VeniceOfflinePushMonitorAccessor(
this.listenerManager = new ListenerManager<>(logContext);
this.partitionStatusZkListener = new PartitionStatusZkListener(logContext);
this.refreshAttemptsForZkReconnect = refreshAttemptsForZkReconnect;
this.refreshIntervalForZkReconnectInMs = refreshIntervalForZkReconnectInMs;
}

/**
Expand All @@ -105,16 +100,16 @@ public VeniceOfflinePushMonitorAccessor(
String clusterName,
ZkBaseDataAccessor<OfflinePushStatus> offlinePushStatusAccessor,
ZkBaseDataAccessor<PartitionStatus> partitionStatusAccessor,
LogContext logContext) {
LogContext logContext,
VeniceProperties props) {
this.clusterName = clusterName;
this.offlinePushStatusAccessor = offlinePushStatusAccessor;
this.partitionStatusAccessor = partitionStatusAccessor;
this.offlinePushStatusParentPath = getOfflinePushStatuesParentPath();
this.zkClient = null;
this.listenerManager = new ListenerManager<>(logContext);
this.partitionStatusZkListener = new PartitionStatusZkListener(logContext);
this.refreshAttemptsForZkReconnect = DEFAULT_ZK_REFRESH_ATTEMPTS;
this.refreshIntervalForZkReconnectInMs = DEFAULT_ZK_REFRESH_INTERVAL;
this.refreshAttemptsForZkReconnect = props.getInt(REFRESH_ATTEMPTS_FOR_ZK_RECONNECT, 9);
}

private void registerSerializers(HelixAdapterSerializer adapter) {
Expand All @@ -127,11 +122,8 @@ private void registerSerializers(HelixAdapterSerializer adapter) {
@Override
public List<OfflinePushStatus> loadOfflinePushStatusesAndPartitionStatuses() {
LOGGER.info("Start loading all offline pushes statuses from ZK in cluster: {}.", clusterName);
List<OfflinePushStatus> offlinePushStatuses = HelixUtils.getChildren(
offlinePushStatusAccessor,
offlinePushStatusParentPath,
refreshAttemptsForZkReconnect,
refreshIntervalForZkReconnectInMs);
List<OfflinePushStatus> offlinePushStatuses =
HelixUtils.getChildren(offlinePushStatusAccessor, offlinePushStatusParentPath, refreshAttemptsForZkReconnect);
Iterator<OfflinePushStatus> iterator = offlinePushStatuses.iterator();
while (iterator.hasNext()) {
OfflinePushStatus pushStatus = iterator.next();
Expand Down Expand Up @@ -414,11 +406,8 @@ protected PartitionStatus getPartitionStatus(String topic, int partitionId) {
*/
protected List<PartitionStatus> getPartitionStatuses(String topic, int partitionCount) {
LOGGER.debug("Start reading partition status from ZK for topic: {} in cluster: {}.", topic, clusterName);
List<PartitionStatus> zkResult = HelixUtils.getChildren(
partitionStatusAccessor,
getOfflinePushStatusPath(topic),
refreshAttemptsForZkReconnect,
refreshIntervalForZkReconnectInMs);
List<PartitionStatus> zkResult =
HelixUtils.getChildren(partitionStatusAccessor, getOfflinePushStatusPath(topic), refreshAttemptsForZkReconnect);
LOGGER.debug("Read {} partition status from ZK for topic: {} in cluster: {}.", zkResult.size(), topic, clusterName);

if (zkResult.isEmpty()) {
Expand Down
Loading
Loading