polaris-java-agent设计 #116
Replies: 3 comments
-
配置设计 |
Beta Was this translation helpful? Give feedback.
-
Beta Was this translation helpful? Give feedback.
-
Spring-Cloud2021插件设计Agent参数初始化agent需要大量参数供执行后续所有操作,其中包含用户注入参数及Spring启动参数: 用户启动注入参数
-Dpolaris.namespace=default -Dpolaris.server.address=host:8091 从Spring上下文获取参数
拦截Spring启动以获取参数上述与Spring Web应用相关的参数在Spring启动时都会保存在WebApplicationContext中,故在WebApplicationContext初始化完成后执行拦截。 具体拦截点在SpringApplication类中拦截refreshContext方法,refreshContext方法会在当前Spring Context初始化好后执行: public ConfigurableApplicationContext run(String... args) {
long startTime = System.nanoTime();
DefaultBootstrapContext bootstrapContext = createBootstrapContext();
ConfigurableApplicationContext context = null;
configureHeadlessProperty();
SpringApplicationRunListeners listeners = getRunListeners(args);
listeners.starting(bootstrapContext, this.mainApplicationClass);
try {
ApplicationArguments applicationArguments = new DefaultApplicationArguments(args);
ConfigurableEnvironment environment = prepareEnvironment(listeners, bootstrapContext, applicationArguments);
configureIgnoreBeanInfo(environment);
Banner printedBanner = printBanner(environment);
context = createApplicationContext();
context.setApplicationStartup(this.applicationStartup);
prepareContext(bootstrapContext, context, environment, listeners, applicationArguments, printedBanner);
// 该处为拦截点!!!
refreshContext(context);
afterRefresh(context, applicationArguments);
Duration timeTakenToStartup = Duration.ofNanos(System.nanoTime() - startTime);
if (this.logStartupInfo) {
new StartupInfoLogger(this.mainApplicationClass).logStarted(getApplicationLog(), timeTakenToStartup);
}
listeners.started(context, timeTakenToStartup);
callRunners(context, applicationArguments);
}
catch (Throwable ex) {
handleRunFailure(context, ex, listeners);
throw new IllegalStateException(ex);
}
try {
Duration timeTakenToReady = Duration.ofNanos(System.nanoTime() - startTime);
listeners.ready(context, timeTakenToReady);
}
catch (Throwable ex) {
handleRunFailure(context, ex, null);
throw new IllegalStateException(ex);
}
return context;
} 具体拦截逻辑该方法只有一个context参数,该参数即为所需的WebApplicationContext对象,在该对象中获取Spring Context的Environment后再按照application.yml中配置的key去获取相应属性即可。 该处须注意所有的Spring Context都要走该流程,所以需要筛选出是否为GenericReactiveWebApplicationContext或GenericWebApplicationContext的实例 初始化好Agent参数后将其保存起来供整个Agent生命周期使用,即PolarisAgentProperties对象。 @Override
public void beforeInterceptor(Object target, Object[] args, PolarisAgentProperties agentProperties) {
// check if servlet applicationContext or reactive applicationContext
Object configurableContext = args[0];
if (configurableContext instanceof GenericWebApplicationContext || configurableContext instanceof GenericReactiveWebApplicationContext) {
// log
LogUtils.logTargetFound(target);
// convert to applicationContext, actual AnnotationConfigServletWebApplicationContext or AnnotationConfigReactiveWebServerApplicationContext
ApplicationContext applicationContext = (ApplicationContext) configurableContext;
// reserve application context for agent
SpringContextFactory.setApplicationContext(applicationContext);
// get basic info from applicationContext
port = applicationContext.getEnvironment().getProperty("server.port");
service = applicationContext.getEnvironment().getProperty("spring.application.name");
host = applicationContext.getEnvironment().getProperty("spring.cloud.client.ip-address");
Assert.notNull(port, "the server port can't be null, please check your server config");
Assert.notNull(service, "the application name can't be null, please check your spring config");
logger.info("Polaris service is set with port: {}", port);
logger.info("Polaris service is set with service: {}", service);
logger.info("Polaris service is set with host: {}", host);
// get init info from system
String host = HostUtils.getHost();
String namespace = System.getProperty("polaris.namespace");
String serverAddress = System.getProperty("polaris.server.address");
Assert.notNull(serverAddress, "the polaris server address can't be null, please check your polaris agent parameter");
if (StringUtils.isEmpty(namespace)) {
namespace = "default";
// logger.warn("the input namespace is empty, use default instead");
System.out.println("the input namespace is empty, use default instead");
}
// init polaris config and reserve
PolarisAgentProperties polarisAgentProperties = new PolarisAgentProperties();
polarisAgentProperties.setHost(host);
polarisAgentProperties.setPort(Integer.valueOf(port));
polarisAgentProperties.setProtocol("grpc");
polarisAgentProperties.setNamespace(namespace);
polarisAgentProperties.setService(service);
polarisAgentProperties.setServerAddress(serverAddress);
PolarisAgentPropertiesFactory.setPolarisAgentProperties(polarisAgentProperties);
// init polarisContext and api
PolarisContext polarisContext = new PolarisContext(polarisAgentProperties);
PolarisAPIFactory.init(polarisContext);
}
} 服务注册Spring-cloud服务注册规范为:将服务实例抽象为Registration接口、将服务注册抽象为ServiceRegistry<Registration>接口 整体流程Registration接口包含ServiceId,Host和Port public interface ServiceInstance {
default String getInstanceId() {
return null;
}
String getServiceId();
String getHost();
int getPort();
boolean isSecure();
URI getUri();
Map<String, String> getMetadata();
default String getScheme() {
return null;
}
} ServiceRegistry接口Spring-cloud规范中完成服务注册动作的类: 包含注册、反注册、状态、关闭等操作接口 public interface ServiceRegistry<R extends Registration> {
/**
* Registers the registration. A registration typically has information about an
* instance, such as its hostname and port.
* @param registration registration meta data
*/
void register(R registration);
/**
* Deregisters the registration.
* @param registration registration meta data
*/
void deregister(R registration);
/**
* Closes the ServiceRegistry. This is a lifecycle method.
*/
void close();
/**
* Sets the status of the registration. The status values are determined by the
* individual implementations.
* @param registration The registration to update.
* @param status The status to set.
* @see org.springframework.cloud.client.serviceregistry.endpoint.ServiceRegistryEndpoint
*/
void setStatus(R registration, String status);
/**
* Gets the status of a particular registration.
* @param registration The registration to query.
* @param <T> The type of the status.
* @return The status of the registration.
* @see org.springframework.cloud.client.serviceregistry.endpoint.ServiceRegistryEndpoint
*/
<T> T getStatus(R registration);
} Polaris服务注册时序图:初始化SDKContext通过Polaris-Api配置Configuration对象,在默认配置的基础之上绑定SpringCloud决策出的IP,为后续服务注册使用: private static Configuration configuration(PolarisAgentProperties polarisAgentProperties) {
ConfigurationImpl configuration = (ConfigurationImpl) ConfigAPIFactory
.defaultConfig(ConfigProvider.DEFAULT_CONFIG);
configuration.setDefault();
configuration.getGlobal().getAPI().setBindIP(PolarisServiceConstants.host);
configuration.getGlobal().getServerConnector().setAddresses(Collections.singletonList(polarisAgentProperties.getServerAddress()));
return configuration;
} // 初始化SDKContext
SDKContext.initContextByConfig(configuration()); 初始化插件(默认配置可进行扩展)通过用户传入的agent参数(必须传入server的地址,用以初始化ServerConnector)或者默认SPI方式,初始化Polaris所须的插件(可选),包括:ServerConnector、Router、HealthChecker、LoadBalancer、RateLimiter、CircuitBreaker等 Configuration.Extensions.init() 初始化ConsumerAPI和ProviderAPI通过SDKContext,初始化Polaris用以服务发现和服务注册的两个API public static void init(PolarisContext polarisContext) {
CONSUMER_API = DiscoveryAPIFactory.createConsumerAPIByContext(polarisContext.getSdkContext());
PROVIDER_API = DiscoveryAPIFactory.createProviderAPIByContext(polarisContext.getSdkContext());
} agent拦截逻辑拦截点原逻辑为在WebServer启动完成后会发布WebServerInitializedEvents事件,然后再被监听从而触发服务注册流程,实则可以在WebServer启动完成时,也就是ApplicationContext完成上下文刷新时进行post拦截,如下所示: 在AbstractApplicationContext类中finishRefresh方法进行拦截,该处也须注意所有的Spring Context都要走该流程,所以需要筛选出是否为GenericReactiveWebApplicationContext或GenericWebApplicationContext的实例 public void refresh() throws BeansException, IllegalStateException {
synchronized(this.startupShutdownMonitor) {
this.prepareRefresh();
ConfigurableListableBeanFactory beanFactory = this.obtainFreshBeanFactory();
this.prepareBeanFactory(beanFactory);
try {
this.postProcessBeanFactory(beanFactory);
this.invokeBeanFactoryPostProcessors(beanFactory);
this.registerBeanPostProcessors(beanFactory);
this.initMessageSource();
this.initApplicationEventMulticaster();
this.onRefresh();
this.registerListeners();
this.finishBeanFactoryInitialization(beanFactory);
this.finishRefresh();
// 执行拦截
} catch (BeansException var9) {
if (this.logger.isWarnEnabled()) {
this.logger.warn("Exception encountered during context initialization - cancelling refresh attempt: " + var9);
}
this.destroyBeans();
this.cancelRefresh(var9);
throw var9;
} finally {
this.resetCommonCaches();
}
}
} 拦截代码拦截判断: @Override
public void after(Object target, Object[] args, Object result, Throwable throwable) {
if (target instanceof GenericWebApplicationContext || target instanceof GenericReactiveWebApplicationContext) {
LogUtils.logTargetFound(target);
this.afterInterceptor(target, args, result, throwable, PolarisAgentPropertiesFactory.getPolarisAgentProperties());
}
}
@Override
public void afterInterceptor(Object target, Object[] args, Object result, Throwable throwable, PolarisAgentProperties polarisAgentProperties) {
AfterPolarisInterceptor polarisInterceptor = new PolarisRegistryPolarisInterceptor();
polarisInterceptor.afterInterceptor(target, args, result, throwable, polarisAgentProperties);
} 执行register逻辑: @Override
public void register(Registration registration) {
if (StringUtils.isEmpty(registration.getServiceId())) {
log.warn("No service to register for polaris client...");
return;
}
// 注册实例
InstanceRegisterRequest instanceRegisterRequest = new InstanceRegisterRequest();
instanceRegisterRequest.setNamespace(polarisContext.getPolarisContextAgentProperties().getNamespace());
instanceRegisterRequest.setService(registration.getServiceId());
instanceRegisterRequest.setHost(registration.getHost());
instanceRegisterRequest.setPort(registration.getPort());
instanceRegisterRequest.setToken(polarisContext.getPolarisContextAgentProperties().getToken());
if (null != heartbeatExecutor) {
instanceRegisterRequest.setTtl(ttl);
}
// instanceRegisterRequest.setMetadata(metadataLocalProperties.getContent());
instanceRegisterRequest.setProtocol(polarisContext.getPolarisContextAgentProperties().getProtocol());
instanceRegisterRequest.setVersion(polarisContext.getPolarisContextAgentProperties().getVersion());
try {
ProviderAPI providerClient = PolarisAPIFactory.getProviderApi();
providerClient.register(instanceRegisterRequest);
Runtime.getRuntime().addShutdownHook(new Thread(() -> deregister(registration)));
if (null != heartbeatExecutor) {
InstanceHeartbeatRequest heartbeatRequest = new InstanceHeartbeatRequest();
BeanUtils.copyProperties(instanceRegisterRequest, heartbeatRequest);
//注册成功后开始启动心跳线程
heartbeat(heartbeatRequest);
}
} catch (Exception e) {
log.error("polaris registry, {} register failed...{},", registration.getServiceId(), registration, e);
rethrowRuntimeException(e);
}
} 服务反注册使用Java提供的 Runtime.getRuntime().addShutdownHook(new Thread(() -> deregister(registration))); @Override
public void deregister(Registration registration) {
log.info("De-registering from Polaris Server now...");
if (StringUtils.isEmpty(registration.getServiceId())) {
log.warn("No dom to de-register for polaris client...");
return;
}
InstanceDeregisterRequest deRegisterRequest = new InstanceDeregisterRequest();
deRegisterRequest.setToken(polarisContext.getPolarisContextAgentProperties().getToken());
deRegisterRequest.setNamespace(polarisContext.getPolarisContextAgentProperties().getNamespace());
deRegisterRequest.setNamespace("default");
deRegisterRequest.setService(registration.getServiceId());
deRegisterRequest.setHost(registration.getHost());
deRegisterRequest.setPort(registration.getPort());
try {
ProviderAPI providerClient = PolarisAPIFactory.getProviderApi();
providerClient.deRegister(deRegisterRequest);
} catch (Exception e) {
log.error("ERR_POLARIS_DEREGISTER, de-register failed...{},", registration, e);
} finally {
if (null != heartbeatExecutor) {
heartbeatExecutor.shutdown();
}
}
log.info("De-registration finished.");
} 服务发现实现spring-cloud规范中的DiscoveryClient接口,并将其插入到Spring-Cloud服务发现DiscoveryClient列表中 整体流程DiscoveryClient接口主要包含getInstances和getServices两个方法 public interface DiscoveryClient extends Ordered {
/**
* Default order of the discovery client.
*/
int DEFAULT_ORDER = 0;
/**
* A human-readable description of the implementation, used in HealthIndicator.
* @return The description.
*/
String description();
/**
* Gets all ServiceInstances associated with a particular serviceId.
* @param serviceId The serviceId to query.
* @return A List of ServiceInstance.
*/
List<ServiceInstance> getInstances(String serviceId);
/**
* @return All known service IDs.
*/
List<String> getServices();
/**
* Default implementation for getting order of discovery clients.
* @return order
*/
@Override
default int getOrder() {
return DEFAULT_ORDER;
}
} Polaris服务发现时序图拦截逻辑拦截CompositeDiscoveryClient类的构造函数,将原DiscoveryClient列表清空,再将PolarisDiscoveryClient添加进去,以执行Polaris的服务发现逻辑 public class CompositeDiscoveryClient implements DiscoveryClient {
private final List<DiscoveryClient> discoveryClients;
public CompositeDiscoveryClient(List<DiscoveryClient> discoveryClients) {}
AnnotationAwareOrderComparator.sort(discoveryClients);
this.discoveryClients = discoveryClients;
// post拦截点 <init>
// 替换为PolarisDiscoveryClient
}
} 服务发现逻辑实现在PolarisDiscoveryClient中实现具体的调用Polaris的on获取服务及实例的逻辑 /**
* 获取服务路由后的实例列表
*
* @param service 服务名
* @return 服务实例列表
*/
public InstancesResponse getFilteredInstances(String service) {
String namespace = polarisAgentProperties.getNamespace();
GetInstancesRequest getInstancesRequest = new GetInstancesRequest();
getInstancesRequest.setNamespace(namespace);
getInstancesRequest.setService(service);
return consumerAPI.getInstances(getInstancesRequest);
} ### 负载均衡 Ribbon经过调研,尽管在新版本中已经逐渐淘汰Ribbon作为负载均衡,但是Ribbon依然在从Spring Cloud G版H版中被作为默认负载均衡器,并且如果在2020.0及以后版本引入,则依然会使用ribbon ribbon特点很多地方不使用Spring-cloud服务注册与发现的规范,而是重新定义接口,例如:
故实现agent时将首先实现这些规范,调用ribbon时将使用Polaris的路由策略返回ServerList,再由指定的负载均衡策略完成调用,主要的关键是替换掉原有负载均衡器,新建如下所示的负载均衡器,通过Polaris的routerAPI进行路由: public PolarisRoutingLoadBalancer(IClientConfig config, IRule rule, IPing ping,
ServerList<Server> serverList, RouterAPI routerAPI,
PolarisAgentProperties polarisAgentProperties) {
super(config, rule, ping, serverList, null, new PollingServerListUpdater());
this.routerAPI = routerAPI;
this.polarisAgentProperties = polarisAgentProperties;
} 拦截点Ribbon会在LoadBalancerContext构造函数中初始化负载均衡器,于是在构造函数结束后将this.lb指向新建的PolarisRoutingLoadBalancer即可: public LoadBalancerContext(ILoadBalancer lb, IClientConfig clientConfig) {
this.lb = lb;
initWithNiwsConfig(clientConfig);
// post拦截点
} |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
功能架构
java-agent作为子节点agent的方式,通过Instrument注入的方式,嵌入到用户的Java进程中,通过拦截具体框架的执行逻辑,对接到北极星SDK,实现服务治理能力。
需要支持的功能包括北极星原生的服务治理能力:服务注册/反注册/心跳、服务发现&负载均衡,动态路由、故障熔断、服务限流
以及高级的场景化能力:无损下线、调用链跟踪、全链路灰度等
运行方式
需要支持2种使用模式:
在 JVM 启动的时候加载,通过 javaagent 启动参数 java -javaagent:polaris-java-agent.jar 的方式启动
public static void premain(String agentArgument, Instrumentation instrumentation) throws Exception
在 JVM 启动后 Attach,通过 Attach API 进行加载启动
public static void agentmain(String agentArgument, Instrumentation instrumentation) throws Exception
代码架构
polaris-agent-core:核心层,包含polaris-agent的核心功能实现,对接polaris-java sdk,通过插件方式,拦截并适配各个不同框架的接口,实现服务治理相关的功能
核心层不与具体的字节码注入框架绑定,纯粹是拦截器相关的实现
polaris-agent-adapter:适配层,提供字节码注入的bootstrap相关能力,实现各个不同的字节码注入框架的拦截器接口,适配对接core层的插件
Beta Was this translation helpful? Give feedback.
All reactions