Skip to content

Commit 0d6b36a

Browse files
committed
refactor: modularized handlers
1 parent 4caa320 commit 0d6b36a

File tree

7 files changed

+221
-114
lines changed

7 files changed

+221
-114
lines changed

src/agent/pythd/api/rpc.rs

Lines changed: 14 additions & 114 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@ use {
5252
tokio::sync::{
5353
broadcast,
5454
mpsc,
55-
oneshot,
5655
},
5756
warp::{
5857
ws::{
@@ -327,119 +326,20 @@ async fn dispatch_and_catch_error(
327326
}
328327
}
329328

330-
async fn get_product_list(
331-
adapter_tx: &mpsc::Sender<adapter::Message>,
332-
) -> Result<serde_json::Value> {
333-
let (result_tx, result_rx) = oneshot::channel();
334-
adapter_tx
335-
.send(adapter::Message::GetProductList { result_tx })
336-
.await?;
337-
Ok(serde_json::to_value(result_rx.await??)?)
338-
}
339-
340-
async fn get_product(
341-
adapter_tx: &mpsc::Sender<adapter::Message>,
342-
request: &Request<Method, Value>,
343-
) -> Result<serde_json::Value> {
344-
let params: GetProductParams = {
345-
let value = request.params.clone();
346-
serde_json::from_value(value.ok_or_else(|| anyhow!("Missing request parameters"))?)
347-
}?;
348-
349-
let (result_tx, result_rx) = oneshot::channel();
350-
adapter_tx
351-
.send(adapter::Message::GetProduct {
352-
account: params.account,
353-
result_tx,
354-
})
355-
.await?;
356-
Ok(serde_json::to_value(result_rx.await??)?)
357-
}
358-
359-
async fn get_all_products(
360-
adapter_tx: &mpsc::Sender<adapter::Message>,
361-
) -> Result<serde_json::Value> {
362-
let (result_tx, result_rx) = oneshot::channel();
363-
adapter_tx
364-
.send(adapter::Message::GetAllProducts { result_tx })
365-
.await?;
366-
Ok(serde_json::to_value(result_rx.await??)?)
367-
}
368-
369-
async fn subscribe_price(
370-
adapter_tx: &mpsc::Sender<adapter::Message>,
371-
notify_price_tx: &mpsc::Sender<NotifyPrice>,
372-
request: &Request<Method, Value>,
373-
) -> Result<serde_json::Value> {
374-
let params: SubscribePriceParams = serde_json::from_value(
375-
request
376-
.params
377-
.clone()
378-
.ok_or_else(|| anyhow!("Missing request parameters"))?,
379-
)?;
380-
381-
let (result_tx, result_rx) = oneshot::channel();
382-
adapter_tx
383-
.send(adapter::Message::SubscribePrice {
384-
result_tx,
385-
account: params.account,
386-
notify_price_tx: notify_price_tx.clone(),
387-
})
388-
.await?;
389-
390-
Ok(serde_json::to_value(SubscribeResult {
391-
subscription: result_rx.await??,
392-
})?)
393-
}
394-
395-
async fn subscribe_price_sched(
396-
adapter_tx: &mpsc::Sender<adapter::Message>,
397-
notify_price_sched_tx: &mpsc::Sender<NotifyPriceSched>,
398-
request: &Request<Method, Value>,
399-
) -> Result<serde_json::Value> {
400-
let params: SubscribePriceSchedParams = serde_json::from_value(
401-
request
402-
.params
403-
.clone()
404-
.ok_or_else(|| anyhow!("Missing request parameters"))?,
405-
)?;
406-
407-
let (result_tx, result_rx) = oneshot::channel();
408-
adapter_tx
409-
.send(adapter::Message::SubscribePriceSched {
410-
result_tx,
411-
account: params.account,
412-
notify_price_sched_tx: notify_price_sched_tx.clone(),
413-
})
414-
.await?;
415-
416-
Ok(serde_json::to_value(SubscribeResult {
417-
subscription: result_rx.await??,
418-
})?)
419-
}
420-
421-
async fn update_price(
422-
adapter_tx: &mpsc::Sender<adapter::Message>,
423-
request: &Request<Method, Value>,
424-
) -> Result<serde_json::Value> {
425-
let params: UpdatePriceParams = serde_json::from_value(
426-
request
427-
.params
428-
.clone()
429-
.ok_or_else(|| anyhow!("Missing request parameters"))?,
430-
)?;
431-
432-
adapter_tx
433-
.send(adapter::Message::UpdatePrice {
434-
account: params.account,
435-
price: params.price,
436-
conf: params.conf,
437-
status: params.status,
438-
})
439-
.await?;
440-
441-
Ok(serde_json::to_value(0)?)
442-
}
329+
mod get_all_products;
330+
mod get_product;
331+
mod get_product_list;
332+
mod subscribe_price;
333+
mod subscribe_price_sched;
334+
mod update_price;
335+
use {
336+
get_all_products::*,
337+
get_product::*,
338+
get_product_list::*,
339+
subscribe_price::*,
340+
subscribe_price_sched::*,
341+
update_price::*,
342+
};
443343

444344
async fn send_error(
445345
ws_tx: &mut SplitSink<WebSocket, Message>,
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
use {
2+
crate::agent::pythd::adapter,
3+
anyhow::Result,
4+
tokio::sync::{
5+
mpsc,
6+
oneshot,
7+
},
8+
};
9+
10+
pub async fn get_all_products(
11+
adapter_tx: &mpsc::Sender<adapter::Message>,
12+
) -> Result<serde_json::Value> {
13+
let (result_tx, result_rx) = oneshot::channel();
14+
adapter_tx
15+
.send(adapter::Message::GetAllProducts { result_tx })
16+
.await?;
17+
Ok(serde_json::to_value(result_rx.await??)?)
18+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
use {
2+
super::{
3+
GetProductParams,
4+
Method,
5+
},
6+
crate::agent::pythd::adapter,
7+
anyhow::{
8+
anyhow,
9+
Result,
10+
},
11+
jrpc::{
12+
Request,
13+
Value,
14+
},
15+
tokio::sync::{
16+
mpsc,
17+
oneshot,
18+
},
19+
};
20+
21+
pub async fn get_product(
22+
adapter_tx: &mpsc::Sender<adapter::Message>,
23+
request: &Request<Method, Value>,
24+
) -> Result<serde_json::Value> {
25+
let params: GetProductParams = {
26+
let value = request.params.clone();
27+
serde_json::from_value(value.ok_or_else(|| anyhow!("Missing request parameters"))?)
28+
}?;
29+
30+
let (result_tx, result_rx) = oneshot::channel();
31+
adapter_tx
32+
.send(adapter::Message::GetProduct {
33+
account: params.account,
34+
result_tx,
35+
})
36+
.await?;
37+
Ok(serde_json::to_value(result_rx.await??)?)
38+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
use {
2+
crate::agent::pythd::adapter,
3+
anyhow::Result,
4+
tokio::sync::{
5+
mpsc,
6+
oneshot,
7+
},
8+
};
9+
10+
pub async fn get_product_list(
11+
adapter_tx: &mpsc::Sender<adapter::Message>,
12+
) -> Result<serde_json::Value> {
13+
let (result_tx, result_rx) = oneshot::channel();
14+
adapter_tx
15+
.send(adapter::Message::GetProductList { result_tx })
16+
.await?;
17+
Ok(serde_json::to_value(result_rx.await??)?)
18+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
use {
2+
super::{
3+
Method,
4+
NotifyPrice,
5+
SubscribePriceParams,
6+
SubscribeResult,
7+
},
8+
crate::agent::pythd::adapter,
9+
anyhow::{
10+
anyhow,
11+
Result,
12+
},
13+
jrpc::{
14+
Request,
15+
Value,
16+
},
17+
tokio::sync::{
18+
mpsc,
19+
oneshot,
20+
},
21+
};
22+
23+
pub async fn subscribe_price(
24+
adapter_tx: &mpsc::Sender<adapter::Message>,
25+
notify_price_tx: &mpsc::Sender<NotifyPrice>,
26+
request: &Request<Method, Value>,
27+
) -> Result<serde_json::Value> {
28+
let params: SubscribePriceParams = serde_json::from_value(
29+
request
30+
.params
31+
.clone()
32+
.ok_or_else(|| anyhow!("Missing request parameters"))?,
33+
)?;
34+
35+
let (result_tx, result_rx) = oneshot::channel();
36+
adapter_tx
37+
.send(adapter::Message::SubscribePrice {
38+
result_tx,
39+
account: params.account,
40+
notify_price_tx: notify_price_tx.clone(),
41+
})
42+
.await?;
43+
44+
Ok(serde_json::to_value(SubscribeResult {
45+
subscription: result_rx.await??,
46+
})?)
47+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
use {
2+
super::{
3+
Method,
4+
NotifyPriceSched,
5+
SubscribePriceSchedParams,
6+
SubscribeResult,
7+
},
8+
crate::agent::pythd::adapter,
9+
anyhow::{
10+
anyhow,
11+
Result,
12+
},
13+
jrpc::{
14+
Request,
15+
Value,
16+
},
17+
tokio::sync::{
18+
mpsc,
19+
oneshot,
20+
},
21+
};
22+
23+
pub async fn subscribe_price_sched(
24+
adapter_tx: &mpsc::Sender<adapter::Message>,
25+
notify_price_sched_tx: &mpsc::Sender<NotifyPriceSched>,
26+
request: &Request<Method, Value>,
27+
) -> Result<serde_json::Value> {
28+
let params: SubscribePriceSchedParams = serde_json::from_value(
29+
request
30+
.params
31+
.clone()
32+
.ok_or_else(|| anyhow!("Missing request parameters"))?,
33+
)?;
34+
35+
let (result_tx, result_rx) = oneshot::channel();
36+
adapter_tx
37+
.send(adapter::Message::SubscribePriceSched {
38+
result_tx,
39+
account: params.account,
40+
notify_price_sched_tx: notify_price_sched_tx.clone(),
41+
})
42+
.await?;
43+
44+
Ok(serde_json::to_value(SubscribeResult {
45+
subscription: result_rx.await??,
46+
})?)
47+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
use {
2+
super::{
3+
Method,
4+
UpdatePriceParams,
5+
},
6+
crate::agent::pythd::adapter,
7+
anyhow::{
8+
anyhow,
9+
Result,
10+
},
11+
jrpc::{
12+
Request,
13+
Value,
14+
},
15+
tokio::sync::mpsc,
16+
};
17+
18+
pub async fn update_price(
19+
adapter_tx: &mpsc::Sender<adapter::Message>,
20+
request: &Request<Method, Value>,
21+
) -> Result<serde_json::Value> {
22+
let params: UpdatePriceParams = serde_json::from_value(
23+
request
24+
.params
25+
.clone()
26+
.ok_or_else(|| anyhow!("Missing request parameters"))?,
27+
)?;
28+
29+
adapter_tx
30+
.send(adapter::Message::UpdatePrice {
31+
account: params.account,
32+
price: params.price,
33+
conf: params.conf,
34+
status: params.status,
35+
})
36+
.await?;
37+
38+
Ok(serde_json::to_value(0)?)
39+
}

0 commit comments

Comments
 (0)