Skip to content

[controller] Fix bug in ProtocolVersionAutoDetectionService #1795

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
May 19, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.venice.controller;

import static com.linkedin.venice.ConfigKeys.CONTROLLER_SSL_ENABLED;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;

Expand Down Expand Up @@ -36,8 +37,10 @@ public class TestAdminOperationVersionDetection {
@BeforeClass(alwaysRun = true)
public void setUp() throws Exception {
// Create multi-region multi-cluster setup
Properties parentControllerProperties = new Properties();
Properties controllerProperties = new Properties();
controllerProperties.put(CONTROLLER_SSL_ENABLED, "false");
Properties serverProperties = new Properties();
serverProperties.put(CONTROLLER_SSL_ENABLED, "false");

VeniceMultiRegionClusterCreateOptions.Builder optionsBuilder =
new VeniceMultiRegionClusterCreateOptions.Builder().numberOfRegions(NUMBER_OF_CHILD_DATACENTERS)
Expand All @@ -47,10 +50,11 @@ public void setUp() throws Exception {
.numberOfServers(1)
.numberOfRouters(1)
.replicationFactor(1)
.sslToStorageNodes(false)
.sslToStorageNodes(true)
.forkServer(false)
.serverProperties(serverProperties)
.parentControllerProperties(parentControllerProperties);
.parentControllerProperties(controllerProperties)
.childControllerProperties(controllerProperties);
multiRegionMultiClusterWrapper =
ServiceFactory.getVeniceTwoLayerMultiRegionMultiClusterWrapper(optionsBuilder.build());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.linkedin.venice.endToEnd;

import static com.linkedin.venice.ConfigKeys.CONTROLLER_PROTOCOL_VERSION_AUTO_DETECTION_SERVICE_ENABLED;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_PROTOCOL_VERSION_AUTO_DETECTION_SLEEP_MS;
import static com.linkedin.venice.ConfigKeys.*;
import static com.linkedin.venice.controller.kafka.protocol.serializer.AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
Expand Down Expand Up @@ -41,6 +40,10 @@ public void setUp() {
Properties parentControllerProps = new Properties();
parentControllerProps.put(CONTROLLER_PROTOCOL_VERSION_AUTO_DETECTION_SERVICE_ENABLED, true);
parentControllerProps.put(CONTROLLER_PROTOCOL_VERSION_AUTO_DETECTION_SLEEP_MS, SERVICE_INTERVAL_MS);
parentControllerProps.put(CONTROLLER_SSL_ENABLED, "false");

Properties childControllerProps = new Properties();
childControllerProps.put(CONTROLLER_SSL_ENABLED, "false");

VeniceMultiRegionClusterCreateOptions.Builder optionsBuilder =
new VeniceMultiRegionClusterCreateOptions.Builder().numberOfRegions(NUMBER_OF_CHILD_DATACENTERS)
Expand All @@ -51,8 +54,9 @@ public void setUp() {
.numberOfRouters(1)
.replicationFactor(1)
.forkServer(false)
.sslToStorageNodes(false)
.parentControllerProperties(parentControllerProps);
.sslToStorageNodes(true)
.parentControllerProperties(parentControllerProps)
.childControllerProperties(childControllerProps);
multiRegionMultiClusterWrapper =
ServiceFactory.getVeniceTwoLayerMultiRegionMultiClusterWrapper(optionsBuilder.build());
childDatacenters = multiRegionMultiClusterWrapper.getChildRegions();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1045,4 +1045,6 @@ default void clearInstanceMonitor(String clusterName) {
LogContext getLogContext();

VeniceControllerClusterConfig getControllerConfig(String clusterName);

String getControllerName();
}
Original file line number Diff line number Diff line change
Expand Up @@ -7387,6 +7387,7 @@ public List<Instance> getControllersByHelixState(String clusterName, String heli
if (instanceNameAndState.getValue().equals(helixState)) {
// Found the Venice controller, adding to the return list
String id = instanceNameAndState.getKey();
// TODO: Update the instance constructor when Helix NodeId is changed to use secure port.
controllers.add(
new Instance(
id,
Expand Down Expand Up @@ -7814,6 +7815,10 @@ public Map<String, Long> getAdminOperationVersionFromControllers(String clusterN
List<Instance> standbyControllers = getControllersByHelixState(clusterName, HelixState.STANDBY_STATE);

for (Instance standbyController: standbyControllers) {
// In production, leader and standby share the secure port but have different hostnames—no conflict.
// In tests, both run on 'localhost' with the same secure port, which confuses routing.
// Hence, we need to disable SSL for local integration tests.
// RCA: Helix uses an insecure port from the instance ID, while secure port comes from shared multiClusterConfig.
String standbyControllerUrl = standbyController.getUrl(getSslFactory().isPresent());

// Get the admin operation protocol version from standby controller
Expand Down Expand Up @@ -8053,7 +8058,8 @@ void addConfig(VeniceControllerClusterConfig config) {
multiClusterConfigs.addClusterConfig(config);
}

String getControllerName() {
@Override
public String getControllerName() {
return controllerName;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4547,6 +4547,11 @@ public long getLocalAdminOperationProtocolVersion() {
return getVeniceHelixAdmin().getLocalAdminOperationProtocolVersion();
}

@Override
public String getControllerName() {
return getVeniceHelixAdmin().getControllerName();
}

/**
* Unsupported operation in the parent controller.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import com.linkedin.venice.controllerapi.PubSubTopicConfigResponse;
import com.linkedin.venice.controllerapi.StoppableNodeStatusResponse;
import com.linkedin.venice.exceptions.ErrorType;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.protocols.controller.LeaderControllerGrpcRequest;
import com.linkedin.venice.protocols.controller.LeaderControllerGrpcResponse;
import com.linkedin.venice.pubsub.PubSubTopicConfiguration;
Expand All @@ -34,7 +33,6 @@
import com.linkedin.venice.pubsub.manager.TopicManager;
import com.linkedin.venice.utils.ObjectMapperFactory;
import com.linkedin.venice.utils.Utils;
import java.net.URL;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -261,7 +259,7 @@ public Route getAdminOperationVersionFromControllers(Admin admin) {
responseObject.setCluster(clusterName);
Map<String, Long> controllerNameToVersionMap = admin.getAdminOperationVersionFromControllers(clusterName);
responseObject.setControllerNameToVersionMap(controllerNameToVersionMap);
responseObject.setLocalControllerName(getControllerName(request));
responseObject.setLocalControllerName(admin.getControllerName());
responseObject.setLocalAdminOperationProtocolVersion(admin.getLocalAdminOperationProtocolVersion());
} catch (Throwable e) {
responseObject.setError(e);
Expand All @@ -281,7 +279,7 @@ public Route getLocalAdminOperationProtocolVersion(Admin admin) {
response.type(HttpConstants.JSON);
try {
responseObject.setLocalAdminOperationProtocolVersion(admin.getLocalAdminOperationProtocolVersion());
responseObject.setLocalControllerName(getControllerName(request));
responseObject.setLocalControllerName(admin.getControllerName());
} catch (Throwable e) {
responseObject.setError(e);
AdminSparkServer.handleError(e, request, response);
Expand All @@ -290,21 +288,6 @@ public Route getLocalAdminOperationProtocolVersion(Admin admin) {
};
}

/**
* Get controller name from the request object.
* Example:
* request.url() = https://localhost:8080/venice/cluster/clusterName/leaderController?param1=value1&param2=value2
* @return "localhost_8080"
*/
private String getControllerName(Request request) {
try {
URL url = new URL(request.url());
return url.getHost() + (url.getPort() != -1 ? "_" + url.getPort() : "");
} catch (Exception e) {
throw new VeniceException("Invalid URL: " + request.url(), e);
}
}

@FunctionalInterface
interface UpdateTopicConfigFunction {
void apply(Request request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,8 @@ public void testGetLocalAdminOperationProtocolVersion() throws Exception {

doReturn(leaderController).when(mockAdmin).getLeaderController(anyString());
String leaderControllerHost = String.format("http://%s:%s", TEST_HOST, TEST_PORT);
doReturn(leaderControllerHost).when(mockAdmin).getControllerName();

Request request = mock(Request.class);
doReturn(TEST_CLUSTER).when(request).queryParams(eq(ControllerApiConstants.CLUSTER));
doReturn(1L).when(mockAdmin).getLocalAdminOperationProtocolVersion();
Expand All @@ -231,6 +233,7 @@ public void testGetAdminOperationVersionFromControllers() throws Exception {
doReturn(leaderController).when(mockAdmin).getLeaderController(anyString());
String leaderControllerHostHttps = String.format("https://%s:%s", TEST_HOST, TEST_PORT);
String leaderControllerHost = String.format("%s_%s", TEST_HOST, TEST_PORT);
doReturn(leaderControllerHost).when(mockAdmin).getControllerName();

Request request = mock(Request.class);
doReturn(TEST_CLUSTER).when(request).queryParams(eq(ControllerApiConstants.CLUSTER));
Expand Down
Loading