Skip to content

[router][dvc][server][controller][admin-tool] Add exponential backoff to other HelixUtils methods with retries #1784

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 22 commits into from
Jun 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,6 @@ public class HelixParticipationService extends AbstractVeniceService
implements StatusMessageHandler<KillOfflinePushMessage> {
private static final Logger LOGGER = LogManager.getLogger(HelixParticipationService.class);

private static final int MAX_RETRY = 30;
private static final int RETRY_INTERVAL_SEC = 1;

private final Instance instance;
private final String clusterName;
private final String participantName;
Expand Down Expand Up @@ -305,7 +302,10 @@ private void checkBeforeJoinInCluster() {
HelixAdmin admin = new ZKHelixAdmin(zkAddress);
try {
// Check whether the cluster is ready or not at first to prevent zk no node exception.
HelixUtils.checkClusterSetup(admin, clusterName, MAX_RETRY, RETRY_INTERVAL_SEC);
HelixUtils.checkClusterSetup(
admin,
clusterName,
veniceConfigLoader.getVeniceClusterConfig().getRefreshAttemptsForZkReconnect());
List<String> instances = admin.getInstancesInCluster(clusterName);
if (instances.contains(instance.getNodeId())) {
LOGGER.info("{} is not a new node to cluster: {}, skip the cleaning up.", instance.getNodeId(), clusterName);
Expand Down Expand Up @@ -365,9 +365,8 @@ private void asyncStart() {
clusterName,
zkClient,
new HelixAdapterSerializer(),
veniceConfigLoader.getVeniceClusterConfig().getRefreshAttemptsForZkReconnect(),
veniceConfigLoader.getVeniceClusterConfig().getRefreshIntervalForZkReconnectInMs(),
veniceServerConfig.getRegionName());
veniceServerConfig.getRegionName(),
veniceConfigLoader.getVeniceClusterConfig().getRefreshAttemptsForZkReconnect());

/**
* The accessor can only get created successfully after helix manager is created.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,8 @@ private void initServerStoreAndSchemaRepository() {
HelixReadOnlyZKSharedSystemStoreRepository readOnlyZKSharedSystemStoreRepository =
new HelixReadOnlyZKSharedSystemStoreRepository(zkClient, adapter, systemSchemaClusterName);

HelixReadOnlyStoreRepository readOnlyStoreRepository = new HelixReadOnlyStoreRepository(
zkClient,
adapter,
clusterName,
clusterConfig.getRefreshAttemptsForZkReconnect(),
clusterConfig.getRefreshIntervalForZkReconnectInMs());
HelixReadOnlyStoreRepository readOnlyStoreRepository =
new HelixReadOnlyStoreRepository(zkClient, adapter, clusterName);

storeRepo = new HelixReadOnlyStoreRepositoryAdapter(
readOnlyZKSharedSystemStoreRepository,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,26 @@
*/
public class CachedResourceZkStateListener implements IZkStateListener {
private final Logger logger;
public static final int DEFAULT_RETRY_LOAD_ATTEMPTS = 1;
public static final long DEFAULT_RETRY_LOAD_INTERVAL_IN_MS = TimeUnit.SECONDS.toMillis(10);
public static final int DEFAULT_REFRESH_ATTEMPTS_FOR_ZK_RECONNECT = 1;
public static final long DEFAULT_REFRESH_INTERVAL_FOR_ZK_RECONNECT_IS_MS = TimeUnit.SECONDS.toMillis(10);
private final VeniceResource resource;
private final int retryLoadAttempts;
private final long retryLoadIntervalInMs;
private final int refreshAttemptsForZkReconnect;
private final long refreshIntervalForZkReconnectInMs;
private volatile boolean disconnected = false;

public CachedResourceZkStateListener(VeniceResource resource) {
// By default, we only retry once after connection is reconnected.
this(resource, DEFAULT_RETRY_LOAD_ATTEMPTS, DEFAULT_RETRY_LOAD_INTERVAL_IN_MS);
this(resource, DEFAULT_REFRESH_ATTEMPTS_FOR_ZK_RECONNECT, DEFAULT_REFRESH_INTERVAL_FOR_ZK_RECONNECT_IS_MS);
}

public CachedResourceZkStateListener(VeniceResource resource, int retryLoadAttempts, long retryLoadIntervalInMs) {
public CachedResourceZkStateListener(
VeniceResource resource,
int refreshAttemptsForZkReconnect,
long refreshIntervalForZkReconnectInMs) {
this.resource = resource;
this.logger = LogManager.getLogger(this.getClass().getSimpleName() + " [" + getResourceName() + "]");
this.retryLoadAttempts = retryLoadAttempts;
this.retryLoadIntervalInMs = retryLoadIntervalInMs;
this.refreshAttemptsForZkReconnect = refreshAttemptsForZkReconnect;
this.refreshIntervalForZkReconnectInMs = refreshIntervalForZkReconnectInMs;
}

/**
Expand All @@ -59,25 +62,26 @@ public void handleStateChanged(Watcher.Event.KeeperState state) throws Exception
// there is only one refresh operation on the fly.
// As we met the issue that ZK could return partial result just after connection is reconnected.
// In order to reduce the possibility that we get not-up-to-date data, we keep loading data for
// retryLoadAttempts with retryLoadIntervalInMs between each two loading.
// Sleep a random time(no more than retryLoadIntervalInMs) to avoid thunderstorm issue that all nodes are
// refreshAttemptsForZkReconnect with refreshIntervalForZkReconnectInMs between each two loading.
// Sleep a random time(no more than refreshIntervalForZkReconnectInMs) to avoid thunderstorm issue that all
// nodes are
// trying to refresh resource at the same time if there is a network issue in that DC.
Utils.sleep((long) (Math.random() * retryLoadIntervalInMs));
Utils.sleep((long) (Math.random() * refreshIntervalForZkReconnectInMs));
int attempt = 1;
while (attempt <= retryLoadAttempts) {
while (attempt <= refreshAttemptsForZkReconnect) {
logger.info(
"Attempt #{} of {}: Refresh resource after connection is reconnected.",
attempt,
retryLoadAttempts);
refreshAttemptsForZkReconnect);
try {
resource.refresh();
logger.info("Attempt #{} of {}: Refresh completed.", attempt, retryLoadAttempts);
logger.info("Attempt #{} of {}: Refresh completed.", attempt, refreshAttemptsForZkReconnect);
return;
} catch (Exception e) {
logger.error("Can not refresh resource correctly after client is reconnected", e);
if (attempt < retryLoadAttempts) {
logger.info("Will retry after {} ms", retryLoadIntervalInMs);
Utils.sleep(retryLoadIntervalInMs);
if (attempt < refreshAttemptsForZkReconnect) {
logger.info("Will retry after {} ms", refreshIntervalForZkReconnectInMs);
Utils.sleep(refreshIntervalForZkReconnectInMs);
}
attempt++;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,7 @@ public HelixReadOnlySchemaRepository(
long refreshIntervalForZkReconnectInMs) {
this.storeRepository = storeRepository;
this.zkClient = zkClient;
this.accessor = new HelixSchemaAccessor(
zkClient,
adapter,
clusterName,
refreshAttemptsForZkReconnect,
refreshIntervalForZkReconnectInMs);
this.accessor = new HelixSchemaAccessor(zkClient, adapter, clusterName, refreshAttemptsForZkReconnect);

storeRepository.registerStoreDataChangedListener(this);
this.zkStateListener =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ public class HelixReadOnlyStoreRepository extends CachedReadOnlyStoreRepository
public HelixReadOnlyStoreRepository(
ZkClient zkClient,
HelixAdapterSerializer compositeSerializer,
String clusterName,
int refreshAttemptsForZkReconnect,
long refreshIntervalForZkReconnectInMs) {
String clusterName) {
/**
* HelixReadOnlyStoreRepository is used in router, server, fast-client, da-vinci and system store.
* Its centralized locking should NOT be shared with other classes. Create a new instance.
Expand Down Expand Up @@ -118,15 +116,12 @@ protected void onRepositoryChanged(Collection<String> newZkStoreNames) {

private final CachedResourceZkStateListener zkStateListener = new CachedResourceZkStateListener(this);

private final IZkChildListener zkStoreRepositoryListener = new IZkChildListener() {
@Override
public void handleChildChange(String path, List<String> children) {
if (!path.equals(clusterStoreRepositoryPath)) {
LOGGER.warn("Notification path mismatch, path={}, expected={}.", path, clusterStoreRepositoryPath);
return;
}
onRepositoryChanged(children);
private final IZkChildListener zkStoreRepositoryListener = (path, children) -> {
if (!path.equals(clusterStoreRepositoryPath)) {
LOGGER.warn("Notification path mismatch, path={}, expected={}.", path, clusterStoreRepositoryPath);
return;
}
onRepositoryChanged(children);
};

private final IZkDataListener zkStoreListener = new IZkDataListener() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public HelixReadOnlyZKSharedSystemStoreRepository(
ZkClient zkClient,
HelixAdapterSerializer compositeSerializer,
String systemStoreClusterName) {
super(zkClient, compositeSerializer, systemStoreClusterName, 0, 0);
super(zkClient, compositeSerializer, systemStoreClusterName);
// Initialize the necessary zk shared system stores
for (VeniceSystemStoreType type: VeniceSystemStoreType.values()) {
if (type.isNewMedataRepositoryAdopted()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,10 @@ public HelixReadWriteSchemaRepository(
ZkClient zkClient,
HelixAdapterSerializer adapter,
String clusterName,
Optional<MetaStoreWriter> metaStoreWriter) {
Optional<MetaStoreWriter> metaStoreWriter,
int refreshAttemptsForZkReconnect) {
this.storeRepository = storeRepository;
this.accessor = new HelixSchemaAccessor(zkClient, adapter, clusterName);
this.accessor = new HelixSchemaAccessor(zkClient, adapter, clusterName, refreshAttemptsForZkReconnect);
this.metaStoreWriter = metaStoreWriter;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import com.linkedin.venice.utils.HelixUtils;
import com.linkedin.venice.utils.PathResourceRegistry;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.helix.AccessOption;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.zookeeper.impl.client.ZkClient;
Expand All @@ -19,9 +18,6 @@
public class HelixSchemaAccessor {
private static final Logger logger = LogManager.getLogger(HelixSchemaAccessor.class);

private static final int DEFAULT_ZK_REFRESH_ATTEMPTS = 3;
private static final long DEFAULT_ZK_REFRESH_INTERVAL = TimeUnit.SECONDS.toMillis(10);

// Key schema path name
private static final String KEY_SCHEMA_PATH = "key-schema";
// Value schema path name
Expand All @@ -37,6 +33,8 @@ public class HelixSchemaAccessor {
// Replication metadata schema path name. The value still uses "timestamp" for backward compatibility
private static final String REPLICATION_METADATA_SCHEMA_PATH = "timestamp-metadata-schema";

private static final int DEFAULT_ZK_REFRESH_ATTEMPTS = 9;

private final ZkBaseDataAccessor<SchemaEntry> schemaAccessor;
private final ZkBaseDataAccessor<DerivedSchemaEntry> derivedSchemaAccessor;
private final ZkBaseDataAccessor<RmdSchemaEntry> replicationMetadataSchemaAccessor;
Expand All @@ -45,22 +43,19 @@ public class HelixSchemaAccessor {
private final String clusterName;

private final int refreshAttemptsForZkReconnect;
private final long refreshIntervalForZkReconnectInMs;

public HelixSchemaAccessor(ZkClient zkClient, HelixAdapterSerializer helixAdapterSerializer, String clusterName) {
this(zkClient, helixAdapterSerializer, clusterName, DEFAULT_ZK_REFRESH_ATTEMPTS, DEFAULT_ZK_REFRESH_INTERVAL);
this(zkClient, helixAdapterSerializer, clusterName, DEFAULT_ZK_REFRESH_ATTEMPTS);
}

public HelixSchemaAccessor(
ZkClient zkClient,
HelixAdapterSerializer helixAdapterSerializer,
String clusterName,
int refreshAttemptsForZkReconnect,
long refreshIntervalForZkReconnectInMs) {
int refreshAttemptsForZkReconnect) {
this.clusterName = clusterName;

this.refreshAttemptsForZkReconnect = refreshAttemptsForZkReconnect;
this.refreshIntervalForZkReconnectInMs = refreshIntervalForZkReconnectInMs;

registerSerializerForSchema(zkClient, helixAdapterSerializer);
schemaAccessor = new ZkBaseDataAccessor<>(zkClient);
Expand Down Expand Up @@ -97,11 +92,8 @@ public SchemaEntry getValueSchema(String storeName, String id) {
}

public List<SchemaEntry> getAllValueSchemas(String storeName) {
return HelixUtils.getChildren(
schemaAccessor,
getValueSchemaParentPath(storeName).toString(),
refreshAttemptsForZkReconnect,
refreshIntervalForZkReconnectInMs);
return HelixUtils
.getChildren(schemaAccessor, getValueSchemaParentPath(storeName).toString(), refreshAttemptsForZkReconnect);
}

public DerivedSchemaEntry getDerivedSchema(String storeName, String derivedSchemaIdPair) {
Expand All @@ -113,8 +105,7 @@ public List<DerivedSchemaEntry> getAllDerivedSchemas(String storeName) {
return HelixUtils.getChildren(
derivedSchemaAccessor,
getDerivedSchemaParentPath(storeName).toString(),
refreshAttemptsForZkReconnect,
refreshIntervalForZkReconnectInMs);
refreshAttemptsForZkReconnect);
}

public void createKeySchema(String storeName, SchemaEntry schemaEntry) {
Expand Down Expand Up @@ -238,8 +229,7 @@ public List<RmdSchemaEntry> getAllReplicationMetadataSchemas(String storeName) {
return HelixUtils.getChildren(
replicationMetadataSchemaAccessor,
getReplicationMetadataSchemaParentPath(storeName).toString(),
refreshAttemptsForZkReconnect,
refreshIntervalForZkReconnectInMs);
refreshAttemptsForZkReconnect);
}

public void addReplicationMetadataSchema(String storeName, RmdSchemaEntry rmdSchemaEntry) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public SubscriptionBasedStoreRepository(
ZkClient zkClient,
HelixAdapterSerializer compositeSerializer,
String clusterName) {
super(zkClient, compositeSerializer, clusterName, 0, 0);
super(zkClient, compositeSerializer, clusterName);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.helix.AccessOption;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.zookeeper.impl.client.ZkClient;
Expand All @@ -46,8 +45,7 @@
*/
public class VeniceOfflinePushMonitorAccessor implements OfflinePushAccessor {
public static final String OFFLINE_PUSH_SUB_PATH = OFFLINE_PUSHES;
private static final int DEFAULT_ZK_REFRESH_ATTEMPTS = 3;
private static final long DEFAULT_ZK_REFRESH_INTERVAL = TimeUnit.SECONDS.toMillis(10);
private static final int DEFAULT_ZK_REFRESH_ATTEMPTS = 9;

private static final Logger LOGGER = LogManager.getLogger(VeniceOfflinePushMonitorAccessor.class);
private final String clusterName;
Expand All @@ -68,23 +66,12 @@ public class VeniceOfflinePushMonitorAccessor implements OfflinePushAccessor {

private final int refreshAttemptsForZkReconnect;

private final long refreshIntervalForZkReconnectInMs;

public VeniceOfflinePushMonitorAccessor(
String clusterName,
ZkClient zkClient,
HelixAdapterSerializer adapter,
LogContext logContext) {
this(clusterName, zkClient, adapter, DEFAULT_ZK_REFRESH_ATTEMPTS, DEFAULT_ZK_REFRESH_INTERVAL, logContext);
}

public VeniceOfflinePushMonitorAccessor(
String clusterName,
ZkClient zkClient,
HelixAdapterSerializer adapter,
int refreshAttemptsForZkReconnect,
long refreshIntervalForZkReconnectInMs,
Object logContext) {
Object logContext,
int refreshAttemptsForZkReconnect) {
this.clusterName = clusterName;
this.offlinePushStatusParentPath = getOfflinePushStatuesParentPath();
this.zkClient = zkClient;
Expand All @@ -95,7 +82,6 @@ public VeniceOfflinePushMonitorAccessor(
this.listenerManager = new ListenerManager<>(logContext);
this.partitionStatusZkListener = new PartitionStatusZkListener(logContext);
this.refreshAttemptsForZkReconnect = refreshAttemptsForZkReconnect;
this.refreshIntervalForZkReconnectInMs = refreshIntervalForZkReconnectInMs;
}

/**
Expand All @@ -114,7 +100,6 @@ public VeniceOfflinePushMonitorAccessor(
this.listenerManager = new ListenerManager<>(logContext);
this.partitionStatusZkListener = new PartitionStatusZkListener(logContext);
this.refreshAttemptsForZkReconnect = DEFAULT_ZK_REFRESH_ATTEMPTS;
this.refreshIntervalForZkReconnectInMs = DEFAULT_ZK_REFRESH_INTERVAL;
}

private void registerSerializers(HelixAdapterSerializer adapter) {
Expand All @@ -127,11 +112,8 @@ private void registerSerializers(HelixAdapterSerializer adapter) {
@Override
public List<OfflinePushStatus> loadOfflinePushStatusesAndPartitionStatuses() {
LOGGER.info("Start loading all offline pushes statuses from ZK in cluster: {}.", clusterName);
List<OfflinePushStatus> offlinePushStatuses = HelixUtils.getChildren(
offlinePushStatusAccessor,
offlinePushStatusParentPath,
refreshAttemptsForZkReconnect,
refreshIntervalForZkReconnectInMs);
List<OfflinePushStatus> offlinePushStatuses =
HelixUtils.getChildren(offlinePushStatusAccessor, offlinePushStatusParentPath, refreshAttemptsForZkReconnect);
Iterator<OfflinePushStatus> iterator = offlinePushStatuses.iterator();
while (iterator.hasNext()) {
OfflinePushStatus pushStatus = iterator.next();
Expand Down Expand Up @@ -414,11 +396,8 @@ protected PartitionStatus getPartitionStatus(String topic, int partitionId) {
*/
protected List<PartitionStatus> getPartitionStatuses(String topic, int partitionCount) {
LOGGER.debug("Start reading partition status from ZK for topic: {} in cluster: {}.", topic, clusterName);
List<PartitionStatus> zkResult = HelixUtils.getChildren(
partitionStatusAccessor,
getOfflinePushStatusPath(topic),
refreshAttemptsForZkReconnect,
refreshIntervalForZkReconnectInMs);
List<PartitionStatus> zkResult =
HelixUtils.getChildren(partitionStatusAccessor, getOfflinePushStatusPath(topic), refreshAttemptsForZkReconnect);
LOGGER.debug("Read {} partition status from ZK for topic: {} in cluster: {}.", zkResult.size(), topic, clusterName);

if (zkResult.isEmpty()) {
Expand Down
Loading
Loading