Skip to content

Commit e11134f

Browse files
committed
add test
admin tool command to clean execution ids
1 parent e51a06b commit e11134f

File tree

14 files changed

+171
-2
lines changed

14 files changed

+171
-2
lines changed

clients/venice-admin-tool/build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ dependencies {
3030
implementation('org.apache.helix:metrics-common:1.4.1:jdk8')
3131
implementation libraries.zstd
3232

33+
implementation libraries.commonsLang
34+
3335
testImplementation project(':internal:venice-common').sourceSets.test.output
3436
}
3537

clients/venice-admin-tool/src/main/java/com/linkedin/venice/AdminTool.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,11 @@ public static void main(String[] args) throws Exception {
262262
storeResponse = queryStoreList(cmd);
263263
printObject(storeResponse);
264264
break;
265+
case CLEAN_EXECUTION_IDS:
266+
String cluster = getRequiredArgument(cmd, Arg.CLUSTER);
267+
response = controllerClient.cleanExecutionIds(cluster);
268+
printObject(response);
269+
break;
265270
case DESCRIBE_STORE:
266271
storeName = getRequiredArgument(cmd, Arg.STORE, Command.DESCRIBE_STORE);
267272
for (String store: storeName.split(",")) {

clients/venice-admin-tool/src/main/java/com/linkedin/venice/Command.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -600,6 +600,10 @@ public enum Command {
600600
UPDATE_ADMIN_OPERATION_PROTOCOL_VERSION(
601601
"update-admin-operation-protocol-version", "Update the admin operation protocol version",
602602
new Arg[] { URL, CLUSTER, ADMIN_OPERATION_PROTOCOL_VERSION }
603+
),
604+
CLEAN_EXECUTION_IDS(
605+
"clean-execution-ids", "Clean execution ids for the deleted store from `succeededPerStore` map.",
606+
new Arg[] { URL, CLUSTER }
603607
);
604608

605609
private final String commandName;
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package com.linkedin.venice.controllerapi;
2+
3+
import java.util.Map;
4+
5+
6+
public class CleanExecutionIdsResponse extends ControllerResponse {
7+
private Map<String, Long> cleanedExecutionIds;
8+
private Map<String, Long> remainingExecutionIds;
9+
10+
public void setCleanedExecutionIds(Map<String, Long> cleanedExecutionIds) {
11+
this.cleanedExecutionIds = cleanedExecutionIds;
12+
}
13+
14+
public void setRemainingExecutionIds(Map<String, Long> remainingExecutionIds) {
15+
this.remainingExecutionIds = remainingExecutionIds;
16+
}
17+
18+
public Map<String, Long> getCleanedExecutionIds() {
19+
return this.cleanedExecutionIds;
20+
}
21+
22+
public Map<String, Long> getRemainingExecutionIds() {
23+
return this.remainingExecutionIds;
24+
}
25+
}

internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerClient.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -891,6 +891,11 @@ public MultiStoreResponse queryStoreList(
891891
return request(ControllerRoute.LIST_STORES, queryParams, MultiStoreResponse.class);
892892
}
893893

894+
public CleanExecutionIdsResponse cleanExecutionIds(String clusterName) {
895+
QueryParams queryParams = newParams().add(CLUSTER, clusterName);
896+
return request(ControllerRoute.CLEAN_EXECUTION_IDS, queryParams, CleanExecutionIdsResponse.class);
897+
}
898+
894899
public MultiStoreStatusResponse listStoresStatuses() {
895900
return request(ControllerRoute.CLUSTER_HEALTH_STORES, newParams(), MultiStoreStatusResponse.class);
896901
}

internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerRoute.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ public enum ControllerRoute {
153153
JOB("/job", HttpMethod.GET, Arrays.asList(NAME, VERSION)),
154154
KILL_OFFLINE_PUSH_JOB("/kill_offline_push_job", HttpMethod.POST, Collections.singletonList(TOPIC)),
155155
LIST_STORES("/list_stores", HttpMethod.GET, Collections.emptyList(), INCLUDE_SYSTEM_STORES),
156+
CLEAN_EXECUTION_IDS("/clean_execution_ids", HttpMethod.GET, Collections.emptyList(), CLUSTER),
156157
LIST_CHILD_CLUSTERS("/list_child_clusters", HttpMethod.GET, Collections.emptyList()),
157158
LIST_NODES("/list_instances", HttpMethod.GET, Collections.emptyList()),
158159
CLUSTER_HEALTH_STORES("/cluster_health_stores", HttpMethod.GET, Collections.emptyList()),

internal/venice-common/src/main/java/com/linkedin/venice/helix/ZkAllowlistAccessor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public void removeInstanceFromAllowList(String clusterName, String helixNodeId)
6767
zkClient.delete(getAllowListInstancePath(clusterName, helixNodeId));
6868
}
6969

70-
private String getAllowListPath(String clusterName) {
70+
public static String getAllowListPath(String clusterName) {
7171
return HelixUtils.getHelixClusterZkPath(clusterName) + PREFIX_PATH;
7272
}
7373

internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/AdminToolE2ETest.java

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,20 @@
99
import com.linkedin.venice.AdminTool;
1010
import com.linkedin.venice.common.VeniceSystemStoreType;
1111
import com.linkedin.venice.controllerapi.ControllerClient;
12+
import com.linkedin.venice.controllerapi.ControllerResponse;
1213
import com.linkedin.venice.controllerapi.MultiStoreResponse;
1314
import com.linkedin.venice.controllerapi.NewStoreResponse;
1415
import com.linkedin.venice.controllerapi.StoreResponse;
16+
import com.linkedin.venice.controllerapi.UpdateStoreQueryParams;
17+
import com.linkedin.venice.exceptions.ErrorType;
1518
import com.linkedin.venice.integration.utils.ServiceFactory;
1619
import com.linkedin.venice.integration.utils.VeniceControllerWrapper;
1720
import com.linkedin.venice.integration.utils.VeniceMultiClusterWrapper;
1821
import com.linkedin.venice.integration.utils.VeniceMultiRegionClusterCreateOptions;
1922
import com.linkedin.venice.integration.utils.VeniceTwoLayerMultiRegionMultiClusterWrapper;
2023
import com.linkedin.venice.meta.StoreInfo;
2124
import com.linkedin.venice.utils.TestUtils;
25+
import com.linkedin.venice.utils.TestWriteUtils;
2226
import com.linkedin.venice.utils.Utils;
2327
import java.util.Arrays;
2428
import java.util.HashSet;
@@ -28,6 +32,7 @@
2832
import java.util.Set;
2933
import java.util.concurrent.TimeUnit;
3034
import java.util.stream.Collectors;
35+
import org.testng.Assert;
3136
import org.testng.annotations.AfterClass;
3237
import org.testng.annotations.BeforeClass;
3338
import org.testng.annotations.Test;
@@ -299,6 +304,62 @@ public void testUpdateStoreMigrationStatus() throws Exception {
299304
}
300305
}
301306

307+
@Test(timeOut = TEST_TIMEOUT)
308+
public void testCleanExecutionIds() throws Exception {
309+
String storeName1 = Utils.getUniqueString("testCleanExecutionIds-1");
310+
String storeName2 = Utils.getUniqueString("testCleanExecutionIds-2");
311+
List<VeniceControllerWrapper> parentControllers = multiRegionMultiClusterWrapper.getParentControllers();
312+
String clusterName = clusterNames[0];
313+
String parentControllerURLs =
314+
parentControllers.stream().map(VeniceControllerWrapper::getControllerUrl).collect(Collectors.joining(","));
315+
ControllerClient parentControllerClient = new ControllerClient(clusterName, parentControllerURLs);
316+
ControllerClient childControllerClient = ControllerClient
317+
.constructClusterControllerClient(clusterName, childDatacenters.get(0).getControllerConnectString());
318+
319+
createStore(parentControllerClient, childControllerClient, storeName1);
320+
createStore(parentControllerClient, childControllerClient, storeName2);
321+
322+
String[] adminToolArgs = new String[] { "--url", childControllerClient.getLeaderControllerUrl(), "--cluster",
323+
clusterName, "--clean-execution-ids" };
324+
325+
AdminTool.main(adminToolArgs);
326+
327+
UpdateStoreQueryParams updateStoreParams = new UpdateStoreQueryParams();
328+
updateStoreParams.setEnableReads(false).setEnableWrites(false);
329+
TestWriteUtils.updateStore(storeName1, parentControllerClient, updateStoreParams);
330+
parentControllerClient.deleteStore(storeName1);
331+
ControllerResponse deleteStoreResponse = parentControllerClient.retryableRequest(5, c -> c.deleteStore(storeName1));
332+
Assert.assertFalse(
333+
deleteStoreResponse.isError(),
334+
"The DeleteStoreResponse returned an error: " + deleteStoreResponse.getError());
335+
336+
TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, () -> {
337+
StoreResponse getStoreResponse = childControllerClient.getStore(storeName1);
338+
Assert.assertEquals(getStoreResponse.getErrorType(), ErrorType.STORE_NOT_FOUND);
339+
});
340+
341+
AdminTool.main(adminToolArgs);
342+
}
343+
344+
private void createStore(
345+
ControllerClient parentControllerClient,
346+
ControllerClient childControllerClient,
347+
String storeName) {
348+
TestUtils.assertCommand(
349+
parentControllerClient
350+
.retryableRequest(5, c -> c.createNewStore(storeName, "test", "\"string\"", "\"string\"")));
351+
352+
TestUtils.assertCommand(
353+
parentControllerClient
354+
.retryableRequest(5, c -> c.emptyPush(storeName, Utils.getUniqueString("empty-push-1"), 1L)));
355+
356+
TestUtils.waitForNonDeterministicCompletion(
357+
100,
358+
TimeUnit.SECONDS,
359+
() -> childControllerClient.getStore(storeName).getStore().getCurrentVersion() > 0);
360+
361+
}
362+
302363
private void validateStoreMigrationStatusAcrossChildRegions(
303364
String storeName,
304365
String clusterName,

internal/venice-test-common/src/main/java/com/linkedin/venice/admin/InMemoryExecutionIdAccessor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,11 @@ public void updateLastGeneratedExecutionId(String clusterName, Long lastGenerate
5050
// not used, no op.
5151
}
5252

53+
@Override
54+
public void replaceExecutionIdMapInZk(String clusterName, Map<String, Long> executionIdMap) {
55+
executionIdMapInMem.put(clusterName, executionIdMap);
56+
}
57+
5358
@Override
5459
public Long incrementAndGetExecutionId(String clusterName) {
5560
return ++executionId;

services/venice-controller/src/main/java/com/linkedin/venice/controller/ExecutionIdAccessor.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ public interface ExecutionIdAccessor {
3737
*/
3838
void updateLastGeneratedExecutionId(String clusterName, Long lastGeneratedExecutionId);
3939

40+
void replaceExecutionIdMapInZk(String clusterName, Map<String, Long> executionIdMap);
41+
4042
/**
4143
* Read the current value from ZK and try to increment the value by 1 and write it back to ZK.
4244
* @return updated execution id.

services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@
141141
import com.linkedin.venice.meta.RegionPushDetails;
142142
import com.linkedin.venice.meta.RoutersClusterConfig;
143143
import com.linkedin.venice.meta.RoutingDataRepository;
144+
import com.linkedin.venice.meta.SimpleStringSerializer;
144145
import com.linkedin.venice.meta.Store;
145146
import com.linkedin.venice.meta.StoreCleaner;
146147
import com.linkedin.venice.meta.StoreConfig;
@@ -461,6 +462,7 @@ public class VeniceHelixAdmin implements Admin, StoreCleaner {
461462
private final LogContext logContext;
462463

463464
final Map<String, DeadStoreStats> deadStoreStatsMap = new VeniceConcurrentHashMap<>();
465+
private final ZkBaseDataAccessor<String> zkAccessor2;
464466

465467
public VeniceHelixAdmin(
466468
VeniceControllerMultiClusterConfig multiClusterConfigs,
@@ -781,6 +783,16 @@ public void handleDeletedInstances(Set<Instance> deletedInstances) {
781783
Lazy.of(() -> ByteBuffer.wrap(ZstdWithDictCompressor.buildDictionaryOnSyntheticAvroData()));
782784

783785
pushJobUserErrorCheckpoints = commonConfig.getPushJobUserErrorCheckpoints();
786+
787+
ZkClient zkClient2 = ZkClientFactory.newZkClient(multiClusterConfigs.getZkAddress());
788+
HelixAdapterSerializer helixAdapterSerializer2 = new HelixAdapterSerializer();
789+
// helixAdapterSerializer2.registerSerializer(
790+
// ZkAllowlistAccessor.getAllowListPath(PathResourceRegistry.WILDCARD_MATCH_ANY),
791+
// new SimpleStringSerializer());
792+
helixAdapterSerializer2.registerSerializer("/*/*/*", new SimpleStringSerializer());
793+
helixAdapterSerializer2.registerSerializer("/*/*/*/*", new SimpleStringSerializer());
794+
zkClient2.setZkSerializer(helixAdapterSerializer2);
795+
zkAccessor2 = new ZkBaseDataAccessor<>(zkClient2);
784796
}
785797

786798
private VeniceProperties getPubSubSSLPropertiesFromControllerConfig(String pubSubBootstrapServers) {

services/venice-controller/src/main/java/com/linkedin/venice/controller/ZkExecutionIdAccessor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,11 @@ private void updateExecutionIdMapToZk(String path, String storeName, Long lastSu
158158
});
159159
}
160160

161+
public void replaceExecutionIdMapInZk(String clusterName, Map<String, Long> executionIdMap) {
162+
String path = getLastSucceededExecutionIdMapPath(clusterName);
163+
HelixUtils.compareAndUpdate(zkMapAccessor, path, ZK_RETRY_COUNT, dataUpdater -> executionIdMap);
164+
}
165+
161166
private Long getExecutionIdFromZk(String path) {
162167
int retry = ZK_RETRY_COUNT;
163168
while (retry > 0) {

services/venice-controller/src/main/java/com/linkedin/venice/controller/server/AdminSparkServer.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import static com.linkedin.venice.controllerapi.ControllerRoute.BACKUP_VERSION;
1212
import static com.linkedin.venice.controllerapi.ControllerRoute.CHECK_RESOURCE_CLEANUP_FOR_STORE_CREATION;
1313
import static com.linkedin.venice.controllerapi.ControllerRoute.CLEANUP_INSTANCE_CUSTOMIZED_STATES;
14+
import static com.linkedin.venice.controllerapi.ControllerRoute.CLEAN_EXECUTION_IDS;
1415
import static com.linkedin.venice.controllerapi.ControllerRoute.CLUSTER_DISCOVERY;
1516
import static com.linkedin.venice.controllerapi.ControllerRoute.CLUSTER_HEALTH_STORES;
1617
import static com.linkedin.venice.controllerapi.ControllerRoute.COMPARE_STORE;
@@ -330,6 +331,9 @@ public boolean startInner() throws Exception {
330331
httpService.get(
331332
LIST_STORES.getPath(),
332333
new VeniceParentControllerRegionStateHandler(admin, storesRoutes.getAllStores(admin)));
334+
httpService.get(
335+
CLEAN_EXECUTION_IDS.getPath(),
336+
new VeniceParentControllerRegionStateHandler(admin, storesRoutes.cleanExecutionIds(admin)));
333337
httpService.get(
334338
CLUSTER_HEALTH_STORES.getPath(),
335339
new VeniceParentControllerRegionStateHandler(admin, storesRoutes.getAllStoresStatuses(admin)));

services/venice-controller/src/main/java/com/linkedin/venice/controller/server/StoresRoutes.java

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,11 @@
6565
import com.linkedin.venice.acl.DynamicAccessController;
6666
import com.linkedin.venice.controller.Admin;
6767
import com.linkedin.venice.controller.AdminCommandExecutionTracker;
68+
import com.linkedin.venice.controller.VeniceHelixAdmin;
69+
import com.linkedin.venice.controller.VeniceParentHelixAdmin;
6870
import com.linkedin.venice.controller.kafka.TopicCleanupService;
6971
import com.linkedin.venice.controller.repush.RepushJobRequest;
72+
import com.linkedin.venice.controllerapi.CleanExecutionIdsResponse;
7073
import com.linkedin.venice.controllerapi.ClusterStaleDataAuditResponse;
7174
import com.linkedin.venice.controllerapi.ControllerResponse;
7275
import com.linkedin.venice.controllerapi.MultiStoreInfoResponse;
@@ -110,6 +113,7 @@
110113
import com.linkedin.venice.utils.Utils;
111114
import java.util.ArrayList;
112115
import java.util.Collections;
116+
import java.util.HashMap;
113117
import java.util.List;
114118
import java.util.Map;
115119
import java.util.Optional;
@@ -261,6 +265,40 @@ public void internalHandle(Request request, MultiStoreResponse veniceResponse) {
261265
};
262266
}
263267

268+
public Route cleanExecutionIds(Admin admin) {
269+
return new VeniceRouteHandler<CleanExecutionIdsResponse>(CleanExecutionIdsResponse.class) {
270+
@Override
271+
public void internalHandle(Request request, CleanExecutionIdsResponse veniceResponse) {
272+
String cluster = request.queryParams(CLUSTER);
273+
VeniceHelixAdmin veniceHelixAdmin;
274+
if (admin instanceof VeniceParentHelixAdmin) {
275+
veniceHelixAdmin = ((VeniceParentHelixAdmin) admin).getVeniceHelixAdmin();
276+
} else {
277+
veniceHelixAdmin = (VeniceHelixAdmin) admin;
278+
}
279+
280+
Map<String, Long> cleanedExecutionIds = new HashMap<>();
281+
Map<String, Long> remainingExecutionIds = new HashMap<>();
282+
283+
Map<String, Long> resultMap = veniceHelixAdmin.getExecutionIdAccessor().getLastSucceededExecutionIdMap(cluster);
284+
285+
for (Map.Entry<String, Long> entry: resultMap.entrySet()) {
286+
String store = entry.getKey();
287+
if (admin.getStore(cluster, store) == null) {
288+
cleanedExecutionIds.put(store, entry.getValue());
289+
} else {
290+
remainingExecutionIds.put(store, entry.getValue());
291+
}
292+
}
293+
294+
veniceHelixAdmin.getExecutionIdAccessor().replaceExecutionIdMapInZk(cluster, remainingExecutionIds);
295+
296+
veniceResponse.setCleanedExecutionIds(cleanedExecutionIds);
297+
veniceResponse.setRemainingExecutionIds(remainingExecutionIds);
298+
}
299+
};
300+
}
301+
264302
/**
265303
* No ACL check; any user can try to list store statuses.
266304
* @see Admin#getAllStoreStatuses(String)
@@ -512,7 +550,7 @@ public void internalHandle(Request request, StoreMigrationResponse veniceRespons
512550
}
513551

514552
/**
515-
* @see Admin#deleteStore(String, String, int, boolean)
553+
* @see Admin#deleteStore(String, String, boolean, int, boolean)
516554
*/
517555
public Route deleteStore(Admin admin) {
518556
return new VeniceRouteHandler<TrackableControllerResponse>(TrackableControllerResponse.class) {

0 commit comments

Comments
 (0)