-
Notifications
You must be signed in to change notification settings - Fork 507
fix polaris.discovery.heartbeat.enabled is not effective #1565 #1621
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
zihenzzz
wants to merge
13
commits into
Tencent:2022
Choose a base branch
from
zihenzzz:feat-network-fix
base: 2022
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from 6 commits
Commits
Show all changes
13 commits
Select commit
Hold shift + click to select a range
fe274e4
fix
zihenz 885b593
fix: make polaris.discovery.heartbeat.enabled property effective
zihenz f04e0bf
fix heartbeat enabled need to be placed in the first judgment position
zihenz da2c08a
docs:update CHANGELOG.
SkyeBeFreeman 16860d6
fix heartbeat control logic and property name
zihenz 9d90295
fix
zihenz d94c7a0
fix heartbeat control logic and property name
zihenz 2d60655
fix
zihenz 444804a
fix optimize heartbeat control in service registry
zihenz bf58604
fix optimize heartbeat control in service registry
zihenz 4f03ed7
Merge pull request #1 from SkyeBeFreeman/feat-network-fix
zihenzzz 2cb2705
fix use excessively large TTL
zihenz 95bbdc7
Merge branch 'feat-network-fix' of https://github.com/zihenzzz/spring…
zihenz File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -55,6 +55,8 @@ | |
import org.springframework.beans.BeanUtils; | ||
import org.springframework.beans.factory.DisposableBean; | ||
import org.springframework.cloud.client.serviceregistry.ServiceRegistry; | ||
import org.springframework.cloud.context.scope.refresh.RefreshScopeRefreshedEvent; | ||
import org.springframework.context.event.EventListener; | ||
import org.springframework.http.HttpHeaders; | ||
|
||
import static java.util.concurrent.TimeUnit.SECONDS; | ||
|
@@ -77,7 +79,7 @@ public class PolarisServiceRegistry implements ServiceRegistry<PolarisRegistrati | |
|
||
private final StaticMetadataManager staticMetadataManager; | ||
private final PolarisStatProperties polarisStatProperties; | ||
private final ScheduledExecutorService heartbeatExecutor; | ||
private volatile ScheduledExecutorService heartbeatExecutor; | ||
|
||
public PolarisServiceRegistry(PolarisDiscoveryProperties polarisDiscoveryProperties, | ||
PolarisSDKContextManager polarisSDKContextManager, PolarisDiscoveryHandler polarisDiscoveryHandler, | ||
|
@@ -86,21 +88,11 @@ public PolarisServiceRegistry(PolarisDiscoveryProperties polarisDiscoveryPropert | |
this.polarisSDKContextManager = polarisSDKContextManager; | ||
this.polarisDiscoveryHandler = polarisDiscoveryHandler; | ||
this.staticMetadataManager = staticMetadataManager; | ||
|
||
if (StringUtils.isNotBlank(polarisDiscoveryProperties.getHealthCheckUrl())) { | ||
this.heartbeatExecutor = Executors | ||
.newSingleThreadScheduledExecutor(new NamedThreadFactory("polaris-heartbeat")); | ||
} | ||
else { | ||
this.heartbeatExecutor = null; | ||
} | ||
|
||
this.polarisStatProperties = polarisStatProperties; | ||
} | ||
|
||
@Override | ||
public void register(PolarisRegistration registration) { | ||
|
||
if (StringUtils.isBlank(registration.getServiceId())) { | ||
LOGGER.warn("No service to register for polaris client..."); | ||
return; | ||
|
@@ -121,7 +113,6 @@ public void register(PolarisRegistration registration) { | |
instanceRegisterRequest.setRegion(staticMetadataManager.getRegion()); | ||
instanceRegisterRequest.setZone(staticMetadataManager.getZone()); | ||
instanceRegisterRequest.setCampus(staticMetadataManager.getCampus()); | ||
instanceRegisterRequest.setTtl(polarisDiscoveryProperties.getHeartbeatInterval()); | ||
instanceRegisterRequest.setMetadata(registration.getMetadata()); | ||
instanceRegisterRequest.setExtendedMetadata(registration.getExtendedMetadata()); | ||
instanceRegisterRequest.setProtocol(polarisDiscoveryProperties.getProtocol()); | ||
|
@@ -138,42 +129,37 @@ public void register(PolarisRegistration registration) { | |
try { | ||
ProviderAPI providerClient = polarisSDKContextManager.getProviderAPI(); | ||
InstanceRegisterResponse instanceRegisterResponse; | ||
if (StringUtils.isBlank(polarisDiscoveryProperties.getHealthCheckUrl())) { | ||
instanceRegisterResponse = providerClient.registerInstance(instanceRegisterRequest); | ||
} | ||
else { | ||
instanceRegisterResponse = providerClient.register(instanceRegisterRequest); | ||
InstanceHeartbeatRequest heartbeatRequest = new InstanceHeartbeatRequest(); | ||
BeanUtils.copyProperties(instanceRegisterRequest, heartbeatRequest); | ||
heartbeatRequest.setInstanceID(instanceRegisterResponse.getInstanceId()); | ||
// Start the heartbeat thread after the registration is successful. | ||
heartbeat(heartbeatRequest); | ||
|
||
// Always use registerInstance to avoid automatic heartbeat | ||
instanceRegisterRequest.setTtl(polarisDiscoveryProperties.getHeartbeatEnabled() ? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think you should modify com.tencent.polaris.api.core.ProviderAPI#registerInstance but not set a big heartbeat TTL. |
||
polarisDiscoveryProperties.getHeartbeatInterval() : 2592000); | ||
instanceRegisterResponse = providerClient.registerInstance(instanceRegisterRequest); | ||
|
||
// Only start heartbeat if explicitly enabled and health check URL is provided | ||
if (polarisDiscoveryProperties.getHeartbeatEnabled() && | ||
StringUtils.isNotBlank(polarisDiscoveryProperties.getHealthCheckUrl())) { | ||
startHeartbeat(instanceRegisterRequest, instanceRegisterResponse); | ||
LOGGER.info("Registered instance with heartbeat enabled and health check URL."); | ||
} else { | ||
stopHeartbeat(); // Ensure no heartbeat is running | ||
LOGGER.info("Registered instance without heartbeat."); | ||
} | ||
|
||
registration.setInstanceId(instanceRegisterResponse.getInstanceId()); | ||
LOGGER.info("polaris registry, {} {} {} {}:{} {} {} {} {} register finished", polarisDiscoveryProperties.getNamespace(), | ||
registration.getServiceId(), registration.getInstanceId(), registration.getHost(), registration.getPort(), | ||
staticMetadataManager.getRegion(), staticMetadataManager.getZone(), staticMetadataManager.getCampus(), | ||
staticMetadataManager.getMergedStaticMetadata()); | ||
if (Objects.nonNull(polarisStatProperties) && polarisStatProperties.isEnabled()) { | ||
try { | ||
StatReporter statReporter = (StatReporter) polarisSDKContextManager.getSDKContext().getPlugins() | ||
.getPlugin(PluginTypes.STAT_REPORTER.getBaseType(), StatReporterConfig.DEFAULT_REPORTER_PROMETHEUS); | ||
if (Objects.nonNull(statReporter)) { | ||
ReporterMetaInfo reporterMetaInfo = statReporter.metaInfo(); | ||
if (reporterMetaInfo.getPort() != null) { | ||
LOGGER.info("Stat server started on port: " + reporterMetaInfo.getPort() + " (http)"); | ||
} | ||
else { | ||
LOGGER.info("Stat server is set to type of Push gateway"); | ||
} | ||
} | ||
else { | ||
LOGGER.warn("Plugin StatReporter not found"); | ||
} | ||
} | ||
catch (Exception e) { | ||
LOGGER.warn("Stat server started error, ", e); | ||
} | ||
LOGGER.info("polaris registry, {} {} {} {}:{} {} {} {} {} register finished", | ||
instanceRegisterRequest.getNamespace(), | ||
instanceRegisterRequest.getService(), | ||
instanceRegisterResponse.getInstanceId(), | ||
instanceRegisterRequest.getHost(), | ||
instanceRegisterRequest.getPort(), | ||
instanceRegisterRequest.getRegion(), | ||
instanceRegisterRequest.getZone(), | ||
instanceRegisterRequest.getCampus(), | ||
instanceRegisterRequest.getMetadata()); | ||
|
||
// Start stat server and configure service | ||
if (polarisStatProperties.isEnabled()) { | ||
startStatServer(); | ||
} | ||
|
||
ServiceConfigImpl serviceConfig = (ServiceConfigImpl) polarisSDKContextManager.getSDKContext().getConfig() | ||
|
@@ -184,8 +170,108 @@ public void register(PolarisRegistration registration) { | |
PolarisSDKContextManager.isRegistered = true; | ||
} | ||
catch (Exception e) { | ||
LOGGER.error("polaris registry, {} register failed...{},", registration.getServiceId(), registration, e); | ||
rethrowRuntimeException(e); | ||
LOGGER.error("ERR_POLARIS_REGISTER, register failed...{},", registration, e); | ||
} | ||
} | ||
|
||
private void startStatServer() { | ||
try { | ||
StatReporter statReporter = (StatReporter) polarisSDKContextManager.getSDKContext().getPlugins() | ||
.getPlugin(PluginTypes.STAT_REPORTER.getBaseType(), StatReporterConfig.DEFAULT_REPORTER_PROMETHEUS); | ||
if (Objects.nonNull(statReporter)) { | ||
ReporterMetaInfo reporterMetaInfo = statReporter.metaInfo(); | ||
if (reporterMetaInfo.getPort() != null) { | ||
LOGGER.info("Stat server started on port: " + reporterMetaInfo.getPort() + " (http)"); | ||
} | ||
else { | ||
LOGGER.info("Stat server is set to type of Push gateway"); | ||
} | ||
} | ||
else { | ||
LOGGER.warn("Plugin StatReporter not found"); | ||
} | ||
} | ||
catch (Exception e) { | ||
LOGGER.warn("Stat server started error, ", e); | ||
} | ||
} | ||
|
||
private void startHeartbeat(InstanceRegisterRequest instanceRegisterRequest, InstanceRegisterResponse instanceRegisterResponse) { | ||
// Double check heartbeat is enabled and health check URL is not blank | ||
if (!polarisDiscoveryProperties.getHeartbeatEnabled() || StringUtils.isBlank(polarisDiscoveryProperties.getHealthCheckUrl())) { | ||
stopHeartbeat(); | ||
LOGGER.info("Heartbeat is disabled or health check URL is blank, skipping heartbeat initialization."); | ||
return; | ||
} | ||
|
||
// Stop any existing heartbeat task before starting a new one | ||
stopHeartbeat(); | ||
|
||
heartbeatExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("polaris-heartbeat")); | ||
LOGGER.info("Created new heartbeat executor."); | ||
|
||
InstanceHeartbeatRequest heartbeatRequest = new InstanceHeartbeatRequest(); | ||
BeanUtils.copyProperties(instanceRegisterRequest, heartbeatRequest); | ||
heartbeatRequest.setInstanceID(instanceRegisterResponse.getInstanceId()); | ||
|
||
heartbeatExecutor.scheduleWithFixedDelay(() -> { | ||
try { | ||
// Check if heartbeat should still be running | ||
if (!polarisDiscoveryProperties.getHeartbeatEnabled() || | ||
StringUtils.isBlank(polarisDiscoveryProperties.getHealthCheckUrl())) { | ||
LOGGER.info("Heartbeat conditions no longer met, stopping heartbeat task."); | ||
stopHeartbeat(); | ||
return; | ||
} | ||
|
||
// Perform health check | ||
Map<String, String> headers = new HashMap<>(1); | ||
headers.put(HttpHeaders.USER_AGENT, "polaris"); | ||
if (!OkHttpUtil.checkUrl(heartbeatRequest.getHost(), heartbeatRequest.getPort(), | ||
polarisDiscoveryProperties.getHealthCheckUrl(), headers)) { | ||
LOGGER.error("Backend service health check failed. health check endpoint = {}", | ||
polarisDiscoveryProperties.getHealthCheckUrl()); | ||
return; | ||
} | ||
|
||
// Send heartbeat | ||
polarisSDKContextManager.getProviderAPI().heartbeat(heartbeatRequest); | ||
LOGGER.debug("Polaris heartbeat is sent successfully."); | ||
} | ||
catch (PolarisException e) { | ||
LOGGER.error("Polaris heartbeat error with code [{}]", e.getCode(), e); | ||
} | ||
catch (Exception e) { | ||
LOGGER.error("Polaris heartbeat runtime error", e); | ||
} | ||
}, polarisDiscoveryProperties.getHeartbeatInterval(), polarisDiscoveryProperties.getHeartbeatInterval(), SECONDS); | ||
|
||
LOGGER.info("Heartbeat task scheduled with interval {} seconds.", polarisDiscoveryProperties.getHeartbeatInterval()); | ||
} | ||
|
||
private void stopHeartbeat() { | ||
if (heartbeatExecutor != null) { | ||
try { | ||
heartbeatExecutor.shutdownNow(); | ||
if (heartbeatExecutor.awaitTermination(5, SECONDS)) { | ||
LOGGER.info("Polaris heartbeat task stopped gracefully."); | ||
} else { | ||
LOGGER.warn("Polaris heartbeat task did not terminate in time."); | ||
} | ||
} catch (InterruptedException e) { | ||
LOGGER.warn("Interrupted while stopping heartbeat task.", e); | ||
Thread.currentThread().interrupt(); | ||
} finally { | ||
heartbeatExecutor = null; | ||
} | ||
} | ||
} | ||
|
||
@EventListener(RefreshScopeRefreshedEvent.class) | ||
public void onPropertyRefreshed() { | ||
if (!polarisDiscoveryProperties.getHeartbeatEnabled()) { | ||
LOGGER.info("Heartbeat disabled via property refresh, stopping heartbeat task."); | ||
stopHeartbeat(); | ||
} | ||
} | ||
|
||
|
@@ -198,6 +284,8 @@ public void deregister(PolarisRegistration registration) { | |
return; | ||
} | ||
|
||
stopHeartbeat(); | ||
|
||
InstanceDeregisterRequest deRegisterRequest = new InstanceDeregisterRequest(); | ||
deRegisterRequest.setInstanceID(registration.getInstanceId()); | ||
deRegisterRequest.setToken(polarisDiscoveryProperties.getToken()); | ||
|
@@ -215,21 +303,15 @@ public void deregister(PolarisRegistration registration) { | |
catch (Exception e) { | ||
LOGGER.error("ERR_POLARIS_DEREGISTER, de-register failed...{},", registration, e); | ||
} | ||
finally { | ||
if (null != heartbeatExecutor) { | ||
heartbeatExecutor.shutdown(); | ||
} | ||
} | ||
} | ||
|
||
@Override | ||
public void close() { | ||
|
||
stopHeartbeat(); | ||
} | ||
|
||
@Override | ||
public void setStatus(PolarisRegistration registration, String status) { | ||
|
||
} | ||
|
||
@Override | ||
|
@@ -249,40 +331,8 @@ public Object getStatus(PolarisRegistration registration) { | |
return "DOWN"; | ||
} | ||
|
||
/** | ||
* Start the heartbeat thread. | ||
* @param heartbeatRequest heartbeat request | ||
*/ | ||
public void heartbeat(InstanceHeartbeatRequest heartbeatRequest) { | ||
heartbeatExecutor.scheduleWithFixedDelay(() -> { | ||
try { | ||
// If the health check passes, the heartbeat will be reported. | ||
// If it does not pass, the heartbeat will not be reported. | ||
Map<String, String> headers = new HashMap<>(1); | ||
headers.put(HttpHeaders.USER_AGENT, "polaris"); | ||
if (!OkHttpUtil.checkUrl(heartbeatRequest.getHost(), heartbeatRequest.getPort(), | ||
polarisDiscoveryProperties.getHealthCheckUrl(), headers)) { | ||
LOGGER.error("backend service health check failed. health check endpoint = {}", | ||
polarisDiscoveryProperties.getHealthCheckUrl()); | ||
return; | ||
} | ||
|
||
polarisSDKContextManager.getProviderAPI().heartbeat(heartbeatRequest); | ||
LOGGER.trace("Polaris heartbeat is sent"); | ||
} | ||
catch (PolarisException e) { | ||
LOGGER.error("polaris heartbeat error with code [{}]", e.getCode(), e); | ||
} | ||
catch (Exception e) { | ||
LOGGER.error("polaris heartbeat runtime error", e); | ||
} | ||
}, polarisDiscoveryProperties.getHeartbeatInterval(), polarisDiscoveryProperties.getHeartbeatInterval(), SECONDS); | ||
} | ||
|
||
@Override | ||
public void destroy() { | ||
if (heartbeatExecutor != null) { | ||
heartbeatExecutor.shutdown(); | ||
} | ||
stopHeartbeat(); | ||
} | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.