Skip to content

Commit 49ebbbf

Browse files
committed
Remove impl ServiceProvider for ServerActorBuilder
Connecting a server actor to its clients has been the main motivation for the ServiceProvider trait which where used to let the client and the server exchange senders, one given by the server to the client requests and another given by the client to server responses. With the new mechanism of RequestEnvelope, this exchange is no more useful. The actor server is a simple MessageSink accepting requests wrapped in an envelope with a reply-to address, i.e. a sender. Clients only needs a sender to post their request. The next step will be to fully deprecate the ServiceProvider trait, in favor of the combination of two traits MessageSource + MessageSink. Indeed, the complexity of the ServiceProvider was only used by server actors and not the actors, such as the MQTTActor, for which the messages received and sent don't have a request-response relationship. Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
1 parent 8058514 commit 49ebbbf

File tree

15 files changed

+93
-125
lines changed

15 files changed

+93
-125
lines changed

crates/common/batcher/src/driver.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,6 @@ mod tests {
178178
use std::time::Duration;
179179
use tedge_actors::test_helpers::ServiceProviderExt;
180180
use tedge_actors::Builder;
181-
use tedge_actors::NoConfig;
182181
use tedge_actors::SimpleMessageBoxBuilder;
183182
use tokio::time::timeout;
184183

@@ -256,7 +255,7 @@ mod tests {
256255
.build();
257256
let batcher = Batcher::new(config);
258257
let mut box_builder = SimpleMessageBoxBuilder::new("test", 1);
259-
let test_box = box_builder.new_client_box(NoConfig);
258+
let test_box = box_builder.new_client_box();
260259
let driver_box = box_builder.build();
261260

262261
let driver = BatchDriver::new(batcher, driver_box);

crates/core/tedge_actors/src/actors.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ pub mod tests {
7777
#[tokio::test]
7878
async fn running_an_actor_without_a_runtime() {
7979
let mut box_builder = SimpleMessageBoxBuilder::new("test", 16);
80-
let mut client_message_box = box_builder.new_client_box(NoConfig);
80+
let mut client_message_box = box_builder.new_client_box();
8181
let mut runtime_box = box_builder.get_signal_sender();
8282
let actor_message_box = box_builder.build();
8383
let actor = Echo {

crates/core/tedge_actors/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@
133133
//! let mut actor_box_builder = SimpleMessageBoxBuilder::new("Actor", 10);
134134
//!
135135
//! // Create a test box ready then the actor box
136-
//! let mut test_box = actor_box_builder.new_client_box(NoConfig);
136+
//! let mut test_box = actor_box_builder.new_client_box();
137137
//! let actor_box = actor_box_builder.build();
138138
//!
139139
//! // The actor is then spawn in the background with its message box.

crates/core/tedge_actors/src/servers/builders.rs

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -201,16 +201,6 @@ impl<S: Server + Clone> Builder<ConcurrentServerActor<S>> for ServerActorBuilder
201201
}
202202
}
203203

204-
impl<S: Server, K> ServiceProvider<S::Request, S::Response, NoConfig> for ServerActorBuilder<S, K> {
205-
fn connect_consumer(
206-
&mut self,
207-
config: NoConfig,
208-
response_sender: DynSender<S::Response>,
209-
) -> DynSender<S::Request> {
210-
self.box_builder.connect_consumer(config, response_sender)
211-
}
212-
}
213-
214204
impl<S: Server, K> MessageSink<RequestEnvelope<S::Request, S::Response>, NoConfig>
215205
for ServerActorBuilder<S, K>
216206
{

crates/core/tedge_actors/src/servers/message_boxes.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,6 @@ mod tests {
166166
use crate::Builder;
167167
use crate::ConcurrentServerActor;
168168
use crate::DynSender;
169-
use crate::NoConfig;
170169
use crate::Runtime;
171170
use crate::RuntimeRequest;
172171
use crate::RuntimeRequestSink;
@@ -181,7 +180,7 @@ mod tests {
181180
#[tokio::test]
182181
async fn only_processes_messages_up_to_max_concurrency() {
183182
let mut builder = ServerMessageBoxBuilder::new("ConcurrentServerMessageBoxTest", 16);
184-
let mut test_box = builder.new_client_box(NoConfig);
183+
let mut test_box = builder.new_client_box();
185184
let message_box: ServerMessageBox<i32, i32> = builder.build();
186185
let mut concurrent_box = ConcurrentServerMessageBox::new(4, message_box);
187186

@@ -285,7 +284,7 @@ mod tests {
285284
}
286285

287286
fn client_box(&mut self) -> SimpleMessageBox<i32, i32> {
288-
self.box_builder.new_client_box(NoConfig)
287+
self.box_builder.new_client_box()
289288
}
290289
}
291290

crates/core/tedge_actors/src/servers/mod.rs

Lines changed: 14 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,8 @@
6868
//! let mut handle = actor.request_sender();
6969
//!
7070
//! // This handle can then be used to connect client message boxes
71-
//! let mut client_1 = handle.new_client_box(NoConfig);
72-
//! let mut client_2 = handle.new_client_box(NoConfig);
71+
//! let mut client_1 = handle.new_client_box();
72+
//! let mut client_2 = handle.new_client_box();
7373
//!
7474
//! // The actor is then spawn in the background.
7575
//! tokio::spawn(async move { actor.run().await } );
@@ -103,7 +103,6 @@ use crate::Message;
103103
use crate::MessageSink;
104104
use crate::NoConfig;
105105
use crate::Sender;
106-
use crate::ServiceProvider;
107106
use async_trait::async_trait;
108107

109108
/// Define how a server process a request
@@ -141,28 +140,23 @@ impl<Request: Debug, Response> Debug for RequestEnvelope<Request, Response> {
141140
/// A request sender to some [Server]
142141
pub type DynRequestSender<Request, Response> = DynSender<RequestEnvelope<Request, Response>>;
143142

144-
impl<Request: Message, Response: Message> ServiceProvider<Request, Response, NoConfig>
145-
for DynRequestSender<Request, Response>
146-
{
147-
fn connect_consumer(
148-
&mut self,
149-
_config: NoConfig,
150-
reply_to: DynSender<Response>,
151-
) -> DynSender<Request> {
152-
Box::new(RequestSender {
153-
sender: self.sender_clone(),
154-
reply_to,
155-
})
156-
}
157-
}
158-
159143
/// A connector to a [Server] expecting Request and returning Response.
160144
pub trait Service<Request: Message, Response: Message>:
161145
MessageSink<RequestEnvelope<Request, Response>, NoConfig>
162146
{
147+
/// Connect a request message box to the server box under construction
148+
fn add_requester(&mut self, response_sender: DynSender<Response>) -> DynSender<Request>;
163149
}
164150

165-
impl<T, Request: Message, Response: Message> Service<Request, Response> for T where
166-
T: MessageSink<RequestEnvelope<Request, Response>, NoConfig>
151+
impl<T, Request: Message, Response: Message> Service<Request, Response> for T
152+
where
153+
T: MessageSink<RequestEnvelope<Request, Response>, NoConfig>,
167154
{
155+
fn add_requester(&mut self, reply_to: DynSender<Response>) -> DynSender<Request> {
156+
let request_sender = RequestSender {
157+
sender: self.get_sender(),
158+
reply_to,
159+
};
160+
request_sender.into()
161+
}
168162
}

crates/core/tedge_actors/src/test_helpers.rs

Lines changed: 34 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
use crate::mpsc;
33
use crate::Builder;
44
use crate::ChannelError;
5+
use crate::CloneSender;
6+
use crate::DynRequestSender;
57
use crate::DynSender;
68
use crate::Message;
79
use crate::MessageReceiver;
@@ -11,10 +13,11 @@ use crate::NoConfig;
1113
use crate::NoMessage;
1214
use crate::NullSender;
1315
use crate::RequestEnvelope;
16+
use crate::RequestSender;
1417
use crate::RuntimeRequest;
1518
use crate::Sender;
19+
use crate::ServerMessageBoxBuilder;
1620
use crate::ServiceConsumer;
17-
use crate::ServiceProvider;
1821
use crate::SimpleMessageBox;
1922
use crate::SimpleMessageBoxBuilder;
2023
use crate::WrappedInput;
@@ -651,56 +654,39 @@ impl<I: MessagePlus, O: MessagePlus> MessageSink<I, NoConfig> for Probe<I, O> {
651654
}
652655
}
653656

654-
pub trait ServiceProviderExt<I: Message, O: Message, C> {
657+
pub trait ServiceProviderExt<I: Message, O: Message> {
655658
/// Create a simple message box connected to a box under construction.
656-
fn new_client_box(&mut self, config: C) -> SimpleMessageBox<O, I>;
659+
fn new_client_box(&mut self) -> SimpleMessageBox<O, I>;
657660
}
658661

659-
impl<I, O, C, T> ServiceProviderExt<I, O, C> for T
660-
where
661-
I: Message,
662-
O: Message,
663-
C: Clone,
664-
T: ServiceProvider<I, O, C>,
665-
{
666-
fn new_client_box(&mut self, config: C) -> SimpleMessageBox<O, I> {
662+
impl<I: Message, O: Message> ServiceProviderExt<I, O> for DynRequestSender<I, O> {
663+
fn new_client_box(&mut self) -> SimpleMessageBox<O, I> {
667664
let name = "client-box";
668665
let capacity = 16;
669-
let mut client_box = ConsumerBoxBuilder::new(name, capacity, config);
670-
self.add_peer(&mut client_box);
666+
let mut client_box = SimpleMessageBoxBuilder::new(name, capacity);
667+
let request_sender = RequestSender {
668+
sender: self.sender_clone(),
669+
reply_to: client_box.get_sender(),
670+
};
671+
client_box.set_request_sender(request_sender.into());
671672
client_box.build()
672673
}
673674
}
674675

675-
struct ConsumerBoxBuilder<I, O: Debug, C> {
676-
config: C,
677-
box_builder: SimpleMessageBoxBuilder<O, I>,
678-
}
679-
680-
impl<I: Message, O: Message, C> ConsumerBoxBuilder<I, O, C> {
681-
fn new(name: &str, capacity: usize, config: C) -> Self {
682-
ConsumerBoxBuilder {
683-
config,
684-
box_builder: SimpleMessageBoxBuilder::new(name, capacity),
685-
}
686-
}
687-
688-
fn build(self) -> SimpleMessageBox<O, I> {
689-
self.box_builder.build()
676+
impl<I: Message, O: Message> ServiceProviderExt<I, O> for ServerMessageBoxBuilder<I, O> {
677+
fn new_client_box(&mut self) -> SimpleMessageBox<O, I> {
678+
self.request_sender().new_client_box()
690679
}
691680
}
692681

693-
impl<I: Message, O: Message, C: Clone> ServiceConsumer<I, O, C> for ConsumerBoxBuilder<I, O, C> {
694-
fn get_config(&self) -> C {
695-
self.config.clone()
696-
}
697-
698-
fn set_request_sender(&mut self, request_sender: DynSender<I>) {
699-
self.box_builder.set_request_sender(request_sender)
700-
}
701-
702-
fn get_response_sender(&self) -> DynSender<O> {
703-
self.box_builder.get_response_sender()
682+
impl<I: Message, O: Message> ServiceProviderExt<I, O> for SimpleMessageBoxBuilder<I, O> {
683+
fn new_client_box(&mut self) -> SimpleMessageBox<O, I> {
684+
let name = "client-box";
685+
let capacity = 16;
686+
let mut client_box = SimpleMessageBoxBuilder::new(name, capacity);
687+
self.set_request_sender(client_box.get_sender());
688+
client_box.set_request_sender(self.get_sender());
689+
client_box.build()
704690
}
705691
}
706692

@@ -735,9 +721,7 @@ pub struct FakeServerBox<Request: Debug, Response> {
735721
impl<Request: Message, Response: Message> FakeServerBox<Request, Response> {
736722
/// Return a fake message box builder
737723
pub fn builder() -> FakeServerBoxBuilder<Request, Response> {
738-
FakeServerBoxBuilder {
739-
messages: SimpleMessageBoxBuilder::new("Fake Server", 16),
740-
}
724+
FakeServerBoxBuilder::default()
741725
}
742726
}
743727

@@ -799,6 +783,14 @@ pub struct FakeServerBoxBuilder<Request: Debug, Response> {
799783
messages: SimpleMessageBoxBuilder<RequestEnvelope<Request, Response>, NoMessage>,
800784
}
801785

786+
impl<Request: Message, Response: Message> Default for FakeServerBoxBuilder<Request, Response> {
787+
fn default() -> Self {
788+
FakeServerBoxBuilder {
789+
messages: SimpleMessageBoxBuilder::new("Fake Server", 16),
790+
}
791+
}
792+
}
793+
802794
impl<Request: Message, Response: Message> MessageSink<RequestEnvelope<Request, Response>, NoConfig>
803795
for FakeServerBoxBuilder<Request, Response>
804796
{

crates/core/tedge_actors/src/tests.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ async fn spawn_concurrent_sleep_service(max_concurrency: usize) -> DynRequestSen
5050
#[tokio::test]
5151
async fn requests_are_served_in_turn() {
5252
let mut service_handle = spawn_sleep_service().await;
53-
let mut client = service_handle.new_client_box(NoConfig);
53+
let mut client = service_handle.new_client_box();
5454

5555
// The requests being sent in some order
5656
client.send(1).await.unwrap();
@@ -67,8 +67,8 @@ async fn requests_are_served_in_turn() {
6767
async fn clients_can_interleave_request() {
6868
let mut service_handle = spawn_sleep_service().await;
6969

70-
let mut client_1 = service_handle.new_client_box(NoConfig);
71-
let mut client_2 = service_handle.new_client_box(NoConfig);
70+
let mut client_1 = service_handle.new_client_box();
71+
let mut client_2 = service_handle.new_client_box();
7272

7373
// Two clients can independently send requests
7474
client_1.send(1).await.unwrap();
@@ -85,8 +85,8 @@ async fn clients_can_interleave_request() {
8585
async fn requests_can_be_sent_concurrently() {
8686
let mut service_handle = spawn_concurrent_sleep_service(2).await;
8787

88-
let mut client_1 = service_handle.new_client_box(NoConfig);
89-
let mut client_2 = service_handle.new_client_box(NoConfig);
88+
let mut client_1 = service_handle.new_client_box();
89+
let mut client_2 = service_handle.new_client_box();
9090

9191
// Despite a long running request from client_1
9292
client_1.send(1000).await.unwrap();

crates/core/tedge_agent/src/operation_file_cache/mod.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use tedge_actors::RuntimeError;
3030
use tedge_actors::RuntimeRequest;
3131
use tedge_actors::RuntimeRequestSink;
3232
use tedge_actors::Sender;
33+
use tedge_actors::Service;
3334
use tedge_actors::ServiceProvider;
3435
use tedge_actors::SimpleMessageBoxBuilder;
3536
use tedge_api::messages::ConfigUpdateCmdPayload;
@@ -289,13 +290,13 @@ impl FileCacheActorBuilder {
289290
mqtt_schema: MqttSchema,
290291
tedge_http_host: Arc<str>,
291292
data_dir: DataDir,
292-
downloader_actor: &mut impl ServiceProvider<IdDownloadRequest, IdDownloadResult, NoConfig>,
293+
downloader_actor: &mut impl Service<IdDownloadRequest, IdDownloadResult>,
293294
mqtt_actor: &mut impl ServiceProvider<MqttMessage, MqttMessage, TopicFilter>,
294295
) -> Self {
295296
let message_box = SimpleMessageBoxBuilder::new("RestartManager", 10);
296297

297298
let download_sender =
298-
downloader_actor.connect_consumer(NoConfig, message_box.get_sender().sender_clone());
299+
downloader_actor.add_requester(message_box.get_sender().sender_clone());
299300

300301
let mqtt_sender = mqtt_actor.connect_consumer(
301302
Self::subscriptions(&mqtt_schema),

crates/extensions/c8y_mapper_ext/src/actor.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -298,8 +298,8 @@ impl C8yMapperBuilder {
298298
mqtt: &mut impl ServiceProvider<MqttMessage, MqttMessage, TopicFilter>,
299299
http: &mut impl Service<C8YRestRequest, C8YRestResult>,
300300
timer: &mut impl ServiceProvider<SyncStart, SyncComplete, NoConfig>,
301-
uploader: &mut impl ServiceProvider<IdUploadRequest, IdUploadResult, NoConfig>,
302-
downloader: &mut impl ServiceProvider<IdDownloadRequest, IdDownloadResult, NoConfig>,
301+
uploader: &mut impl Service<IdUploadRequest, IdUploadResult>,
302+
downloader: &mut impl Service<IdDownloadRequest, IdDownloadResult>,
303303
fs_watcher: &mut impl MessageSource<FsWatchEvent, PathBuf>,
304304
service_monitor: &mut impl ServiceProvider<MqttMessage, MqttMessage, TopicFilter>,
305305
) -> Result<Self, FileError> {
@@ -314,10 +314,8 @@ impl C8yMapperBuilder {
314314
let http_proxy = C8YHttpProxy::new(http);
315315
let timer_sender =
316316
timer.connect_consumer(NoConfig, box_builder.get_sender().sender_clone());
317-
let upload_sender =
318-
uploader.connect_consumer(NoConfig, box_builder.get_sender().sender_clone());
319-
let download_sender =
320-
downloader.connect_consumer(NoConfig, box_builder.get_sender().sender_clone());
317+
let upload_sender = uploader.add_requester(box_builder.get_sender().sender_clone());
318+
let download_sender = downloader.add_requester(box_builder.get_sender().sender_clone());
321319
fs_watcher.register_peer(
322320
config.ops_dir.clone(),
323321
box_builder.get_sender().sender_clone(),

0 commit comments

Comments
 (0)