Skip to content

Commit 4132762

Browse files
authored
Merge pull request #614 from swimos/docs_devguide_2.3
Example code for dev guide 2.3
2 parents 51b7909 + 1ab334a commit 4132762

File tree

6 files changed

+233
-1
lines changed

6 files changed

+233
-1
lines changed

.tarpaulin.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,9 @@ exclude = [
3333
"swimos_agent_derive",
3434
"macro_utilities",
3535
"example_client_2_2",
36-
"example_server_2_2"
36+
"example_server_2_2",
37+
"example_client_2_3",
38+
"example_server_2_3"
3739
]
3840
workspace = true
3941
avoid-cfg-tarpaulin = true

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ members = [
4141
"example_apps/aggregations",
4242
"example_apps/time_series",
4343
"example_apps/devguide/2_2/*",
44+
"example_apps/devguide/2_3/*",
4445
]
4546

4647
exclude = [
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
[package]
2+
name = "example_client_2_3"
3+
version = "0.1.0"
4+
edition = "2021"
5+
6+
[dependencies]
7+
tokio = { workspace = true, features = ["full"] }
8+
swimos_client = { path = "../../../../client/swimos_client" }
9+
swimos_form = { path = "../../../../api/swimos_form" }
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
use swimos_client::{BasicValueDownlinkLifecycle, DownlinkConfig, RemotePath, SwimClientBuilder};
2+
use swimos_form::Form;
3+
4+
#[derive(Debug, Form, Copy, Clone)]
5+
pub enum Operation {
6+
Add(i32),
7+
Sub(i32),
8+
}
9+
10+
#[tokio::main]
11+
async fn main() {
12+
// Build a Swim Client using the default configuration.
13+
// The `build` method returns a `SwimClient` instance and its internal
14+
// runtime future that is spawned below.
15+
let (client, task) = SwimClientBuilder::default().build().await;
16+
let _client_task = tokio::spawn(task);
17+
let handle = client.handle();
18+
19+
// Build a path the downlink.
20+
let state_path = RemotePath::new(
21+
// The host address
22+
"ws://0.0.0.0:8080",
23+
// You can provide any agent URI that matches the pattern
24+
// "/example/:id"
25+
"/example/1",
26+
// This is the URI of the ValueLane<i32> in our ExampleAgent
27+
"state",
28+
);
29+
30+
let lifecycle = BasicValueDownlinkLifecycle::<usize>::default()
31+
// Register an event handler that is invoked when the downlink connects to the agent.
32+
.on_linked_blocking(|| println!("Downlink linked"))
33+
// Register an event handler that is invoked when the downlink synchronises its state.
34+
// with the agent.
35+
.on_synced_blocking(|value| println!("Downlink synced with: {value:?}"))
36+
// Register an event handler that is invoked when the downlink receives an event.
37+
.on_event_blocking(|value| println!("Downlink event: {value:?}"));
38+
39+
// Build our downlink.
40+
//
41+
// This operation may fail if there is a connection issue.
42+
let _state_downlink = handle
43+
.value_downlink::<i32>(state_path)
44+
.lifecycle(lifecycle)
45+
.downlink_config(DownlinkConfig::default())
46+
.open()
47+
.await
48+
.expect("Failed to open downlink");
49+
50+
let exec_path = RemotePath::new(
51+
// The host address
52+
"ws://0.0.0.0:8080",
53+
// You can provide any agent URI that matches the pattern
54+
// "/example/:id"
55+
"/example/1",
56+
// This is the URI of the ValueLane<i32> in our ExampleAgent
57+
"exec",
58+
);
59+
60+
let exec_downlink = handle
61+
.value_downlink::<Operation>(exec_path)
62+
.downlink_config(DownlinkConfig::default())
63+
.open()
64+
.await
65+
.expect("Failed to open exec downlink");
66+
67+
exec_downlink.set(Operation::Add(1000)).await.unwrap();
68+
exec_downlink.set(Operation::Sub(13)).await.unwrap();
69+
70+
tokio::signal::ctrl_c()
71+
.await
72+
.expect("Failed to listen for ctrl-c.");
73+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
[package]
2+
name = "example_server_2_3"
3+
version = "0.1.0"
4+
edition = "2021"
5+
6+
[dependencies]
7+
tokio = { workspace = true, features = ["full"] }
8+
swimos = { path = "../../../../swimos", features = ["server"] }
9+
swimos_form = { path = "../../../../api/swimos_form" }
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
use swimos::{
2+
agent::{
3+
agent_lifecycle::HandlerContext, agent_model::AgentModel, event_handler::EventHandler,
4+
lanes::ValueLane, lifecycle, projections, AgentLaneModel,
5+
},
6+
route::RoutePattern,
7+
server::{Server, ServerBuilder, ServerHandle},
8+
};
9+
10+
use std::{error::Error, time::Duration};
11+
use swimos::agent::event_handler::HandlerActionExt;
12+
use swimos::agent::lanes::CommandLane;
13+
use swimos_form::Form;
14+
15+
// Note how as this is a custom type we need to derive `Form` for it.
16+
// For most types, simply adding the derive attribute will suffice.
17+
#[derive(Debug, Form, Copy, Clone)]
18+
pub enum Operation {
19+
Add(i32),
20+
Sub(i32),
21+
}
22+
23+
#[derive(AgentLaneModel)]
24+
#[projections]
25+
pub struct ExampleAgent {
26+
state: ValueLane<i32>,
27+
exec: CommandLane<Operation>,
28+
}
29+
30+
#[derive(Clone)]
31+
pub struct ExampleLifecycle;
32+
33+
#[lifecycle(ExampleAgent)]
34+
impl ExampleLifecycle {
35+
// Handler invoked when the agent starts.
36+
#[on_start]
37+
pub fn on_start(
38+
&self,
39+
context: HandlerContext<ExampleAgent>,
40+
) -> impl EventHandler<ExampleAgent> {
41+
context.effect(|| println!("Starting agent."))
42+
}
43+
44+
// Handler invoked when the agent is about to stop.
45+
#[on_stop]
46+
pub fn on_stop(
47+
&self,
48+
context: HandlerContext<ExampleAgent>,
49+
) -> impl EventHandler<ExampleAgent> {
50+
context.effect(|| println!("Stopping agent."))
51+
}
52+
53+
// Handler invoked after the state of 'lane' has changed.
54+
#[on_event(state)]
55+
pub fn on_event(
56+
&self,
57+
context: HandlerContext<ExampleAgent>,
58+
value: &i32,
59+
) -> impl EventHandler<ExampleAgent> {
60+
let n = *value;
61+
// EventHandler::effect accepts a FnOnce()
62+
// which runs a side effect.
63+
context.effect(move || {
64+
println!("Setting value to: {}", n);
65+
})
66+
}
67+
68+
#[on_command(exec)]
69+
pub fn on_command(
70+
&self,
71+
context: HandlerContext<ExampleAgent>,
72+
// Notice a reference to the deserialized command envelope is provided.
73+
operation: &Operation,
74+
) -> impl EventHandler<ExampleAgent> {
75+
let operation = *operation;
76+
context
77+
// Get the current state of our `state` lane.
78+
.get_value(ExampleAgent::STATE)
79+
.and_then(move |state| {
80+
// Calculate the new state.
81+
let new_state = match operation {
82+
Operation::Add(val) => state + val,
83+
Operation::Sub(val) => state - val,
84+
};
85+
// Return a event handler which updates the state of the `state` lane.
86+
context.set_value(ExampleAgent::STATE, new_state)
87+
})
88+
}
89+
}
90+
91+
#[tokio::main]
92+
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
93+
// Create a dynamic route for our agents.
94+
let route = RoutePattern::parse_str("/example/:id")?;
95+
// Create an agent model which contains the factory for creating the agent as well
96+
// as the lifecycle which will be run.
97+
let agent = AgentModel::new(ExampleAgent::default, ExampleLifecycle.into_lifecycle());
98+
99+
// Create a server builder.
100+
let server = ServerBuilder::with_plane_name("Plane")
101+
// Bind to port 8080
102+
.set_bind_addr("127.0.0.1:8080".parse().unwrap())
103+
// For this guide, ensure agents timeout fairly quickly.
104+
// An agent will timeout after they have received no new updates
105+
// for this configured period of time.
106+
.update_config(|config| {
107+
config.agent_runtime.inactive_timeout = Duration::from_secs(20);
108+
})
109+
// Register the agent against the route.
110+
.add_route(route, agent)
111+
.build()
112+
// Building the server may fail if many routes are registered and some
113+
// are ambiguous.
114+
.await?;
115+
116+
// Run the server. A tuple of the server's runtime
117+
// future and a handle to the runtime is returned.
118+
let (task, handle) = server.run();
119+
// Watch for ctrl+c signals
120+
let shutdown = manage_handle(handle);
121+
122+
// Join on the server and ctrl+c futures.
123+
let (_, result) = tokio::join!(shutdown, task);
124+
125+
result?;
126+
println!("Server stopped successfully.");
127+
Ok(())
128+
}
129+
130+
// Utility function for awaiting a stop signal in the terminal.
131+
async fn manage_handle(mut handle: ServerHandle) {
132+
tokio::signal::ctrl_c()
133+
.await
134+
.expect("Failed to register interrupt handler.");
135+
136+
println!("Stopping server.");
137+
handle.stop();
138+
}

0 commit comments

Comments
 (0)