Skip to content

[dvc][server] Add exponential backoff to other HelixUtils methods with retries #1784

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ private void checkBeforeJoinInCluster() {
HelixAdmin admin = new ZKHelixAdmin(zkAddress);
try {
// Check whether the cluster is ready or not at first to prevent zk no node exception.
HelixUtils.checkClusterSetup(admin, clusterName, MAX_RETRY, RETRY_INTERVAL_SEC);
HelixUtils.checkClusterSetup(admin, clusterName, MAX_RETRY);
List<String> instances = admin.getInstancesInCluster(clusterName);
if (instances.contains(instance.getNodeId())) {
LOGGER.info("{} is not a new node to cluster: {}, skip the cleaning up.", instance.getNodeId(), clusterName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,8 @@ public SchemaEntry getValueSchema(String storeName, String id) {
}

public List<SchemaEntry> getAllValueSchemas(String storeName) {
return HelixUtils.getChildren(
schemaAccessor,
getValueSchemaParentPath(storeName).toString(),
refreshAttemptsForZkReconnect,
refreshIntervalForZkReconnectInMs);
return HelixUtils
.getChildren(schemaAccessor, getValueSchemaParentPath(storeName).toString(), refreshAttemptsForZkReconnect);
}

public DerivedSchemaEntry getDerivedSchema(String storeName, String derivedSchemaIdPair) {
Expand All @@ -113,8 +110,7 @@ public List<DerivedSchemaEntry> getAllDerivedSchemas(String storeName) {
return HelixUtils.getChildren(
derivedSchemaAccessor,
getDerivedSchemaParentPath(storeName).toString(),
refreshAttemptsForZkReconnect,
refreshIntervalForZkReconnectInMs);
refreshAttemptsForZkReconnect);
}

public void createKeySchema(String storeName, SchemaEntry schemaEntry) {
Expand Down Expand Up @@ -238,8 +234,7 @@ public List<RmdSchemaEntry> getAllReplicationMetadataSchemas(String storeName) {
return HelixUtils.getChildren(
replicationMetadataSchemaAccessor,
getReplicationMetadataSchemaParentPath(storeName).toString(),
refreshAttemptsForZkReconnect,
refreshIntervalForZkReconnectInMs);
refreshAttemptsForZkReconnect);
}

public void addReplicationMetadataSchema(String storeName, RmdSchemaEntry rmdSchemaEntry) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,8 @@ private void registerSerializers(HelixAdapterSerializer adapter) {
@Override
public List<OfflinePushStatus> loadOfflinePushStatusesAndPartitionStatuses() {
LOGGER.info("Start loading all offline pushes statuses from ZK in cluster: {}.", clusterName);
List<OfflinePushStatus> offlinePushStatuses = HelixUtils.getChildren(
offlinePushStatusAccessor,
offlinePushStatusParentPath,
refreshAttemptsForZkReconnect,
refreshIntervalForZkReconnectInMs);
List<OfflinePushStatus> offlinePushStatuses =
HelixUtils.getChildren(offlinePushStatusAccessor, offlinePushStatusParentPath, refreshAttemptsForZkReconnect);
Iterator<OfflinePushStatus> iterator = offlinePushStatuses.iterator();
while (iterator.hasNext()) {
OfflinePushStatus pushStatus = iterator.next();
Expand Down Expand Up @@ -414,11 +411,8 @@ protected PartitionStatus getPartitionStatus(String topic, int partitionId) {
*/
protected List<PartitionStatus> getPartitionStatuses(String topic, int partitionCount) {
LOGGER.debug("Start reading partition status from ZK for topic: {} in cluster: {}.", topic, clusterName);
List<PartitionStatus> zkResult = HelixUtils.getChildren(
partitionStatusAccessor,
getOfflinePushStatusPath(topic),
refreshAttemptsForZkReconnect,
refreshIntervalForZkReconnectInMs);
List<PartitionStatus> zkResult =
HelixUtils.getChildren(partitionStatusAccessor, getOfflinePushStatusPath(topic), refreshAttemptsForZkReconnect);
LOGGER.debug("Read {} partition status from ZK for topic: {} in cluster: {}.", zkResult.size(), topic, clusterName);

if (zkResult.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,9 @@ public static Instance getInstanceFromHelixInstanceName(String helixInstanceName
* @param path parent path
* @return a list of objects that are under parent path. It will return null if parent path is not existing
*/
public static <T> List<T> getChildren(
ZkBaseDataAccessor<T> dataAccessor,
String path,
int retryCount,
long retryInterval) {
int attempt = 1;
while (attempt <= retryCount) {
attempt++;
public static <T> List<T> getChildren(ZkBaseDataAccessor<T> dataAccessor, String path, int retryCount) {
int attempt = 0;
while (attempt < retryCount) {
List<String> childrenNames = dataAccessor.getChildNames(path, AccessOption.PERSISTENT);
int expectedCount = 0;
if (childrenNames == null) {
Expand All @@ -116,17 +111,9 @@ public static <T> List<T> getChildren(
List<T> children = dataAccessor.getChildren(path, null, AccessOption.PERSISTENT);
if (children.size() != expectedCount) {
// Data is inconsistent
if (attempt < retryCount) {
LOGGER.info(
"dataAccessor.getChildNames() did not return the expected number of elements from path: {}\nExpected: {}, but got {}. Attempt:{}/{}, will sleep {} and retry.",
path,
expectedCount,
children.size(),
attempt,
retryCount,
retryInterval);
Utils.sleep(retryInterval);
}
attempt++;
LOGGER.info("Expected number of elements: {}, but got {}.", expectedCount, children.size());
handleFailedHelixOperation(path, "getChildren", attempt, retryCount);
} else {
return children;
}
Expand Down Expand Up @@ -253,9 +240,12 @@ public static <T> void compareAndUpdate(
*/
private static void handleFailedHelixOperation(String path, String helixOperation, int attempt, int retryCount) {
if (attempt < retryCount) {
if (!path.isEmpty()) {
path = " with path " + path;
}
long retryIntervalSec = (long) Math.pow(2, attempt);
LOGGER.error(
"{} failed with path {} on attempt {}/{}. Will retry in {} seconds.",
"{} failed{} on attempt {}/{}. Will retry in {} seconds.",
helixOperation,
path,
attempt,
Expand All @@ -268,34 +258,28 @@ private static void handleFailedHelixOperation(String path, String helixOperatio
}

/**
* Try to connect Helix Manger. If failed, waits for certain time and retry. If Helix Manager can not be
* Try to connect Helix Manager. If failed, waits for certain time and retry. If Helix Manager can not be
* connected after certain number of retry, throws exception. This method is most likely being used asynchronously since
* it is going to block and wait if connection fails.
* @param manager HelixManager instance
* @param maxRetries retry time
* @param sleepTimeSeconds time in second that it blocks until next retry.
* @exception VeniceException if connection keeps failing after certain number of retry
*
* @param manager HelixManager instance
* @param retryCount retry time
* @throws VeniceException if connection keeps failing after certain number of retry
*/
public static void connectHelixManager(SafeHelixManager manager, int maxRetries, int sleepTimeSeconds) {
int attempt = 1;
boolean isSuccess = false;
while (!isSuccess) {
public static void connectHelixManager(SafeHelixManager manager, int retryCount) {
int attempt = 0;
while (attempt < retryCount) {
try {
manager.connect();
isSuccess = true;
// Connection established.
break;
} catch (Exception e) {
if (attempt <= maxRetries) {
LOGGER.warn(
"Failed to connect {} on attempt {}/{}. Will retry in {} seconds.",
manager.toString(),
attempt,
maxRetries,
sleepTimeSeconds);
attempt++;
Utils.sleep(TimeUnit.SECONDS.toMillis(sleepTimeSeconds));
attempt++;
if (attempt < retryCount) {
handleFailedHelixOperation("", "connectHelixManager", attempt, retryCount);
} else {
throw new VeniceException(
"Error connecting to Helix Manager for Cluster '" + manager.getClusterName() + "' after " + maxRetries
"Error connecting to Helix Manager for Cluster '" + manager.getClusterName() + "' after " + retryCount
+ " attempts.",
e);
}
Expand All @@ -308,20 +292,16 @@ public static void connectHelixManager(SafeHelixManager manager, int maxRetries,
* controller.
* Otherwise, the following operations issued by participant/spectator would fail.
*/
public static void checkClusterSetup(HelixAdmin admin, String cluster, int maxRetry, int retryIntervalSec) {
int attempt = 1;
while (true) {
public static void checkClusterSetup(HelixAdmin admin, String cluster, int retryCount) {
int attempt = 0;
while (attempt < retryCount) {
if (admin.getClusters().contains(cluster)) {
// Cluster is ready.
break;
} else {
if (attempt <= maxRetry) {
LOGGER.warn(
"Cluster has not been initialized by controller. Attempt: {}. Will retry in {} seconds.",
attempt,
retryIntervalSec);
attempt++;
Utils.sleep(TimeUnit.SECONDS.toMillis(retryIntervalSec));
attempt++;
if (attempt < retryCount) {
handleFailedHelixOperation("", "checkClusterSetup", attempt, retryCount);
} else {
throw new VeniceException("Cluster has not been initialized by controller after attempted: " + attempt);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.linkedin.venice.utils;

import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand All @@ -9,12 +11,14 @@
import static org.testng.Assert.assertTrue;

import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.helix.SafeHelixManager;
import com.linkedin.venice.meta.Instance;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.helix.AccessOption;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixException;
import org.apache.helix.cloud.constants.CloudProvider;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
Expand All @@ -28,6 +32,9 @@
public class TestHelixUtils {
private static final String TEST_PATH = "/test/path";
private static final String TEST_DATA = "testData";
private static final List<String> TEST_CHILD_NAMES = Arrays.asList("child1", "child2");
private static final List<String> TEST_CHILD_VALUES = Arrays.asList("value1", "value2");
private static final List<String> TEST_FEWER_CHILD_VALUES = Collections.singletonList("value1");
private static final int TEST_RETRY_COUNT = 3;
private ZkBaseDataAccessor<String> mockDataAccessor;
private DataUpdater<String> dataUpdater;
Expand Down Expand Up @@ -102,6 +109,49 @@ public void setUp() {
dataUpdater = mock(DataUpdater.class);
}

@Test
public void testGetChildren() {
doReturn(TEST_CHILD_NAMES).when(mockDataAccessor).getChildNames(TEST_PATH, AccessOption.PERSISTENT);
doReturn(TEST_CHILD_VALUES).when(mockDataAccessor).getChildren(TEST_PATH, null, AccessOption.PERSISTENT);

List<String> result = HelixUtils.getChildren(mockDataAccessor, TEST_PATH, TEST_RETRY_COUNT);

verify(mockDataAccessor, times(1)).getChildNames(TEST_PATH, AccessOption.PERSISTENT);
verify(mockDataAccessor, times(1)).getChildren(TEST_PATH, null, AccessOption.PERSISTENT);

assertEquals(result, TEST_CHILD_VALUES);
}

@Test
public void testGetChildrenFailsAfterRetries() {
doReturn(TEST_CHILD_VALUES).when(mockDataAccessor).getChildNames(TEST_PATH, AccessOption.PERSISTENT);
doReturn(TEST_FEWER_CHILD_VALUES).when(mockDataAccessor).getChildren(TEST_PATH, null, AccessOption.PERSISTENT);

assertThrows(VeniceException.class, () -> {
HelixUtils.getChildren(mockDataAccessor, TEST_PATH, TEST_RETRY_COUNT);
});

verify(mockDataAccessor, times(TEST_RETRY_COUNT)).getChildNames(TEST_PATH, AccessOption.PERSISTENT);
verify(mockDataAccessor, times(TEST_RETRY_COUNT)).getChildren(TEST_PATH, null, AccessOption.PERSISTENT);
}

@Test
public void testGetChildrenSucceedsAfterRetries() {
doReturn(TEST_CHILD_NAMES).when(mockDataAccessor).getChildNames(TEST_PATH, AccessOption.PERSISTENT);

doReturn(TEST_FEWER_CHILD_VALUES).doReturn(TEST_FEWER_CHILD_VALUES)
.doReturn(TEST_CHILD_VALUES)
.when(mockDataAccessor)
.getChildren(TEST_PATH, null, AccessOption.PERSISTENT);

List<String> result = HelixUtils.getChildren(mockDataAccessor, TEST_PATH, TEST_RETRY_COUNT);

verify(mockDataAccessor, times(3)).getChildNames(TEST_PATH, AccessOption.PERSISTENT);
verify(mockDataAccessor, times(3)).getChildren(TEST_PATH, null, AccessOption.PERSISTENT);

assertEquals(result, TEST_CHILD_VALUES);
}

@Test
public void testCreate() {
doReturn(true).when(mockDataAccessor).create(TEST_PATH, TEST_DATA, AccessOption.PERSISTENT);
Expand Down Expand Up @@ -250,4 +300,92 @@ public void testCompareAndUpdateSucceedsAfterRetries() {

verify(mockDataAccessor, times(TEST_RETRY_COUNT)).update(TEST_PATH, dataUpdater, AccessOption.PERSISTENT);
}

@Test
public void testConnectHelixManager() throws Exception {
SafeHelixManager mockManager = mock(SafeHelixManager.class);
String testCluster = "testCluster";

doReturn(testCluster).when(mockManager).getClusterName();

doNothing().when(mockManager).connect();

HelixUtils.connectHelixManager(mockManager, TEST_RETRY_COUNT);

verify(mockManager, times(1)).connect();
}

@Test
public void testConnectHelixManagerFailsAfterRetries() throws Exception {
SafeHelixManager mockManager = mock(SafeHelixManager.class);
String testCluster = "testCluster";

doReturn(testCluster).when(mockManager).getClusterName();

doThrow(new Exception("Connection failed")).when(mockManager).connect();

Assert.assertThrows(VeniceException.class, () -> {
HelixUtils.connectHelixManager(mockManager, TEST_RETRY_COUNT);
});

verify(mockManager, times(TEST_RETRY_COUNT)).connect();
}

@Test
public void testConnectHelixManagerSucceedsAfterRetries() throws Exception {
SafeHelixManager mockManager = mock(SafeHelixManager.class);
String testCluster = "testCluster";

doReturn(testCluster).when(mockManager).getClusterName();

doThrow(new Exception("Connection failed")).doThrow(new Exception("Connection failed"))
.doNothing()
.when(mockManager)
.connect();

HelixUtils.connectHelixManager(mockManager, TEST_RETRY_COUNT);

verify(mockManager, times(TEST_RETRY_COUNT)).connect();
}

@Test
public void testCheckClusterSetup() {
HelixAdmin mockAdmin = mock(HelixAdmin.class);
String testCluster = "testCluster";

doReturn(Collections.singletonList(testCluster)).when(mockAdmin).getClusters();

HelixUtils.checkClusterSetup(mockAdmin, testCluster, TEST_RETRY_COUNT);

verify(mockAdmin, times(1)).getClusters();
}

@Test
public void testCheckClusterSetupFailsAfterRetries() {
HelixAdmin mockAdmin = mock(HelixAdmin.class);
String testCluster = "testCluster";

doReturn(Collections.emptyList()).when(mockAdmin).getClusters();

Assert.assertThrows(VeniceException.class, () -> {
HelixUtils.checkClusterSetup(mockAdmin, testCluster, TEST_RETRY_COUNT);
});

verify(mockAdmin, times(TEST_RETRY_COUNT)).getClusters();
}

@Test
public void testCheckClusterSetupSucceedsAfterRetries() {
HelixAdmin mockAdmin = mock(HelixAdmin.class);
String testCluster = "testCluster";

doReturn(Collections.emptyList()).doReturn(Collections.emptyList())
.doReturn(Collections.singletonList(testCluster))
.when(mockAdmin)
.getClusters();

HelixUtils.checkClusterSetup(mockAdmin, testCluster, TEST_RETRY_COUNT);

verify(mockAdmin, times(TEST_RETRY_COUNT)).getClusters();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -963,7 +963,7 @@ private CompletableFuture startServices(boolean async) {
// TODO: Remove this check once test constructor is removed or otherwise fixed.
LOGGER.info("Not connecting to Helix because the HelixManager is null (the test constructor was used)");
} else {
HelixUtils.connectHelixManager(manager, 30, 1);
HelixUtils.connectHelixManager(manager, 30);
LOGGER.info("{} finished connectHelixManager()", this);
}
} catch (VeniceException ve) {
Expand Down
Loading