Skip to content

[dvc][server] Add exponential backoff to other HelixUtils methods with retries #1784

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public VeniceClusterConfig(VeniceProperties clusterProps, Map<String, Map<String
this.kafkaFetchQuotaUnorderedRecordPerSecond =
clusterProps.getLong(KAFKA_FETCH_QUOTA_UNORDERED_RECORDS_PER_SECOND, -1);

this.refreshAttemptsForZkReconnect = clusterProps.getInt(REFRESH_ATTEMPTS_FOR_ZK_RECONNECT, 3);
this.refreshAttemptsForZkReconnect = clusterProps.getInt(REFRESH_ATTEMPTS_FOR_ZK_RECONNECT, 9);
this.refreshIntervalForZkReconnectInMs =
clusterProps.getLong(REFRESH_INTERVAL_FOR_ZK_RECONNECT_MS, TimeUnit.SECONDS.toMillis(10));
this.kafkaReadCycleDelayMs = clusterProps.getLong(KAFKA_READ_CYCLE_DELAY_MS, 1000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,6 @@ public class HelixParticipationService extends AbstractVeniceService
implements StatusMessageHandler<KillOfflinePushMessage> {
private static final Logger LOGGER = LogManager.getLogger(HelixParticipationService.class);

private static final int MAX_RETRY = 30;
private static final int RETRY_INTERVAL_SEC = 1;

private final Instance instance;
private final String clusterName;
private final String participantName;
Expand Down Expand Up @@ -305,7 +302,7 @@ private void checkBeforeJoinInCluster() {
HelixAdmin admin = new ZKHelixAdmin(zkAddress);
try {
// Check whether the cluster is ready or not at first to prevent zk no node exception.
HelixUtils.checkClusterSetup(admin, clusterName, MAX_RETRY, RETRY_INTERVAL_SEC);
HelixUtils.checkClusterSetup(admin, clusterName, 9);
List<String> instances = admin.getInstancesInCluster(clusterName);
if (instances.contains(instance.getNodeId())) {
LOGGER.info("{} is not a new node to cluster: {}, skip the cleaning up.", instance.getNodeId(), clusterName);
Expand Down Expand Up @@ -365,9 +362,8 @@ private void asyncStart() {
clusterName,
zkClient,
new HelixAdapterSerializer(),
veniceConfigLoader.getVeniceClusterConfig().getRefreshAttemptsForZkReconnect(),
veniceConfigLoader.getVeniceClusterConfig().getRefreshIntervalForZkReconnectInMs(),
veniceServerConfig.getRegionName());
veniceServerConfig.getRegionName(),
veniceConfigLoader.getCombinedProperties());

/**
* The accessor can only get created successfully after helix manager is created.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,8 @@ private void initServerStoreAndSchemaRepository() {
HelixReadOnlyZKSharedSystemStoreRepository readOnlyZKSharedSystemStoreRepository =
new HelixReadOnlyZKSharedSystemStoreRepository(zkClient, adapter, systemSchemaClusterName);

HelixReadOnlyStoreRepository readOnlyStoreRepository = new HelixReadOnlyStoreRepository(
zkClient,
adapter,
clusterName,
clusterConfig.getRefreshAttemptsForZkReconnect(),
clusterConfig.getRefreshIntervalForZkReconnectInMs());
HelixReadOnlyStoreRepository readOnlyStoreRepository =
new HelixReadOnlyStoreRepository(zkClient, adapter, clusterName);

storeRepo = new HelixReadOnlyStoreRepositoryAdapter(
readOnlyZKSharedSystemStoreRepository,
Expand All @@ -155,17 +151,15 @@ private void initServerStoreAndSchemaRepository() {
zkClient,
adapter,
systemSchemaClusterName,
clusterConfig.getRefreshAttemptsForZkReconnect(),
clusterConfig.getRefreshIntervalForZkReconnectInMs());
clusterConfig.getRefreshAttemptsForZkReconnect());
schemaRepo = new HelixReadOnlySchemaRepositoryAdapter(
readOnlyZKSharedSchemaRepository,
new HelixReadOnlySchemaRepository(
readOnlyStoreRepository,
zkClient,
adapter,
clusterName,
clusterConfig.getRefreshAttemptsForZkReconnect(),
clusterConfig.getRefreshIntervalForZkReconnectInMs()));
clusterConfig.getRefreshAttemptsForZkReconnect()));
schemaRepo.refresh();

liveClusterConfigRepo = new HelixReadOnlyLiveClusterConfigRepository(zkClient, adapter, clusterName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,19 @@
public class CachedResourceZkStateListener implements IZkStateListener {
private final Logger logger;
public static final int DEFAULT_RETRY_LOAD_ATTEMPTS = 1;
public static final long DEFAULT_RETRY_LOAD_INTERVAL_IN_MS = TimeUnit.SECONDS.toMillis(10);
private final VeniceResource resource;
private final int retryLoadAttempts;
private final long retryLoadIntervalInMs;
private final int refreshAttemptsForZkReconnect;
private volatile boolean disconnected = false;

public CachedResourceZkStateListener(VeniceResource resource) {
// By default, we only retry once after connection is reconnected.
this(resource, DEFAULT_RETRY_LOAD_ATTEMPTS, DEFAULT_RETRY_LOAD_INTERVAL_IN_MS);
this(resource, DEFAULT_RETRY_LOAD_ATTEMPTS);
}

public CachedResourceZkStateListener(VeniceResource resource, int retryLoadAttempts, long retryLoadIntervalInMs) {
public CachedResourceZkStateListener(VeniceResource resource, int refreshAttemptsForZkReconnect) {
this.resource = resource;
this.logger = LogManager.getLogger(this.getClass().getSimpleName() + " [" + getResourceName() + "]");
this.retryLoadAttempts = retryLoadAttempts;
this.retryLoadIntervalInMs = retryLoadIntervalInMs;
this.refreshAttemptsForZkReconnect = refreshAttemptsForZkReconnect;
}

/**
Expand All @@ -62,20 +59,22 @@ public void handleStateChanged(Watcher.Event.KeeperState state) throws Exception
// retryLoadAttempts with retryLoadIntervalInMs between each two loading.
// Sleep a random time(no more than retryLoadIntervalInMs) to avoid thunderstorm issue that all nodes are
// trying to refresh resource at the same time if there is a network issue in that DC.
// TODO: refactor to use exponential backoff like implemented in HelixUtils
long retryLoadIntervalInMs = TimeUnit.SECONDS.toMillis(2);
Utils.sleep((long) (Math.random() * retryLoadIntervalInMs));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should modify the sleep internval here. Please read the above comment as to why it's setup like this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reverted in 84a0c2d

int attempt = 1;
while (attempt <= retryLoadAttempts) {
while (attempt <= refreshAttemptsForZkReconnect) {
logger.info(
"Attempt #{} of {}: Refresh resource after connection is reconnected.",
attempt,
retryLoadAttempts);
refreshAttemptsForZkReconnect);
try {
resource.refresh();
logger.info("Attempt #{} of {}: Refresh completed.", attempt, retryLoadAttempts);
logger.info("Attempt #{} of {}: Refresh completed.", attempt, refreshAttemptsForZkReconnect);
return;
} catch (Exception e) {
logger.error("Can not refresh resource correctly after client is reconnected", e);
if (attempt < retryLoadAttempts) {
if (attempt < refreshAttemptsForZkReconnect) {
logger.info("Will retry after {} ms", retryLoadIntervalInMs);
Utils.sleep(retryLoadIntervalInMs);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,35 +94,26 @@ public HelixReadOnlySchemaRepository(
ZkClient zkClient,
HelixAdapterSerializer adapter,
String clusterName,
int refreshAttemptsForZkReconnect,
long refreshIntervalForZkReconnectInMs) {
int refreshAttemptsForZkReconnect) {
this.storeRepository = storeRepository;
this.zkClient = zkClient;
this.accessor = new HelixSchemaAccessor(
zkClient,
adapter,
clusterName,
refreshAttemptsForZkReconnect,
refreshIntervalForZkReconnectInMs);
this.accessor = new HelixSchemaAccessor(zkClient, adapter, clusterName, refreshAttemptsForZkReconnect);

storeRepository.registerStoreDataChangedListener(this);
this.zkStateListener =
new CachedResourceZkStateListener(this, refreshAttemptsForZkReconnect, refreshIntervalForZkReconnectInMs);
this.zkStateListener = new CachedResourceZkStateListener(this, refreshAttemptsForZkReconnect);
}

/** test-only */
HelixReadOnlySchemaRepository(
ReadOnlyStoreRepository storeRepository,
ZkClient zkClient,
HelixSchemaAccessor accessor,
int refreshAttemptsForZkReconnect,
long refreshIntervalForZkReconnectInMs) {
int refreshAttemptsForZkReconnect) {
this.storeRepository = storeRepository;
this.zkClient = zkClient;
this.accessor = accessor;
storeRepository.registerStoreDataChangedListener(this);
this.zkStateListener =
new CachedResourceZkStateListener(this, refreshAttemptsForZkReconnect, refreshIntervalForZkReconnectInMs);
this.zkStateListener = new CachedResourceZkStateListener(this, refreshAttemptsForZkReconnect);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ public class HelixReadOnlyStoreRepository extends CachedReadOnlyStoreRepository
public HelixReadOnlyStoreRepository(
ZkClient zkClient,
HelixAdapterSerializer compositeSerializer,
String clusterName,
int refreshAttemptsForZkReconnect,
long refreshIntervalForZkReconnectInMs) {
String clusterName) {
/**
* HelixReadOnlyStoreRepository is used in router, server, fast-client, da-vinci and system store.
* Its centralized locking should NOT be shared with other classes. Create a new instance.
Expand Down Expand Up @@ -118,15 +116,12 @@ protected void onRepositoryChanged(Collection<String> newZkStoreNames) {

private final CachedResourceZkStateListener zkStateListener = new CachedResourceZkStateListener(this);

private final IZkChildListener zkStoreRepositoryListener = new IZkChildListener() {
@Override
public void handleChildChange(String path, List<String> children) {
if (!path.equals(clusterStoreRepositoryPath)) {
LOGGER.warn("Notification path mismatch, path={}, expected={}.", path, clusterStoreRepositoryPath);
return;
}
onRepositoryChanged(children);
private final IZkChildListener zkStoreRepositoryListener = (path, children) -> {
if (!path.equals(clusterStoreRepositoryPath)) {
LOGGER.warn("Notification path mismatch, path={}, expected={}.", path, clusterStoreRepositoryPath);
return;
}
onRepositoryChanged(children);
Comment on lines -121 to +124
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did we change this to lambda function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Forgot to mention it in my comment:

In the docs, I found no mentions in the style guide regarding lambda functions, but it was suggested to me as a refactoring by Intellij, and thought I could suggest it as a change. Although, in retrospect, I should've proposed this in another PR as this outside the scope of this current PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't have a style guide against lambda functions. I just wanted to confirm what prompted this change, and to verify if the functionality is the same.

};

private final IZkDataListener zkStoreListener = new IZkDataListener() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,7 @@ public HelixReadOnlyZKSharedSchemaRepository(
ZkClient zkClient,
HelixAdapterSerializer adapter,
String systemStoreClusterName,
int refreshAttemptsForZkReconnect,
long refreshIntervalForZkReconnectInMs) {
super(
storeRepository,
zkClient,
adapter,
systemStoreClusterName,
refreshAttemptsForZkReconnect,
refreshIntervalForZkReconnectInMs);
int refreshAttemptsForZkReconnect) {
super(storeRepository, zkClient, adapter, systemStoreClusterName, refreshAttemptsForZkReconnect);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public HelixReadOnlyZKSharedSystemStoreRepository(
ZkClient zkClient,
HelixAdapterSerializer compositeSerializer,
String systemStoreClusterName) {
super(zkClient, compositeSerializer, systemStoreClusterName, 0, 0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What were these 0 values for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For refreshAttemptsForZkReconnect and refreshIntervalForZkReconnectInMs, but weren't used in the HelixReadOnlyStoreRepository's constructor:

public HelixReadOnlyStoreRepository(
      ZkClient zkClient,
      HelixAdapterSerializer compositeSerializer,
      String clusterName,
      int refreshAttemptsForZkReconnect,
      long refreshIntervalForZkReconnectInMs) {
    /**
     * HelixReadOnlyStoreRepository is used in router, server, fast-client, da-vinci and system store.
     * Its centralized locking should NOT be shared with other classes. Create a new instance.
     */
    super(zkClient, clusterName, compositeSerializer, new ClusterLockManager(clusterName));
  }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. I wonder why it was setup like this in the first place.

super(zkClient, compositeSerializer, systemStoreClusterName);
// Initialize the necessary zk shared system stores
for (VeniceSystemStoreType type: VeniceSystemStoreType.values()) {
if (type.isNewMedataRepositoryAdopted()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import com.linkedin.venice.utils.HelixUtils;
import com.linkedin.venice.utils.PathResourceRegistry;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.helix.AccessOption;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.zookeeper.impl.client.ZkClient;
Expand All @@ -19,9 +18,6 @@
public class HelixSchemaAccessor {
private static final Logger logger = LogManager.getLogger(HelixSchemaAccessor.class);

private static final int DEFAULT_ZK_REFRESH_ATTEMPTS = 3;
private static final long DEFAULT_ZK_REFRESH_INTERVAL = TimeUnit.SECONDS.toMillis(10);

// Key schema path name
private static final String KEY_SCHEMA_PATH = "key-schema";
// Value schema path name
Expand All @@ -45,22 +41,20 @@ public class HelixSchemaAccessor {
private final String clusterName;

private final int refreshAttemptsForZkReconnect;
private final long refreshIntervalForZkReconnectInMs;

public HelixSchemaAccessor(ZkClient zkClient, HelixAdapterSerializer helixAdapterSerializer, String clusterName) {
this(zkClient, helixAdapterSerializer, clusterName, DEFAULT_ZK_REFRESH_ATTEMPTS, DEFAULT_ZK_REFRESH_INTERVAL);
// TODO: refactor hard-coded refreshAttemptsForZkReconnect
this(zkClient, helixAdapterSerializer, clusterName, 9);
}

public HelixSchemaAccessor(
ZkClient zkClient,
HelixAdapterSerializer helixAdapterSerializer,
String clusterName,
int refreshAttemptsForZkReconnect,
long refreshIntervalForZkReconnectInMs) {
int refreshAttemptsForZkReconnect) {
this.clusterName = clusterName;

this.refreshAttemptsForZkReconnect = refreshAttemptsForZkReconnect;
this.refreshIntervalForZkReconnectInMs = refreshIntervalForZkReconnectInMs;

registerSerializerForSchema(zkClient, helixAdapterSerializer);
schemaAccessor = new ZkBaseDataAccessor<>(zkClient);
Expand Down Expand Up @@ -97,11 +91,8 @@ public SchemaEntry getValueSchema(String storeName, String id) {
}

public List<SchemaEntry> getAllValueSchemas(String storeName) {
return HelixUtils.getChildren(
schemaAccessor,
getValueSchemaParentPath(storeName).toString(),
refreshAttemptsForZkReconnect,
refreshIntervalForZkReconnectInMs);
return HelixUtils
.getChildren(schemaAccessor, getValueSchemaParentPath(storeName).toString(), refreshAttemptsForZkReconnect);
}

public DerivedSchemaEntry getDerivedSchema(String storeName, String derivedSchemaIdPair) {
Expand All @@ -113,8 +104,7 @@ public List<DerivedSchemaEntry> getAllDerivedSchemas(String storeName) {
return HelixUtils.getChildren(
derivedSchemaAccessor,
getDerivedSchemaParentPath(storeName).toString(),
refreshAttemptsForZkReconnect,
refreshIntervalForZkReconnectInMs);
refreshAttemptsForZkReconnect);
}

public void createKeySchema(String storeName, SchemaEntry schemaEntry) {
Expand Down Expand Up @@ -238,8 +228,7 @@ public List<RmdSchemaEntry> getAllReplicationMetadataSchemas(String storeName) {
return HelixUtils.getChildren(
replicationMetadataSchemaAccessor,
getReplicationMetadataSchemaParentPath(storeName).toString(),
refreshAttemptsForZkReconnect,
refreshIntervalForZkReconnectInMs);
refreshAttemptsForZkReconnect);
}

public void addReplicationMetadataSchema(String storeName, RmdSchemaEntry rmdSchemaEntry) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public SubscriptionBasedStoreRepository(
ZkClient zkClient,
HelixAdapterSerializer compositeSerializer,
String clusterName) {
super(zkClient, compositeSerializer, clusterName, 0, 0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same reason

super(zkClient, compositeSerializer, clusterName);
}

@Override
Expand Down
Loading
Loading