Skip to content

Commit 0ce7599

Browse files
TxEventQ Stream Binder Sample (#107)
Signed-off-by: Anders Swanson <anders.swanson@oracle.com>
1 parent 8d02d11 commit 0ce7599

File tree

11 files changed

+317
-5
lines changed

11 files changed

+317
-5
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
# Spring Cloud Stream Binder TxEventQ Sample
2+
3+
This sample application demonstrates how to use the Spring Cloud Stream Binder for Oracle TxEventQ in a simple Spring Boot Application.
4+
5+
Spring Cloud Stream exposes a [functional messaging API](https://docs.spring.io/spring-cloud-stream/reference/spring-cloud-stream/producing-and-consuming-messages.html) for producing and consuming messages. In this sample we implement three functional interfaces to produce a series of words, capitalize them, and output them.
6+
7+
### WordSupplier: Message Producer
8+
9+
The [WordSupplier](src/main/java/com/oracle/cstream/sample/WordSupplier.java) class produces a series of words to a topic. Consumers may subscribe to this topic to review messages from the supplier.
10+
11+
### toUpperCase and stdoutConsumer
12+
13+
Messages from the WordSupplier are piped through the [toUpperCase](src/main/java/com/oracle/cstream/sample/StreamConfiguration.java) functional interface to demonstrate stream processing. Finally, each message is consumed and printed to stdout by the `stdoutConsumer`.
14+
15+
### Running the tests
16+
17+
The tests require a docker runtime environment, and will instantiate a local Oracle Database.
18+
19+
To run the tests, use the following command:
20+
21+
```shell
22+
mvn test
23+
```
24+
25+
As the test runs, you should see the following output, indicating messages are being processed by the TxEventQ stream binder:
26+
27+
```
28+
Consumed: SPRING
29+
Consumed: CLOUD
30+
Consumed: STREAM
31+
Consumed: SIMPLIFIES
32+
Consumed: EVENT-DRIVEN
33+
Consumed: MICROSERVICES
34+
Consumed: WITH
35+
Consumed: POWERFUL
36+
Consumed: MESSAGING
37+
Consumed: CAPABILITIES.
38+
```
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
2+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
3+
<!-- Copyright (c) 2024, Oracle and/or its affiliates. -->
4+
<!-- Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl. -->
5+
<modelVersion>4.0.0</modelVersion>
6+
<groupId>com.oracle.database.cstream</groupId>
7+
<artifactId>spring-cloud-stream-binder-txeventq-sample</artifactId>
8+
<version>1.0.0</version>
9+
<packaging>jar</packaging>
10+
11+
<name>spring-cloud-stream-binder-txeventq-sample</name>
12+
<description>Spring Cloud Stream Binder for Oracle TxEventQ Sample Application</description>
13+
14+
<organization>
15+
<name>Oracle America, Inc.</name>
16+
<url>https://www.oracle.com</url>
17+
</organization>
18+
19+
<developers>
20+
<developer>
21+
<name>Oracle</name>
22+
<email>obaas_ww at oracle.com</email>
23+
<organization>Oracle America, Inc.</organization>
24+
<organizationUrl>https://www.oracle.com</organizationUrl>
25+
</developer>
26+
</developers>
27+
28+
<licenses>
29+
<license>
30+
<name>The Universal Permissive License (UPL), Version 1.0</name>
31+
<url>https://oss.oracle.com/licenses/upl/</url>
32+
<distribution>repo</distribution>
33+
</license>
34+
</licenses>
35+
36+
<scm>
37+
<url>https://github.com/oracle/spring-cloud-oracle</url>
38+
<connection>scm:git:https://github.com/oracle/spring-cloud-oracle.git</connection>
39+
<developerConnection>scm:git:git@github.com:oracle/spring-cloud-oracle.git</developerConnection>
40+
</scm>
41+
42+
<properties>
43+
<maven.compiler.source>17</maven.compiler.source>
44+
<maven.compiler.target>17</maven.compiler.target>
45+
<txeventq.streambinder.version>0.9.0</txeventq.streambinder.version>
46+
<testcontainers.version>1.20.1</testcontainers.version>
47+
<spring.boot.version>3.2.7</spring.boot.version>
48+
<build-helper-maven-plugin.version>3.6.0</build-helper-maven-plugin.version>
49+
<maven-javadoc-plugin.version>3.7.0</maven-javadoc-plugin.version>
50+
<maven-source-plugin.version>3.3.1</maven-source-plugin.version>
51+
</properties>
52+
53+
<dependencies>
54+
<dependency>
55+
<groupId>com.oracle.database.cstream</groupId>
56+
<artifactId>spring-cloud-stream-binder-oracle-txeventq</artifactId>
57+
<version>${txeventq.streambinder.version}</version>
58+
</dependency>
59+
60+
<!-- Test Dependencies-->
61+
<dependency>
62+
<groupId>org.springframework.boot</groupId>
63+
<artifactId>spring-boot-starter-test</artifactId>
64+
<version>${spring.boot.version}</version>
65+
<scope>test</scope>
66+
</dependency>
67+
68+
<dependency>
69+
<groupId>org.testcontainers</groupId>
70+
<artifactId>junit-jupiter</artifactId>
71+
<version>${testcontainers.version}</version>
72+
<scope>test</scope>
73+
</dependency>
74+
75+
<dependency>
76+
<groupId>org.testcontainers</groupId>
77+
<artifactId>testcontainers</artifactId>
78+
<version>${testcontainers.version}</version>
79+
<scope>test</scope>
80+
</dependency>
81+
82+
<dependency>
83+
<groupId>org.testcontainers</groupId>
84+
<artifactId>oracle-free</artifactId>
85+
<version>${testcontainers.version}</version>
86+
<scope>test</scope>
87+
</dependency>
88+
</dependencies>
89+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
// Copyright (c) 2024, Oracle and/or its affiliates.
2+
// Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl.
3+
package com.oracle.cstream.sample;
4+
5+
import java.util.function.Consumer;
6+
import java.util.function.Function;
7+
8+
import org.springframework.beans.factory.annotation.Value;
9+
import org.springframework.context.annotation.Bean;
10+
import org.springframework.context.annotation.Configuration;
11+
12+
@Configuration
13+
public class StreamConfiguration {
14+
private @Value("${phrase}") String phrase;
15+
16+
@Bean
17+
public Function<String, String> toUpperCase() {
18+
return String::toUpperCase;
19+
}
20+
21+
@Bean
22+
public Consumer<String> stdoutConsumer() {
23+
return s -> System.out.println("Consumed: " + s);
24+
}
25+
26+
@Bean
27+
public WordSupplier wordSupplier() {
28+
return new WordSupplier(phrase);
29+
}
30+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
// Copyright (c) 2024, Oracle and/or its affiliates.
2+
// Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl.
3+
package com.oracle.cstream.sample;
4+
5+
import org.springframework.boot.SpringApplication;
6+
import org.springframework.boot.autoconfigure.SpringBootApplication;
7+
8+
@SpringBootApplication
9+
public class TxEventQSampleApp {
10+
public static void main(String[] args) {
11+
SpringApplication.run(TxEventQSampleApp.class, args);
12+
}
13+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package com.oracle.cstream.sample;
2+
3+
import java.util.concurrent.atomic.AtomicBoolean;
4+
import java.util.concurrent.atomic.AtomicInteger;
5+
import java.util.function.Supplier;
6+
7+
public class WordSupplier implements Supplier<String> {
8+
private final String[] words;
9+
private final AtomicInteger idx = new AtomicInteger(0);
10+
private final AtomicBoolean done = new AtomicBoolean(false);
11+
12+
public WordSupplier(String phrase) {
13+
this.words = phrase.split(" ");
14+
}
15+
16+
@Override
17+
public String get() {
18+
int i = idx.getAndAccumulate(words.length, (x, y) -> {
19+
if (x < words.length - 1) {
20+
return x + 1;
21+
}
22+
done.set(true);
23+
return 0;
24+
});
25+
return words[i];
26+
}
27+
28+
public boolean done() {
29+
return done.get();
30+
}
31+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
phrase: "Spring Cloud Stream simplifies event-driven microservices with powerful messaging capabilities."
2+
3+
spring:
4+
cloud:
5+
stream:
6+
bindings:
7+
wordSupplier-out-0:
8+
destination: toUpperCase-in-0
9+
group: t1
10+
producer:
11+
required-groups:
12+
- t1
13+
stdoutConsumer-in-0:
14+
destination: toUpperCase-out-0
15+
group: t1
16+
function:
17+
definition: wordSupplier;toUpperCase;stdoutConsumer
18+
19+
datasource:
20+
username: ${USERNAME}
21+
password: ${PASSWORD}
22+
url: ${JDBC_URL}
23+
driver-class-name: oracle.jdbc.OracleDriver
24+
type: oracle.ucp.jdbc.PoolDataSourceImpl
25+
oracleucp:
26+
initial-pool-size: 1
27+
min-pool-size: 1
28+
max-pool-size: 30
29+
connection-pool-name: TxEventQSample
30+
connection-factory-class-name: oracle.jdbc.pool.OracleDataSource
31+
server:
32+
port: 9001
33+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
// Copyright (c) 2024, Oracle and/or its affiliates.
2+
// Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl.
3+
package com.oracle.cstream.sample;
4+
5+
import java.time.Duration;
6+
7+
import org.junit.jupiter.api.BeforeAll;
8+
import org.junit.jupiter.api.Test;
9+
import org.springframework.beans.factory.annotation.Autowired;
10+
import org.springframework.boot.test.context.SpringBootTest;
11+
import org.springframework.cloud.stream.binding.BindingsLifecycleController;
12+
import org.springframework.test.context.DynamicPropertyRegistry;
13+
import org.springframework.test.context.DynamicPropertySource;
14+
import org.testcontainers.junit.jupiter.Container;
15+
import org.testcontainers.junit.jupiter.Testcontainers;
16+
import org.testcontainers.oracle.OracleContainer;
17+
import org.testcontainers.utility.MountableFile;
18+
19+
@SpringBootTest
20+
@Testcontainers
21+
public class TxEventQSampleAppTest {
22+
@Container
23+
static OracleContainer oracleContainer = new OracleContainer("gvenzl/oracle-free:23.5-slim-faststart")
24+
.withStartupTimeout(Duration.ofMinutes(2))
25+
.withUsername("testuser")
26+
.withPassword(("testpwd"));
27+
28+
@BeforeAll
29+
public static void setUp() throws Exception {
30+
oracleContainer.start();
31+
oracleContainer.copyFileToContainer(MountableFile.forClasspathResource("init.sql"), "/tmp/init.sql");
32+
oracleContainer.execInContainer("sqlplus", "sys / as sysdba", "@/tmp/init.sql");
33+
}
34+
35+
@DynamicPropertySource
36+
static void properties(DynamicPropertyRegistry registry) {
37+
registry.add("JDBC_URL", oracleContainer::getJdbcUrl);
38+
registry.add("USERNAME", oracleContainer::getUsername);
39+
registry.add("PASSWORD", oracleContainer::getPassword);
40+
}
41+
42+
@Autowired
43+
WordSupplier wordSupplier;
44+
45+
@Autowired
46+
BindingsLifecycleController lifecycleController;
47+
48+
@Test
49+
void processStream() throws InterruptedException {
50+
// Process all words from the word supplier message stream.
51+
do {
52+
Thread.sleep(100);
53+
} while (!wordSupplier.done());
54+
55+
// Shutdown all messaging beans.
56+
lifecycleController.queryStates().forEach((state) ->
57+
lifecycleController.stop((String) state.get("bindingName")));
58+
}
59+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
alter session set container=freepdb1;
2+
grant unlimited tablespace to testuser;
3+
grant select_catalog_role to testuser;
4+
grant execute on dbms_aq to testuser;
5+
grant execute on dbms_aqadm to testuser;
6+
grant execute on dbms_aqin to testuser;
7+
grant execute on dbms_aqjms_internal to testuser;
8+
grant execute on dbms_teqk to testuser;
9+
grant execute on DBMS_RESOURCE_MANAGER to testuser;
10+
grant select on sys.aq$_queue_shards to testuser;
11+
grant select on user_queue_partition_assignment_table to testuser;

database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/cstream/TxEventQQueueProvisioner.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737

3838
import java.sql.SQLException;
3939

40+
import oracle.jakarta.jms.AQjmsException;
4041
import org.slf4j.Logger;
4142
import org.slf4j.LoggerFactory;
4243
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
@@ -49,6 +50,7 @@
4950
public class TxEventQQueueProvisioner
5051
implements
5152
ProvisioningProvider<ExtendedConsumerProperties<JmsConsumerProperties>, ExtendedProducerProperties<JmsProducerProperties>> {
53+
private static final int CODE_TOPIC_NOT_FOUND = 243;
5254

5355
private final ConnectionFactory connectionFactory;
5456

@@ -247,7 +249,7 @@ private void checkPartitionCount(String topicName, int pCount) {
247249

248250

249251
private String formatName(String name) {
250-
// surround with double quotes
252+
// surround with double quotes
251253
// to use exact name for topic
252254
return "\"" + name + "\"";
253255
}
@@ -262,7 +264,11 @@ private Topic getTopicInstance(String topicName, Session session) {
262264
try {
263265
topic = session.createTopic(topicName);
264266
} catch (JMSException e) {
265-
logger.info("Exception: {}", e.getMessage());
267+
if (e instanceof AQjmsException aqe && aqe.getErrorNumber() == CODE_TOPIC_NOT_FOUND) {
268+
logger.debug(e.getMessage());
269+
} else {
270+
logger.info("Exception: {}", e.getMessage());
271+
}
266272
return null;
267273
}
268274
return topic;

database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/cstream/utils/SpecCompliantJmsHeaderMapper.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,10 +85,10 @@ public void fromHeaders(MessageHeaders headers, Message jmsMessage) {
8585
compliantHeaders.put(entry.getKey(), value.toString());
8686
} else if (!SUPPORTED_PROPERTY_TYPES.contains(value.getClass())) {
8787
if (value instanceof Serializable) {
88-
logger.info("Serializing {} header object", value);
88+
logger.debug("Serializing {} header object", value);
8989
compliantHeaders.put(entry.getKey(), SerializationUtils.serialize(value));
9090
} else {
91-
logger.info("Storing String representation for header: {}", entry.getKey());
91+
logger.debug("Storing String representation for header: {}", entry.getKey());
9292
compliantHeaders.put(entry.getKey(), value.toString());
9393
}
9494
}

0 commit comments

Comments
 (0)