Skip to content

[controller] Fix bug in ProtocolVersionAutoDetectionService #1795

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
May 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@

public class AdminOperationProtocolVersionControllerResponse extends ControllerResponse {
private long localAdminOperationProtocolVersion = -1;
private String requestUrl = "";
private Map<String, Long> controllerUrlToVersionMap = new HashMap<>();
private String localControllerName = "";
private Map<String, Long> controllerNameToVersionMap = new HashMap<>();

public void setLocalAdminOperationProtocolVersion(long adminOperationProtocolVersion) {
this.localAdminOperationProtocolVersion = adminOperationProtocolVersion;
Expand All @@ -17,19 +17,19 @@ public long getLocalAdminOperationProtocolVersion() {
return localAdminOperationProtocolVersion;
}

public void setRequestUrl(String url) {
this.requestUrl = url;
public void setLocalControllerName(String controllerName) {
this.localControllerName = controllerName;
}

public String getRequestUrl() {
return requestUrl;
public String getLocalControllerName() {
return localControllerName;
}

public void setControllerUrlToVersionMap(Map<String, Long> urlToVersionMap) {
this.controllerUrlToVersionMap = urlToVersionMap;
public void setControllerNameToVersionMap(Map<String, Long> controllerNameToVersionMap) {
this.controllerNameToVersionMap = controllerNameToVersionMap;
}

public Map<String, Long> getControllerUrlToVersionMap() {
return controllerUrlToVersionMap;
public Map<String, Long> getControllerNameToVersionMap() {
return controllerNameToVersionMap;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.venice.controller;

import static com.linkedin.venice.ConfigKeys.CONTROLLER_SSL_ENABLED;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;

Expand Down Expand Up @@ -36,8 +37,10 @@ public class TestAdminOperationVersionDetection {
@BeforeClass(alwaysRun = true)
public void setUp() throws Exception {
// Create multi-region multi-cluster setup
Properties parentControllerProperties = new Properties();
Properties controllerProperties = new Properties();
controllerProperties.put(CONTROLLER_SSL_ENABLED, "false");
Properties serverProperties = new Properties();
serverProperties.put(CONTROLLER_SSL_ENABLED, "false");

VeniceMultiRegionClusterCreateOptions.Builder optionsBuilder =
new VeniceMultiRegionClusterCreateOptions.Builder().numberOfRegions(NUMBER_OF_CHILD_DATACENTERS)
Expand All @@ -50,7 +53,8 @@ public void setUp() throws Exception {
.sslToStorageNodes(true)
.forkServer(false)
.serverProperties(serverProperties)
.parentControllerProperties(parentControllerProperties);
.parentControllerProperties(controllerProperties)
.childControllerProperties(controllerProperties);
multiRegionMultiClusterWrapper =
ServiceFactory.getVeniceTwoLayerMultiRegionMultiClusterWrapper(optionsBuilder.build());
}
Expand All @@ -73,8 +77,8 @@ public void testGetAdminOperationVersionForParentControllers() {
assertEquals(
response.getLocalAdminOperationProtocolVersion(),
AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION);
Map<String, Long> urlToVersionMap = response.getControllerUrlToVersionMap();
assertEquals(urlToVersionMap.size(), 2);
Map<String, Long> controllerNameToVersionMap = response.getControllerNameToVersionMap();
assertEquals(controllerNameToVersionMap.size(), 2);
assertEquals(response.getCluster(), clusterName);
}

Expand All @@ -97,8 +101,8 @@ public void testAdminOperationVersionForChildControllers() {
assertEquals(
response.getLocalAdminOperationProtocolVersion(),
AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION);
Map<String, Long> urlToVersionMap = response.getControllerUrlToVersionMap();
assertEquals(urlToVersionMap.size(), 2);
Map<String, Long> controllerNameToVersionMap = response.getControllerNameToVersionMap();
assertEquals(controllerNameToVersionMap.size(), 2);
assertEquals(response.getCluster(), clusterName);
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.linkedin.venice.endToEnd;

import static com.linkedin.venice.ConfigKeys.CONTROLLER_PROTOCOL_VERSION_AUTO_DETECTION_SERVICE_ENABLED;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_PROTOCOL_VERSION_AUTO_DETECTION_SLEEP_MS;
import static com.linkedin.venice.ConfigKeys.*;
import static com.linkedin.venice.controller.kafka.protocol.serializer.AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
Expand Down Expand Up @@ -41,6 +40,10 @@ public void setUp() {
Properties parentControllerProps = new Properties();
parentControllerProps.put(CONTROLLER_PROTOCOL_VERSION_AUTO_DETECTION_SERVICE_ENABLED, true);
parentControllerProps.put(CONTROLLER_PROTOCOL_VERSION_AUTO_DETECTION_SLEEP_MS, SERVICE_INTERVAL_MS);
parentControllerProps.put(CONTROLLER_SSL_ENABLED, "false");

Properties childControllerProps = new Properties();
childControllerProps.put(CONTROLLER_SSL_ENABLED, "false");

VeniceMultiRegionClusterCreateOptions.Builder optionsBuilder =
new VeniceMultiRegionClusterCreateOptions.Builder().numberOfRegions(NUMBER_OF_CHILD_DATACENTERS)
Expand All @@ -51,7 +54,9 @@ public void setUp() {
.numberOfRouters(1)
.replicationFactor(1)
.forkServer(false)
.parentControllerProperties(parentControllerProps);
.sslToStorageNodes(true)
.parentControllerProperties(parentControllerProps)
.childControllerProperties(childControllerProps);
multiRegionMultiClusterWrapper =
ServiceFactory.getVeniceTwoLayerMultiRegionMultiClusterWrapper(optionsBuilder.build());
childDatacenters = multiRegionMultiClusterWrapper.getChildRegions();
Expand Down Expand Up @@ -100,7 +105,7 @@ public void testGetAdminOperationProtocolVersionFromControllers() {
AdminOperationProtocolVersionControllerResponse parentResponse =
parentControllerClient.getAdminOperationProtocolVersionFromControllers(clusterName);
assertEquals(parentResponse.getLocalAdminOperationProtocolVersion(), LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION);
assertEquals(parentResponse.getControllerUrlToVersionMap().size(), 2);
assertEquals(parentResponse.getControllerNameToVersionMap().size(), 2);
}

// child controller
Expand All @@ -109,7 +114,7 @@ public void testGetAdminOperationProtocolVersionFromControllers() {
AdminOperationProtocolVersionControllerResponse childResponse =
dc0ControllerClient.getAdminOperationProtocolVersionFromControllers(clusterName);
assertEquals(childResponse.getLocalAdminOperationProtocolVersion(), LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION);
assertEquals(childResponse.getControllerUrlToVersionMap().size(), 2);
assertEquals(childResponse.getControllerNameToVersionMap().size(), 2);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1045,4 +1045,6 @@ default void clearInstanceMonitor(String clusterName) {
LogContext getLogContext();

VeniceControllerClusterConfig getControllerConfig(String clusterName);

String getControllerName();
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public ProtocolVersionAutoDetectionService(
@Override
public boolean startInner() throws Exception {
LOGGER.info("Starting {}", getClass().getSimpleName());
executor.scheduleAtFixedRate(getRunnableTask(), 0, sleepIntervalInMs, TimeUnit.MILLISECONDS);
executor.scheduleAtFixedRate(getRunnableTask(), sleepIntervalInMs, sleepIntervalInMs, TimeUnit.MILLISECONDS);
return true;
}

Expand Down Expand Up @@ -96,8 +96,8 @@ public long getSmallestLocalAdminOperationProtocolVersionForAllConsumers(String
"Failed to get admin operation protocol version from child controller " + entry.getKey() + ": "
+ response.getError());
}
Map<String, Long> controllerUrlToVersionMap = response.getControllerUrlToVersionMap();
regionToControllerToVersionMap.put(entry.getKey(), controllerUrlToVersionMap);
Map<String, Long> controllerNameToVersionMap = response.getControllerNameToVersionMap();
regionToControllerToVersionMap.put(entry.getKey(), controllerNameToVersionMap);
}

LOGGER.info("All controller versions for cluster {}: {}", clusterName, regionToControllerToVersionMap);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7387,6 +7387,7 @@ public List<Instance> getControllersByHelixState(String clusterName, String heli
if (instanceNameAndState.getValue().equals(helixState)) {
// Found the Venice controller, adding to the return list
String id = instanceNameAndState.getKey();
// TODO: Update the instance constructor when Helix NodeId is changed to use secure port.
controllers.add(
new Instance(
id,
Expand Down Expand Up @@ -7791,27 +7792,34 @@ public void updateAdminOperationProtocolVersion(String clusterName, Long adminOp
/**
* Get the admin operation protocol versions from controllers (leader + standby) for specific cluster.
* @param clusterName: the cluster name
* @return map (controllerUrl: version). Example: {http://localhost:1234=1, http://localhost:1235=1}*/
* @return map (controllerName: version). Example: {localhost_1234=1, localhost_1235=1}*/
@Override
public Map<String, Long> getAdminOperationVersionFromControllers(String clusterName) {
checkControllerLeadershipFor(clusterName);

Map<String, Long> controllerUrlToAdminOperationVersionMap = new HashMap<>();
Map<String, Long> controllerNameToAdminOperationVersionMap = new HashMap<>();

// Get the version from the current controller - leader
String leaderControllerUrl = getLeaderController(clusterName).getUrl(false);
controllerUrlToAdminOperationVersionMap.put(leaderControllerUrl, getLocalAdminOperationProtocolVersion());
Instance leaderController = getLeaderController(clusterName);

controllerNameToAdminOperationVersionMap.put(getControllerName(), getLocalAdminOperationProtocolVersion());

// Create the controller client to reuse
// (this is controller client to communicate with other controllers in the same cluster, the same region)
ControllerClient localControllerClient =
ControllerClient.constructClusterControllerClient(clusterName, leaderControllerUrl, sslFactory);
ControllerClient localControllerClient = ControllerClient.constructClusterControllerClient(
clusterName,
leaderController.getUrl(getSslFactory().isPresent()),
getSslFactory());

// Get version for standby controllers
List<Instance> standbyControllers = getControllersByHelixState(clusterName, HelixState.STANDBY_STATE);

for (Instance standbyController: standbyControllers) {
String standbyControllerUrl = standbyController.getUrl(false);
// In production, leader and standby share the secure port but have different hostnames—no conflict.
// In tests, both run on 'localhost' with the same secure port, which confuses routing.
// Hence, we need to disable SSL for local integration tests.
// RCA: Helix uses an insecure port from the instance ID, while secure port comes from shared multiClusterConfig.
String standbyControllerUrl = standbyController.getUrl(getSslFactory().isPresent());

// Get the admin operation protocol version from standby controller
AdminOperationProtocolVersionControllerResponse response =
Expand All @@ -7821,11 +7829,11 @@ public Map<String, Long> getAdminOperationVersionFromControllers(String clusterN
"Failed to get admin operation protocol version from standby controller: " + standbyControllerUrl
+ ", error message: " + response.getError());
}
controllerUrlToAdminOperationVersionMap
.put(response.getRequestUrl(), response.getLocalAdminOperationProtocolVersion());
controllerNameToAdminOperationVersionMap
.put(response.getLocalControllerName(), response.getLocalAdminOperationProtocolVersion());
}

return controllerUrlToAdminOperationVersionMap;
return controllerNameToAdminOperationVersionMap;
}

/**
Expand Down Expand Up @@ -8050,7 +8058,8 @@ void addConfig(VeniceControllerClusterConfig config) {
multiClusterConfigs.addClusterConfig(config);
}

String getControllerName() {
@Override
public String getControllerName() {
return controllerName;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4547,6 +4547,11 @@ public long getLocalAdminOperationProtocolVersion() {
return getVeniceHelixAdmin().getLocalAdminOperationProtocolVersion();
}

@Override
public String getControllerName() {
return getVeniceHelixAdmin().getControllerName();
}

/**
* Unsupported operation in the parent controller.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import com.linkedin.venice.controllerapi.PubSubTopicConfigResponse;
import com.linkedin.venice.controllerapi.StoppableNodeStatusResponse;
import com.linkedin.venice.exceptions.ErrorType;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.protocols.controller.LeaderControllerGrpcRequest;
import com.linkedin.venice.protocols.controller.LeaderControllerGrpcResponse;
import com.linkedin.venice.pubsub.PubSubTopicConfiguration;
Expand All @@ -34,7 +33,6 @@
import com.linkedin.venice.pubsub.manager.TopicManager;
import com.linkedin.venice.utils.ObjectMapperFactory;
import com.linkedin.venice.utils.Utils;
import java.net.URL;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -258,22 +256,11 @@ public Route getAdminOperationVersionFromControllers(Admin admin) {
response.type(HttpConstants.JSON);
try {
String clusterName = request.queryParams(CLUSTER);
String currentUrl = getRequestURL(request);

responseObject.setCluster(clusterName);
Map<String, Long> controllerUrlToVersionMap = admin.getAdminOperationVersionFromControllers(clusterName);
responseObject.setControllerUrlToVersionMap(controllerUrlToVersionMap);
responseObject.setRequestUrl(currentUrl);

if (controllerUrlToVersionMap.containsKey(currentUrl)) {
responseObject.setLocalAdminOperationProtocolVersion(controllerUrlToVersionMap.get(currentUrl));
} else {
// Should not happen
throw new VeniceException(
"The current controller URL: " + currentUrl + " is not in the urlToVersionMap in the response "
+ controllerUrlToVersionMap);
}

Map<String, Long> controllerNameToVersionMap = admin.getAdminOperationVersionFromControllers(clusterName);
responseObject.setControllerNameToVersionMap(controllerNameToVersionMap);
responseObject.setLocalControllerName(admin.getControllerName());
responseObject.setLocalAdminOperationProtocolVersion(admin.getLocalAdminOperationProtocolVersion());
} catch (Throwable e) {
responseObject.setError(e);
AdminSparkServer.handleError(e, request, response);
Expand All @@ -292,7 +279,7 @@ public Route getLocalAdminOperationProtocolVersion(Admin admin) {
response.type(HttpConstants.JSON);
try {
responseObject.setLocalAdminOperationProtocolVersion(admin.getLocalAdminOperationProtocolVersion());
responseObject.setRequestUrl(getRequestURL(request));
responseObject.setLocalControllerName(admin.getControllerName());
} catch (Throwable e) {
responseObject.setError(e);
AdminSparkServer.handleError(e, request, response);
Expand All @@ -301,22 +288,6 @@ public Route getLocalAdminOperationProtocolVersion(Admin admin) {
};
}

/**
* Get the request base URL from the request object.
* Example:
* request.url() = https://localhost:8080/venice/cluster/clusterName/leaderController?param1=value1&param2=value2
* base URL: https://localhost:8080
* @return the base URL
*/
private String getRequestURL(Request request) {
try {
URL url = new URL(request.url());
return url.getProtocol() + "://" + url.getHost() + (url.getPort() != -1 ? ":" + url.getPort() : "");
} catch (Exception e) {
throw new VeniceException("Invalid URL: " + request.url(), e);
}
}

@FunctionalInterface
interface UpdateTopicConfigFunction {
void apply(Request request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,13 +216,13 @@ private AdminOperationProtocolVersionControllerResponse getAdminOperationProtoco
for (Map.Entry<String, Long> entry: hostToVersionMap.entrySet()) {
String hostName = entry.getKey();
long version = entry.getValue();
response.getControllerUrlToVersionMap().put(hostName, version);
response.getControllerNameToVersionMap().put(hostName, version);
if (hostToClusterToLeaderStateMap.containsKey(hostName)) {
Map<String, Boolean> clusterToStateMap = hostToClusterToLeaderStateMap.get(hostName);
if (clusterToStateMap.get(clusterName) != null && clusterToStateMap.get(clusterName)) {
// Add the local version for leader controller
response.setLocalAdminOperationProtocolVersion(version);
response.setRequestUrl(hostName);
response.setLocalControllerName(hostName);
response.setCluster(clusterName);
}
}
Expand Down
Loading
Loading