Skip to content

Commit 56534c1

Browse files
committed
admin tool command to clean execution ids
1 parent e51a06b commit 56534c1

File tree

13 files changed

+1373
-2
lines changed

13 files changed

+1373
-2
lines changed

build.gradle

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,8 @@ subprojects {
318318
doFirst {
319319
def versionOverrides = [
320320
// project(':internal:venice-common').file('src/main/resources/avro/StoreVersionState/v5', PathValidation.DIRECTORY)
321-
project(':internal:venice-common').file('src/main/resources/avro/KafkaMessageEnvelope/v11', PathValidation.DIRECTORY)
321+
project(':internal:venice-common').file('src/main/resources/avro/KafkaMessageEnvelope/v11', PathValidation.DIRECTORY),
322+
project(':services:venice-controller').file('src/main/resources/avro/AdminOperation/v87', PathValidation.DIRECTORY)
322323
]
323324

324325
def schemaDirs = [sourceDir]

clients/venice-admin-tool/src/main/java/com/linkedin/venice/AdminTool.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,11 @@ public static void main(String[] args) throws Exception {
262262
storeResponse = queryStoreList(cmd);
263263
printObject(storeResponse);
264264
break;
265+
case CLEAN_EXECUTION_IDS:
266+
String cluster = getRequiredArgument(cmd, Arg.CLUSTER);
267+
response = controllerClient.cleanExecutionIds(cluster);
268+
printObject(response);
269+
break;
265270
case DESCRIBE_STORE:
266271
storeName = getRequiredArgument(cmd, Arg.STORE, Command.DESCRIBE_STORE);
267272
for (String store: storeName.split(",")) {

clients/venice-admin-tool/src/main/java/com/linkedin/venice/Command.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -600,6 +600,10 @@ public enum Command {
600600
UPDATE_ADMIN_OPERATION_PROTOCOL_VERSION(
601601
"update-admin-operation-protocol-version", "Update the admin operation protocol version",
602602
new Arg[] { URL, CLUSTER, ADMIN_OPERATION_PROTOCOL_VERSION }
603+
),
604+
CLEAN_EXECUTION_IDS(
605+
"clean-execution-ids", "Clean execution ids for the deleted store from `succeededPerStore` map.",
606+
new Arg[] { URL, CLUSTER }
603607
);
604608

605609
private final String commandName;
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package com.linkedin.venice.controllerapi;
2+
3+
import java.util.Map;
4+
5+
6+
public class CleanExecutionIdsResponse extends ControllerResponse {
7+
private Map<String, Long> cleanedExecutionIds;
8+
private Map<String, Long> remainingExecutionIds;
9+
10+
public void setCleanedExecutionIds(Map<String, Long> cleanedExecutionIds) {
11+
this.cleanedExecutionIds = cleanedExecutionIds;
12+
}
13+
14+
public void setRemainingExecutionIds(Map<String, Long> remainingExecutionIds) {
15+
this.remainingExecutionIds = remainingExecutionIds;
16+
}
17+
18+
public Map<String, Long> getCleanedExecutionIds() {
19+
return this.cleanedExecutionIds;
20+
}
21+
22+
public Map<String, Long> getRemainingExecutionIds() {
23+
return this.remainingExecutionIds;
24+
}
25+
}

internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerClient.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -891,6 +891,11 @@ public MultiStoreResponse queryStoreList(
891891
return request(ControllerRoute.LIST_STORES, queryParams, MultiStoreResponse.class);
892892
}
893893

894+
public CleanExecutionIdsResponse cleanExecutionIds(String clusterName) {
895+
QueryParams queryParams = newParams().add(CLUSTER, clusterName);
896+
return request(ControllerRoute.CLEAN_EXECUTION_IDS, queryParams, CleanExecutionIdsResponse.class);
897+
}
898+
894899
public MultiStoreStatusResponse listStoresStatuses() {
895900
return request(ControllerRoute.CLUSTER_HEALTH_STORES, newParams(), MultiStoreStatusResponse.class);
896901
}

internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerRoute.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ public enum ControllerRoute {
153153
JOB("/job", HttpMethod.GET, Arrays.asList(NAME, VERSION)),
154154
KILL_OFFLINE_PUSH_JOB("/kill_offline_push_job", HttpMethod.POST, Collections.singletonList(TOPIC)),
155155
LIST_STORES("/list_stores", HttpMethod.GET, Collections.emptyList(), INCLUDE_SYSTEM_STORES),
156+
CLEAN_EXECUTION_IDS("/clean_execution_ids", HttpMethod.GET, Collections.emptyList(), CLUSTER),
156157
LIST_CHILD_CLUSTERS("/list_child_clusters", HttpMethod.GET, Collections.emptyList()),
157158
LIST_NODES("/list_instances", HttpMethod.GET, Collections.emptyList()),
158159
CLUSTER_HEALTH_STORES("/cluster_health_stores", HttpMethod.GET, Collections.emptyList()),

internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/AdminToolE2ETest.java

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,20 @@
99
import com.linkedin.venice.AdminTool;
1010
import com.linkedin.venice.common.VeniceSystemStoreType;
1111
import com.linkedin.venice.controllerapi.ControllerClient;
12+
import com.linkedin.venice.controllerapi.ControllerResponse;
1213
import com.linkedin.venice.controllerapi.MultiStoreResponse;
1314
import com.linkedin.venice.controllerapi.NewStoreResponse;
1415
import com.linkedin.venice.controllerapi.StoreResponse;
16+
import com.linkedin.venice.controllerapi.UpdateStoreQueryParams;
17+
import com.linkedin.venice.exceptions.ErrorType;
1518
import com.linkedin.venice.integration.utils.ServiceFactory;
1619
import com.linkedin.venice.integration.utils.VeniceControllerWrapper;
1720
import com.linkedin.venice.integration.utils.VeniceMultiClusterWrapper;
1821
import com.linkedin.venice.integration.utils.VeniceMultiRegionClusterCreateOptions;
1922
import com.linkedin.venice.integration.utils.VeniceTwoLayerMultiRegionMultiClusterWrapper;
2023
import com.linkedin.venice.meta.StoreInfo;
2124
import com.linkedin.venice.utils.TestUtils;
25+
import com.linkedin.venice.utils.TestWriteUtils;
2226
import com.linkedin.venice.utils.Utils;
2327
import java.util.Arrays;
2428
import java.util.HashSet;
@@ -28,6 +32,7 @@
2832
import java.util.Set;
2933
import java.util.concurrent.TimeUnit;
3034
import java.util.stream.Collectors;
35+
import org.testng.Assert;
3136
import org.testng.annotations.AfterClass;
3237
import org.testng.annotations.BeforeClass;
3338
import org.testng.annotations.Test;
@@ -299,6 +304,62 @@ public void testUpdateStoreMigrationStatus() throws Exception {
299304
}
300305
}
301306

307+
@Test(timeOut = TEST_TIMEOUT)
308+
public void testCleanExecutionIds() throws Exception {
309+
String storeName1 = Utils.getUniqueString("testCleanExecutionIds-1");
310+
String storeName2 = Utils.getUniqueString("testCleanExecutionIds-2");
311+
List<VeniceControllerWrapper> parentControllers = multiRegionMultiClusterWrapper.getParentControllers();
312+
String clusterName = clusterNames[0];
313+
String parentControllerURLs =
314+
parentControllers.stream().map(VeniceControllerWrapper::getControllerUrl).collect(Collectors.joining(","));
315+
ControllerClient parentControllerClient = new ControllerClient(clusterName, parentControllerURLs);
316+
ControllerClient childControllerClient = ControllerClient
317+
.constructClusterControllerClient(clusterName, childDatacenters.get(0).getControllerConnectString());
318+
319+
createStore(parentControllerClient, childControllerClient, storeName1);
320+
createStore(parentControllerClient, childControllerClient, storeName2);
321+
322+
String[] adminToolArgs = new String[] { "--url", childControllerClient.getLeaderControllerUrl(), "--cluster",
323+
clusterName, "--clean-execution-ids" };
324+
325+
AdminTool.main(adminToolArgs);
326+
327+
UpdateStoreQueryParams updateStoreParams = new UpdateStoreQueryParams();
328+
updateStoreParams.setEnableReads(false).setEnableWrites(false);
329+
TestWriteUtils.updateStore(storeName1, parentControllerClient, updateStoreParams);
330+
parentControllerClient.deleteStore(storeName1);
331+
ControllerResponse deleteStoreResponse = parentControllerClient.retryableRequest(5, c -> c.deleteStore(storeName1));
332+
Assert.assertFalse(
333+
deleteStoreResponse.isError(),
334+
"The DeleteStoreResponse returned an error: " + deleteStoreResponse.getError());
335+
336+
TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, () -> {
337+
StoreResponse getStoreResponse = childControllerClient.getStore(storeName1);
338+
Assert.assertEquals(getStoreResponse.getErrorType(), ErrorType.STORE_NOT_FOUND);
339+
});
340+
341+
AdminTool.main(adminToolArgs);
342+
}
343+
344+
private void createStore(
345+
ControllerClient parentControllerClient,
346+
ControllerClient childControllerClient,
347+
String storeName) {
348+
TestUtils.assertCommand(
349+
parentControllerClient
350+
.retryableRequest(5, c -> c.createNewStore(storeName, "test", "\"string\"", "\"string\"")));
351+
352+
TestUtils.assertCommand(
353+
parentControllerClient
354+
.retryableRequest(5, c -> c.emptyPush(storeName, Utils.getUniqueString("empty-push-1"), 1L)));
355+
356+
TestUtils.waitForNonDeterministicCompletion(
357+
100,
358+
TimeUnit.SECONDS,
359+
() -> childControllerClient.getStore(storeName).getStore().getCurrentVersion() > 0);
360+
361+
}
362+
302363
private void validateStoreMigrationStatusAcrossChildRegions(
303364
String storeName,
304365
String clusterName,

internal/venice-test-common/src/main/java/com/linkedin/venice/admin/InMemoryExecutionIdAccessor.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33
import com.linkedin.venice.controller.ExecutionIdAccessor;
44
import java.util.HashMap;
55
import java.util.Map;
6+
import java.util.Set;
7+
import java.util.stream.Collectors;
8+
import org.apache.commons.lang3.tuple.Pair;
69

710

811
/**
@@ -50,6 +53,20 @@ public void updateLastGeneratedExecutionId(String clusterName, Long lastGenerate
5053
// not used, no op.
5154
}
5255

56+
@Override
57+
public Pair<Map<String, Long>, Map<String, Long>> cleanExecutionIdMap(String clusterName, Set<String> allStores) {
58+
Map<String, Long> executionIdsCleaned = new HashMap<>(executionIdMapInMem.get(clusterName));
59+
Map<String, Long> executionIdsToKeep = executionIdMapInMem.get(clusterName)
60+
.entrySet()
61+
.parallelStream()
62+
.filter(entry -> allStores.contains(entry.getKey()))
63+
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
64+
65+
executionIdsCleaned.keySet().removeAll(executionIdsToKeep.keySet());
66+
executionIdMapInMem.put(clusterName, executionIdsToKeep);
67+
return Pair.of(executionIdsCleaned, executionIdsToKeep);
68+
}
69+
5370
@Override
5471
public Long incrementAndGetExecutionId(String clusterName) {
5572
return ++executionId;

services/venice-controller/src/main/java/com/linkedin/venice/controller/ExecutionIdAccessor.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package com.linkedin.venice.controller;
22

33
import java.util.Map;
4+
import java.util.Set;
5+
import org.apache.commons.lang3.tuple.Pair;
46

57

68
/**
@@ -37,6 +39,8 @@ public interface ExecutionIdAccessor {
3739
*/
3840
void updateLastGeneratedExecutionId(String clusterName, Long lastGeneratedExecutionId);
3941

42+
Pair<Map<String, Long>, Map<String, Long>> cleanExecutionIdMap(String clusterName, Set<String> allStores);
43+
4044
/**
4145
* Read the current value from ZK and try to increment the value by 1 and write it back to ZK.
4246
* @return updated execution id.

services/venice-controller/src/main/java/com/linkedin/venice/controller/ZkExecutionIdAccessor.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,11 @@
1111
import com.linkedin.venice.utils.PathResourceRegistry;
1212
import java.util.HashMap;
1313
import java.util.Map;
14+
import java.util.Set;
1415
import java.util.concurrent.atomic.AtomicLong;
16+
import java.util.concurrent.atomic.AtomicReference;
17+
import java.util.stream.Collectors;
18+
import org.apache.commons.lang3.tuple.Pair;
1519
import org.apache.helix.AccessOption;
1620
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
1721
import org.apache.helix.zookeeper.impl.client.ZkClient;
@@ -158,6 +162,26 @@ private void updateExecutionIdMapToZk(String path, String storeName, Long lastSu
158162
});
159163
}
160164

165+
public Pair<Map<String, Long>, Map<String, Long>> cleanExecutionIdMap(String clusterName, Set<String> allStores) {
166+
String path = getLastSucceededExecutionIdMapPath(clusterName);
167+
AtomicReference<Map<String, Long>> executionIdsCleaned = new AtomicReference<>(new HashMap<>());
168+
AtomicReference<Map<String, Long>> executionIdsToKeep = new AtomicReference<>(new HashMap<>());
169+
170+
HelixUtils.compareAndUpdate(zkMapAccessor, path, ZK_RETRY_COUNT, executionIdMap -> {
171+
executionIdsCleaned.set(executionIdMap);
172+
Map<String, Long> filteredExecutionIds = executionIdMap.entrySet()
173+
.parallelStream()
174+
.filter(entry -> allStores.contains(entry.getKey()))
175+
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
176+
executionIdsToKeep.set(filteredExecutionIds);
177+
return filteredExecutionIds;
178+
});
179+
180+
executionIdsCleaned.get().keySet().removeAll(executionIdsToKeep.get().keySet());
181+
182+
return Pair.of(executionIdsCleaned.get(), executionIdsToKeep.get());
183+
}
184+
161185
private Long getExecutionIdFromZk(String path) {
162186
int retry = ZK_RETRY_COUNT;
163187
while (retry > 0) {

services/venice-controller/src/main/java/com/linkedin/venice/controller/server/AdminSparkServer.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import static com.linkedin.venice.controllerapi.ControllerRoute.BACKUP_VERSION;
1212
import static com.linkedin.venice.controllerapi.ControllerRoute.CHECK_RESOURCE_CLEANUP_FOR_STORE_CREATION;
1313
import static com.linkedin.venice.controllerapi.ControllerRoute.CLEANUP_INSTANCE_CUSTOMIZED_STATES;
14+
import static com.linkedin.venice.controllerapi.ControllerRoute.CLEAN_EXECUTION_IDS;
1415
import static com.linkedin.venice.controllerapi.ControllerRoute.CLUSTER_DISCOVERY;
1516
import static com.linkedin.venice.controllerapi.ControllerRoute.CLUSTER_HEALTH_STORES;
1617
import static com.linkedin.venice.controllerapi.ControllerRoute.COMPARE_STORE;
@@ -330,6 +331,9 @@ public boolean startInner() throws Exception {
330331
httpService.get(
331332
LIST_STORES.getPath(),
332333
new VeniceParentControllerRegionStateHandler(admin, storesRoutes.getAllStores(admin)));
334+
httpService.get(
335+
CLEAN_EXECUTION_IDS.getPath(),
336+
new VeniceParentControllerRegionStateHandler(admin, storesRoutes.cleanExecutionIds(admin)));
333337
httpService.get(
334338
CLUSTER_HEALTH_STORES.getPath(),
335339
new VeniceParentControllerRegionStateHandler(admin, storesRoutes.getAllStoresStatuses(admin)));

services/venice-controller/src/main/java/com/linkedin/venice/controller/server/StoresRoutes.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,11 @@
6565
import com.linkedin.venice.acl.DynamicAccessController;
6666
import com.linkedin.venice.controller.Admin;
6767
import com.linkedin.venice.controller.AdminCommandExecutionTracker;
68+
import com.linkedin.venice.controller.VeniceHelixAdmin;
69+
import com.linkedin.venice.controller.VeniceParentHelixAdmin;
6870
import com.linkedin.venice.controller.kafka.TopicCleanupService;
6971
import com.linkedin.venice.controller.repush.RepushJobRequest;
72+
import com.linkedin.venice.controllerapi.CleanExecutionIdsResponse;
7073
import com.linkedin.venice.controllerapi.ClusterStaleDataAuditResponse;
7174
import com.linkedin.venice.controllerapi.ControllerResponse;
7275
import com.linkedin.venice.controllerapi.MultiStoreInfoResponse;
@@ -116,6 +119,7 @@
116119
import java.util.Set;
117120
import java.util.stream.Collectors;
118121
import org.apache.avro.Schema;
122+
import org.apache.commons.lang3.tuple.Pair;
119123
import org.apache.http.HttpStatus;
120124
import org.apache.logging.log4j.LogManager;
121125
import org.apache.logging.log4j.Logger;
@@ -261,6 +265,29 @@ public void internalHandle(Request request, MultiStoreResponse veniceResponse) {
261265
};
262266
}
263267

268+
public Route cleanExecutionIds(Admin admin) {
269+
return new VeniceRouteHandler<CleanExecutionIdsResponse>(CleanExecutionIdsResponse.class) {
270+
@Override
271+
public void internalHandle(Request request, CleanExecutionIdsResponse veniceResponse) {
272+
String cluster = request.queryParams(CLUSTER);
273+
VeniceHelixAdmin veniceHelixAdmin;
274+
if (admin instanceof VeniceParentHelixAdmin) {
275+
veniceHelixAdmin = ((VeniceParentHelixAdmin) admin).getVeniceHelixAdmin();
276+
} else {
277+
veniceHelixAdmin = (VeniceHelixAdmin) admin;
278+
}
279+
280+
Set<String> allStores =
281+
veniceHelixAdmin.getAllStores(cluster).stream().map(Store::getName).collect(Collectors.toSet());
282+
Pair<Map<String, Long>, Map<String, Long>> response =
283+
veniceHelixAdmin.getExecutionIdAccessor().cleanExecutionIdMap(cluster, allStores);
284+
285+
veniceResponse.setCleanedExecutionIds(response.getLeft());
286+
veniceResponse.setRemainingExecutionIds(response.getRight());
287+
}
288+
};
289+
}
290+
264291
/**
265292
* No ACL check; any user can try to list store statuses.
266293
* @see Admin#getAllStoreStatuses(String)
@@ -512,7 +539,7 @@ public void internalHandle(Request request, StoreMigrationResponse veniceRespons
512539
}
513540

514541
/**
515-
* @see Admin#deleteStore(String, String, int, boolean)
542+
* @see Admin#deleteStore(String, String, boolean, int, boolean)
516543
*/
517544
public Route deleteStore(Admin admin) {
518545
return new VeniceRouteHandler<TrackableControllerResponse>(TrackableControllerResponse.class) {

0 commit comments

Comments
 (0)