Skip to content

feat:add deadline to grpc request. #593

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,7 @@ private void reportClient(Extensions extensions) {
}
}
reportClientRequest.setReporterMetaInfos(reporterMetaInfos);
reportClientRequest.setTimeoutMs(extensions.getConfiguration().getGlobal().getAPI().getTimeout());

try {
ReportClientResponse reportClientResponse = serverConnector.reportClient(reportClientRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ private InstanceRegisterResponse doRegister(InstanceRegisterRequest req, Map<Str
long start = System.currentTimeMillis();
ServiceCallResult serviceCallResult = new ServiceCallResult();
CommonProviderRequest request = req.getRequest();
request.setTimeoutMs(timeout);
try {
CommonProviderResponse response = serverConnector.registerInstance(request, customHeader);
LOG.info("register {}/{} instance {} successfully", req.getNamespace(), req.getService(),
Expand Down Expand Up @@ -183,6 +184,7 @@ public void deRegister(InstanceDeregisterRequest req) {
long start = System.currentTimeMillis();
ServiceCallResult serviceCallResult = new ServiceCallResult();
CommonProviderRequest request = req.getRequest();
request.setTimeoutMs(timeout);
try {
serverConnector.deregisterInstance(request);
serviceCallResult.setRetStatus(RetStatus.RetSuccess);
Expand Down Expand Up @@ -250,6 +252,7 @@ public ReportServiceContractResponse reportServiceContract(ReportServiceContract
while (timeout > 0) {
long start = System.currentTimeMillis();
ServiceCallResult serviceCallResult = new ServiceCallResult();
req.setTimeoutMs(timeout);
try {
ReportServiceContractResponse response = serverConnector.reportServiceContract(req);
serviceCallResult.setRetStatus(RetStatus.RetSuccess);
Expand Down Expand Up @@ -286,6 +289,7 @@ public ServiceRuleResponse getServiceContract(GetServiceContractRequest req) {
CommonServiceContractRequest request = req.getRequest();
request.setNamespace(req.getNamespace());
request.setService(req.getService());
request.setTimeoutMs(timeout);
try {
ServiceRuleByProto response = serverConnector.getServiceContract(request);
serviceCallResult.setRetStatus(RetStatus.RetSuccess);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public class ReportClientRequest {

private TargetServer targetServer;

private long timeoutMs;

private List<ReporterMetaInfo> reporterMetaInfos;

public List<ReporterMetaInfo> getReporterMetaInfos() {
Expand Down Expand Up @@ -89,6 +91,14 @@ public void setTargetServer(TargetServer targetServer) {
this.targetServer = targetServer;
}

public long getTimeoutMs() {
return timeoutMs;
}

public void setTimeoutMs(long timeoutMs) {
this.timeoutMs = timeoutMs;
}

@Override
public String toString() {
return "ReportClientRequest{" +
Expand All @@ -97,6 +107,8 @@ public String toString() {
", clientHost='" + clientHost + '\'' +
", version='" + version + '\'' +
", targetServer=" + targetServer +
", timeoutMs=" + timeoutMs +
", reporterMetaInfos=" + reporterMetaInfos +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,8 @@ public CommonProviderResponse registerInstance(CommonProviderRequest req, Map<St
stub = GrpcUtil.attachRequestHeader(stub, GrpcUtil.nextInstanceRegisterReqId());
stub = GrpcUtil.attachRequestHeader(stub, customHeader);
stub = GrpcUtil.attachAccessToken(connectorConfig.getToken(), stub);
ResponseProto.Response registerInstanceResponse = stub.registerInstance(buildRegisterInstanceRequest(req));
ResponseProto.Response registerInstanceResponse = stub.withDeadlineAfter(req.getTimeoutMs(),
TimeUnit.MILLISECONDS).registerInstance(buildRegisterInstanceRequest(req));
GrpcUtil.checkResponse(registerInstanceResponse);
if (!registerInstanceResponse.hasInstance()) {
throw new PolarisException(ErrorCode.SERVER_USER_ERROR,
Expand Down Expand Up @@ -470,8 +471,8 @@ public void deregisterInstance(CommonProviderRequest req) throws PolarisExceptio
PolarisGRPCGrpc.PolarisGRPCBlockingStub stub = PolarisGRPCGrpc.newBlockingStub(connection.getChannel());
stub = GrpcUtil.attachRequestHeader(stub, GrpcUtil.nextInstanceDeRegisterReqId());
stub = GrpcUtil.attachAccessToken(connectorConfig.getToken(), stub);
ResponseProto.Response deregisterInstanceResponse = stub
.deregisterInstance(buildDeregisterInstanceRequest(req));
ResponseProto.Response deregisterInstanceResponse = stub.withDeadlineAfter(req.getTimeoutMs(),
TimeUnit.MILLISECONDS).deregisterInstance(buildDeregisterInstanceRequest(req));
GrpcUtil.checkResponse(deregisterInstanceResponse);
LOG.debug("received deregister response {}", deregisterInstanceResponse);
} catch (Throwable t) {
Expand Down Expand Up @@ -551,7 +552,8 @@ public ReportClientResponse reportClient(ReportClientRequest req) throws Polaris
stub = GrpcUtil.attachRequestHeader(stub, GrpcUtil.nextHeartbeatReqId());
stub = GrpcUtil.attachAccessToken(connectorConfig.getToken(), stub);
ClientProto.Client request = buildReportRequest(req);
ResponseProto.Response response = stub.reportClient(request);
ResponseProto.Response response = stub.withDeadlineAfter(req.getTimeoutMs(),
TimeUnit.MILLISECONDS).reportClient(request);
LOG.debug("reportClient req:{}, rsp:{}", req, TextFormat.shortDebugString(response));
GrpcUtil.checkResponse(response);
ReportClientResponse rsp = new ReportClientResponse();
Expand Down Expand Up @@ -625,8 +627,8 @@ public ReportServiceContractResponse reportServiceContract(ReportServiceContract
PolarisServiceContractGRPCGrpc.newBlockingStub(connection.getChannel());
stub = GrpcUtil.attachRequestHeader(stub, GrpcUtil.nextReportServiceContractReqId());
stub = GrpcUtil.attachAccessToken(connectorConfig.getToken(), stub);
ResponseProto.Response reportServiceContractResponse =
stub.reportServiceContract(buildReportServiceContractRequest(req));
ResponseProto.Response reportServiceContractResponse = stub.withDeadlineAfter(req.getTimeoutMs(),
TimeUnit.MILLISECONDS).reportServiceContract(buildReportServiceContractRequest(req));
GrpcUtil.checkResponse(reportServiceContractResponse);
return new ReportServiceContractResponse();
} catch (Throwable t) {
Expand Down Expand Up @@ -663,7 +665,8 @@ public ServiceRuleByProto getServiceContract(CommonServiceContractRequest req) t
PolarisServiceContractGRPCGrpc.newBlockingStub(connection.getChannel());
stub = GrpcUtil.attachRequestHeader(stub, GrpcUtil.nextReportServiceContractReqId());
stub = GrpcUtil.attachAccessToken(connectorConfig.getToken(), stub);
ResponseProto.Response response = stub.getServiceContract(req.toQuerySpec());
ResponseProto.Response response = stub.withDeadlineAfter(req.getTimeoutMs(),
TimeUnit.MILLISECONDS).getServiceContract(req.toQuerySpec());
GrpcUtil.checkResponse(response);
ServiceContractProto.ServiceContract remoteVal = response.getServiceContract();
return new ServiceRuleByProto(remoteVal, remoteVal.getRevision(), false, EventType.SERVICE_CONTRACT);
Expand Down