Skip to content

Commit 287fcb3

Browse files
committed
wip for standardizing refreshAttemptForZkReconnect
1 parent f780377 commit 287fcb3

File tree

31 files changed

+90
-146
lines changed

31 files changed

+90
-146
lines changed

clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceClusterConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ public VeniceClusterConfig(VeniceProperties clusterProps, Map<String, Map<String
120120
this.kafkaFetchQuotaUnorderedRecordPerSecond =
121121
clusterProps.getLong(KAFKA_FETCH_QUOTA_UNORDERED_RECORDS_PER_SECOND, -1);
122122

123-
this.refreshAttemptsForZkReconnect = clusterProps.getInt(REFRESH_ATTEMPTS_FOR_ZK_RECONNECT, 3);
123+
this.refreshAttemptsForZkReconnect = clusterProps.getInt(REFRESH_ATTEMPTS_FOR_ZK_RECONNECT, 9);
124124
this.refreshIntervalForZkReconnectInMs =
125125
clusterProps.getLong(REFRESH_INTERVAL_FOR_ZK_RECONNECT_MS, TimeUnit.SECONDS.toMillis(10));
126126
this.kafkaReadCycleDelayMs = clusterProps.getLong(KAFKA_READ_CYCLE_DELAY_MS, 1000);

clients/da-vinci-client/src/main/java/com/linkedin/davinci/helix/HelixParticipationService.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,6 @@ public class HelixParticipationService extends AbstractVeniceService
6969
implements StatusMessageHandler<KillOfflinePushMessage> {
7070
private static final Logger LOGGER = LogManager.getLogger(HelixParticipationService.class);
7171

72-
private static final int MAX_RETRY = 30;
73-
private static final int RETRY_INTERVAL_SEC = 1;
74-
7572
private final Instance instance;
7673
private final String clusterName;
7774
private final String participantName;
@@ -305,7 +302,7 @@ private void checkBeforeJoinInCluster() {
305302
HelixAdmin admin = new ZKHelixAdmin(zkAddress);
306303
try {
307304
// Check whether the cluster is ready or not at first to prevent zk no node exception.
308-
HelixUtils.checkClusterSetup(admin, clusterName, MAX_RETRY);
305+
HelixUtils.checkClusterSetup(admin, clusterName, 9);
309306
List<String> instances = admin.getInstancesInCluster(clusterName);
310307
if (instances.contains(instance.getNodeId())) {
311308
LOGGER.info("{} is not a new node to cluster: {}, skip the cleaning up.", instance.getNodeId(), clusterName);
@@ -365,9 +362,8 @@ private void asyncStart() {
365362
clusterName,
366363
zkClient,
367364
new HelixAdapterSerializer(),
368-
veniceConfigLoader.getVeniceClusterConfig().getRefreshAttemptsForZkReconnect(),
369-
veniceConfigLoader.getVeniceClusterConfig().getRefreshIntervalForZkReconnectInMs(),
370-
veniceServerConfig.getRegionName());
365+
veniceServerConfig.getRegionName(),
366+
veniceConfigLoader.getCombinedProperties());
371367

372368
/**
373369
* The accessor can only get created successfully after helix manager is created.

clients/da-vinci-client/src/main/java/com/linkedin/davinci/repository/VeniceMetadataRepositoryBuilder.java

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -133,12 +133,8 @@ private void initServerStoreAndSchemaRepository() {
133133
HelixReadOnlyZKSharedSystemStoreRepository readOnlyZKSharedSystemStoreRepository =
134134
new HelixReadOnlyZKSharedSystemStoreRepository(zkClient, adapter, systemSchemaClusterName);
135135

136-
HelixReadOnlyStoreRepository readOnlyStoreRepository = new HelixReadOnlyStoreRepository(
137-
zkClient,
138-
adapter,
139-
clusterName,
140-
clusterConfig.getRefreshAttemptsForZkReconnect(),
141-
clusterConfig.getRefreshIntervalForZkReconnectInMs());
136+
HelixReadOnlyStoreRepository readOnlyStoreRepository =
137+
new HelixReadOnlyStoreRepository(zkClient, adapter, clusterName);
142138

143139
storeRepo = new HelixReadOnlyStoreRepositoryAdapter(
144140
readOnlyZKSharedSystemStoreRepository,
@@ -155,17 +151,15 @@ private void initServerStoreAndSchemaRepository() {
155151
zkClient,
156152
adapter,
157153
systemSchemaClusterName,
158-
clusterConfig.getRefreshAttemptsForZkReconnect(),
159-
clusterConfig.getRefreshIntervalForZkReconnectInMs());
154+
clusterConfig.getRefreshAttemptsForZkReconnect());
160155
schemaRepo = new HelixReadOnlySchemaRepositoryAdapter(
161156
readOnlyZKSharedSchemaRepository,
162157
new HelixReadOnlySchemaRepository(
163158
readOnlyStoreRepository,
164159
zkClient,
165160
adapter,
166161
clusterName,
167-
clusterConfig.getRefreshAttemptsForZkReconnect(),
168-
clusterConfig.getRefreshIntervalForZkReconnectInMs()));
162+
clusterConfig.getRefreshAttemptsForZkReconnect()));
169163
schemaRepo.refresh();
170164

171165
liveClusterConfigRepo = new HelixReadOnlyLiveClusterConfigRepository(zkClient, adapter, clusterName);

internal/venice-common/src/main/java/com/linkedin/venice/helix/CachedResourceZkStateListener.java

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,22 +20,19 @@
2020
public class CachedResourceZkStateListener implements IZkStateListener {
2121
private final Logger logger;
2222
public static final int DEFAULT_RETRY_LOAD_ATTEMPTS = 1;
23-
public static final long DEFAULT_RETRY_LOAD_INTERVAL_IN_MS = TimeUnit.SECONDS.toMillis(10);
2423
private final VeniceResource resource;
25-
private final int retryLoadAttempts;
26-
private final long retryLoadIntervalInMs;
24+
private final int refreshAttemptsForZkReconnect;
2725
private volatile boolean disconnected = false;
2826

2927
public CachedResourceZkStateListener(VeniceResource resource) {
3028
// By default, we only retry once after connection is reconnected.
31-
this(resource, DEFAULT_RETRY_LOAD_ATTEMPTS, DEFAULT_RETRY_LOAD_INTERVAL_IN_MS);
29+
this(resource, DEFAULT_RETRY_LOAD_ATTEMPTS);
3230
}
3331

34-
public CachedResourceZkStateListener(VeniceResource resource, int retryLoadAttempts, long retryLoadIntervalInMs) {
32+
public CachedResourceZkStateListener(VeniceResource resource, int refreshAttemptsForZkReconnect) {
3533
this.resource = resource;
3634
this.logger = LogManager.getLogger(this.getClass().getSimpleName() + " [" + getResourceName() + "]");
37-
this.retryLoadAttempts = retryLoadAttempts;
38-
this.retryLoadIntervalInMs = retryLoadIntervalInMs;
35+
this.refreshAttemptsForZkReconnect = refreshAttemptsForZkReconnect;
3936
}
4037

4138
/**
@@ -62,20 +59,22 @@ public void handleStateChanged(Watcher.Event.KeeperState state) throws Exception
6259
// retryLoadAttempts with retryLoadIntervalInMs between each two loading.
6360
// Sleep a random time(no more than retryLoadIntervalInMs) to avoid thunderstorm issue that all nodes are
6461
// trying to refresh resource at the same time if there is a network issue in that DC.
62+
// TODO: refactor to use exponential backoff like implemented in HelixUtils
63+
long retryLoadIntervalInMs = TimeUnit.SECONDS.toMillis(2);
6564
Utils.sleep((long) (Math.random() * retryLoadIntervalInMs));
6665
int attempt = 1;
67-
while (attempt <= retryLoadAttempts) {
66+
while (attempt <= refreshAttemptsForZkReconnect) {
6867
logger.info(
6968
"Attempt #{} of {}: Refresh resource after connection is reconnected.",
7069
attempt,
71-
retryLoadAttempts);
70+
refreshAttemptsForZkReconnect);
7271
try {
7372
resource.refresh();
74-
logger.info("Attempt #{} of {}: Refresh completed.", attempt, retryLoadAttempts);
73+
logger.info("Attempt #{} of {}: Refresh completed.", attempt, refreshAttemptsForZkReconnect);
7574
return;
7675
} catch (Exception e) {
7776
logger.error("Can not refresh resource correctly after client is reconnected", e);
78-
if (attempt < retryLoadAttempts) {
77+
if (attempt < refreshAttemptsForZkReconnect) {
7978
logger.info("Will retry after {} ms", retryLoadIntervalInMs);
8079
Utils.sleep(retryLoadIntervalInMs);
8180
}

internal/venice-common/src/main/java/com/linkedin/venice/helix/HelixReadOnlySchemaRepository.java

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -94,35 +94,26 @@ public HelixReadOnlySchemaRepository(
9494
ZkClient zkClient,
9595
HelixAdapterSerializer adapter,
9696
String clusterName,
97-
int refreshAttemptsForZkReconnect,
98-
long refreshIntervalForZkReconnectInMs) {
97+
int refreshAttemptsForZkReconnect) {
9998
this.storeRepository = storeRepository;
10099
this.zkClient = zkClient;
101-
this.accessor = new HelixSchemaAccessor(
102-
zkClient,
103-
adapter,
104-
clusterName,
105-
refreshAttemptsForZkReconnect,
106-
refreshIntervalForZkReconnectInMs);
100+
this.accessor = new HelixSchemaAccessor(zkClient, adapter, clusterName, refreshAttemptsForZkReconnect);
107101

108102
storeRepository.registerStoreDataChangedListener(this);
109-
this.zkStateListener =
110-
new CachedResourceZkStateListener(this, refreshAttemptsForZkReconnect, refreshIntervalForZkReconnectInMs);
103+
this.zkStateListener = new CachedResourceZkStateListener(this, refreshAttemptsForZkReconnect);
111104
}
112105

113106
/** test-only */
114107
HelixReadOnlySchemaRepository(
115108
ReadOnlyStoreRepository storeRepository,
116109
ZkClient zkClient,
117110
HelixSchemaAccessor accessor,
118-
int refreshAttemptsForZkReconnect,
119-
long refreshIntervalForZkReconnectInMs) {
111+
int refreshAttemptsForZkReconnect) {
120112
this.storeRepository = storeRepository;
121113
this.zkClient = zkClient;
122114
this.accessor = accessor;
123115
storeRepository.registerStoreDataChangedListener(this);
124-
this.zkStateListener =
125-
new CachedResourceZkStateListener(this, refreshAttemptsForZkReconnect, refreshIntervalForZkReconnectInMs);
116+
this.zkStateListener = new CachedResourceZkStateListener(this, refreshAttemptsForZkReconnect);
126117
}
127118

128119
/**

internal/venice-common/src/main/java/com/linkedin/venice/helix/HelixReadOnlyStoreRepository.java

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,7 @@ public class HelixReadOnlyStoreRepository extends CachedReadOnlyStoreRepository
2222
public HelixReadOnlyStoreRepository(
2323
ZkClient zkClient,
2424
HelixAdapterSerializer compositeSerializer,
25-
String clusterName,
26-
int refreshAttemptsForZkReconnect,
27-
long refreshIntervalForZkReconnectInMs) {
25+
String clusterName) {
2826
/**
2927
* HelixReadOnlyStoreRepository is used in router, server, fast-client, da-vinci and system store.
3028
* Its centralized locking should NOT be shared with other classes. Create a new instance.
@@ -118,15 +116,12 @@ protected void onRepositoryChanged(Collection<String> newZkStoreNames) {
118116

119117
private final CachedResourceZkStateListener zkStateListener = new CachedResourceZkStateListener(this);
120118

121-
private final IZkChildListener zkStoreRepositoryListener = new IZkChildListener() {
122-
@Override
123-
public void handleChildChange(String path, List<String> children) {
124-
if (!path.equals(clusterStoreRepositoryPath)) {
125-
LOGGER.warn("Notification path mismatch, path={}, expected={}.", path, clusterStoreRepositoryPath);
126-
return;
127-
}
128-
onRepositoryChanged(children);
119+
private final IZkChildListener zkStoreRepositoryListener = (path, children) -> {
120+
if (!path.equals(clusterStoreRepositoryPath)) {
121+
LOGGER.warn("Notification path mismatch, path={}, expected={}.", path, clusterStoreRepositoryPath);
122+
return;
129123
}
124+
onRepositoryChanged(children);
130125
};
131126

132127
private final IZkDataListener zkStoreListener = new IZkDataListener() {

internal/venice-common/src/main/java/com/linkedin/venice/helix/HelixReadOnlyZKSharedSchemaRepository.java

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,7 @@ public HelixReadOnlyZKSharedSchemaRepository(
1616
ZkClient zkClient,
1717
HelixAdapterSerializer adapter,
1818
String systemStoreClusterName,
19-
int refreshAttemptsForZkReconnect,
20-
long refreshIntervalForZkReconnectInMs) {
21-
super(
22-
storeRepository,
23-
zkClient,
24-
adapter,
25-
systemStoreClusterName,
26-
refreshAttemptsForZkReconnect,
27-
refreshIntervalForZkReconnectInMs);
19+
int refreshAttemptsForZkReconnect) {
20+
super(storeRepository, zkClient, adapter, systemStoreClusterName, refreshAttemptsForZkReconnect);
2821
}
2922
}

internal/venice-common/src/main/java/com/linkedin/venice/helix/HelixReadOnlyZKSharedSystemStoreRepository.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ public HelixReadOnlyZKSharedSystemStoreRepository(
2828
ZkClient zkClient,
2929
HelixAdapterSerializer compositeSerializer,
3030
String systemStoreClusterName) {
31-
super(zkClient, compositeSerializer, systemStoreClusterName, 0, 0);
31+
super(zkClient, compositeSerializer, systemStoreClusterName);
3232
// Initialize the necessary zk shared system stores
3333
for (VeniceSystemStoreType type: VeniceSystemStoreType.values()) {
3434
if (type.isNewMedataRepositoryAdopted()) {

internal/venice-common/src/main/java/com/linkedin/venice/helix/HelixSchemaAccessor.java

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
import com.linkedin.venice.utils.HelixUtils;
88
import com.linkedin.venice.utils.PathResourceRegistry;
99
import java.util.List;
10-
import java.util.concurrent.TimeUnit;
1110
import org.apache.helix.AccessOption;
1211
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
1312
import org.apache.helix.zookeeper.impl.client.ZkClient;
@@ -19,9 +18,6 @@
1918
public class HelixSchemaAccessor {
2019
private static final Logger logger = LogManager.getLogger(HelixSchemaAccessor.class);
2120

22-
private static final int DEFAULT_ZK_REFRESH_ATTEMPTS = 3;
23-
private static final long DEFAULT_ZK_REFRESH_INTERVAL = TimeUnit.SECONDS.toMillis(10);
24-
2521
// Key schema path name
2622
private static final String KEY_SCHEMA_PATH = "key-schema";
2723
// Value schema path name
@@ -45,22 +41,20 @@ public class HelixSchemaAccessor {
4541
private final String clusterName;
4642

4743
private final int refreshAttemptsForZkReconnect;
48-
private final long refreshIntervalForZkReconnectInMs;
4944

5045
public HelixSchemaAccessor(ZkClient zkClient, HelixAdapterSerializer helixAdapterSerializer, String clusterName) {
51-
this(zkClient, helixAdapterSerializer, clusterName, DEFAULT_ZK_REFRESH_ATTEMPTS, DEFAULT_ZK_REFRESH_INTERVAL);
46+
// TODO: refactor hard-coded refreshAttemptsForZkReconnect
47+
this(zkClient, helixAdapterSerializer, clusterName, 9);
5248
}
5349

5450
public HelixSchemaAccessor(
5551
ZkClient zkClient,
5652
HelixAdapterSerializer helixAdapterSerializer,
5753
String clusterName,
58-
int refreshAttemptsForZkReconnect,
59-
long refreshIntervalForZkReconnectInMs) {
54+
int refreshAttemptsForZkReconnect) {
6055
this.clusterName = clusterName;
6156

6257
this.refreshAttemptsForZkReconnect = refreshAttemptsForZkReconnect;
63-
this.refreshIntervalForZkReconnectInMs = refreshIntervalForZkReconnectInMs;
6458

6559
registerSerializerForSchema(zkClient, helixAdapterSerializer);
6660
schemaAccessor = new ZkBaseDataAccessor<>(zkClient);

internal/venice-common/src/main/java/com/linkedin/venice/helix/SubscriptionBasedStoreRepository.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ public SubscriptionBasedStoreRepository(
2323
ZkClient zkClient,
2424
HelixAdapterSerializer compositeSerializer,
2525
String clusterName) {
26-
super(zkClient, compositeSerializer, clusterName, 0, 0);
26+
super(zkClient, compositeSerializer, clusterName);
2727
}
2828

2929
@Override

internal/venice-common/src/main/java/com/linkedin/venice/helix/VeniceOfflinePushMonitorAccessor.java

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.linkedin.venice.helix;
22

3+
import static com.linkedin.venice.ConfigKeys.REFRESH_ATTEMPTS_FOR_ZK_RECONNECT;
34
import static com.linkedin.venice.zk.VeniceZkPaths.OFFLINE_PUSHES;
45

56
import com.linkedin.venice.exceptions.VeniceException;
@@ -14,12 +15,12 @@
1415
import com.linkedin.venice.utils.HelixUtils;
1516
import com.linkedin.venice.utils.LogContext;
1617
import com.linkedin.venice.utils.PathResourceRegistry;
18+
import com.linkedin.venice.utils.VeniceProperties;
1719
import java.util.ArrayList;
1820
import java.util.Collections;
1921
import java.util.Iterator;
2022
import java.util.List;
2123
import java.util.Optional;
22-
import java.util.concurrent.TimeUnit;
2324
import org.apache.helix.AccessOption;
2425
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
2526
import org.apache.helix.zookeeper.impl.client.ZkClient;
@@ -46,8 +47,6 @@
4647
*/
4748
public class VeniceOfflinePushMonitorAccessor implements OfflinePushAccessor {
4849
public static final String OFFLINE_PUSH_SUB_PATH = OFFLINE_PUSHES;
49-
private static final int DEFAULT_ZK_REFRESH_ATTEMPTS = 3;
50-
private static final long DEFAULT_ZK_REFRESH_INTERVAL = TimeUnit.SECONDS.toMillis(10);
5150

5251
private static final Logger LOGGER = LogManager.getLogger(VeniceOfflinePushMonitorAccessor.class);
5352
private final String clusterName;
@@ -68,23 +67,20 @@ public class VeniceOfflinePushMonitorAccessor implements OfflinePushAccessor {
6867

6968
private final int refreshAttemptsForZkReconnect;
7069

71-
private final long refreshIntervalForZkReconnectInMs;
72-
7370
public VeniceOfflinePushMonitorAccessor(
7471
String clusterName,
7572
ZkClient zkClient,
7673
HelixAdapterSerializer adapter,
7774
LogContext logContext) {
78-
this(clusterName, zkClient, adapter, DEFAULT_ZK_REFRESH_ATTEMPTS, DEFAULT_ZK_REFRESH_INTERVAL, logContext);
75+
this(clusterName, zkClient, adapter, logContext, VeniceProperties.empty());
7976
}
8077

8178
public VeniceOfflinePushMonitorAccessor(
8279
String clusterName,
8380
ZkClient zkClient,
8481
HelixAdapterSerializer adapter,
85-
int refreshAttemptsForZkReconnect,
86-
long refreshIntervalForZkReconnectInMs,
87-
Object logContext) {
82+
Object logContext,
83+
VeniceProperties props) {
8884
this.clusterName = clusterName;
8985
this.offlinePushStatusParentPath = getOfflinePushStatuesParentPath();
9086
this.zkClient = zkClient;
@@ -94,8 +90,7 @@ public VeniceOfflinePushMonitorAccessor(
9490
this.partitionStatusAccessor = new ZkBaseDataAccessor<>(zkClient);
9591
this.listenerManager = new ListenerManager<>(logContext);
9692
this.partitionStatusZkListener = new PartitionStatusZkListener(logContext);
97-
this.refreshAttemptsForZkReconnect = refreshAttemptsForZkReconnect;
98-
this.refreshIntervalForZkReconnectInMs = refreshIntervalForZkReconnectInMs;
93+
this.refreshAttemptsForZkReconnect = props.getInt(REFRESH_ATTEMPTS_FOR_ZK_RECONNECT, 9);
9994
}
10095

10196
/**
@@ -105,16 +100,16 @@ public VeniceOfflinePushMonitorAccessor(
105100
String clusterName,
106101
ZkBaseDataAccessor<OfflinePushStatus> offlinePushStatusAccessor,
107102
ZkBaseDataAccessor<PartitionStatus> partitionStatusAccessor,
108-
LogContext logContext) {
103+
LogContext logContext,
104+
VeniceProperties props) {
109105
this.clusterName = clusterName;
110106
this.offlinePushStatusAccessor = offlinePushStatusAccessor;
111107
this.partitionStatusAccessor = partitionStatusAccessor;
112108
this.offlinePushStatusParentPath = getOfflinePushStatuesParentPath();
113109
this.zkClient = null;
114110
this.listenerManager = new ListenerManager<>(logContext);
115111
this.partitionStatusZkListener = new PartitionStatusZkListener(logContext);
116-
this.refreshAttemptsForZkReconnect = DEFAULT_ZK_REFRESH_ATTEMPTS;
117-
this.refreshIntervalForZkReconnectInMs = DEFAULT_ZK_REFRESH_INTERVAL;
112+
this.refreshAttemptsForZkReconnect = props.getInt(REFRESH_ATTEMPTS_FOR_ZK_RECONNECT, 9);
118113
}
119114

120115
private void registerSerializers(HelixAdapterSerializer adapter) {

internal/venice-common/src/main/java/com/linkedin/venice/helix/ZkRoutersClusterManager.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,7 @@ public ZkRoutersClusterManager(
6262
adapter.registerSerializer(getRouterRootPath(), new RouterClusterConfigJSONSerializer());
6363
zkClient.setZkSerializer(adapter);
6464
dataAccessor = new ZkBaseDataAccessor<>(zkClient);
65-
zkStateListener =
66-
new CachedResourceZkStateListener(this, refreshAttemptsForZkReconnect, refreshIntervalForZkReconnectInMs);
65+
zkStateListener = new CachedResourceZkStateListener(this, refreshAttemptsForZkReconnect);
6766
isConnected.set(zkClient.getConnection().getZookeeperState().isAlive());
6867
this.zkClient.subscribeStateChanges(this);
6968
}

internal/venice-common/src/main/java/com/linkedin/venice/utils/HelixUtils.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,7 @@ public static <T> void compareAndUpdate(
240240
*/
241241
private static void handleFailedHelixOperation(String path, String helixOperation, int attempt, int retryCount) {
242242
if (attempt < retryCount) {
243+
// Is empty if the caller doesn't operate on a specific ZK path (like connectHelixManager and checkClusterSetup)
243244
if (!path.isEmpty()) {
244245
path = " with path " + path;
245246
}

0 commit comments

Comments
 (0)