Skip to content

Commit 4ef20cb

Browse files
dotnet and go services: switch to polling for msgs (#32)
* dotnet inventorylocation query * dotnet inventorylocation query * inventory-go work * inventory-go work * inventory-go aq dequeue added * inventory-go working with pl/sql * dotnet and go services: switch to polling for msgs
1 parent a611d21 commit 4ef20cb

File tree

3 files changed

+107
-95
lines changed

3 files changed

+107
-95
lines changed

grabdish/inventory-dotnet/Startup.cs

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -91,19 +91,15 @@ public String ListenForMessages()
9191
";";
9292
Console
9393
.WriteLine("tnsAdmin:" +
94-
tnsAdmin +
95-
" connString:" +
96-
connString);
97-
using (
94+
tnsAdmin);
95+
using (
9896
OracleConnection connection = new OracleConnection(connString)
9997
)
10098
{
101-
try
102-
{
103-
connection.Open();
104-
while (true)
99+
connection.Open();
100+
while (true) {
101+
try
105102
{
106-
Console.WriteLine("connection:" + connection);
107103
//dequeue from order queues (out param)
108104
OracleCommand orderReceiveMessageCommand = new OracleCommand();
109105
orderReceiveMessageCommand.Connection = connection;
@@ -116,10 +112,23 @@ public String ListenForMessages()
116112
p_orderInfoParam.Direction = ParameterDirection.Output;
117113
orderReceiveMessageCommand.Parameters.Add (p_orderInfoParam);
118114
orderReceiveMessageCommand.ExecuteNonQuery();
119-
Order order =
115+
// Console.WriteLine("orderReceiveMessageCommand.Parameters[p_orderInfo].Value:" + orderReceiveMessageCommand.Parameters["p_orderInfo"].Value);
116+
if (orderReceiveMessageCommand.Parameters["p_orderInfo"] is null || orderReceiveMessageCommand.Parameters["p_orderInfo"].Value is null) {
117+
Console.WriteLine("message was null");
118+
System.Threading.Thread.Sleep(1000);
119+
continue;
120+
}
121+
Order order;
122+
try {
123+
order =
120124
JsonConvert
121125
.DeserializeObject<Order>("" +
122126
orderReceiveMessageCommand.Parameters["p_orderInfo"].Value);
127+
} catch (System.NullReferenceException ex) {
128+
Console.WriteLine("message was null" + ex);
129+
System.Threading.Thread.Sleep(1000);
130+
continue;
131+
}
123132
System
124133
.Console
125134
.WriteLine("order.itemid inventorychecked sendmessage for {0}",
@@ -200,15 +209,11 @@ public String ListenForMessages()
200209
);
201210
inventorySendMessageCommand.ExecuteNonQuery();
202211
}
212+
catch (NullReferenceException ex) {
213+
if(ex != null) System.Threading.Thread.Sleep(1000);
214+
}
203215
}
204-
catch (NullReferenceException ex)
205-
{
206-
System.Console.WriteLine("Exception: {0}", ex.ToString());
207-
return "fail:" + ex;
208-
}
209-
connection.Close();
210216
}
211-
return "complete";
212217
}
213218
}
214219
}

grabdish/inventory-dotnet/dequeueenqueue.sql

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,17 +12,18 @@ IS
1212
pragma exception_init(no_messages, -25228);
1313

1414
BEGIN
15-
16-
dequeue_options.wait := -1;
15+
dequeue_options.wait := dbms_aq.NO_WAIT;
16+
-- dequeue_options.wait := dbms_aq.FOREVER;
1717
-- dequeue_options.navigation := dbms_aq.FIRST_MESSAGE;
18+
-- dequeue_options.dequeue_mode := dbms_aq.LOCKED;
1819

1920
DBMS_AQ.DEQUEUE(
2021
queue_name => 'ORDERQUEUE',
2122
dequeue_options => dequeue_options,
2223
message_properties => message_properties,
2324
payload => message,
2425
msgid => message_handle);
25-
COMMIT;
26+
-- COMMIT;
2627

