Skip to content

Commit d3abd9a

Browse files
committed
Deprecate keyed message boxes
Instead of using an array of clients identified by their indexes, the client address is attached to the requests. Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
1 parent f54ceeb commit d3abd9a

File tree

29 files changed

+651
-516
lines changed

29 files changed

+651
-516
lines changed

crates/core/tedge_actors/src/message_boxes.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ impl<Output> LoggingSender<Output> {
235235
}
236236

237237
#[async_trait]
238-
impl<Output: Message + Debug> Sender<Output> for LoggingSender<Output> {
238+
impl<Output: Message> Sender<Output> for LoggingSender<Output> {
239239
async fn send(&mut self, message: Output) -> Result<(), ChannelError> {
240240
log_message_sent(&self.name, &message);
241241
self.sender.send(message).await

crates/core/tedge_actors/src/servers/actors.rs

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use crate::Actor;
22
use crate::ConcurrentServerMessageBox;
33
use crate::MessageReceiver;
4+
use crate::RequestEnvelope;
45
use crate::RuntimeError;
56
use crate::RuntimeRequest;
67
use crate::Sender;
@@ -13,12 +14,12 @@ use async_trait::async_trait;
1314
/// Requests are processed in turn, leading either to a response or an error.
1415
pub struct ServerActor<S: Server> {
1516
server: S,
16-
messages: ServerMessageBox<S::Request, S::Response>,
17+
requests: ServerMessageBox<S::Request, S::Response>,
1718
}
1819

1920
impl<S: Server> ServerActor<S> {
20-
pub fn new(server: S, messages: ServerMessageBox<S::Request, S::Response>) -> Self {
21-
ServerActor { server, messages }
21+
pub fn new(server: S, requests: ServerMessageBox<S::Request, S::Response>) -> Self {
22+
ServerActor { server, requests }
2223
}
2324
}
2425

@@ -30,12 +31,17 @@ impl<S: Server> Actor for ServerActor<S> {
3031

3132
async fn run(mut self) -> Result<(), RuntimeError> {
3233
let server = &mut self.server;
33-
while let Some((client_id, request)) = self.messages.recv().await {
34+
while let Some(RequestEnvelope {
35+
request,
36+
mut reply_to,
37+
}) = self.requests.recv().await
38+
{
3439
tokio::select! {
35-
result = server.handle(request) => {
36-
self.messages.send((client_id, result)).await?
40+
response = server.handle(request) => {
41+
// Ignore errors on send: the requester is simply no more expecting a response
42+
let _ = reply_to.send(response).await;
3743
}
38-
Some(RuntimeRequest::Shutdown) = self.messages.recv_signal() => {
44+
Some(RuntimeRequest::Shutdown) = self.requests.recv_signal() => {
3945
break;
4046
}
4147
}
@@ -67,16 +73,21 @@ impl<S: Server + Clone> Actor for ConcurrentServerActor<S> {
6773
}
6874

6975
async fn run(mut self) -> Result<(), RuntimeError> {
70-
while let Some((client_id, request)) = self.messages.recv().await {
76+
while let Some(RequestEnvelope {
77+
request,
78+
mut reply_to,
79+
}) = self.messages.next_request().await
80+
{
7181
// Spawn the request
7282
let mut server = self.server.clone();
73-
let pending_result = tokio::spawn(async move {
83+
let request_handler = tokio::spawn(async move {
7484
let result = server.handle(request).await;
75-
(client_id, result)
85+
// Ignore errors on send: the requester is simply no more expecting a response
86+
let _ = reply_to.send(result).await;
7687
});
7788

7889
// Send the response back to the client
79-
self.messages.send_response_once_done(pending_result)
90+
self.messages.register_request_handler(request_handler)
8091
}
8192

8293
Ok(())

crates/core/tedge_actors/src/servers/builders.rs

Lines changed: 50 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,33 @@
11
use crate::mpsc;
22
use crate::Actor;
33
use crate::Builder;
4-
use crate::ClientId;
54
use crate::CloneSender;
65
use crate::ConcurrentServerActor;
76
use crate::ConcurrentServerMessageBox;
7+
use crate::DynRequestSender;
88
use crate::DynSender;
9-
use crate::KeyedSender;
109
use crate::LoggingReceiver;
11-
use crate::LoggingSender;
1210
use crate::Message;
11+
use crate::MessageSink;
1312
use crate::NoConfig;
13+
use crate::RequestEnvelope;
14+
use crate::RequestSender;
1415
use crate::RuntimeError;
1516
use crate::RuntimeRequest;
1617
use crate::RuntimeRequestSink;
17-
use crate::SenderVec;
1818
use crate::Server;
1919
use crate::ServerActor;
2020
use crate::ServerMessageBox;
2121
use crate::ServiceProvider;
22-
use crate::SimpleMessageBox;
2322
use std::convert::Infallible;
2423
use std::fmt::Debug;
2524

2625
/// A message box builder for request-response services
2726
pub struct ServerMessageBoxBuilder<Request: Debug, Response> {
28-
service_name: String,
2927
max_concurrency: usize,
30-
request_sender: mpsc::Sender<(ClientId, Request)>,
31-
input_receiver: LoggingReceiver<(ClientId, Request)>,
28+
request_sender: mpsc::Sender<RequestEnvelope<Request, Response>>,
29+
request_receiver: LoggingReceiver<RequestEnvelope<Request, Response>>,
3230
signal_sender: mpsc::Sender<RuntimeRequest>,
33-
clients: Vec<DynSender<Response>>,
3431
}
3532

3633
impl<Request: Message, Response: Message> ServerMessageBoxBuilder<Request, Response> {
@@ -39,16 +36,14 @@ impl<Request: Message, Response: Message> ServerMessageBoxBuilder<Request, Respo
3936
let max_concurrency = 1;
4037
let (request_sender, request_receiver) = mpsc::channel(capacity);
4138
let (signal_sender, signal_receiver) = mpsc::channel(4);
42-
let input_receiver =
39+
let request_receiver =
4340
LoggingReceiver::new(server_name.to_string(), request_receiver, signal_receiver);
4441

4542
ServerMessageBoxBuilder {
46-
service_name: server_name.to_string(),
4743
max_concurrency,
4844
request_sender,
49-
input_receiver,
45+
request_receiver,
5046
signal_sender,
51-
clients: vec![],
5247
}
5348
}
5449

@@ -59,19 +54,19 @@ impl<Request: Message, Response: Message> ServerMessageBoxBuilder<Request, Respo
5954
}
6055
}
6156

57+
/// Return a sender for the requests
58+
pub fn request_sender(&self) -> DynRequestSender<Request, Response> {
59+
self.request_sender.sender_clone()
60+
}
61+
6262
/// Build a message box ready to be used by the server actor
6363
fn build_server(self) -> ServerMessageBox<Request, Response> {
64-
let response_sender = SenderVec::new_sender(self.clients);
65-
let logging_sender = LoggingSender::new(self.service_name.clone(), response_sender);
66-
67-
SimpleMessageBox::new(self.input_receiver, logging_sender)
64+
self.request_receiver
6865
}
6966

7067
/// Build a message box aimed to concurrently serve requests
7168
fn build_concurrent(self) -> ConcurrentServerMessageBox<Request, Response> {
72-
let max_concurrency = self.max_concurrency;
73-
let clients = self.build_server();
74-
ConcurrentServerMessageBox::new(max_concurrency, clients)
69+
ConcurrentServerMessageBox::new(self.max_concurrency, self.request_receiver)
7570
}
7671
}
7772

@@ -84,15 +79,23 @@ impl<Req: Message, Res: Message> RuntimeRequestSink for ServerMessageBoxBuilder<
8479
impl<Req: Message, Res: Message> ServiceProvider<Req, Res, NoConfig>
8580
for ServerMessageBoxBuilder<Req, Res>
8681
{
87-
fn connect_consumer(
88-
&mut self,
89-
_config: NoConfig,
90-
response_sender: DynSender<Res>,
91-
) -> DynSender<Req> {
92-
let client_id = self.clients.len();
93-
let request_sender = KeyedSender::new_sender(client_id, self.request_sender.clone());
94-
self.clients.push(response_sender);
95-
request_sender
82+
fn connect_consumer(&mut self, _config: NoConfig, reply_to: DynSender<Res>) -> DynSender<Req> {
83+
Box::new(RequestSender {
84+
sender: self.request_sender.sender_clone(),
85+
reply_to,
86+
})
87+
}
88+
}
89+
90+
impl<Req: Message, Res: Message> MessageSink<RequestEnvelope<Req, Res>, NoConfig>
91+
for ServerMessageBoxBuilder<Req, Res>
92+
{
93+
fn get_config(&self) -> NoConfig {
94+
NoConfig
95+
}
96+
97+
fn get_sender(&self) -> DynSender<RequestEnvelope<Req, Res>> {
98+
self.request_sender().sender_clone()
9699
}
97100
}
98101

@@ -138,8 +141,7 @@ pub struct ServerActorBuilder<S: Server, K> {
138141

139142
impl<S: Server, K> ServerActorBuilder<S, K> {
140143
pub fn new(server: S, config: &ServerConfig, kind: K) -> Self {
141-
let service_name = server.name().to_string();
142-
let box_builder = ServerMessageBoxBuilder::new(&service_name, config.capacity)
144+
let box_builder = ServerMessageBoxBuilder::new(server.name(), config.capacity)
143145
.with_max_concurrency(config.max_concurrency);
144146

145147
ServerActorBuilder {
@@ -148,6 +150,11 @@ impl<S: Server, K> ServerActorBuilder<S, K> {
148150
box_builder,
149151
}
150152
}
153+
154+
/// Return a sender for the requests
155+
pub fn request_sender(&self) -> DynRequestSender<S::Request, S::Response> {
156+
self.box_builder.request_sender()
157+
}
151158
}
152159

153160
impl<S: Server> ServerActorBuilder<S, Sequential> {
@@ -204,6 +211,18 @@ impl<S: Server, K> ServiceProvider<S::Request, S::Response, NoConfig> for Server
204211
}
205212
}
206213

214+
impl<S: Server, K> MessageSink<RequestEnvelope<S::Request, S::Response>, NoConfig>
215+
for ServerActorBuilder<S, K>
216+
{
217+
fn get_config(&self) -> NoConfig {
218+
self.box_builder.get_config()
219+
}
220+
221+
fn get_sender(&self) -> DynSender<RequestEnvelope<S::Request, S::Response>> {
222+
self.box_builder.get_sender()
223+
}
224+
}
225+
207226
impl<S: Server, K> RuntimeRequestSink for ServerActorBuilder<S, K> {
208227
fn get_signal_sender(&self) -> DynSender<RuntimeRequest> {
209228
self.box_builder.get_signal_sender()

crates/core/tedge_actors/src/servers/keyed_messages.rs

Lines changed: 0 additions & 64 deletions
This file was deleted.

0 commit comments

Comments
 (0)