Skip to content

Commit 950df12

Browse files
authored
added code for TEQwebpages (#419)
* update for node.js and python * update node and python bugs * update teardown and setup * python bug fix * Update nodeEnqDeqAQ.js * Update nodeEnqDeqTEQ.js * update oracle AQ and TEQ * added gitignore * add env paths * codeTEQ for webpage
1 parent 39a8e56 commit 950df12

File tree

11 files changed

+707
-0
lines changed

11 files changed

+707
-0
lines changed

code-teq/javaTeq/javaEnqDeqTEQ.java

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package com.examples.enqueueDequeueTEQ;
2+
3+
import java.sql.SQLException;
4+
import java.util.HashMap;
5+
import java.util.Map;
6+
import javax.jms.JMSException;
7+
8+
import javax.jms.TopicSession;
9+
10+
import org.springframework.beans.factory.annotation.Autowired;
11+
import org.springframework.beans.factory.annotation.Value;
12+
import org.springframework.stereotype.Service;
13+
14+
import com.examples.config.ConfigData;
15+
import com.examples.config.ConstantName;
16+
import com.examples.util.pubSubUtil;
17+
import com.fasterxml.jackson.core.JsonProcessingException;
18+
19+
@Service
20+
public class EnqueueDequeueTEQ {
21+
22+
@Autowired(required = true)
23+
private pubSubUtil pubSubUtil;
24+
25+
@Autowired(required = true)
26+
private ConfigData configData;
27+
28+
@Autowired(required = true)
29+
private ConstantName constantName;
30+
31+
public Map<Integer, String> pubSubTEQ()
32+
throws JsonProcessingException, ClassNotFoundException, SQLException, JMSException {
33+
Map<Integer, String> response = new HashMap();
34+
35+
TopicSession session = configData.topicDataSourceConnection();
36+
response.put(1, "Topic Connection created.");
37+
38+
pubSub(session, constantName.teq_pubSubSubscriber1, constantName.teq_pubSubQueue, "Sample text message");
39+
response.put(2, "Topic pubSub executed.");
40+
41+
return response;
42+
}
43+
44+
public void pubSub(TopicSession session, String subscriberName, String queueName, String message)
45+
throws JsonProcessingException, ClassNotFoundException, SQLException, JMSException {
46+
47+
Topic topic = ((AQjmsSession) session).getTopic(username, queueName);
48+
AQjmsTopicPublisher publisher = (AQjmsTopicPublisher) session.createPublisher(topic);
49+
TopicSubscriber topicSubscriber = (TopicSubscriber) ((AQjmsSession) session).createDurableSubscriber(topic,
50+
subscriberName);
51+
52+
AQjmsTextMessage publisherMessage = (AQjmsTextMessage) session.createTextMessage(message);
53+
System.out.println("------Publisher Message: " + publisherMessage.getText());
54+
publisher.publish(publisherMessage, new AQjmsAgent[] { new AQjmsAgent(subscriberName, null) });
55+
session.commit();
56+
57+
AQjmsTextMessage subscriberMessage = (AQjmsTextMessage) topicSubscriber.receive(10);
58+
System.out.println("------Subscriber Message: " + subscriberMessage.getText());
59+
session.commit();
60+
}
61+
}

code-teq/nodeTeq/nodeCleanupTEQ.js

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
const oracledb = require('oracledb');
2+
3+
async function run() {
4+
5+
let connection;
6+
7+
try {
8+
const config = { connectString: process.env.DB_ALIAS, externalAuth: true };
9+
const connection = await oracledb.getConnection(config);
10+
11+
cleanUp(connection,"NODE_TEQ_ADT" );
12+
cleanUp(connection,"NODE_TEQ_RAW" );
13+
cleanUp(connection,"NODE_TEQ_JMS" );
14+
cleanUp(connection,"NODE_TEQ_JSON" );
15+
16+
17+
} catch (err) {
18+
console.error(err);
19+
} finally {
20+
if (connection) {
21+
try {
22+
await connection.close();
23+
} catch (err) {
24+
console.error(err);
25+
}
26+
}
27+
}
28+
}
29+
run();
30+
31+
async function cleanUp(conn, queueTable, queueName) {
32+
33+
await conn.execute(`
34+
BEGIN
35+
DBMS_AQADM.STOP_QUEUE( QUEUE_NAME => '`.concat(queueName).concat(`');
36+
END;`)
37+
);
38+
39+
await conn.execute(`
40+
BEGIN
41+
DBMS_AQADM.DROP_TRANSACTIONAL_QUEUE(
42+
QUEUE_NAME => '`.concat(queueName).concat(`',
43+
FORCE => TRUE
44+
);
45+
END;`)
46+
);
47+
48+
}

code-teq/nodeTeq/nodeCreateTEQ.js

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
const oracledb = require('oracledb');
2+
3+
async function run() {
4+
5+
let connection;
6+
7+
try {
8+
const config = { connectString: process.env.DB_ALIAS, externalAuth: true };
9+
const connection = await oracledb.getConnection(config);
10+
11+
await connection.execute('CREATE OR REPLACE TYPE NODE_TEQ_MESSAGE_TYPE AS OBJECT (NAME VARCHAR2(10),ADDRESS VARCHAR2(50))');
12+
13+
createQueue(connection, "NODE_TEQ_ADT" , "NODE_TEQ_MESSAGE_TYPE" ,"SUBSCRIBER_NODE_TEQ_ADT");
14+
createQueue(connection, "NODE_TEQ_RAW" , "RAW" ,"SUBSCRIBER_NODE_TEQ_RAW");
15+
createQueue(connection, "NODE_TEQ_JMS" , "JMS" ,"SUBSCRIBER_NODE_TEQ_JMS");
16+
createQueue(connection, "NODE_TEQ_JSON", "JSON","SUBSCRIBER_NODE_TEQ_JSON");
17+
18+
19+
} catch (err) {
20+
console.error(err);
21+
} finally {
22+
if (connection) {
23+
try {
24+
await connection.close();
25+
} catch (err) {
26+
console.error(err);
27+
}
28+
}
29+
}
30+
}
31+
run();
32+
33+
async function createQueue(conn, queueName, payload, subscriberName) {
34+
await conn.execute(`
35+
BEGIN
36+
DBMS_AQADM.CREATE_TRANSACTIONAL_EVENT_QUEUE(
37+
queue_name =>'`.concat(queueName).concat(`',
38+
storage_clause =>null,
39+
multiple_consumers =>true,
40+
max_retries =>10,
41+
comment =>'Node.js Samples for TEQ',
42+
queue_payload_type =>'`).concat(payload).concat(`',
43+
queue_properties =>null,
44+
replication_mode =>null
45+
);
46+
END;`)
47+
);
48+
49+
await conn.execute(`
50+
BEGIN
51+
DBMS_AQADM.START_QUEUE(
52+
queue_name=>'`.concat(queueName).concat(`',
53+
enqueue =>TRUE,
54+
dequeue=> True
55+
);
56+
END;`)
57+
);
58+
59+
await conn.execute(`
60+
DECLARE
61+
subscriber sys.aq$_agent;
62+
BEGIN
63+
DBMS_AQADM.add_subscriber(
64+
queue_name =>'`.concat(queueName).concat(`',
65+
subscriber => sys.aq$_agent('`).concat(subscriberName).concat(`', null ,0)
66+
);
67+
END;`)
68+
);
69+
70+
}

code-teq/nodeTeq/nodeEnqDeqTEQ.js

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
const oracledb = require('oracledb');
2+
3+
async function run() {
4+
5+
let connection;
6+
7+
try {
8+
const config = { connectString: process.env.DB_ALIAS, externalAuth: true };
9+
const connection = await oracledb.getConnection(config);
10+
/*ADT PAYLOAD*/
11+
console.log("1)Enqueue one message with ADT payload ");
12+
const adtQueue = await connection.getQueue("NODE_TEQ_ADT", {payloadType: "NODE_TEQ_MESSAGE_TYPE"});
13+
const message = new adtQueue.payloadTypeClass(
14+
{
15+
NAME: "scott",
16+
ADDRESS: "The Kennel"
17+
}
18+
);
19+
await adtQueue.enqOne(props={payload: message});
20+
/*await adtQueue.enqOne(message);*/
21+
await connection.commit();
22+
console.log("Enqueue Done!!!");
23+
const adtResult = await connection.execute("Select QUEUE, USER_DATA from AQ$NODE_TEQ_ADT_TABLE");
24+
console.dir(adtResult.rows);
25+
26+
adtQueue.deqOptions.consumerName="SUBSCRIBER_NODE_TEQ_ADT";
27+
adtQueue.deqOptions.wait=oracledb.AQ_DEQ_NO_WAIT;
28+
const adtDeq = await adtQueue.deqOne();
29+
console.dir(adtDeq.payload());
30+
await connection.commit();
31+
console.log("Dequeued message with ADT payload : ",adtDeq.payload.NAME)
32+
console.log("Dequeue Done!!!")
33+
console.log("-----------------------------------------------------------------")
34+
35+
/*RAW PAYLOAD*/
36+
console.log("2)Enqueue one message with RAW payload ");
37+
const rawQueue = await connection.getQueue("NODE_TEQ_RAW");
38+
await rawQueue.enqOne("This is my RAW message");
39+
await connection.commit();
40+
console.log("Enqueue Done!!!");
41+
const rawResult = await connection.execute("Select QUEUE, USER_DATA from AQ$NODE_TEQ_RAW_TABLE");
42+
console.dir(rawResult.rows);
43+
44+
rawQueue.deqOptions.consumerName="SUBSCRIBER_NODE_TEQ_RAW";
45+
rawQueue.deqOptions.wait=oracledb.AQ_DEQ_NO_WAIT;
46+
const rawDeq = await rawQueue.deqOne();
47+
await connection.commit();
48+
console.dir(rawDeq.payload);
49+
await connection.commit();
50+
console.log("Dequeued message with ADT payload : ",rawDeq.payload)
51+
console.log("Dequeue Done!!!")
52+
console.log("-----------------------------------------------------------------")
53+
54+
} catch (err) {
55+
console.error(err);
56+
} finally {
57+
if (connection) {
58+
try {
59+
await connection.close();
60+
} catch (err) {
61+
console.error(err);
62+
}
63+
}
64+
}
65+
}
66+
run();

code-teq/plsqlTeq/cleanupTEQ.sql

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
--Clean up all objects related to the obj type: */
2+
Execute DBMS_AQADM.STOP_QUEUE ( queue_name => 'objType_TEQ');
3+
Execute DBMS_AQADM.drop_transactional_event_queue(queue_name =>'objType_TEQ',force=> TRUE);
4+
5+
--Cleans up all objects related to the RAW type: */
6+
Execute DBMS_AQADM.STOP_QUEUE ( queue_name => 'rawType_TEQ');
7+
Execute DBMS_AQADM.drop_transactional_event_queue(queue_name =>'rawType_TEQ',force=> TRUE);
8+
9+
--Cleans up all objects related to the priority queue: */
10+
Execute DBMS_AQADM.STOP_QUEUE ( queue_name => 'jsonType_TEQ');
11+
Execute DBMS_AQADM.drop_transactional_event_queue(queue_name =>'jsonType_TEQ',force=> TRUE);
12+
/
13+
select name, queue_table, dequeue_enabled,enqueue_enabled, sharded, queue_category, recipients from all_queues where OWNER='DBUSER' and QUEUE_TYPE<>'EXCEPTION_QUEUE';
14+
/
15+
EXIT;

code-teq/plsqlTeq/createTEQ.sql

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
2+
CREATE type Message_type as object (subject VARCHAR2(30), text VARCHAR2(80));
3+
/
4+
-- Creating an Object type queue
5+
BEGIN
6+
DBMS_AQADM.CREATE_TRANSACTIONAL_EVENT_QUEUE(
7+
queue_name =>'objType_TEQ',
8+
storage_clause =>null,
9+
multiple_consumers =>true,
10+
max_retries =>10,
11+
comment =>'ObjectType for TEQ',
12+
queue_payload_type =>'Message_type',
13+
queue_properties =>null,
14+
replication_mode =>null);
15+
DBMS_AQADM.START_QUEUE (queue_name=> 'objType_TEQ', enqueue =>TRUE, dequeue=> True);
16+
END;
17+
/
18+
19+
-- Creating a RAW type queue:
20+
BEGIN
21+
DBMS_AQADM.CREATE_TRANSACTIONAL_EVENT_QUEUE(
22+
queue_name =>'rawType_TEQ',
23+
storage_clause =>null,
24+
multiple_consumers =>true,
25+
max_retries =>10,
26+
comment =>'RAW type for TEQ',
27+
queue_payload_type =>'RAW',
28+
queue_properties =>null,
29+
replication_mode =>null);
30+
DBMS_AQADM.START_QUEUE (queue_name=> 'rawType_TEQ', enqueue =>TRUE, dequeue=> True);
31+
END;
32+
/
33+
34+
--Creating JSON type queue:
35+
BEGIN
36+
DBMS_AQADM.CREATE_TRANSACTIONAL_EVENT_QUEUE(
37+
queue_name =>'jsonType_TEQ',
38+
storage_clause =>null,
39+
multiple_consumers =>true,
40+
max_retries =>10,
41+
comment =>'jsonType for TEQ',
42+
queue_payload_type =>'JSON',
43+
queue_properties =>null,
44+
replication_mode =>null);
45+
DBMS_AQADM.START_QUEUE (queue_name=> 'jsonType_TEQ', enqueue =>TRUE, dequeue=> True);
46+
END;
47+
/
48+
BEGIN
49+
DBMS_AQADM.CREATE_TRANSACTIONAL_EVENT_QUEUE(
50+
queue_name =>'JAVA_TEQ_PUBSUB_QUEUE',
51+
storage_clause =>null,
52+
multiple_consumers=>true,
53+
max_retries =>10,
54+
comment =>'JAVA_TEQ_PUBSUB_QUEUE',
55+
queue_payload_type=>'JMS',
56+
queue_properties =>null,
57+
replication_mode =>null);
58+
DBMS_AQADM.START_QUEUE (queue_name=> 'JAVA_TEQ_PUBSUB_QUEUE', enqueue =>TRUE, dequeue=> True);
59+
END;
60+
/
61+
DECLARE
62+
subscriber sys.aq$_agent;
63+
BEGIN
64+
dbms_aqadm.add_subscriber(queue_name => 'objType_TEQ' , subscriber => sys.aq$_agent('teqBasicObjSubscriber' , null ,0), rule => 'correlation = ''teqBasicObjSubscriber''');
65+
66+
dbms_aqadm.add_subscriber(queue_name => 'rawType_TEQ' , subscriber => sys.aq$_agent('teqBasicRawSubscriber' , null ,0), rule => 'correlation = ''teqBasicRawSubscriber''');
67+
68+
dbms_aqadm.add_subscriber(queue_name => 'jsonType_TEQ' , subscriber => sys.aq$_agent('teqBasicJsonSubscriber' , null ,0), rule => 'correlation = ''teqBasicJsonSubscriber''');
69+
70+
END;
71+
/
72+
CREATE OR REPLACE FUNCTION enqueueDequeueTEQ(subscriber varchar2, queueName varchar2, message Message_Typ) RETURN Message_Typ
73+
IS
74+
enqueue_options DBMS_AQ.enqueue_options_t;
75+
message_properties DBMS_AQ.message_properties_t;
76+
message_handle RAW(16);
77+
dequeue_options DBMS_AQ.dequeue_options_t;
78+
messageData Message_Typ;
79+
80+
BEGIN
81+
messageData := message;
82+
message_properties.correlation := subscriber;
83+
DBMS_AQ.ENQUEUE(
84+
queue_name => queueName,
85+
enqueue_options => enqueue_options,
86+
message_properties => message_properties,
87+
payload => messageData,
88+
msgid => message_handle);
89+
COMMIT;
90+
DBMS_OUTPUT.PUT_LINE ('----------ENQUEUE Message: ' || 'ORDERID: ' || messageData.ORDERID || ', OTP: ' || messageData.OTP ||', DELIVERY_STATUS: ' || messageData.DELIVERY_STATUS );
91+
92+
dequeue_options.dequeue_mode := DBMS_AQ.REMOVE;
93+
dequeue_options.wait := DBMS_AQ.NO_WAIT;
94+
dequeue_options.navigation := DBMS_AQ.FIRST_MESSAGE;
95+
dequeue_options.consumer_name := subscriber;
96+
DBMS_AQ.DEQUEUE(
97+
queue_name => queueName,
98+
dequeue_options => dequeue_options,
99+
message_properties => message_properties,
100+
payload => messageData,
101+
msgid => message_handle);
102+
COMMIT;
103+
DBMS_OUTPUT.PUT_LINE ('----------DEQUEUE Message: ' || 'ORDERID: ' || messageData.ORDERID || ', OTP: ' || messageData.OTP ||', DELIVERY_STATUS: ' || messageData.DELIVERY_STATUS );
104+
RETURN messageData;
105+
END;
106+
/
107+
EXIT;

0 commit comments

Comments
 (0)