2728
-- p_action := message.get_string_property('action');
2829
-- p_orderid := message.get_int_property('orderid');
@@ -47,7 +48,7 @@ CREATE OR REPLACE PROCEDURE checkInventoryReturnLocation(p_inventoryId IN VARCHA
4748
IS
4849

4950
BEGIN
50-
update inventory set inventorycount = inventorycount - 1 where inventoryid = p_inventoryId and inventorycount > 0 returning inventorylocation into p_inventorylocation;
51+
update INVENTORYUSER.INVENTORY set inventorycount = inventorycount - 1 where inventoryid = p_inventoryId and inventorycount > 0 returning inventorylocation into p_inventorylocation;
5152
dbms_output.put_line('p_inventorylocation');
5253
dbms_output.put_line(p_inventorylocation);
5354
END;

grabdish/inventory-go/inventory.go

Lines changed: 80 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"fmt"
88
"log"
99
"os"
10+
"time"
1011

1112
"github.com/godror/godror"
1213
)
@@ -48,85 +49,90 @@ func main() {
4849
}
4950
fmt.Printf("Listening for messages... start time: %s\n", thedate)
5051
ctx := context.Background()
51-
for {
52-
listenForMessages(ctx, db)
53-
}
52+
listenForMessages(ctx, db)
5453
}
5554

