From e445a34f8dddcb5fb6547dfd5df4460f8749a3ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=96mer=20=C3=87elik?= Date: Sat, 5 Oct 2024 01:32:15 +0300 Subject: [PATCH 1/3] Code Cleanup - StringBuilder, Redundant Iteration, Record --- .../KafkaStreamsFunctionProcessor.java | 3 ++- .../KafkaStreamsBindableProxyFactory.java | 7 +++--- .../binding/AbstractBindableProxyFactory.java | 15 ++++++------ .../stream/binding/BoundTargetHolder.java | 23 +++---------------- .../stream/config/BindingProperties.java | 10 ++++---- .../config/BindingServiceConfiguration.java | 2 ++ 6 files changed, 24 insertions(+), 36 deletions(-) diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsFunctionProcessor.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsFunctionProcessor.java index b4926cafdd..b5855040c1 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsFunctionProcessor.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsFunctionProcessor.java @@ -69,6 +69,7 @@ * @author Soby Chacko * @author Byungjun You * @author Georg Friedrich + * @author Omer Celik * @since 2.2.0 */ public class KafkaStreamsFunctionProcessor extends AbstractKafkaStreamsBinderProcessor implements BeanFactoryAware { @@ -475,7 +476,7 @@ private void handleKStreamArrayOutbound(ResolvableType resolvableType, String fu String next = iterator.next(); kafkaStreamsBindableProxyFactory.addOutputBinding(next, KStream.class); RootBeanDefinition rootBeanDefinition1 = new RootBeanDefinition(); - rootBeanDefinition1.setInstanceSupplier(() -> kafkaStreamsBindableProxyFactory.getOutputHolders().get(next).getBoundTarget()); + rootBeanDefinition1.setInstanceSupplier(() -> kafkaStreamsBindableProxyFactory.getOutputHolders().get(next).boundTarget()); registry.registerBeanDefinition(next, rootBeanDefinition1); Object targetBean = this.applicationContext.getBean(next); diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/function/KafkaStreamsBindableProxyFactory.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/function/KafkaStreamsBindableProxyFactory.java index dd52863034..f5b4f4d925 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/function/KafkaStreamsBindableProxyFactory.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/function/KafkaStreamsBindableProxyFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2023 the original author or authors. + * Copyright 2019-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -65,6 +65,7 @@ * the actual size in the returned array. That has to wait until the function is invoked and we get a result. * * @author Soby Chacko + * @author Omer Celik * @since 3.0.0 */ public class KafkaStreamsBindableProxyFactory extends AbstractBindableProxyFactory implements InitializingBean, BeanFactoryAware { @@ -173,7 +174,7 @@ public void afterPropertiesSet() { .createOutput(outputBinding), true)); String outputBinding1 = outputBinding; RootBeanDefinition rootBeanDefinition1 = new RootBeanDefinition(); - rootBeanDefinition1.setInstanceSupplier(() -> outputHolders.get(outputBinding1).getBoundTarget()); + rootBeanDefinition1.setInstanceSupplier(() -> outputHolders.get(outputBinding1).boundTarget()); BeanDefinitionRegistry registry = (BeanDefinitionRegistry) beanFactory; registry.registerBeanDefinition(outputBinding1, rootBeanDefinition1); } @@ -248,7 +249,7 @@ private void bindInput(ResolvableType arg0, String inputName) { } BeanDefinitionRegistry registry = (BeanDefinitionRegistry) beanFactory; RootBeanDefinition rootBeanDefinition = new RootBeanDefinition(); - rootBeanDefinition.setInstanceSupplier(() -> inputHolders.get(inputName).getBoundTarget()); + rootBeanDefinition.setInstanceSupplier(() -> inputHolders.get(inputName).boundTarget()); registry.registerBeanDefinition(inputName, rootBeanDefinition); } diff --git a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/AbstractBindableProxyFactory.java b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/AbstractBindableProxyFactory.java index a64ab7a206..6f3ebc69dc 100644 --- a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/AbstractBindableProxyFactory.java +++ b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/AbstractBindableProxyFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2022 the original author or authors. + * Copyright 2019-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -35,6 +35,7 @@ * * Original authors in {@link BindableProxyFactory} * @author Soby Chacko + * @author Omer Celik * @since 3.0.0 */ public class AbstractBindableProxyFactory implements Bindable { @@ -94,9 +95,9 @@ public Collection> createAndBindInputs( .entrySet()) { String inputTargetName = boundTargetHolderEntry.getKey(); BoundTargetHolder boundTargetHolder = boundTargetHolderEntry.getValue(); - if (boundTargetHolder.isBindable()) { + if (boundTargetHolder.bindable()) { bindings.addAll(bindingService.bindConsumer( - boundTargetHolder.getBoundTarget(), inputTargetName)); + boundTargetHolder.boundTarget(), inputTargetName)); } } return bindings; @@ -111,9 +112,9 @@ public Collection> createAndBindOutputs( .entrySet()) { BoundTargetHolder boundTargetHolder = boundTargetHolderEntry.getValue(); String outputTargetName = boundTargetHolderEntry.getKey(); - if (boundTargetHolderEntry.getValue().isBindable()) { + if (boundTargetHolderEntry.getValue().bindable()) { bindings.add(bindingService.bindProducer( - boundTargetHolder.getBoundTarget(), outputTargetName)); + boundTargetHolder.boundTarget(), outputTargetName)); } } return bindings; @@ -123,7 +124,7 @@ public Collection> createAndBindOutputs( public void unbindInputs(BindingService bindingService) { for (Map.Entry boundTargetHolderEntry : this.inputHolders .entrySet()) { - if (boundTargetHolderEntry.getValue().isBindable()) { + if (boundTargetHolderEntry.getValue().bindable()) { bindingService.unbindConsumers(boundTargetHolderEntry.getKey()); } } @@ -133,7 +134,7 @@ public void unbindInputs(BindingService bindingService) { public void unbindOutputs(BindingService bindingService) { for (Map.Entry boundTargetHolderEntry : this.outputHolders .entrySet()) { - if (boundTargetHolderEntry.getValue().isBindable()) { + if (boundTargetHolderEntry.getValue().bindable()) { bindingService.unbindProducers(boundTargetHolderEntry.getKey()); } } diff --git a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/BoundTargetHolder.java b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/BoundTargetHolder.java index f3a384527f..bc3fa88382 100644 --- a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/BoundTargetHolder.java +++ b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/BoundTargetHolder.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2019 the original author or authors. + * Copyright 2015-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,25 +24,8 @@ * * @author Original authors in {@link BindableProxyFactory} * @author Soby Chacko + * @author Omer Celik * @since 3.0.0 */ -public final class BoundTargetHolder { - - private Object boundTarget; - - private boolean bindable; - - public BoundTargetHolder(Object boundTarget, boolean bindable) { - this.boundTarget = boundTarget; - this.bindable = bindable; - } - - public Object getBoundTarget() { - return this.boundTarget; - } - - public boolean isBindable() { - return this.bindable; - } - +public record BoundTargetHolder(Object boundTarget, boolean bindable) { } diff --git a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/config/BindingProperties.java b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/config/BindingProperties.java index 7b0e2765ca..f8850769c7 100644 --- a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/config/BindingProperties.java +++ b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/config/BindingProperties.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2018 the original author or authors. + * Copyright 2015-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -148,16 +148,16 @@ public boolean onlyOneOfProducerOrConsumerSet() { @Override public String toString() { StringBuilder sb = new StringBuilder(); - sb.append("destination=" + this.destination); + sb.append("destination=").append(this.destination); sb.append(COMMA); - sb.append("group=" + this.group); + sb.append("group=").append(this.group); sb.append(COMMA); if (this.contentType != null) { - sb.append("contentType=" + this.contentType); + sb.append("contentType=").append(this.contentType); sb.append(COMMA); } if (this.binder != null) { - sb.append("binder=" + this.binder); + sb.append("binder=").append(this.binder); sb.append(COMMA); } sb.deleteCharAt(sb.lastIndexOf(COMMA)); diff --git a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/config/BindingServiceConfiguration.java b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/config/BindingServiceConfiguration.java index bd890bc09e..8621ebe16d 100644 --- a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/config/BindingServiceConfiguration.java +++ b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/config/BindingServiceConfiguration.java @@ -78,6 +78,7 @@ * @author Oleg Zhurakousky * @author Soby Chacko * @author Chris Bono + * @author Omer Celik */ @AutoConfiguration @EnableConfigurationProperties({ BindingServiceProperties.class, @@ -132,6 +133,7 @@ public static Map getBinderConfigurations( .entrySet()) { if (configurationEntry.getValue().isDefaultCandidate()) { defaultCandidatesExist = true; + break; } } if (!defaultCandidatesExist) { From b76dc715f99277a6a7cb6fe1e1305de3cd5f10d8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=96mer=20=C3=87elik?= Date: Sat, 5 Oct 2024 01:37:39 +0300 Subject: [PATCH 2/3] Code cleanup --- .../cloud/stream/binder/kafka/KafkaBinderMetrics.java | 3 ++- .../provisioning/RabbitExchangeQueueProvisioner.java | 8 ++++---- .../stream/binder/rabbit/RabbitMessageChannelBinder.java | 1 + .../stream/binder/rabbit/{ => utils}/StreamUtils.java | 2 +- .../cloud/stream/binding/BindableProxyFactory.java | 5 +++-- .../cloud/stream/config/BindingServiceProperties.java | 7 +++---- .../cloud/stream/function/StreamBridge.java | 2 +- .../cloud/stream/provisioning/ProvisioningException.java | 4 ++-- 8 files changed, 17 insertions(+), 15 deletions(-) rename binders/rabbit-binder/spring-cloud-stream-binder-rabbit/src/main/java/org/springframework/cloud/stream/binder/rabbit/{ => utils}/StreamUtils.java (99%) diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderMetrics.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderMetrics.java index 1ccb2bcf15..5b9925cef9 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderMetrics.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderMetrics.java @@ -68,6 +68,7 @@ * @author Tomek Szmytka * @author Nico Heller * @author Kurt Hong + * @author Omer Celik */ public class KafkaBinderMetrics implements MeterBinder, ApplicationListener, AutoCloseable { @@ -89,7 +90,7 @@ public class KafkaBinderMetrics private final MeterRegistry meterRegistry; - private Map> metadataConsumers; + private final Map> metadataConsumers; private int timeout = DEFAULT_TIMEOUT; diff --git a/binders/rabbit-binder/spring-cloud-stream-binder-rabbit-core/src/main/java/org/springframework/cloud/stream/binder/rabbit/provisioning/RabbitExchangeQueueProvisioner.java b/binders/rabbit-binder/spring-cloud-stream-binder-rabbit-core/src/main/java/org/springframework/cloud/stream/binder/rabbit/provisioning/RabbitExchangeQueueProvisioner.java index 359a75d6a3..38bac7ff5d 100644 --- a/binders/rabbit-binder/spring-cloud-stream-binder-rabbit-core/src/main/java/org/springframework/cloud/stream/binder/rabbit/provisioning/RabbitExchangeQueueProvisioner.java +++ b/binders/rabbit-binder/spring-cloud-stream-binder-rabbit-core/src/main/java/org/springframework/cloud/stream/binder/rabbit/provisioning/RabbitExchangeQueueProvisioner.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2023 the original author or authors. + * Copyright 2016-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -79,6 +79,7 @@ * @author Oleg Zhurakousky * @author Michael Michailidis * @author Byungjun You + * @author Omer Celik */ // @checkstyle:off public class RabbitExchangeQueueProvisioner @@ -379,8 +380,7 @@ private Binding partitionedBinding(String destination, Exchange exchange, Queue bindingKey = destination; } bindingKey += "-" + index; - Map arguments = new HashMap<>(); - arguments.putAll(extendedProperties.getQueueBindingArguments()); + Map arguments = new HashMap<>(extendedProperties.getQueueBindingArguments()); if (exchange instanceof TopicExchange topicExchange) { Binding binding = BindingBuilder.bind(queue).to(topicExchange) .with(bindingKey); @@ -679,7 +679,7 @@ private void configureAlternate(AlternateExchange alternate, String beanNameQual Queue queue = new Queue(binding.getQueue()); String beanName = alternate.getName() + "." + binding.getQueue() + "." + beanNameQualifier; declareQueue(beanName, queue); - Binding toBind = createBinding(exchange, queue, binding.getRoutingKey(), null, beanName); + createBinding(exchange, queue, binding.getRoutingKey(), null, beanName); } } diff --git a/binders/rabbit-binder/spring-cloud-stream-binder-rabbit/src/main/java/org/springframework/cloud/stream/binder/rabbit/RabbitMessageChannelBinder.java b/binders/rabbit-binder/spring-cloud-stream-binder-rabbit/src/main/java/org/springframework/cloud/stream/binder/rabbit/RabbitMessageChannelBinder.java index a2b3a0ee90..e753128ea0 100644 --- a/binders/rabbit-binder/spring-cloud-stream-binder-rabbit/src/main/java/org/springframework/cloud/stream/binder/rabbit/RabbitMessageChannelBinder.java +++ b/binders/rabbit-binder/spring-cloud-stream-binder-rabbit/src/main/java/org/springframework/cloud/stream/binder/rabbit/RabbitMessageChannelBinder.java @@ -85,6 +85,7 @@ import org.springframework.cloud.stream.binder.rabbit.properties.RabbitProducerProperties; import org.springframework.cloud.stream.binder.rabbit.properties.RabbitProducerProperties.ProducerType; import org.springframework.cloud.stream.binder.rabbit.provisioning.RabbitExchangeQueueProvisioner; +import org.springframework.cloud.stream.binder.rabbit.utils.StreamUtils; import org.springframework.cloud.stream.config.ListenerContainerCustomizer; import org.springframework.cloud.stream.config.MessageSourceCustomizer; import org.springframework.cloud.stream.provisioning.ConsumerDestination; diff --git a/binders/rabbit-binder/spring-cloud-stream-binder-rabbit/src/main/java/org/springframework/cloud/stream/binder/rabbit/StreamUtils.java b/binders/rabbit-binder/spring-cloud-stream-binder-rabbit/src/main/java/org/springframework/cloud/stream/binder/rabbit/utils/StreamUtils.java similarity index 99% rename from binders/rabbit-binder/spring-cloud-stream-binder-rabbit/src/main/java/org/springframework/cloud/stream/binder/rabbit/StreamUtils.java rename to binders/rabbit-binder/spring-cloud-stream-binder-rabbit/src/main/java/org/springframework/cloud/stream/binder/rabbit/utils/StreamUtils.java index fe0d5424b7..5b818081ff 100644 --- a/binders/rabbit-binder/spring-cloud-stream-binder-rabbit/src/main/java/org/springframework/cloud/stream/binder/rabbit/StreamUtils.java +++ b/binders/rabbit-binder/spring-cloud-stream-binder-rabbit/src/main/java/org/springframework/cloud/stream/binder/rabbit/utils/StreamUtils.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.springframework.cloud.stream.binder.rabbit; +package org.springframework.cloud.stream.binder.rabbit.utils; import java.util.Map; import java.util.function.Function; diff --git a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/BindableProxyFactory.java b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/BindableProxyFactory.java index d6b6958e3a..211d507329 100644 --- a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/BindableProxyFactory.java +++ b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/BindableProxyFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2022 the original author or authors. + * Copyright 2015-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -43,11 +43,12 @@ * @author Ilayaperumal Gopinathan * @author Oleg Zhurakousky * @author Soby Chacko + * @author Omer Celik */ public class BindableProxyFactory extends AbstractBindableProxyFactory implements MethodInterceptor, FactoryBean, InitializingBean, BeanFactoryAware { - private static Log log = LogFactory.getLog(BindableProxyFactory.class); + private static final Log log = LogFactory.getLog(BindableProxyFactory.class); private final Map targetCache = new HashMap<>(2); diff --git a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/config/BindingServiceProperties.java b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/config/BindingServiceProperties.java index 341d4ef7b2..4fd0b99fd9 100644 --- a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/config/BindingServiceProperties.java +++ b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/config/BindingServiceProperties.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2018 the original author or authors. + * Copyright 2015-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -53,6 +53,7 @@ * @author Oleg Zhurakousky * @author Michael Michailidis * @author Kurt Hong + * @author Omer Celik */ @ConfigurationProperties("spring.cloud.stream") @JsonInclude(Include.NON_DEFAULT) @@ -235,9 +236,7 @@ public Map asMapProperties() { for (Map.Entry entry : this.bindings.entrySet()) { properties.put(entry.getKey(), entry.getValue().toString()); } - for (Map.Entry entry : this.binders.entrySet()) { - properties.put(entry.getKey(), entry.getValue()); - } + properties.putAll(this.binders); return properties; } diff --git a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/StreamBridge.java b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/StreamBridge.java index 7e22cdb75a..14037aad0b 100644 --- a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/StreamBridge.java +++ b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/StreamBridge.java @@ -85,6 +85,7 @@ * @author Soby Chacko * @author Byungjun You * @author Michał Rowicki + * @author Omer Celik * @since 3.0.3 * */ @@ -128,7 +129,6 @@ public final class StreamBridge implements StreamOperations, SmartInitializingSi * @param bindingServiceProperties instance of {@link BindingServiceProperties} * @param applicationContext instance of {@link ConfigurableApplicationContext} */ - @SuppressWarnings("serial") StreamBridge(FunctionCatalog functionCatalog, BindingServiceProperties bindingServiceProperties, ConfigurableApplicationContext applicationContext, @Nullable NewDestinationBindingCallback destinationBindingCallback) { this.executorService = Executors.newCachedThreadPool(); diff --git a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/provisioning/ProvisioningException.java b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/provisioning/ProvisioningException.java index a39174e979..b77fc658fc 100644 --- a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/provisioning/ProvisioningException.java +++ b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/provisioning/ProvisioningException.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2017 the original author or authors. + * Copyright 2015-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,8 +25,8 @@ * See {@link NestedRuntimeException} for more usage details. * * @author Soby Chacko + * @author Omer Celik */ -@SuppressWarnings("serial") public class ProvisioningException extends NestedRuntimeException { /** From b2677cb9cd9ac4c99ce8b02d7d19839e694acb2d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=96mer=20=C3=87elik?= Date: Sat, 5 Oct 2024 01:38:56 +0300 Subject: [PATCH 3/3] Revert "Code cleanup" This reverts commit b76dc715f99277a6a7cb6fe1e1305de3cd5f10d8. --- .../cloud/stream/binder/kafka/KafkaBinderMetrics.java | 3 +-- .../provisioning/RabbitExchangeQueueProvisioner.java | 8 ++++---- .../stream/binder/rabbit/RabbitMessageChannelBinder.java | 1 - .../stream/binder/rabbit/{utils => }/StreamUtils.java | 2 +- .../cloud/stream/binding/BindableProxyFactory.java | 5 ++--- .../cloud/stream/config/BindingServiceProperties.java | 7 ++++--- .../cloud/stream/function/StreamBridge.java | 2 +- .../cloud/stream/provisioning/ProvisioningException.java | 4 ++-- 8 files changed, 15 insertions(+), 17 deletions(-) rename binders/rabbit-binder/spring-cloud-stream-binder-rabbit/src/main/java/org/springframework/cloud/stream/binder/rabbit/{utils => }/StreamUtils.java (99%) diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderMetrics.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderMetrics.java index 5b9925cef9..1ccb2bcf15 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderMetrics.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderMetrics.java @@ -68,7 +68,6 @@ * @author Tomek Szmytka * @author Nico Heller * @author Kurt Hong - * @author Omer Celik */ public class KafkaBinderMetrics implements MeterBinder, ApplicationListener, AutoCloseable { @@ -90,7 +89,7 @@ public class KafkaBinderMetrics private final MeterRegistry meterRegistry; - private final Map> metadataConsumers; + private Map> metadataConsumers; private int timeout = DEFAULT_TIMEOUT; diff --git a/binders/rabbit-binder/spring-cloud-stream-binder-rabbit-core/src/main/java/org/springframework/cloud/stream/binder/rabbit/provisioning/RabbitExchangeQueueProvisioner.java b/binders/rabbit-binder/spring-cloud-stream-binder-rabbit-core/src/main/java/org/springframework/cloud/stream/binder/rabbit/provisioning/RabbitExchangeQueueProvisioner.java index 38bac7ff5d..359a75d6a3 100644 --- a/binders/rabbit-binder/spring-cloud-stream-binder-rabbit-core/src/main/java/org/springframework/cloud/stream/binder/rabbit/provisioning/RabbitExchangeQueueProvisioner.java +++ b/binders/rabbit-binder/spring-cloud-stream-binder-rabbit-core/src/main/java/org/springframework/cloud/stream/binder/rabbit/provisioning/RabbitExchangeQueueProvisioner.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2024 the original author or authors. + * Copyright 2016-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -79,7 +79,6 @@ * @author Oleg Zhurakousky * @author Michael Michailidis * @author Byungjun You - * @author Omer Celik */ // @checkstyle:off public class RabbitExchangeQueueProvisioner @@ -380,7 +379,8 @@ private Binding partitionedBinding(String destination, Exchange exchange, Queue bindingKey = destination; } bindingKey += "-" + index; - Map arguments = new HashMap<>(extendedProperties.getQueueBindingArguments()); + Map arguments = new HashMap<>(); + arguments.putAll(extendedProperties.getQueueBindingArguments()); if (exchange instanceof TopicExchange topicExchange) { Binding binding = BindingBuilder.bind(queue).to(topicExchange) .with(bindingKey); @@ -679,7 +679,7 @@ private void configureAlternate(AlternateExchange alternate, String beanNameQual Queue queue = new Queue(binding.getQueue()); String beanName = alternate.getName() + "." + binding.getQueue() + "." + beanNameQualifier; declareQueue(beanName, queue); - createBinding(exchange, queue, binding.getRoutingKey(), null, beanName); + Binding toBind = createBinding(exchange, queue, binding.getRoutingKey(), null, beanName); } } diff --git a/binders/rabbit-binder/spring-cloud-stream-binder-rabbit/src/main/java/org/springframework/cloud/stream/binder/rabbit/RabbitMessageChannelBinder.java b/binders/rabbit-binder/spring-cloud-stream-binder-rabbit/src/main/java/org/springframework/cloud/stream/binder/rabbit/RabbitMessageChannelBinder.java index e753128ea0..a2b3a0ee90 100644 --- a/binders/rabbit-binder/spring-cloud-stream-binder-rabbit/src/main/java/org/springframework/cloud/stream/binder/rabbit/RabbitMessageChannelBinder.java +++ b/binders/rabbit-binder/spring-cloud-stream-binder-rabbit/src/main/java/org/springframework/cloud/stream/binder/rabbit/RabbitMessageChannelBinder.java @@ -85,7 +85,6 @@ import org.springframework.cloud.stream.binder.rabbit.properties.RabbitProducerProperties; import org.springframework.cloud.stream.binder.rabbit.properties.RabbitProducerProperties.ProducerType; import org.springframework.cloud.stream.binder.rabbit.provisioning.RabbitExchangeQueueProvisioner; -import org.springframework.cloud.stream.binder.rabbit.utils.StreamUtils; import org.springframework.cloud.stream.config.ListenerContainerCustomizer; import org.springframework.cloud.stream.config.MessageSourceCustomizer; import org.springframework.cloud.stream.provisioning.ConsumerDestination; diff --git a/binders/rabbit-binder/spring-cloud-stream-binder-rabbit/src/main/java/org/springframework/cloud/stream/binder/rabbit/utils/StreamUtils.java b/binders/rabbit-binder/spring-cloud-stream-binder-rabbit/src/main/java/org/springframework/cloud/stream/binder/rabbit/StreamUtils.java similarity index 99% rename from binders/rabbit-binder/spring-cloud-stream-binder-rabbit/src/main/java/org/springframework/cloud/stream/binder/rabbit/utils/StreamUtils.java rename to binders/rabbit-binder/spring-cloud-stream-binder-rabbit/src/main/java/org/springframework/cloud/stream/binder/rabbit/StreamUtils.java index 5b818081ff..fe0d5424b7 100644 --- a/binders/rabbit-binder/spring-cloud-stream-binder-rabbit/src/main/java/org/springframework/cloud/stream/binder/rabbit/utils/StreamUtils.java +++ b/binders/rabbit-binder/spring-cloud-stream-binder-rabbit/src/main/java/org/springframework/cloud/stream/binder/rabbit/StreamUtils.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.springframework.cloud.stream.binder.rabbit.utils; +package org.springframework.cloud.stream.binder.rabbit; import java.util.Map; import java.util.function.Function; diff --git a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/BindableProxyFactory.java b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/BindableProxyFactory.java index 211d507329..d6b6958e3a 100644 --- a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/BindableProxyFactory.java +++ b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/BindableProxyFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2024 the original author or authors. + * Copyright 2015-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -43,12 +43,11 @@ * @author Ilayaperumal Gopinathan * @author Oleg Zhurakousky * @author Soby Chacko - * @author Omer Celik */ public class BindableProxyFactory extends AbstractBindableProxyFactory implements MethodInterceptor, FactoryBean, InitializingBean, BeanFactoryAware { - private static final Log log = LogFactory.getLog(BindableProxyFactory.class); + private static Log log = LogFactory.getLog(BindableProxyFactory.class); private final Map targetCache = new HashMap<>(2); diff --git a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/config/BindingServiceProperties.java b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/config/BindingServiceProperties.java index 4fd0b99fd9..341d4ef7b2 100644 --- a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/config/BindingServiceProperties.java +++ b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/config/BindingServiceProperties.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2024 the original author or authors. + * Copyright 2015-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -53,7 +53,6 @@ * @author Oleg Zhurakousky * @author Michael Michailidis * @author Kurt Hong - * @author Omer Celik */ @ConfigurationProperties("spring.cloud.stream") @JsonInclude(Include.NON_DEFAULT) @@ -236,7 +235,9 @@ public Map asMapProperties() { for (Map.Entry entry : this.bindings.entrySet()) { properties.put(entry.getKey(), entry.getValue().toString()); } - properties.putAll(this.binders); + for (Map.Entry entry : this.binders.entrySet()) { + properties.put(entry.getKey(), entry.getValue()); + } return properties; } diff --git a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/StreamBridge.java b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/StreamBridge.java index 14037aad0b..7e22cdb75a 100644 --- a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/StreamBridge.java +++ b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/StreamBridge.java @@ -85,7 +85,6 @@ * @author Soby Chacko * @author Byungjun You * @author Michał Rowicki - * @author Omer Celik * @since 3.0.3 * */ @@ -129,6 +128,7 @@ public final class StreamBridge implements StreamOperations, SmartInitializingSi * @param bindingServiceProperties instance of {@link BindingServiceProperties} * @param applicationContext instance of {@link ConfigurableApplicationContext} */ + @SuppressWarnings("serial") StreamBridge(FunctionCatalog functionCatalog, BindingServiceProperties bindingServiceProperties, ConfigurableApplicationContext applicationContext, @Nullable NewDestinationBindingCallback destinationBindingCallback) { this.executorService = Executors.newCachedThreadPool(); diff --git a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/provisioning/ProvisioningException.java b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/provisioning/ProvisioningException.java index b77fc658fc..a39174e979 100644 --- a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/provisioning/ProvisioningException.java +++ b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/provisioning/ProvisioningException.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2024 the original author or authors. + * Copyright 2015-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,8 +25,8 @@ * See {@link NestedRuntimeException} for more usage details. * * @author Soby Chacko - * @author Omer Celik */ +@SuppressWarnings("serial") public class ProvisioningException extends NestedRuntimeException { /**