Skip to content

Commit c599ebf

Browse files
authored
WIP: update sample code (#438)
* updated the plsql samples Signed-off-by: Mark Nelson <mark.x.nelson@oracle.com> * wip save point Signed-off-by: Mark Nelson <mark.x.nelson@oracle.com> * wip save point Signed-off-by: Mark Nelson <mark.x.nelson@oracle.com> * create teq in java Signed-off-by: Mark Nelson <mark.x.nelson@oracle.com> * node samples Signed-off-by: Mark Nelson <mark.x.nelson@oracle.com>
1 parent b582c8c commit c599ebf

19 files changed

+521
-321
lines changed

code-teq/javaTeq/javaEnqDeqTEQ.java

Lines changed: 0 additions & 61 deletions
This file was deleted.

code-teq/javaTeq/pom.xml

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
7+
8+
<groupId>com.oracle.example</groupId>
9+
<artifactId>teq</artifactId>
10+
<version>0.0.1-SNAPSHOT</version>
11+
<name>teq</name>
12+
<description>TEQ examples</description>
13+
14+
<properties>
15+
<maven.compiler.target>17</maven.compiler.target>
16+
<maven.compiler.source>17</maven.compiler.source>
17+
</properties>
18+
19+
<dependencies>
20+
<dependency>
21+
<groupId>javax.transaction</groupId>
22+
<artifactId>javax.transaction-api</artifactId>
23+
<version>1.2</version>
24+
</dependency>
25+
<dependency>
26+
<groupId>com.oracle.database.jdbc</groupId>
27+
<artifactId>ojdbc8</artifactId>
28+
<version>19.3.0.0</version>
29+
</dependency>
30+
<dependency>
31+
<groupId>com.oracle.database.messaging</groupId>
32+
<artifactId>aqapi</artifactId>
33+
<version>21.3.0.0</version>
34+
<scope>system</scope>
35+
<systemPath>/home/mark/src/aqapi.jar</systemPath>
36+
</dependency>
37+
<dependency>
38+
<groupId>javax.jms</groupId>
39+
<artifactId>javax.jms-api</artifactId>
40+
<version>2.0.1</version>
41+
</dependency>
42+
<dependency>
43+
<groupId>javax.transaction</groupId>
44+
<artifactId>jta</artifactId>
45+
<version>1.1</version>
46+
</dependency>
47+
</dependencies>
48+
49+
<build>
50+
<plugins>
51+
<plugin>
52+
<groupId>org.codehaus.mojo</groupId>
53+
<artifactId>exec-maven-plugin</artifactId>
54+
<version>3.0.0</version>
55+
<executions>
56+
<execution>
57+
<goals>
58+
<goal>exec</goal>
59+
</goals>
60+
</execution>
61+
</executions>
62+
<configuration>
63+
<executable>java</executable>
64+
<arguments>
65+
<argument>-Doracle.jdbc.fanEnabled=false</argument>
66+
<argument>-classpath</argument>
67+
<classpath/>
68+
<argument>com.oracle.example.ConsumeTEQ</argument>
69+
</arguments>
70+
</configuration>
71+
</plugin>
72+
</plugins>
73+
</build>
74+
75+
</project>
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package com.oracle.example;
2+
3+
import java.sql.SQLException;
4+
5+
import javax.jms.JMSException;
6+
import javax.jms.Session;
7+
import javax.jms.Topic;
8+
import javax.jms.TopicConnection;
9+
import javax.jms.TopicConnectionFactory;
10+
import javax.jms.TopicSession;
11+
12+
import oracle.AQ.AQException;
13+
import oracle.jms.AQjmsFactory;
14+
import oracle.jms.AQjmsSession;
15+
import oracle.jms.AQjmsTextMessage;
16+
import oracle.jms.AQjmsTopicSubscriber;
17+
import oracle.ucp.jdbc.PoolDataSource;
18+
import oracle.ucp.jdbc.PoolDataSourceFactory;
19+
20+
public class ConsumeTEQ {
21+
22+
private static String username = "pdbadmin";
23+
private static String url = "jdbc:oracle:thin:@//localhost:1521/pdb1";
24+
private static String topicName = "my_teq";
25+
26+
public static void main(String[] args) throws AQException, SQLException, JMSException {
27+
28+
// create a topic session
29+
PoolDataSource ds = PoolDataSourceFactory.getPoolDataSource();
30+
ds.setConnectionFactoryClassName("oracle.jdbc.pool.OracleDataSource");
31+
ds.setURL(url);
32+
ds.setUser(username);
33+
ds.setPassword(System.getenv("DB_PASSWORD"));
34+
35+
// create a JMS topic connection and session
36+
TopicConnectionFactory tcf = AQjmsFactory.getTopicConnectionFactory(ds);
37+
TopicConnection conn = tcf.createTopicConnection();
38+
conn.start();
39+
TopicSession session =
40+
(AQjmsSession) conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
41+
42+
// create a subscriber on the topic
43+
Topic topic = ((AQjmsSession) session).getTopic(username, topicName);
44+
AQjmsTopicSubscriber subscriber =
45+
(AQjmsTopicSubscriber) session.createDurableSubscriber(topic, "my_subscriber");
46+
47+
System.out.println("Waiting for messages...");
48+
49+
// wait forever for messages to arrive and print them out
50+
while (true) {
51+
52+
// the 1_000 is a one second timeout
53+
AQjmsTextMessage message = (AQjmsTextMessage) subscriber.receive(1_000);
54+
if (message != null) {
55+
if (message.getText() != null) {
56+
System.out.println(message.getText());
57+
} else {
58+
System.out.println();
59+
}
60+
}
61+
session.commit();
62+
}
63+
}
64+
65+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package com.oracle.example;
2+
3+
import java.sql.SQLException;
4+
5+
import javax.jms.Destination;
6+
import javax.jms.JMSException;
7+
import javax.jms.Session;
8+
import javax.jms.TopicConnection;
9+
import javax.jms.TopicConnectionFactory;
10+
import javax.jms.TopicSession;
11+
12+
import oracle.AQ.AQException;
13+
import oracle.AQ.AQQueueTableProperty;
14+
import oracle.jms.AQjmsDestination;
15+
import oracle.jms.AQjmsFactory;
16+
import oracle.jms.AQjmsSession;
17+
import oracle.ucp.jdbc.PoolDataSource;
18+
import oracle.ucp.jdbc.PoolDataSourceFactory;
19+
20+
public class CreateTEQ {
21+
22+
private static String username = "pdbadmin";
23+
private static String url = "jdbc:oracle:thin:@//localhost:1521/pdb1";
24+
25+
public static void main(String[] args) throws AQException, SQLException, JMSException {
26+
27+
// create a topic session
28+
PoolDataSource ds = PoolDataSourceFactory.getPoolDataSource();
29+
ds.setConnectionFactoryClassName("oracle.jdbc.pool.OracleDataSource");
30+
ds.setURL(url);
31+
ds.setUser(username);
32+
ds.setPassword(System.getenv("DB_PASSWORD"));
33+
34+
TopicConnectionFactory tcf = AQjmsFactory.getTopicConnectionFactory(ds);
35+
TopicConnection conn = tcf.createTopicConnection();
36+
conn.start();
37+
TopicSession session = (AQjmsSession) conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
38+
39+
// create properties
40+
AQQueueTableProperty props = new AQQueueTableProperty("SYS.AQ$_JMS_TEXT_MESAGE");
41+
props.setMultiConsumer(true);
42+
props.setPayloadType("SYS.AQ$_JMS_TEXT_MESSAGE");
43+
44+
// create queue table, topic and start it
45+
Destination myTeq = ((AQjmsSession) session).createJMSTransactionalEventQueue("my_jms_teq", true);
46+
((AQjmsDestination) myTeq).start(session, true, true);
47+
48+
}
49+
50+
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package com.oracle.example;
2+
3+
import java.sql.SQLException;
4+
5+
import javax.jms.JMSException;
6+
import javax.jms.Session;
7+
import javax.jms.Topic;
8+
import javax.jms.TopicConnection;
9+
import javax.jms.TopicConnectionFactory;
10+
import javax.jms.TopicSession;
11+
12+
import oracle.AQ.AQException;
13+
import oracle.jms.AQjmsAgent;
14+
import oracle.jms.AQjmsFactory;
15+
import oracle.jms.AQjmsSession;
16+
import oracle.jms.AQjmsTextMessage;
17+
import oracle.jms.AQjmsTopicPublisher;
18+
import oracle.ucp.jdbc.PoolDataSource;
19+
import oracle.ucp.jdbc.PoolDataSourceFactory;
20+
21+
public class PublishTEQ {
22+
23+
private static String username = "pdbadmin";
24+
private static String url = "jdbc:oracle:thin:@//localhost:1521/pdb1";
25+
private static String topicName = "my_teq";
26+
27+
public static void main(String[] args) throws AQException, SQLException, JMSException {
28+
29+
// create a topic session
30+
PoolDataSource ds = PoolDataSourceFactory.getPoolDataSource();
31+
ds.setConnectionFactoryClassName("oracle.jdbc.pool.OracleDataSource");
32+
ds.setURL(url);
33+
ds.setUser(username);
34+
ds.setPassword(System.getenv("DB_PASSWORD"));
35+
36+
// create a JMS topic connection and session
37+
TopicConnectionFactory tcf = AQjmsFactory.getTopicConnectionFactory(ds);
38+
TopicConnection conn = tcf.createTopicConnection();
39+
conn.start();
40+
TopicSession session =
41+
(AQjmsSession) conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
42+
43+
// publish message
44+
Topic topic = ((AQjmsSession) session).getTopic(username, topicName);
45+
AQjmsTopicPublisher publisher = (AQjmsTopicPublisher) session.createPublisher(topic);
46+
47+
AQjmsTextMessage message = (AQjmsTextMessage) session.createTextMessage("hello from java");
48+
publisher.publish(message, new AQjmsAgent[] { new AQjmsAgent("my_subscription", null) });
49+
session.commit();
50+
51+
// clean up
52+
publisher.close();
53+
session.close();
54+
conn.close();
55+
}
56+
57+
}

code-teq/nodeTeq/dequeueTEQ.js

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
//
2+
// This sample demonstrates how to enqueue a message onto a TEQ using PL/SQL
3+
//
4+
5+
// There are various payload types supported, including user-defined object, raw, JMS and JSON.
6+
// This sample uses the RAW payload type.
7+
8+
// Execute permission on dbms_aq is required.
9+
10+
const oracledb = require('oracledb');
11+
12+
// declare an async function to do the work, since most of the APIs in the
13+
// oracledb node library are async
14+
async function run() {
15+
let connection;
16+
17+
try {
18+
// get a connection to the database
19+
oracledb.initOracleClient({});
20+
connection = await oracledb.getConnection({
21+
user: 'pdbadmin',
22+
password: process.env.DB_PASSWORD,
23+
connectString: 'localhost:1521/pdb1'
24+
})
25+
26+
// dequeue a message
27+
const rawQueue = await connection.getQueue("my_raw_teq");
28+
rawQueue.deqOptions.consumerName = "my_subscriber";
29+
rawQueue.deqOptions.wait=oracledb.AQ_DEQ_NO_WAIT;
30+
const rawDeq = await rawQueue.deqOne();
31+
await connection.commit();
32+
33+
// RAW data is returned as a buffer, so we need to convert to a string
34+
// before displaying the data
35+
const buffer = Buffer.from(rawDeq.payload)
36+
console.log(buffer.toString());
37+
38+
} catch (err) {
39+
console.error(err);
40+
} finally {
41+
if (connection) {
42+
try {
43+
await connection.close();
44+
} catch (err) {
45+
console.error(err);
46+
}
47+
}
48+
}
49+
}
50+
51+
// entry point
52+
run();

0 commit comments

Comments
 (0)