Skip to content

Commit 0f70844

Browse files
Transactional stream binder (#188)
* Stream Binder transactions Signed-off-by: Anders Swanson <anders.swanson@oracle.com>
1 parent 2ae791e commit 0f70844

File tree

33 files changed

+476
-113
lines changed

33 files changed

+476
-113
lines changed

database/spring-cloud-stream-binder-oracle-txeventq/spring-cloud-stream-binder-txeventq-sample/src/test/java/com/oracle/database/spring/cloud/stream/binder/sample/TxEventQSampleAppTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
@Testcontainers
2121
public class TxEventQSampleAppTest {
2222
@Container
23-
static OracleContainer oracleContainer = new OracleContainer("gvenzl/oracle-free:23.6-slim-faststart")
23+
static OracleContainer oracleContainer = new OracleContainer("gvenzl/oracle-free:23.7-slim-faststart")
2424
.withStartupTimeout(Duration.ofMinutes(2))
2525
.withUsername("testuser")
2626
.withPassword("testpwd");

database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/JMSMessageChannelBinder.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,16 +30,13 @@
3030
import com.oracle.database.spring.cloud.stream.binder.config.JmsProducerProperties;
3131
import com.oracle.database.spring.cloud.stream.binder.provisioning.JmsConsumerDestination;
3232
import com.oracle.database.spring.cloud.stream.binder.provisioning.JmsProducerDestination;
33-
34-
3533
import com.oracle.database.spring.cloud.stream.binder.utils.DestinationNameResolver;
3634
import com.oracle.database.spring.cloud.stream.binder.utils.JmsMessageDrivenChannelAdapterFactory;
3735
import com.oracle.database.spring.cloud.stream.binder.utils.JmsSendingMessageHandlerFactory;
3836
import jakarta.jms.Connection;
3937
import jakarta.jms.ConnectionFactory;
4038
import jakarta.jms.Session;
4139
import jakarta.jms.Topic;
42-
4340
import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
4441
import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
4542
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;

database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/TxEventQQueueProvisioner.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
/*
22
** TxEventQ Support for Spring Cloud Stream
3-
** Copyright (c) 2023, 2024 Oracle and/or its affiliates.
3+
** Copyright (c) 2023, 2025 Oracle and/or its affiliates.
44
**
55
** This file has been modified by Oracle Corporation.
66
*/
@@ -24,6 +24,8 @@
2424

2525
package com.oracle.database.spring.cloud.stream.binder;
2626

27+
import java.sql.SQLException;
28+
2729
import com.oracle.database.spring.cloud.stream.binder.config.JmsConsumerProperties;
2830
import com.oracle.database.spring.cloud.stream.binder.config.JmsProducerProperties;
2931
import com.oracle.database.spring.cloud.stream.binder.plsql.OracleDBUtils;
@@ -34,9 +36,6 @@
3436
import jakarta.jms.JMSException;
3537
import jakarta.jms.Session;
3638
import jakarta.jms.Topic;
37-
38-
import java.sql.SQLException;
39-
4039
import oracle.jakarta.jms.AQjmsException;
4140
import org.slf4j.Logger;
4241
import org.slf4j.LoggerFactory;
@@ -137,7 +136,7 @@ private Topic provisionProducerTopic(String topicName,
137136
ExtendedProducerProperties<JmsProducerProperties> properties) {
138137
Connection aQConnection = null;
139138
Session session = null;
140-
Topic topic = null;
139+
Topic topic;
141140
try {
142141
aQConnection = connectionFactory.createConnection();
143142
session = aQConnection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
@@ -177,7 +176,7 @@ private Topic provisionConsumerTopic(String topicName,
177176
ExtendedConsumerProperties<JmsConsumerProperties> properties) {
178177
Connection aQConnection = null;
179178
Session session = null;
180-
Topic topic = null;
179+
Topic topic;
181180
try {
182181
aQConnection = connectionFactory.createConnection();
183182
session = aQConnection.createSession(true, Session.CLIENT_ACKNOWLEDGE);

database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/config/JmsBinderGlobalConfiguration.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@
4646
@Configuration
4747
public class JmsBinderGlobalConfiguration {
4848

49-
private final ConnectionFactory connectionFactory;
49+
private ConnectionFactory connectionFactory;
5050

5151
public JmsBinderGlobalConfiguration(ConnectionFactory connectionFactory) {
5252
this.connectionFactory = connectionFactory;

database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/config/JmsExtendedBindingProperties.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@
2727
import java.util.Map;
2828

2929
import org.springframework.boot.context.properties.ConfigurationProperties;
30-
import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
3130
import org.springframework.cloud.stream.binder.AbstractExtendedBindingProperties;
31+
import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
3232

3333
@ConfigurationProperties("spring.cloud.stream.txeventq")
3434
public class JmsExtendedBindingProperties

database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/config/TxEventQJmsConfiguration.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,17 +24,15 @@
2424

2525
package com.oracle.database.spring.cloud.stream.binder.config;
2626

27+
import java.sql.SQLException;
28+
2729
import com.oracle.database.spring.cloud.stream.binder.TxEventQQueueProvisioner;
2830
import com.oracle.database.spring.cloud.stream.binder.plsql.OracleDBUtils;
29-
3031
import jakarta.jms.ConnectionFactory;
3132
import jakarta.jms.JMSException;
3233
import oracle.jakarta.jms.AQjmsConnectionFactory;
3334
import oracle.jakarta.jms.AQjmsFactory;
3435
import oracle.ucp.jdbc.PoolDataSource;
35-
36-
import java.sql.SQLException;
37-
3836
import org.slf4j.Logger;
3937
import org.slf4j.LoggerFactory;
4038
import org.springframework.boot.autoconfigure.AutoConfigureAfter;

database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/plsql/OracleDBUtils.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,14 @@
2929
import java.sql.SQLException;
3030
import java.sql.Types;
3131

32+
import oracle.ucp.jdbc.PoolDataSource;
3233
import org.slf4j.Logger;
3334
import org.slf4j.LoggerFactory;
3435

35-
import oracle.ucp.jdbc.PoolDataSource;
36-
3736
public class OracleDBUtils {
3837

3938
private PoolDataSource pds = null;
40-
private final int dbversion;
39+
private int dbversion;
4140
private final Logger logger = LoggerFactory.getLogger(getClass());
4241

4342
private static final String CREATE_KB2_TEQ =

database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/provisioning/JmsProducerDestination.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public class JmsProducerDestination implements ProducerDestination {
3434

3535
private final Topic topic;
3636
private final int partitionCount;
37-
private final int dbversion;
37+
private int dbversion;
3838

3939
public JmsProducerDestination(Topic topic, int pCount, int dbversion) {
4040
this.topic = topic;

database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/serialize/CustomSerializationMessageConverter.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
/*
22
** TxEventQ Support for Spring Cloud Stream
3-
** Copyright (c) 2023, 2024 Oracle and/or its affiliates.
3+
** Copyright (c) 2023, 2025 Oracle and/or its affiliates.
44
**
55
** This file has been modified by Oracle Corporation.
66
*/
@@ -24,13 +24,12 @@
2424

2525
package com.oracle.database.spring.cloud.stream.binder.serialize;
2626

27+
import jakarta.jms.JMSException;
28+
import jakarta.jms.Message;
2729
import org.slf4j.Logger;
2830
import org.slf4j.LoggerFactory;
2931
import org.springframework.jms.support.converter.SimpleMessageConverter;
3032

31-
import jakarta.jms.JMSException;
32-
import jakarta.jms.Message;
33-
3433
public class CustomSerializationMessageConverter extends SimpleMessageConverter {
3534
public String deserializer = null;
3635

@@ -66,8 +65,8 @@ public Object fromMessage(Message jmsMessage) throws JMSException {
6665
}
6766

6867
if (!isInstanceOfDeserializer) {
69-
logger.debug("The configured deserializer class is not an instance of 'com.oracle.database.spring.cloud.stream.binder.serialize.DeSerializer'");
70-
throw new IllegalArgumentException("The configured serializer class is not an instance of 'com.oracle.database.spring.cloud.stream.binder.serialize.DeSerializer'");
68+
logger.debug("The configured deserializer class is not an instance of 'com.oracle.cstream.serialize.DeSerializer'");
69+
throw new IllegalArgumentException("The configured serializer class is not an instance of 'com.oracle.cstream.serialize.DeSerializer'");
7170
}
7271

7372
Deserializer<?> s = null;
@@ -79,7 +78,7 @@ public Object fromMessage(Message jmsMessage) throws JMSException {
7978
throw new IllegalArgumentException("Serializer object could not be initiated.");
8079
}
8180

82-
result = s.deserialize((byte[]) result);
81+
result = (Object) (s.deserialize((byte[]) result));
8382

8483
return result;
8584
}

database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/serialize/Deserializer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,5 +25,5 @@
2525
package com.oracle.database.spring.cloud.stream.binder.serialize;
2626

2727
public interface Deserializer<T> {
28-
T deserialize(byte[] bytes);
28+
public T deserialize(byte[] bytes);
2929
}

0 commit comments

Comments
 (0)