Skip to content

Commit 2ee8650

Browse files
authored
WMS-8022 : Lab 4 : Kafka to Oracle TEQ Connection (#342)
1 parent d2f6af7 commit 2ee8650

File tree

4 files changed

+183
-0
lines changed

4 files changed

+183
-0
lines changed
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
-- Copyright (c) 2021 Oracle and/or its affiliates.
2+
-- Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl.
3+
4+
set echo on
5+
set serveroutput on size 20000
6+
set serverout on verify off
7+
8+
declare
9+
teq_topic varchar2(30) := '&1' ;
10+
teq_subscriber varchar2(30) := '&2' ;
11+
12+
dequeue_options DBMS_AQ.dequeue_options_t;
13+
message_properties DBMS_AQ.message_properties_t;
14+
message_id RAW(2000);
15+
my_message SYS.AQ$_JMS_TEXT_MESSAGE;
16+
msg_text varchar2(32767);
17+
begin
18+
DBMS_OUTPUT.ENABLE (20000);
19+
20+
if teq_topic is not null and teq_subscriber is not null
21+
then
22+
-- Dequeue Options
23+
dequeue_options.dequeue_mode := DBMS_AQ.REMOVE;
24+
dequeue_options.wait := DBMS_AQ.NO_WAIT;
25+
dequeue_options.navigation := DBMS_AQ.FIRST_MESSAGE;
26+
dequeue_options.wait := 1;
27+
dequeue_options.consumer_name := teq_subscriber;
28+
29+
DBMS_AQ.DEQUEUE(
30+
queue_name => teq_topic,
31+
dequeue_options => dequeue_options,
32+
message_properties => message_properties,
33+
payload => my_message,
34+
msgid => message_id);
35+
commit;
36+
my_message.get_text(msg_text);
37+
DBMS_OUTPUT.put_line('TEQ message: ' || msg_text);
38+
else
39+
DBMS_OUTPUT.put_line('ERR : at least one of the variables is null !');
40+
end if;
41+
end;
42+
/
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
#!/bin/bash
2+
# Copyright (c) 2021 Oracle and/or its affiliates.
3+
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl.
4+
5+
# Fail on error
6+
set -e
7+
8+
# Make sure this is run via source or .
9+
if ! (return 0 2>/dev/null); then
10+
echo "ERROR: Usage: 'source dequeue_oracle_teq.sh"
11+
exit
12+
fi
13+
14+
# Collect the DB password
15+
read -s -r -p "Please enter Oracle DB Password: " ORACLE_DB_PASSWORD
16+
echo "***********"
17+
18+
# Collect the DB USER
19+
if state_done LAB_DB_USER; then
20+
LAB_DB_USER="$(state_get LAB_DB_USER)"
21+
else
22+
echo "ERROR: Oracle DB user is missing!"
23+
exit
24+
fi
25+
26+
# Collect Oracle Database Service
27+
if state_done LAB_DB_NAME; then
28+
LAB_DB_SVC="$(state_get LAB_DB_NAME)_tp"
29+
else
30+
echo "ERROR: Oracle DB Service is missing!"
31+
exit
32+
fi
33+
34+
# Collect the Oracle TEQ Topic (Destination)
35+
if state_done LAB_TEQ_TOPIC; then
36+
TEQ_TOPIC="$(state_get LAB_TEQ_TOPIC)"
37+
else
38+
echo "ERROR: Oracle TEQ Topic is missing!"
39+
exit
40+
fi
41+
42+
# Collect the Oracle TEQ Subscriber (Destination)
43+
if state_done LAB_TEQ_TOPIC_SUBSCRIBER; then
44+
TEQ_SUBSCRIBER="$(state_get LAB_TEQ_TOPIC_SUBSCRIBER)"
45+
else
46+
echo "ERROR: Oracle TEQ Topic Subscriber is missing!"
47+
exit
48+
fi
49+
50+
# DB Connection Setup
51+
export TNS_ADMIN=$LAB_HOME/wallet
52+
53+
# DEQUEUE TEQ Message
54+
sql -S /nolog <<!
55+
connect $LAB_DB_USER/"$ORACLE_DB_PASSWORD"@$LAB_DB_SVC
56+
57+
@dequeue_msg_oracle_teq_topic.sql $TEQ_TOPIC $TEQ_SUBSCRIBER
58+
59+
exit;
60+
!
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
{
2+
"connector.class": "io.confluent.connect.jms.JmsSinkConnector",
3+
"tasks.max": "1",
4+
"topics": "LAB8022_TOPIC",
5+
"java.naming.factory.initial": "oracle.jms.AQjmsInitialContextFactory",
6+
"java.naming.provider.url": "jdbc:oracle:thin:@LAB_DB_SVC?TNS_ADMIN=/home/appuser/wallet",
7+
"db_url": "jdbc:oracle:thin:@LAB_DB_SVC?TNS_ADMIN=/home/appuser/wallet",
8+
"java.naming.security.principal": "LAB_DB_USER",
9+
"java.naming.security.credentials": "LAB_DB_PASSWORD",
10+
"jndi.connection.factory": "javax.jms.XAQueueConnectionFactory",
11+
"jms.destination.type": "topic",
12+
"jms.destination.name": "LAB_TEQ_TOPIC",
13+
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
14+
"value.converter":"org.apache.kafka.connect.storage.StringConverter",
15+
"confluent.topic.bootstrap.servers":"broker:29092",
16+
"confluent.topic.replication.factor": "1"
17+
}
18+
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
#!/bin/bash
2+
# Copyright (c) 2021 Oracle and/or its affiliates.
3+
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl.
4+
5+
# Fail on error
6+
set -e
7+
8+
CONF_FILE="${LAB_HOME}"/kafka-connect-teq/kafka2teq-connect-configuration.json
9+
10+
# Collect the DB password
11+
read -s -r -p "Please enter Oracle DB Password: " ORACLE_DB_PASSWORD
12+
#seq -f "*" -s '' -t '\n' "${#ORACLE_DB_PASSWORD}"
13+
14+
# Collect the Kafka Topic to be consumed by Connect
15+
# read -r -p "Please enter Kafka Topic: " KAFKA_TOPIC
16+
17+
# Collect the DB USER
18+
if state_get LAB_DB_USER; then
19+
LAB_DB_USER="$(state_get LAB_DB_USER)"
20+
21+
# Set the Oracle Database User used by Connect Sync
22+
sed -i 's/LAB_DB_USER/'"${LAB_DB_USER}"'/g' "$CONF_FILE"
23+
else
24+
echo "ERROR: Oracle DB user is missing!"
25+
exit
26+
fi
27+
28+
# Collect Oracle Database Service
29+
if state_get LAB_DB_NAME; then
30+
LAB_DB_SVC="$(state_get LAB_DB_NAME)_tp"
31+
32+
# Set the Oracle Database URL used by Connect Sync
33+
sed -i 's/LAB_DB_SVC/'"${LAB_DB_SVC}"'/g' "$CONF_FILE"
34+
else
35+
echo "ERROR: Oracle DB Service is missing!"
36+
exit
37+
fi
38+
39+
# Collect the Oracle TEQ Topic (Destination)
40+
if state_get LAB_TEQ_TOPIC; then
41+
TEQ_TOPIC="$(state_get LAB_TEQ_TOPIC)"
42+
43+
# Set the Oracle TEQ TOPIC produced by Connect sync
44+
sed -i 's/LAB_TEQ_TOPIC/'"${TEQ_TOPIC}"'/g' "$CONF_FILE"
45+
else
46+
echo "ERROR: Oracle TEQ Topic is missing!"
47+
exit
48+
fi
49+
50+
# Setup the Oracle DB User Password
51+
sed -i 's/LAB_DB_PASSWORD/'"${ORACLE_DB_PASSWORD}"'/g' "$CONF_FILE"
52+
53+
# Configure the Kafka Connect Sync with Oracle Database
54+
curl -S -s -o /dev/null \
55+
-i -X PUT -H "Accept:application/json" \
56+
-H "Content-Type:application/json" http://localhost:8083/connectors/JmsConnectSync_lab8022/config \
57+
-T "$CONF_FILE"
58+
59+
# Reset the Oracle DB User Password
60+
jq '."java.naming.security.credentials" |= "ORACLE_DB_PASSWORD"' \
61+
"$CONF_FILE" > "$CONF_FILE"'_tmp'
62+
63+
mv "$CONF_FILE"'_tmp' "$CONF_FILE"

0 commit comments

Comments
 (0)