Skip to content

Commit f5aa48f

Browse files
committed
refactor(server): make command smaller and read more data from storage
1 parent 5219791 commit f5aa48f

File tree

8 files changed

+95
-38
lines changed

8 files changed

+95
-38
lines changed

server/src/bin/dispatcher.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
1+
use actix_web::web::Data;
2+
13
use server::amqp::establish_connection_with_rabbit;
24
use server::dispatch_consumer::consume;
35
use server::logs::init_log;
6+
use server::storage::Storage;
47

58
#[tokio::main]
69
async fn main() {
710
init_log();
811
let channel = establish_connection_with_rabbit().await;
912

10-
consume(channel, "dispatcher").await
13+
consume(channel, "dispatcher", Data::new(Storage::new())).await
1114
}

server/src/bin/server.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ async fn main() -> std::io::Result<()> {
1919
let port: u16 = Env::env_or("SERVER_PORT", 8090);
2020

2121
let storage = Data::new(Storage::new());
22+
let storage_for_consumer = storage.clone();
2223
let channel = establish_connection_with_rabbit().await;
2324
let publisher = Data::new(Publisher::new(channel.clone()));
2425
let app = move || {
@@ -33,7 +34,7 @@ async fn main() -> std::io::Result<()> {
3334
ip, port
3435
);
3536

36-
rt::spawn(async move { consume(channel, "dispatcher-in-server").await });
37+
rt::spawn(async move { consume(channel, "dispatcher-in-server", storage_for_consumer).await });
3738

3839
HttpServer::new(app).bind((ip, port))?.run().await
3940
}

server/src/circuit_breaker.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use std::collections::HashMap;
2+
use std::future::Future;
23

34
use log::debug;
45

@@ -33,17 +34,18 @@ impl CircuitBreaker {
3334
self.states.get(&key).unwrap_or(&State::Open) == &State::Closed
3435
}
3536

36-
pub fn call<T, E, F>(&mut self, key: String, function: F) -> Result<T, Error<E>>
37+
pub async fn call<T, E, F, Fut>(&mut self, key: String, function: F) -> Result<T, Error<E>>
3738
where
38-
F: FnOnce() -> Result<T, E>,
39+
F: FnOnce() -> Fut,
40+
Fut: Future<Output = Result<T, E>>,
3941
{
4042
if self.is_call_permitted(key.clone()) {
4143
debug!("Service {} closed", key);
4244

4345
return Err(Error::Rejected);
4446
}
4547

46-
match function() {
48+
match function().await {
4749
Ok(ok) => {
4850
self.storage.entry(key.clone()).or_insert(0);
4951

server/src/cmd.rs

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
use serde::{Deserialize, Serialize};
2-
use url::Url;
32

43
use crate::configuration::domain::EndpointId;
5-
use crate::events::domain::{MessageId, Payload};
4+
use crate::events::domain::MessageId;
65

76
#[derive(Serialize, Deserialize)]
87
#[serde(tag = "t", content = "c")]
@@ -12,18 +11,14 @@ pub enum AsyncMessage {
1211

1312
#[derive(Serialize, Deserialize, Debug, Clone)]
1413
pub struct SentMessage {
15-
pub payload: String,
16-
pub url: String,
17-
pub msg_id: String,
14+
msg_id: String,
1815
pub attempt: usize,
19-
pub endpoint_id: String,
16+
endpoint_id: String,
2017
}
2118

2219
impl SentMessage {
23-
pub fn new(payload: Payload, url: Url, msg_id: MessageId, endpoint_id: EndpointId) -> Self {
20+
pub fn new(msg_id: MessageId, endpoint_id: EndpointId) -> Self {
2421
Self {
25-
payload: payload.to_string(),
26-
url: url.to_string(),
2722
msg_id: msg_id.to_string(),
2823
attempt: 1,
2924
endpoint_id: endpoint_id.to_string(),
@@ -32,11 +27,17 @@ impl SentMessage {
3227

3328
pub fn with_increased_attempt(&self) -> SentMessage {
3429
Self {
35-
payload: self.payload.clone(),
36-
url: self.url.clone(),
3730
msg_id: self.msg_id.clone(),
3831
attempt: self.attempt + 1,
3932
endpoint_id: self.endpoint_id.clone(),
4033
}
4134
}
35+
36+
pub fn msg_id(&self) -> MessageId {
37+
MessageId::try_from(self.msg_id.clone()).unwrap()
38+
}
39+
40+
pub fn endpoint_id(&self) -> EndpointId {
41+
EndpointId::try_from(self.endpoint_id.clone()).unwrap()
42+
}
4243
}

server/src/dispatch_consumer.rs

Lines changed: 52 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,19 @@
11
use std::time::Duration;
22

3+
use actix_web::web::Data;
34
use futures_lite::StreamExt;
45
use lapin::options::{BasicAckOptions, BasicConsumeOptions};
56
use lapin::types::FieldTable;
67
use lapin::Channel;
7-
use log::{debug, info};
8+
use log::{debug, error, info};
89

910
use crate::amqp::{Publisher, Serializer, SENT_MESSAGE_QUEUE};
1011
use crate::circuit_breaker::{CircuitBreaker, Error};
1112
use crate::cmd::AsyncMessage;
1213
use crate::retry::RetryPolicyBuilder;
14+
use crate::storage::Storage;
1315

14-
pub async fn consume(channel: Channel, consumer_tag: &str) {
16+
pub async fn consume(channel: Channel, consumer_tag: &str, storage: Data<Storage>) {
1517
let retry_policy = RetryPolicyBuilder::new()
1618
.max_retries(5)
1719
.exponential(2, Duration::from_secs(2))
@@ -44,22 +46,52 @@ pub async fn consume(channel: Channel, consumer_tag: &str) {
4446
info!("message consumed: {:?}", cmd);
4547

4648
// todo allow to revive endpoint, check endpoint status and reset circuit breaker
49+
let endpoint_id = cmd.endpoint_id();
50+
let msg = storage.messages.get(cmd.msg_id());
51+
52+
if msg.is_err() {
53+
error!(
54+
"Message {} doesn't not exists and cannot be dispatched",
55+
cmd.msg_id()
56+
);
57+
58+
delivery.ack(BasicAckOptions::default()).await.expect("ack");
59+
60+
continue;
61+
}
62+
63+
let endpoint = storage.endpoints.get(&endpoint_id);
64+
if endpoint.is_err() {
65+
error!(
66+
"Endpoint {} doesn't not exists and message {} cannot be dispatched",
67+
endpoint_id,
68+
cmd.msg_id()
69+
);
70+
71+
delivery.ack(BasicAckOptions::default()).await.expect("ack");
72+
73+
continue;
74+
}
75+
76+
let msg = msg.unwrap();
77+
let endpoint = endpoint.unwrap();
78+
79+
debug!(
80+
"Message {} for endpoint {} is being prepared to send",
81+
msg.id.to_string(),
82+
endpoint.id.to_string()
83+
);
4784

48-
let key = cmd.endpoint_id.to_string();
4985
let func = || {
50-
reqwest::blocking::Client::new()
51-
.post(&cmd.url)
52-
.json(&cmd.payload.as_str())
86+
reqwest::Client::new()
87+
.post(endpoint.url)
88+
.json(msg.payload.to_string().as_str())
5389
.send()
5490
};
5591

56-
let log_error_response = |res: reqwest::Error| {
57-
let status: String = res.status().map_or(String::from("-"), |s| s.to_string());
58-
59-
debug!("Error response! Status: {}, Error: {}", status, res);
60-
};
92+
let key = endpoint_id.to_string();
6193

62-
match circuit_breaker.call(key.clone(), func) {
94+
match circuit_breaker.call(key.clone(), func).await {
6395
Ok(res) => {
6496
debug!("Success! {}", res.status())
6597
}
@@ -92,7 +124,8 @@ pub async fn consume(channel: Channel, consumer_tag: &str) {
92124
Error::Rejected => {
93125
debug!(
94126
"Endpoint {} is closed. Message {} rejected.",
95-
key, cmd.msg_id
127+
key,
128+
cmd.msg_id()
96129
);
97130

98131
// todo do something with message? add to some "not delivered" bucket?
@@ -103,3 +136,9 @@ pub async fn consume(channel: Channel, consumer_tag: &str) {
103136
delivery.ack(BasicAckOptions::default()).await.expect("ack");
104137
}
105138
}
139+
140+
fn log_error_response(res: reqwest::Error) {
141+
let status: String = res.status().map_or(String::from("-"), |s| s.to_string());
142+
143+
debug!("Error response! Status: {}, Error: {}", status, res);
144+
}

server/src/events/domain.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
1-
use crate::configuration::domain::{ApplicationId, Topic};
21
use std::fmt::{Display, Formatter};
32

3+
use crate::configuration::domain::{ApplicationId, Topic};
4+
45
#[derive(Debug, Clone)]
56
pub struct Payload {
67
body: String,
@@ -18,7 +19,7 @@ impl Display for Payload {
1819
}
1920
}
2021

21-
#[derive(Debug, Clone, derive::Ksuid)]
22+
#[derive(Debug, Clone, derive::Ksuid, Eq, PartialEq)]
2223
#[prefix = "msg"]
2324
pub struct MessageId {
2425
id: String,

server/src/events/handlers.rs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,12 +50,7 @@ pub async fn create_message_handler(
5050
for endpoint in active_endpoints {
5151
debug!("{} sending to {}", msg.id, endpoint.url);
5252

53-
let cmd = SentMessage::new(
54-
msg.payload.clone(),
55-
endpoint.url,
56-
msg.id.clone(),
57-
endpoint.id,
58-
);
53+
let cmd = SentMessage::new(msg.id.clone(), endpoint.id);
5954
let message = AsyncMessage::SentMessage(cmd);
6055

6156
dispatcher.publish(message).await;

server/src/events/storage.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,15 @@
1-
use crate::events::domain::Message;
21
use std::sync::Mutex;
32

3+
use crate::error::Error;
4+
use crate::error::Error::EntityNotFound;
5+
use crate::events::domain::{Message, MessageId};
6+
47
pub trait MessageStorage {
58
fn save(&self, app: Message);
69

710
fn count(&self) -> usize;
11+
12+
fn get(&self, message_id: MessageId) -> Result<Message, Error>;
813
}
914

1015
pub struct InMemoryMessageStorage {
@@ -31,4 +36,14 @@ impl MessageStorage for InMemoryMessageStorage {
3136

3237
messages.len()
3338
}
39+
40+
fn get(&self, message_id: MessageId) -> Result<Message, Error> {
41+
let messages = self.messages.lock().unwrap();
42+
43+
messages
44+
.clone()
45+
.into_iter()
46+
.find(|msg| msg.id.eq(&message_id))
47+
.ok_or_else(|| EntityNotFound("Message not found".to_string()))
48+
}
3449
}

0 commit comments

Comments
 (0)