5655
func listenForMessages(ctx context.Context, db *sql.DB) {
5756

58-
tx, err := db.BeginTx(ctx, nil)
59-
fmt.Println("__________________________________________")
60-
//receive order...
61-
var orderJSON string
62-
dequeueOrderMessageSproc := `BEGIN dequeueOrderMessage(:1); END;`
63-
if _, err := db.ExecContext(ctx, dequeueOrderMessageSproc, sql.Out{Dest: &orderJSON}); err != nil {
64-
log.Printf("Error running %q: %+v", dequeueOrderMessageSproc, err)
65-
return
66-
}
67-
fmt.Println("orderJSON:" + orderJSON)
68-
type Order struct {
69-
Orderid string
70-
Itemid string
71-
Deliverylocation string
72-
Status string
73-
Inventorylocation string
74-
SuggestiveSale string
75-
}
76-
var order Order
77-
jsonerr := json.Unmarshal([]byte(orderJSON), &order)
78-
if jsonerr != nil {
79-
fmt.Printf("Order Unmarshal fmt.Sprint(data) err = %s", jsonerr)
80-
}
81-
fmt.Printf("order.orderid: %s", order.Orderid)
82-
fmt.Println("__________________________________________")
83-
//check inventory...
84-
var inventorylocation string
85-
sqlString := "update INVENTORY set INVENTORYCOUNT = INVENTORYCOUNT - 1 where INVENTORYID = :inventoryid and INVENTORYCOUNT > 0 returning inventorylocation into :inventorylocation"
86-
_, errFromInventoryCheck := db.Exec(sqlString, sql.Named("inventoryid", order.Itemid), sql.Named("inventorylocation", sql.Out{Dest: &inventorylocation}))
87-
if err != nil {
88-
fmt.Println("errFromInventoryCheck: %s", errFromInventoryCheck)
89-
}
90-
// numRows, err := res.RowsAffected()
91-
// if err != nil {
92-
// fmt.Println(errFromInventoryCheck)
93-
// }
94-
// fmt.Println("numRows:" + string(numRows))
95-
if inventorylocation == "" {
96-
inventorylocation = "inventorydoesnotexist"
97-
}
98-
fmt.Println("inventorylocation:" + inventorylocation)
99-
fmt.Println("__________________________________________")
100-
//create inventory reply message...
101-
type Inventory struct {
102-
Orderid string `json:"orderid"`
103-
Itemid string `json:"itemid"`
104-
Inventorylocation string `json:"inventorylocation"`
105-
SuggestiveSale string `json:"suggestiveSale"`
106-
}
107-
inventory := &Inventory{
108-
Orderid: order.Orderid,
109-
Itemid: order.Itemid,
110-
Inventorylocation: inventorylocation,
111-
SuggestiveSale: "beer",
112-
}
113-
inventoryJsonData, err := json.Marshal(inventory)
114-
if err != nil {
115-
fmt.Println(err)
116-
}
117-
inventoryJsonString := string(inventoryJsonData)
118-
fmt.Println("inventoryJsonData:" + inventoryJsonString) // :inventoryid
119-
messageSendSproc := `BEGIN enqueueInventoryMessage(:1); END;`
120-
if _, err := db.ExecContext(ctx, messageSendSproc, inventoryJsonString); err != nil {
121-
log.Printf("Error running %q: %+v", messageSendSproc, err)
122-
return
123-
}
124-
fmt.Println("inventory status message sent:" + inventoryJsonString)
125-
commiterr := tx.Commit()
126-
if commiterr != nil {
127-
fmt.Println("commiterr:", commiterr)
57+
for {
58+
tx, err := db.BeginTx(ctx, nil)
59+
// fmt.Println("__________________________________________")
60+
//receive order...
61+
var orderJSON string
62+
dequeueOrderMessageSproc := `BEGIN dequeueOrderMessage(:1); END;`
63+
if _, err := db.ExecContext(ctx, dequeueOrderMessageSproc, sql.Out{Dest: &orderJSON}); err != nil {
64+
log.Printf("Error running %q: %+v", dequeueOrderMessageSproc, err)
65+
return
66+
}
67+
// fmt.Println("orderJSON:" + orderJSON)
68+
type Order struct {
69+
Orderid string
70+
Itemid string
71+
Deliverylocation string
72+
Status string
73+
Inventorylocation string
74+
SuggestiveSale string
75+
}
76+
var order Order
77+
jsonerr := json.Unmarshal([]byte(orderJSON), &order)
78+
if jsonerr != nil {
79+
tx.Commit()
80+
continue
81+
// fmt.Printf("Order Unmarshal fmt.Sprint(data) err = %s", jsonerr)
82+
}
83+
if jsonerr == nil {
84+
fmt.Printf("order.orderid: %s", order.Orderid)
85+
}
86+
// fmt.Println("__________________________________________")
87+
//check inventory...
88+
var inventorylocation string
89+
sqlString := "update INVENTORY set INVENTORYCOUNT = INVENTORYCOUNT - 1 where INVENTORYID = :inventoryid and INVENTORYCOUNT > 0 returning inventorylocation into :inventorylocation"
90+
_, errFromInventoryCheck := db.Exec(sqlString, sql.Named("inventoryid", order.Itemid), sql.Named("inventorylocation", sql.Out{Dest: &inventorylocation}))
91+
if err != nil {
92+
fmt.Println("errFromInventoryCheck: %s", errFromInventoryCheck)
93+
}
94+
// numRows, err := res.RowsAffected()
95+
// if err != nil {
96+
// fmt.Println(errFromInventoryCheck)
97+
// }
98+
// fmt.Println("numRows:" + string(numRows))
99+
if inventorylocation == "" {
100+
inventorylocation = "inventorydoesnotexist"
101+
}
102+
fmt.Println("inventorylocation:" + inventorylocation)
103+
// fmt.Println("__________________________________________")
104+
//create inventory reply message...
105+
type Inventory struct {
106+
Orderid string `json:"orderid"`
107+
Itemid string `json:"itemid"`
108+
Inventorylocation string `json:"inventorylocation"`
109+
SuggestiveSale string `json:"suggestiveSale"`
110+
}
111+
inventory := &Inventory{
112+
Orderid: order.Orderid,
113+
Itemid: order.Itemid,
114+
Inventorylocation: inventorylocation,
115+
SuggestiveSale: "beer",
116+
}
117+
inventoryJsonData, err := json.Marshal(inventory)
118+
if err != nil {
119+
fmt.Println(err)
120+
}
121+
inventoryJsonString := string(inventoryJsonData)
122+
fmt.Println("inventoryJsonData:" + inventoryJsonString) // :inventoryid
123+
messageSendSproc := `BEGIN enqueueInventoryMessage(:1); END;`
124+
if _, err := db.ExecContext(ctx, messageSendSproc, inventoryJsonString); err != nil {
125+
log.Printf("Error running %q: %+v", messageSendSproc, err)
126+
return
127+
}
128+
fmt.Println("inventory status message sent:" + inventoryJsonString)
129+
commiterr := tx.Commit()
130+
if commiterr != nil {
131+
fmt.Println("commiterr:", commiterr)
132+
}
133+
fmt.Println("commit complete for message sent:" + inventoryJsonString)
134+
time.Sleep(1 * time.Second)
128135
}
129-
fmt.Println("commit complete for message sent:" + inventoryJsonString)
130136
}
131137

132138
func listenForMessagesAQAPI(ctx context.Context, db *sql.DB) { //todo incomplete - using PL/SQL above

0 commit comments

Comments
 (0)