Skip to content

Commit 4fe60ac

Browse files
authored
Fixes (#101)
1 parent cc8bef0 commit 4fe60ac

File tree

67 files changed

+898
-371
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

67 files changed

+898
-371
lines changed

.circleci/config.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ integration_test_steps: &integration_test_steps
173173
steps:
174174
- *attach_workspace
175175
- *redis_dep
176-
- run: ./gradlew codeCoverageReport -DincludeTags=integration -DexcludeTags=redisCluster,producerOnly
176+
- run: ./gradlew codeCoverageReport -DincludeTags=integration -DexcludeTags=redisCluster,producerOnly,local
177177
- *persist_to_workspace
178178
- *copy_logs
179179
- *copy_test_results

CHANGELOG.md

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
# [Rqueue] New and Notable Changes
22

3-
### [2.8.1] - 19-Jul-2021
4-
5-
Option to add rqueue web url prefix, the prefix is configured from application.properties file using
6-
`rqueue.web.url.prefix=my-application`, now rqueue dashboard would be served
7-
at `my-application/rquque` instead of `/rqueue`, the configuration has higher priority than the
3+
### [2.9.0] - TBD
4+
### Fixes
5+
* Option to add rqueue web url prefix, the prefix is configured from application.properties file using
6+
`rqueue.web.url.prefix=/my-application/`, now rqueue dashboard would be served
7+
at `/my-application/rquque` instead of `/rqueue`, the configuration has higher priority than the
88
HTTP request header `x-forwarded-prefix`.
9+
10+
* Custom message converter is not working
11+
* RedisCommandExecutionException : command arguments must be strings or integers
912

1013
### [2.8.0] - 08-Jun-2021
1114

@@ -251,4 +254,4 @@ Fixes:
251254

252255
[2.8.0]: https://repo1.maven.org/maven2/com/github/sonus21/rqueue-core/2.8.0-RELEASE
253256

254-
[2.8.1]: https://repo1.maven.org/maven2/com/github/sonus21/rqueue-core/2.8.1-RELEASE
257+
[2.9.0]: https://repo1.maven.org/maven2/com/github/sonus21/rqueue-core/2.9.0-RELEASE

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ ext {
7070

7171
subprojects {
7272
group = 'com.github.sonus21'
73-
version = '2.8.1-RELEASE'
73+
version = '2.9.0-RELEASE'
7474

7575
dependencies {
7676
// https://mvnrepository.com/artifact/org.springframework/spring-messaging

rqueue-core/src/main/java/com/github/sonus21/rqueue/config/RqueueListenerBaseConfig.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.github.sonus21.rqueue.common.RqueueLockManager;
2222
import com.github.sonus21.rqueue.common.RqueueRedisTemplate;
2323
import com.github.sonus21.rqueue.common.impl.RqueueLockManagerImpl;
24+
import com.github.sonus21.rqueue.converter.MessageConverterProvider;
2425
import com.github.sonus21.rqueue.core.DelayedMessageScheduler;
2526
import com.github.sonus21.rqueue.core.ProcessingMessageScheduler;
2627
import com.github.sonus21.rqueue.core.RqueueBeanProvider;
@@ -66,7 +67,33 @@ public abstract class RqueueListenerBaseConfig {
6667
public static final int MAX_DB_VERSION = 2;
6768
private static final String TEMPLATE_DIR = "templates/rqueue/";
6869
private static final String TEMPLATE_SUFFIX = ".html";
69-
protected @Value("${rqueue.reactive.enabled:false}") boolean reactiveEnabled;
70+
71+
@Value("${rqueue.reactive.enabled:false}")
72+
protected boolean reactiveEnabled;
73+
74+
@Value(
75+
"${rqueue.message.converter.provider.class:com.github.sonus21.rqueue.converter.DefaultMessageConverterProvider}")
76+
private String messageConverterProviderClass;
77+
78+
protected MessageConverterProvider getMessageConverterProvider() {
79+
try {
80+
Class<?> c =
81+
Thread.currentThread().getContextClassLoader().loadClass(messageConverterProviderClass);
82+
Object messageProvider = c.newInstance();
83+
if (messageProvider instanceof MessageConverterProvider) {
84+
return (MessageConverterProvider) messageProvider;
85+
}
86+
throw new IllegalStateException(
87+
"configured message converter is not of type MessageConverterProvider, type: '"
88+
+ messageConverterProviderClass
89+
+ "'",
90+
new Exception());
91+
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
92+
throw new IllegalStateException(
93+
"MessageConverterProvider class '" + messageConverterProviderClass + "' loading failed ",
94+
e);
95+
}
96+
}
7097

7198
@Autowired(required = false)
7299
protected final SimpleRqueueListenerContainerFactory simpleRqueueListenerContainerFactory =

rqueue-core/src/main/java/com/github/sonus21/rqueue/config/RqueueWebConfig.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,7 @@ public class RqueueWebConfig {
4040

4141
/**
4242
* Whether queue stats should be collected or not. When this flag is disabled, metric data won't
43-
* be available in the dashboard. This consumes heavy CPU resources as well due to statistics
44-
* aggregation and computations.
43+
* be available in the dashboard.
4544
*/
4645
@Value("${rqueue.web.collect.listener.stats:true}")
4746
private boolean collectListenerStats;
@@ -53,12 +52,19 @@ public class RqueueWebConfig {
5352
@Value("${rqueue.web.statistic.history.day:90}")
5453
private int historyDay;
5554

55+
// number of jobs that should be aggregated at once
5656
@Value("${rqueue.web.collect.statistic.aggregate.event.count:500}")
5757
private int aggregateEventCount;
5858

59+
// controls how long to wait before doing aggregation
5960
@Value("${rqueue.web.collect.statistic.aggregate.event.wait.time:60}")
60-
private int aggregateEventWaitTime;
61+
private int aggregateEventWaitTimeInSecond;
6162

63+
// controls how long, application should wait for the running threads to complete the execution
6264
@Value("${rqueue.web.collect.statistic.aggregate.shutdown.wait.time:500}")
6365
private int aggregateShutdownWaitTime;
66+
67+
// lock duration for aggregate job, acquired per queue
68+
@Value("${rqueue.web.collect.statistic.aggregate.event.lock.duration:500}")
69+
private int aggregateEventLockDurationInMs;
6470
}

rqueue-core/src/main/java/com/github/sonus21/rqueue/config/SimpleRqueueListenerContainerFactory.java

Lines changed: 37 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@
2121

2222
import com.github.sonus21.rqueue.annotation.MessageListener;
2323
import com.github.sonus21.rqueue.annotation.RqueueListener;
24-
import com.github.sonus21.rqueue.core.DefaultRqueueMessageConverter;
24+
import com.github.sonus21.rqueue.converter.DefaultMessageConverterProvider;
25+
import com.github.sonus21.rqueue.converter.MessageConverterProvider;
2526
import com.github.sonus21.rqueue.core.RqueueMessageTemplate;
2627
import com.github.sonus21.rqueue.core.impl.RqueueMessageTemplateImpl;
2728
import com.github.sonus21.rqueue.core.middleware.Middleware;
@@ -31,15 +32,13 @@
3132
import com.github.sonus21.rqueue.models.enums.PriorityMode;
3233
import com.github.sonus21.rqueue.utils.Constants;
3334
import com.github.sonus21.rqueue.utils.backoff.TaskExecutionBackOff;
34-
import java.util.Collections;
3535
import java.util.LinkedList;
3636
import java.util.List;
3737
import org.springframework.core.task.AsyncTaskExecutor;
3838
import org.springframework.core.task.TaskExecutor;
3939
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
4040
import org.springframework.data.redis.connection.RedisConnectionFactory;
4141
import org.springframework.messaging.MessageHeaders;
42-
import org.springframework.messaging.converter.CompositeMessageConverter;
4342
import org.springframework.messaging.converter.MessageConverter;
4443
import org.springframework.util.CollectionUtils;
4544

@@ -52,6 +51,9 @@
5251
@SuppressWarnings("WeakerAccess")
5352
public class SimpleRqueueListenerContainerFactory {
5453

54+
// The message converter provider that will return a message converter to convert messages to/from
55+
private MessageConverterProvider messageConverterProvider;
56+
5557
// Provide task executor, this can be used to provide some additional details like some threads
5658
// name, etc otherwise a default task executor would be created
5759
private AsyncTaskExecutor taskExecutor;
@@ -63,8 +65,7 @@ public class SimpleRqueueListenerContainerFactory {
6365
private ReactiveRedisConnectionFactory reactiveRedisConnectionFactory;
6466
// Custom requeue message handler
6567
private RqueueMessageHandler rqueueMessageHandler;
66-
// The message converter to convert messages to/from
67-
private MessageConverter messageConverter = new DefaultRqueueMessageConverter();
68+
6869
// Send message poll time when no messages are available
6970
private long pollingInterval = 200L;
7071
// In case of failure how much time, we should wait for next job
@@ -90,7 +91,8 @@ public class SimpleRqueueListenerContainerFactory {
9091

9192
private final List<Middleware> middlewares = new LinkedList<>();
9293

93-
// Any message headers that should be set, only used for message serialization
94+
// Any message headers that should be set, headers are NOT STORED in db so it should not be
95+
// changed, same header is used in serialized and deserialization process.
9496
private MessageHeaders messageHeaders;
9597

9698
// Set priority mode for the pollers
@@ -158,11 +160,19 @@ public void setAutoStartup(boolean autoStartup) {
158160
/**
159161
* Return configured message handler
160162
*
163+
* @param messageConverterProvider message converter that will be used to serialize/deserialize
164+
* message
161165
* @return RqueueMessageHandler object
162166
*/
163-
public RqueueMessageHandler getRqueueMessageHandler() {
167+
public RqueueMessageHandler getRqueueMessageHandler(
168+
MessageConverterProvider messageConverterProvider) {
169+
if (this.messageConverterProvider == null) {
170+
this.messageConverterProvider = messageConverterProvider;
171+
}
172+
notNull(this.messageConverterProvider, "messageConverterProvider can not be null");
164173
if (rqueueMessageHandler == null) {
165-
rqueueMessageHandler = new RqueueMessageHandler(getMessageConverter(), inspectAllBean);
174+
rqueueMessageHandler =
175+
new RqueueMessageHandler(messageConverterProvider.getConverter(), inspectAllBean);
166176
}
167177
return rqueueMessageHandler;
168178
}
@@ -227,46 +237,27 @@ public void setMaxNumWorkers(int maxNumWorkers) {
227237
}
228238

229239
/**
230-
* @return the message converters
231-
* @deprecated use {@link #getMessageConverter()}
232-
*/
233-
@Deprecated
234-
public List<MessageConverter> getMessageConverters() {
235-
return Collections.singletonList(getMessageConverter());
236-
}
237-
238-
/**
239-
* For message (de)serialization we might need one or more message converters, configure those
240-
* message converters
240+
* Message converter must be configured before calling this method.
241241
*
242-
* @param messageConverters list of message converters
243-
* @deprecated use {@link #setMessageConverter(MessageConverter)}
242+
* @throws IllegalAccessException when messageConverterProvider is null
243+
* @return the message converter object
244244
*/
245-
@Deprecated
246-
public void setMessageConverters(List<MessageConverter> messageConverters) {
247-
notEmpty(messageConverters, "messageConverters must not be empty");
248-
if (messageConverters.size() == 1) {
249-
setMessageConverter(messageConverters.get(0));
250-
} else {
251-
setMessageConverter(new CompositeMessageConverter(messageConverters));
245+
public MessageConverter getMessageConverter() throws IllegalAccessException {
246+
if (messageConverterProvider == null) {
247+
throw new IllegalAccessException("messageConverterProvider is not configured");
252248
}
253-
}
254-
255-
/** @return the message converter */
256-
public MessageConverter getMessageConverter() {
257-
return messageConverter;
249+
return messageConverterProvider.getConverter();
258250
}
259251

260252
/**
261-
* A default message converter {@link DefaultRqueueMessageConverter} is added that can handle all
262-
* type of data serialization/deserialization and all data is serialized to and from JSON. You can
263-
* use other mechanism to serialize/deserialize class object like MessagePack or any other format.
253+
* Rqueue configures a default message converter that can convert message to/from for any object.
264254
*
265-
* @param messageConverter the message converter
255+
* @see DefaultMessageConverterProvider
256+
* @param messageConverterProvider the message converter provider
266257
*/
267-
public void setMessageConverter(MessageConverter messageConverter) {
268-
notNull(messageConverter, "message converter must not be null");
269-
this.messageConverter = messageConverter;
258+
public void setMessageConverterProvider(MessageConverterProvider messageConverterProvider) {
259+
notNull(messageConverterProvider, "messageConverterProvider must not be null");
260+
this.messageConverterProvider = messageConverterProvider;
270261
}
271262

272263
/** @return get Redis connection factor */
@@ -308,15 +299,16 @@ public void setRqueueMessageTemplate(RqueueMessageTemplate messageTemplate) {
308299
* @return an object of {@link RqueueMessageListenerContainer} object
309300
*/
310301
public RqueueMessageListenerContainer createMessageListenerContainer() {
311-
notNull(getRqueueMessageHandler(), "rqueueMessageHandler must not be null");
312302
notNull(redisConnectionFactory, "redisConnectionFactory must not be null");
303+
notNull(messageConverterProvider, "messageConverterProvider must not be null");
313304
if (rqueueMessageTemplate == null) {
314305
rqueueMessageTemplate =
315306
new RqueueMessageTemplateImpl(
316307
getRedisConnectionFactory(), getReactiveRedisConnectionFactory());
317308
}
318309
RqueueMessageListenerContainer messageListenerContainer =
319-
new RqueueMessageListenerContainer(getRqueueMessageHandler(), rqueueMessageTemplate);
310+
new RqueueMessageListenerContainer(
311+
getRqueueMessageHandler(messageConverterProvider), rqueueMessageTemplate);
320312
messageListenerContainer.setAutoStartup(autoStartup);
321313
if (taskExecutor != null) {
322314
messageListenerContainer.setTaskExecutor(taskExecutor);
@@ -352,6 +344,9 @@ public RqueueMessageListenerContainer createMessageListenerContainer() {
352344
if (!CollectionUtils.isEmpty(getMiddlewares())) {
353345
messageListenerContainer.setMiddlewares(getMiddlewares());
354346
}
347+
if (messageHeaders != null) {
348+
messageListenerContainer.setMessageHeaders(messageHeaders);
349+
}
355350
return messageListenerContainer;
356351
}
357352

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Copyright 2021 Sonu Kumar
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and limitations under the License.
14+
*
15+
*/
16+
17+
package com.github.sonus21.rqueue.converter;
18+
19+
import com.github.sonus21.rqueue.core.DefaultRqueueMessageConverter;
20+
import org.springframework.messaging.converter.MessageConverter;
21+
22+
public class DefaultMessageConverterProvider implements MessageConverterProvider {
23+
24+
@Override
25+
public MessageConverter getConverter() {
26+
return new DefaultRqueueMessageConverter();
27+
}
28+
}

rqueue-core/src/main/java/com/github/sonus21/rqueue/converter/GenericMessageConverter.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public GenericMessageConverter(ObjectMapper objectMapper) {
6666
*/
6767
@Override
6868
public Message<?> toMessage(Object payload, MessageHeaders headers) {
69-
log.debug("Payload: {} headers: {}", payload, headers);
69+
log.trace("Payload: {} headers: {}", payload, headers);
7070
String message = smartMessageSerDes.serialize(payload);
7171
if (message == null) {
7272
return null;
@@ -76,13 +76,13 @@ public Message<?> toMessage(Object payload, MessageHeaders headers) {
7676

7777
@Override
7878
public Object fromMessage(Message<?> message, Class<?> targetClass, Object conversionHint) {
79-
log.debug("Message: {} class: {} hint: {}", message, targetClass, conversionHint);
79+
log.trace("Message: {} class: {} hint: {}", message, targetClass, conversionHint);
8080
return fromMessage(message, targetClass);
8181
}
8282

8383
@Override
8484
public Message<?> toMessage(Object payload, MessageHeaders headers, Object conversionHint) {
85-
log.debug("Payload: {} headers: {} hint: {}", payload, headers, conversionHint);
85+
log.trace("Payload: {} headers: {} hint: {}", payload, headers, conversionHint);
8686
return toMessage(payload, headers);
8787
}
8888
/**
@@ -98,7 +98,7 @@ public Message<?> toMessage(Object payload, MessageHeaders headers, Object conve
9898
*/
9999
@Override
100100
public Object fromMessage(Message<?> message, Class<?> targetClass) {
101-
log.debug("Message: {} class: {}", message, targetClass);
101+
log.trace("Message: {} class: {}", message, targetClass);
102102
String payload;
103103
try {
104104
payload = (String) message.getPayload();
@@ -182,7 +182,7 @@ public Object deserialize(String payload) {
182182
return objectMapper.readValue(msg.msg, type);
183183
}
184184
} catch (Exception e) {
185-
log.warn("Deserialization of message {} failed", payload, e);
185+
log.debug("Deserialization of message {} failed", payload, e);
186186
}
187187
return null;
188188
}
@@ -194,7 +194,7 @@ public <T> T deserialize(byte[] payload, Class<T> clazz) {
194194
try {
195195
return objectMapper.readValue(payload, clazz);
196196
} catch (Exception e) {
197-
log.warn("Deserialization of message {} failed", new String(payload), e);
197+
log.debug("Deserialization of message {} failed", new String(payload), e);
198198
}
199199
return null;
200200
}
@@ -209,7 +209,7 @@ public String serialize(Object payload) {
209209
Msg message = new Msg(msg, name);
210210
return objectMapper.writeValueAsString(message);
211211
} catch (JsonProcessingException e) {
212-
log.warn("Serialisation failed", e);
212+
log.debug("Serialisation failed", e);
213213
return null;
214214
}
215215
}

0 commit comments

Comments
 (0)