From 673f7e70c29392743cebd5998fc06b4d6a83bc7c Mon Sep 17 00:00:00 2001 From: Haotian Zhang <928016560@qq.com> Date: Tue, 25 Mar 2025 21:05:59 +0800 Subject: [PATCH] feat:add fair lock when using multi-discovery. --- .../composite/CompositeServiceUpdateTask.java | 476 ++++++++++-------- 1 file changed, 262 insertions(+), 214 deletions(-) diff --git a/polaris-plugins/polaris-plugins-connector/connector-composite/src/main/java/com/tencent/polaris/plugins/connector/composite/CompositeServiceUpdateTask.java b/polaris-plugins/polaris-plugins-connector/connector-composite/src/main/java/com/tencent/polaris/plugins/connector/composite/CompositeServiceUpdateTask.java index bfec27b0b..f09c3b97a 100644 --- a/polaris-plugins/polaris-plugins-connector/connector-composite/src/main/java/com/tencent/polaris/plugins/connector/composite/CompositeServiceUpdateTask.java +++ b/polaris-plugins/polaris-plugins-connector/connector-composite/src/main/java/com/tencent/polaris/plugins/connector/composite/CompositeServiceUpdateTask.java @@ -50,6 +50,7 @@ import java.util.*; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantLock; import static com.tencent.polaris.api.config.plugin.DefaultPlugins.SERVER_CONNECTOR_CONSUL; import static com.tencent.polaris.api.config.plugin.DefaultPlugins.SERVER_CONNECTOR_GRPC; @@ -74,6 +75,11 @@ public class CompositeServiceUpdateTask extends ServiceUpdateTask { private final Map subServiceUpdateTaskMap = new ConcurrentHashMap<>(); + /** + * 多注册多发现并发公平锁。 + */ + private final ReentrantLock fairLock = new ReentrantLock(true); + public CompositeServiceUpdateTask(ServiceEventHandler handler, DestroyableServerConnector connector) { super(handler, connector); CompositeConnector compositeConnector = (CompositeConnector) connector; @@ -143,122 +149,133 @@ protected void handle(Throwable throwable) { @Override public boolean notifyServerEvent(ServerEvent serverEvent) { - taskStatus.compareAndSet(Status.RUNNING, Status.READY); - long currentTimeStamp = System.currentTimeMillis(); - lastUpdateTime.set(currentTimeStamp); - LOG.debug("[CompositeServerConnector]task for service {} has been notified", this); - String serverEventConnectorType = serverEvent.getConnectorType(); - ServiceUpdateTask subTask = subServiceUpdateTaskMap.get(serverEventConnectorType); - if (subTask != null) { - subTask.setStatus(Status.RUNNING, Status.READY); - subTask.setLastUpdateTime(currentTimeStamp); - LOG.debug("[CompositeServerConnector]subtask {} for service {} has been notified", serverEventConnectorType, this); - } - boolean shouldTest = false; - if (null == serverEvent.getError()) { - try { - if (serverEvent.getValue() instanceof DiscoverResponse) { - DiscoverResponse discoverResponse = (DiscoverResponse) serverEvent.getValue(); - DiscoverResponse.Builder newDiscoverResponseBuilder = DiscoverResponse.newBuilder() - .mergeFrom(discoverResponse); - CompositeConnector connector = (CompositeConnector) serverConnector; + fairLock.lock(); + try { + taskStatus.compareAndSet(Status.RUNNING, Status.READY); + long currentTimeStamp = System.currentTimeMillis(); + lastUpdateTime.set(currentTimeStamp); + LOG.debug("[CompositeServerConnector]task for service {} has been notified", this); + String serverEventConnectorType = serverEvent.getConnectorType(); + ServiceUpdateTask subTask = subServiceUpdateTaskMap.get(serverEventConnectorType); + if (subTask != null) { + subTask.setStatus(Status.RUNNING, Status.READY); + subTask.setLastUpdateTime(currentTimeStamp); + LOG.debug("[CompositeServerConnector]subtask {} for service {} has been notified", serverEventConnectorType, this); + } + boolean shouldTest = false; + if (null == serverEvent.getError()) { + try { + if (serverEvent.getValue() instanceof DiscoverResponse) { + DiscoverResponse discoverResponse = (DiscoverResponse) serverEvent.getValue(); + DiscoverResponse.Builder newDiscoverResponseBuilder = DiscoverResponse.newBuilder() + .mergeFrom(discoverResponse); + CompositeConnector connector = (CompositeConnector) serverConnector; - if (EventType.INSTANCE.equals(serviceEventKey.getEventType())) { - // load current instance map split by connector type. - CompositeRevision compositeRevision = new CompositeRevision(); - Object value = getEventHandler().getValue(); - Map> instancesMap = new HashMap<>(); - if (taskType.get() == ServiceUpdateTaskConstant.Type.LONG_RUNNING && value instanceof ServiceInstancesByProto) { - ServiceInstancesByProto cacheValue = (ServiceInstancesByProto) value; - compositeRevision = CompositeRevision.of(cacheValue.getRevision()); - List oldInstancesList = cacheValue.getOriginInstancesList(); - for (Instance oldInstance : oldInstancesList) { - String serverConnectorType = oldInstance.getMetadataOrDefault(SERVER_CONNECTOR_TYPE, SERVER_CONNECTOR_GRPC); - if (!instancesMap.containsKey(serverConnectorType)) { - instancesMap.put(serverConnectorType, new ArrayList<>()); + if (EventType.INSTANCE.equals(serviceEventKey.getEventType())) { + // load current instance map split by connector type. + CompositeRevision compositeRevision = new CompositeRevision(); + Object value = getEventHandler().getValue(); + Map> instancesMap = new HashMap<>(); + if (taskType.get() == ServiceUpdateTaskConstant.Type.LONG_RUNNING && value instanceof ServiceInstancesByProto) { + ServiceInstancesByProto cacheValue = (ServiceInstancesByProto) value; + compositeRevision = CompositeRevision.of(cacheValue.getRevision()); + List oldInstancesList = cacheValue.getOriginInstancesList(); + for (Instance oldInstance : oldInstancesList) { + String serverConnectorType = oldInstance.getMetadataOrDefault(SERVER_CONNECTOR_TYPE, SERVER_CONNECTOR_GRPC); + if (!instancesMap.containsKey(serverConnectorType)) { + instancesMap.put(serverConnectorType, new ArrayList<>()); + } + instancesMap.get(serverConnectorType).add(oldInstance); } - instancesMap.get(serverConnectorType).add(oldInstance); } - } - // 按照事件来源更新对应的revision - compositeRevision.setRevision(serverEventConnectorType, discoverResponse.getService().getRevision().getValue()); - // 按照事件来源更新对应的列表 - List serverEventInstancesList = instancesMap.computeIfAbsent(serverEventConnectorType, key -> new ArrayList<>()); - serverEventInstancesList.clear(); - serverEventInstancesList.addAll(discoverResponse.getInstancesList()); - // 由于合并多个发现结果会修改版本号,所以将 polaris 的版本号保存一份 - if (StringUtils.equals(serverEvent.getConnectorType(), SERVER_CONNECTOR_GRPC)) { - serverEvent.setPolarisRevision(discoverResponse.getService().getRevision().getValue()); - } + // 按照事件来源更新对应的revision + compositeRevision.setRevision(serverEventConnectorType, discoverResponse.getService().getRevision().getValue()); + List serverEventInstancesList = instancesMap.computeIfAbsent(serverEventConnectorType, key -> new ArrayList<>()); + if (LOG.isDebugEnabled()) { + String oldInstancesMapStr = convertToString(instancesMap); + LOG.debug("old instances map value: {}", oldInstancesMapStr); + } + if (newDiscoverResponseBuilder.getCode().getValue() != ServerCodes.DATA_NO_CHANGE) { + // 按照事件来源更新对应的列表 + serverEventInstancesList.clear(); + serverEventInstancesList.addAll(discoverResponse.getInstancesList()); + } + if (LOG.isDebugEnabled()) { + String newInstancesMapStr = convertToString(instancesMap); + LOG.debug("new instances map value: {}", newInstancesMapStr); + } + // 由于合并多个发现结果会修改版本号,所以将 polaris 的版本号保存一份 + if (StringUtils.equals(serverEvent.getConnectorType(), SERVER_CONNECTOR_GRPC)) { + serverEvent.setPolarisRevision(discoverResponse.getService().getRevision().getValue()); + } - // Get instance information list except polaris. - for (DestroyableServerConnector sc : connector.getServerConnectors()) { - if (!SERVER_CONNECTOR_GRPC.equals(sc.getName()) && sc.isDiscoveryEnable()) { - ServiceInstancesResponse serviceInstancesResponse = sc.syncGetServiceInstances(this); - if (serviceInstancesResponse != null) { - compositeRevision.setRevision(sc.getName(), serviceInstancesResponse.getRevision()); - List tempServiceInstanceList = serviceInstancesResponse.getServiceInstanceList(); - if (CollectionUtils.isNotEmpty(tempServiceInstanceList)) { - // 将 NO_CHANGE 响应转为 SUCCESS 响应,用于多个发现结果的合并 - newDiscoverResponseBuilder - .setCode(UInt32Value.newBuilder().setValue(ServerCodes.EXECUTE_SUCCESS).build()); - } - List tempServerEventInstancesList = instancesMap.computeIfAbsent(sc.getName(), key -> new ArrayList<>()); - tempServerEventInstancesList.clear(); - for (DefaultInstance e : tempServiceInstanceList) { - Instance.Builder instanceBuilder = Instance.newBuilder() - .setNamespace(StringValue.of(serviceEventKey.getNamespace())) - .setService(StringValue.of(e.getService())) - .setHost(StringValue.of(e.getHost())) - .setPort(UInt32Value.of(e.getPort())) - .setHealthy(BoolValue.of(e.isHealthy())) - .setIsolate(BoolValue.of(e.isIsolated())); - // set Id - if (StringUtils.isNotBlank(e.getId())) { - instanceBuilder.setId(StringValue.of(e.getId())); - } else { - String id = - e.getService() + "-" + e.getHost().replace(".", "-") + "-" + e.getPort(); - instanceBuilder.setId(StringValue.of(id)); - LOG.info("Instance with name {} host {} port {} doesn't have id.", e.getService() - , e.getHost(), e.getPort()); - } - // set location - ModelProto.Location.Builder locationBuilder = ModelProto.Location.newBuilder(); - if (StringUtils.isNotBlank(e.getRegion())) { - locationBuilder.setRegion(StringValue.of(e.getRegion())); - } - if (StringUtils.isNotBlank(e.getZone())) { - locationBuilder.setZone(StringValue.of(e.getZone())); - } - if (StringUtils.isNotBlank(e.getCampus())) { - locationBuilder.setCampus(StringValue.of(e.getCampus())); - } - instanceBuilder.setLocation(locationBuilder.build()); - // set metadata - if (CollectionUtils.isNotEmpty(e.getMetadata())) { - instanceBuilder.putAllMetadata(e.getMetadata()); + // Get instance information list except polaris. + for (DestroyableServerConnector sc : connector.getServerConnectors()) { + if (!SERVER_CONNECTOR_GRPC.equals(sc.getName()) && sc.isDiscoveryEnable()) { + ServiceInstancesResponse serviceInstancesResponse = sc.syncGetServiceInstances(this); + if (serviceInstancesResponse != null) { + compositeRevision.setRevision(sc.getName(), serviceInstancesResponse.getRevision()); + List tempServiceInstanceList = serviceInstancesResponse.getServiceInstanceList(); + if (CollectionUtils.isNotEmpty(tempServiceInstanceList)) { + // 将 NO_CHANGE 响应转为 SUCCESS 响应,用于多个发现结果的合并 + newDiscoverResponseBuilder + .setCode(UInt32Value.newBuilder().setValue(ServerCodes.EXECUTE_SUCCESS).build()); } - // set Protocol - if (StringUtils.isNotBlank(e.getProtocol())) { - instanceBuilder.setProtocol(StringValue.of(e.getProtocol())); - } - // set Version - if (StringUtils.isNotBlank(e.getVersion())) { - instanceBuilder.setVersion(StringValue.of(e.getVersion())); + List tempServerEventInstancesList = instancesMap.computeIfAbsent(sc.getName(), key -> new ArrayList<>()); + tempServerEventInstancesList.clear(); + for (DefaultInstance e : tempServiceInstanceList) { + Instance.Builder instanceBuilder = Instance.newBuilder() + .setNamespace(StringValue.of(serviceEventKey.getNamespace())) + .setService(StringValue.of(e.getService())) + .setHost(StringValue.of(e.getHost())) + .setPort(UInt32Value.of(e.getPort())) + .setHealthy(BoolValue.of(e.isHealthy())) + .setIsolate(BoolValue.of(e.isIsolated())); + // set Id + if (StringUtils.isNotBlank(e.getId())) { + instanceBuilder.setId(StringValue.of(e.getId())); + } else { + String id = + e.getService() + "-" + e.getHost().replace(".", "-") + "-" + e.getPort(); + instanceBuilder.setId(StringValue.of(id)); + LOG.info("Instance with name {} host {} port {} doesn't have id.", e.getService() + , e.getHost(), e.getPort()); + } + // set location + ModelProto.Location.Builder locationBuilder = ModelProto.Location.newBuilder(); + if (StringUtils.isNotBlank(e.getRegion())) { + locationBuilder.setRegion(StringValue.of(e.getRegion())); + } + if (StringUtils.isNotBlank(e.getZone())) { + locationBuilder.setZone(StringValue.of(e.getZone())); + } + if (StringUtils.isNotBlank(e.getCampus())) { + locationBuilder.setCampus(StringValue.of(e.getCampus())); + } + instanceBuilder.setLocation(locationBuilder.build()); + // set metadata + if (CollectionUtils.isNotEmpty(e.getMetadata())) { + instanceBuilder.putAllMetadata(e.getMetadata()); + } + // set Protocol + if (StringUtils.isNotBlank(e.getProtocol())) { + instanceBuilder.setProtocol(StringValue.of(e.getProtocol())); + } + // set Version + if (StringUtils.isNotBlank(e.getVersion())) { + instanceBuilder.setVersion(StringValue.of(e.getVersion())); + } + tempServerEventInstancesList.add(instanceBuilder.build()); } - tempServerEventInstancesList.add(instanceBuilder.build()); } } } - } - // Merge instance information list if needed. - newDiscoverResponseBuilder.clearInstances(); - List finalInstanceList = new ArrayList<>(instancesMap.get(serverEventConnectorType)); - for (String type : ORDER_LIST) { - if (!StringUtils.equals(type, serverEventConnectorType)) { + // Merge instance information list if needed. + newDiscoverResponseBuilder.clearInstances(); + List finalInstanceList = new ArrayList<>(); + for (String type : ORDER_LIST) { List instances = instancesMap.get(type); if (CollectionUtils.isNotEmpty(instances)) { for (Instance newInstance : instances) { @@ -276,138 +293,146 @@ public boolean notifyServerEvent(ServerEvent serverEvent) { } } } - } - Service.Builder newServiceBuilder = Service.newBuilder() - .mergeFrom(newDiscoverResponseBuilder.getService()); - if (newDiscoverResponseBuilder.getService() != null) { - if (StringUtils.isBlank(newDiscoverResponseBuilder.getService().getNamespace().getValue())) { - newServiceBuilder.setNamespace(StringValue.of(serviceEventKey.getNamespace())); + if (LOG.isDebugEnabled()) { + String finalInstanceListStr = convertToString(finalInstanceList); + LOG.debug("final instance list value: {}", finalInstanceListStr); } - if (StringUtils.isBlank(newDiscoverResponseBuilder.getService().getName().getValue())) { - newServiceBuilder.setName(StringValue.of(serviceEventKey.getService())); + Service.Builder newServiceBuilder = Service.newBuilder() + .mergeFrom(newDiscoverResponseBuilder.getService()); + if (newDiscoverResponseBuilder.getService() != null) { + if (StringUtils.isBlank(newDiscoverResponseBuilder.getService().getNamespace().getValue())) { + newServiceBuilder.setNamespace(StringValue.of(serviceEventKey.getNamespace())); + } + if (StringUtils.isBlank(newDiscoverResponseBuilder.getService().getName().getValue())) { + newServiceBuilder.setName(StringValue.of(serviceEventKey.getService())); + } } - } - newServiceBuilder.setRevision(StringValue.of(compositeRevision.getCompositeRevisionString())); - newDiscoverResponseBuilder.setService(newServiceBuilder.build()); - newDiscoverResponseBuilder.addAllInstances(finalInstanceList); + newServiceBuilder.setRevision(StringValue.of(compositeRevision.getCompositeRevisionString())); + newDiscoverResponseBuilder.setService(newServiceBuilder.build()); + newDiscoverResponseBuilder.addAllInstances(finalInstanceList); - // zero instance protect. - if (!newDiscoverResponseBuilder.getInstancesList().isEmpty()) { - serverEvent.setError(null); - } else if (newDiscoverResponseBuilder.getCode().getValue() != ServerCodes.DATA_NO_CHANGE && connector.isZeroProtectionEnabled()) { - if (value instanceof ServiceInstancesByProto) { - ServiceInstancesByProto cacheValue = (ServiceInstancesByProto) value; - newDiscoverResponseBuilder.setCode(UInt32Value.of(ServerCodes.DATA_NO_CHANGE)); - newServiceBuilder = Service.newBuilder() - .mergeFrom(newDiscoverResponseBuilder.getService()); - newServiceBuilder.setRevision(StringValue.of(cacheValue.getRevision())); - newDiscoverResponseBuilder.setService(newServiceBuilder.build()); - newDiscoverResponseBuilder.clearInstances(); - newDiscoverResponseBuilder.addAllInstances(cacheValue.getOriginInstancesList()); - if (CollectionUtils.isNotEmpty(cacheValue.getOriginInstancesList())) { - shouldTest = true; - } + // zero instance protect. + if (!newDiscoverResponseBuilder.getInstancesList().isEmpty()) { serverEvent.setError(null); + } else if (newDiscoverResponseBuilder.getCode().getValue() != ServerCodes.DATA_NO_CHANGE && connector.isZeroProtectionEnabled()) { + if (value instanceof ServiceInstancesByProto) { + ServiceInstancesByProto cacheValue = (ServiceInstancesByProto) value; + newDiscoverResponseBuilder.setCode(UInt32Value.of(ServerCodes.DATA_NO_CHANGE)); + newServiceBuilder = Service.newBuilder() + .mergeFrom(newDiscoverResponseBuilder.getService()); + newServiceBuilder.setRevision(StringValue.of(cacheValue.getRevision())); + newDiscoverResponseBuilder.setService(newServiceBuilder.build()); + newDiscoverResponseBuilder.clearInstances(); + newDiscoverResponseBuilder.addAllInstances(cacheValue.getOriginInstancesList()); + if (CollectionUtils.isNotEmpty(cacheValue.getOriginInstancesList())) { + shouldTest = true; + } + serverEvent.setError(null); + } } - } - instanceListMeta.setLastRevision(newDiscoverResponseBuilder.getService().getRevision().getValue()); - } else if (EventType.SERVICE.equals(serviceEventKey.getEventType())) { - // load current instance map split by connector type. - CompositeRevision compositeRevision = new CompositeRevision(); - Object value = getEventHandler().getValue(); - Map> servicesMap = new HashMap<>(); - if (taskType.get() == ServiceUpdateTaskConstant.Type.LONG_RUNNING && value instanceof ServicesByProto) { - ServicesByProto cacheValue = (ServicesByProto) value; - compositeRevision = CompositeRevision.of(cacheValue.getRevision()); - List oldInstancesList = cacheValue.getOriginServicesList(); - for (Service oldService : oldInstancesList) { - String serverConnectorType = oldService.getMetadataOrDefault(SERVER_CONNECTOR_TYPE, SERVER_CONNECTOR_GRPC); - if (!servicesMap.containsKey(serverConnectorType)) { - servicesMap.put(serverConnectorType, new ArrayList<>()); + instanceListMeta.setLastRevision(newDiscoverResponseBuilder.getService().getRevision().getValue()); + } else if (EventType.SERVICE.equals(serviceEventKey.getEventType())) { + // load current instance map split by connector type. + CompositeRevision compositeRevision = new CompositeRevision(); + Object value = getEventHandler().getValue(); + Map> servicesMap = new HashMap<>(); + if (taskType.get() == ServiceUpdateTaskConstant.Type.LONG_RUNNING && value instanceof ServicesByProto) { + ServicesByProto cacheValue = (ServicesByProto) value; + compositeRevision = CompositeRevision.of(cacheValue.getRevision()); + List oldInstancesList = cacheValue.getOriginServicesList(); + for (Service oldService : oldInstancesList) { + String serverConnectorType = oldService.getMetadataOrDefault(SERVER_CONNECTOR_TYPE, SERVER_CONNECTOR_GRPC); + if (!servicesMap.containsKey(serverConnectorType)) { + servicesMap.put(serverConnectorType, new ArrayList<>()); + } + servicesMap.get(serverConnectorType).add(oldService); } - servicesMap.get(serverConnectorType).add(oldService); } - } - // 按照事件来源更新对应的revision - compositeRevision.setRevision(serverEventConnectorType, discoverResponse.getService().getRevision().getValue()); - // 按照事件来源更新对应的列表 - List serverEventServicesList = servicesMap.computeIfAbsent(serverEventConnectorType, key -> new ArrayList<>()); - serverEventServicesList.clear(); - serverEventServicesList.addAll(discoverResponse.getServicesList()); - // 由于合并多个发现结果会修改版本号,所以将 polaris 的版本号保存一份 - if (StringUtils.equals(serverEvent.getConnectorType(), SERVER_CONNECTOR_GRPC)) { - serverEvent.setPolarisRevision(discoverResponse.getService().getRevision().getValue()); - } + // 按照事件来源更新对应的revision + compositeRevision.setRevision(serverEventConnectorType, discoverResponse.getService().getRevision().getValue()); + // 按照事件来源更新对应的列表 + List serverEventServicesList = servicesMap.computeIfAbsent(serverEventConnectorType, key -> new ArrayList<>()); + if (discoverResponse.getCode().getValue() == ServerCodes.DATA_NO_CHANGE) { + serverEventServicesList.clear(); + serverEventServicesList.addAll(discoverResponse.getServicesList()); + } + // 由于合并多个发现结果会修改版本号,所以将 polaris 的版本号保存一份 + if (StringUtils.equals(serverEvent.getConnectorType(), SERVER_CONNECTOR_GRPC)) { + serverEvent.setPolarisRevision(discoverResponse.getService().getRevision().getValue()); + } - // Get service information list except polaris. - for (DestroyableServerConnector sc : connector.getServerConnectors()) { - if (!SERVER_CONNECTOR_GRPC.equals(sc.getName()) && sc.isDiscoveryEnable()) { - Services services = sc.syncGetServices(this); - if (services != null) { - compositeRevision.setRevision(sc.getName(), services.getRevision()); - List tempServiceList = services.getServices(); - List tempServerEventServicesList = servicesMap.computeIfAbsent(sc.getName(), key -> new ArrayList<>()); - tempServerEventServicesList.clear(); - for (ServiceInfo serviceInfo : tempServiceList) { - Service service = Service.newBuilder() - .setNamespace(StringValue.of(serviceEventKey.getNamespace())) - .setName(StringValue.of(serviceInfo.getService())) - .build(); - tempServerEventServicesList.add(service); + // Get service information list except polaris. + for (DestroyableServerConnector sc : connector.getServerConnectors()) { + if (!SERVER_CONNECTOR_GRPC.equals(sc.getName()) && sc.isDiscoveryEnable()) { + Services services = sc.syncGetServices(this); + if (services != null) { + compositeRevision.setRevision(sc.getName(), services.getRevision()); + List tempServiceList = services.getServices(); + List tempServerEventServicesList = servicesMap.computeIfAbsent(sc.getName(), key -> new ArrayList<>()); + tempServerEventServicesList.clear(); + for (ServiceInfo serviceInfo : tempServiceList) { + Service service = Service.newBuilder() + .setNamespace(StringValue.of(serviceEventKey.getNamespace())) + .setName(StringValue.of(serviceInfo.getService())) + .build(); + tempServerEventServicesList.add(service); + } } } } - } - // Merge service information list if needed. - newDiscoverResponseBuilder.clearServices(); - List finalServiceList = new ArrayList<>(servicesMap.get(serverEventConnectorType)); - for (String type : ORDER_LIST) { - if (!StringUtils.equals(type, serverEventConnectorType)) { - List services = servicesMap.get(type); - if (CollectionUtils.isNotEmpty(services)) { - for (Service newService : services) { - boolean needAdd = true; - for (Service existService : finalServiceList) { - if (StringUtils.equals(newService.getName().getValue(), existService.getName().getValue())) { - needAdd = false; - break; + // Merge service information list if needed. + newDiscoverResponseBuilder.clearServices(); + List finalServiceList = new ArrayList<>(servicesMap.get(serverEventConnectorType)); + for (String type : ORDER_LIST) { + if (!StringUtils.equals(type, serverEventConnectorType)) { + List services = servicesMap.get(type); + if (CollectionUtils.isNotEmpty(services)) { + for (Service newService : services) { + boolean needAdd = true; + for (Service existService : finalServiceList) { + if (StringUtils.equals(newService.getName().getValue(), existService.getName().getValue())) { + needAdd = false; + break; + } + } + if (needAdd) { + finalServiceList.add(newService); } - } - if (needAdd) { - finalServiceList.add(newService); } } + newDiscoverResponseBuilder.addAllServices(finalServiceList); } - newDiscoverResponseBuilder.addAllServices(finalServiceList); + } + if (!newDiscoverResponseBuilder.getServicesList().isEmpty()) { + serverEvent.setError(null); } } - if (!newDiscoverResponseBuilder.getServicesList().isEmpty()) { - serverEvent.setError(null); + DiscoverResponse response = newDiscoverResponseBuilder.build(); + if (EventType.INSTANCE.equals(serviceEventKey.getEventType()) && shouldTest) { + connector.submitTestConnectivityTask(this, response); } + serverEvent.setValue(response); } - DiscoverResponse response = newDiscoverResponseBuilder.build(); - if (EventType.INSTANCE.equals(serviceEventKey.getEventType()) && shouldTest) { - connector.submitTestConnectivityTask(this, response); - } - serverEvent.setValue(response); + } catch (PolarisException e) { + LOG.error("Merge other server response failed.", e); + serverEvent.setError(e); + } catch (Throwable throwable) { + LOG.error("Merge other server response failed.", throwable); + serverEvent.setError(new PolarisException(ErrorCode.INTERNAL_ERROR)); } - } catch (PolarisException e) { - LOG.error("Merge other server response failed.", e); - serverEvent.setError(e); - } catch (Throwable throwable) { - LOG.error("Merge other server response failed.", throwable); - serverEvent.setError(new PolarisException(ErrorCode.INTERNAL_ERROR)); } + if (null == serverEvent.getError()) { + successUpdates.addAndGet(1); + } + boolean svcDeleted = getEventHandler().onEventUpdate(serverEvent); + if (!svcDeleted && subTask != null) { + subTask.setType(ServiceUpdateTaskConstant.Type.FIRST, ServiceUpdateTaskConstant.Type.LONG_RUNNING); + } + return svcDeleted; + } finally { + fairLock.unlock(); } - if (null == serverEvent.getError()) { - successUpdates.addAndGet(1); - } - boolean svcDeleted = getEventHandler().onEventUpdate(serverEvent); - if (!svcDeleted && subTask != null) { - subTask.setType(ServiceUpdateTaskConstant.Type.FIRST, ServiceUpdateTaskConstant.Type.LONG_RUNNING); - } - return svcDeleted; } public boolean setStatus(Status last, Status current, boolean isSpread) { @@ -436,4 +461,27 @@ public boolean notifyServerEventWithRevisionChecking(ServerEvent serverEvent, St } return false; } + + private String convertToString(List list) { + StringBuilder stringBuilder = new StringBuilder("----------------------\n"); + for (Instance instance : list) { + stringBuilder.append(instance.toString()).append("\n"); + } + stringBuilder.append("----------------------\n"); + return stringBuilder.toString(); + } + + private String convertToString(Map> map) { + StringBuilder stringBuilder = new StringBuilder(); + for (Map.Entry> entry : map.entrySet()) { + List instances = entry.getValue(); + StringBuilder line = new StringBuilder("-----------" + entry.getKey() + "-----------\n"); + for (Instance instance : instances) { + line.append(instance.toString()).append("\n"); + } + stringBuilder.append(line).append("\n"); + } + stringBuilder.append("----------------------\n"); + return stringBuilder.toString(); + } }