Skip to content

Commit 7c261dc

Browse files
committed
Documented the agent task module.
1 parent 94329c4 commit 7c261dc

File tree

9 files changed

+118
-48
lines changed

9 files changed

+118
-48
lines changed

runtime/swimos_runtime/src/agent/mod.rs

Lines changed: 82 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ use self::{
6060
},
6161
};
6262

63+
/// Describes the metrics the the agent runtime task reports as it runs. These are subscribed to by the
64+
/// introspection API to report on the internal state of server application.
6365
pub mod reporting;
6466
mod store;
6567
mod task;
@@ -69,26 +71,40 @@ mod tests;
6971
use task::AgentRuntimeRequest;
7072
use tracing::{error, info_span, Instrument};
7173

74+
/// A message type that can be sent to the agent runtime to request a link to one of its lanes.
7275
#[derive(Debug)]
7376
pub enum LinkRequest {
77+
/// A request to open a downlink to one of the lanes.
7478
Downlink(DownlinkRequest),
79+
/// A request to open a one way connection to send commands to a lane.
7580
Commander(CommanderRequest),
7681
}
7782

83+
/// A description of an endpoint to which commands can be sent.
7884
#[derive(Debug, PartialEq, Eq, Hash, Clone)]
7985
pub enum CommanderKey {
86+
/// An endpoint on an explicit remote host.
8087
Remote(SchemeHostPort),
88+
/// An endpoint that is locally resolved.
8189
Local(RelativeAddress<Text>),
8290
}
8391

92+
/// A request to the runtime to open a channel to send commands to a remote lane.
8493
#[derive(Debug)]
8594
pub struct CommanderRequest {
95+
/// The ID of the agent making the request.
8696
pub agent_id: Uuid,
97+
/// The target end point for the channel.
8798
pub key: CommanderKey,
99+
/// A promise to be satisfied with the channel.
88100
pub promise: oneshot::Sender<Result<ByteWriter, DownlinkRuntimeError>>,
89101
}
90102

91103
impl CommanderRequest {
104+
/// # Arguments
105+
/// * `agent_id` - The ID of the agent making the request.
106+
/// * `key` - The target end point for the channel.
107+
/// * `promise` - A promise to be satisfied with the channel.
92108
pub fn new(
93109
agent_id: Uuid,
94110
key: CommanderKey,
@@ -102,31 +118,28 @@ impl CommanderRequest {
102118
}
103119
}
104120

121+
/// A request to the runtime to open a downlink to a lane on another agent.
105122
#[derive(Debug)]
106123
pub struct DownlinkRequest {
124+
/// An explicit host for the agent, if defined.
107125
pub remote: Option<SchemeHostPort>,
126+
/// The node URI and name of the lane.
108127
pub address: RelativeAddress<Text>,
128+
/// The kind of the downlink to open.
109129
pub kind: DownlinkKind,
130+
/// Configuration parameters for the downlink.
110131
pub options: DownlinkOptions,
132+
/// A promise to be satisfied with a channel to the downlink.
111133
pub promise: oneshot::Sender<Result<Io, DownlinkRuntimeError>>,
112134
}
113135

114136
impl DownlinkRequest {
115-
pub fn replace_promise(
116-
&self,
117-
replacement: oneshot::Sender<Result<Io, DownlinkRuntimeError>>,
118-
) -> Self {
119-
DownlinkRequest {
120-
remote: self.remote.clone(),
121-
address: self.address.clone(),
122-
kind: self.kind,
123-
options: self.options,
124-
promise: replacement,
125-
}
126-
}
127-
}
128-
129-
impl DownlinkRequest {
137+
/// # Arguments
138+
///
139+
/// * `remote` - An explicit host for the agent, if defined.
140+
/// * `kind` - The kind of the downlink to open.
141+
/// * `options` - Configuration parameters for the downlink.
142+
/// * `promise` - A promise to be satisfied with a channel to the downlink.
130143
pub fn new(
131144
remote: Option<SchemeHostPort>,
132145
address: RelativeAddress<Text>,
@@ -144,6 +157,21 @@ impl DownlinkRequest {
144157
}
145158
}
146159

160+
impl DownlinkRequest {
161+
fn replace_promise(
162+
&self,
163+
replacement: oneshot::Sender<Result<Io, DownlinkRuntimeError>>,
164+
) -> Self {
165+
DownlinkRequest {
166+
remote: self.remote.clone(),
167+
address: self.address.clone(),
168+
kind: self.kind,
169+
options: self.options,
170+
promise: replacement,
171+
}
172+
}
173+
}
174+
147175
/// Implementation of [`AgentContext`] that communicates with with another task over a channel
148176
/// to perform the supported operations.
149177
#[derive(Clone)]
@@ -338,19 +366,23 @@ pub enum AgentAttachmentRequest {
338366

339367
/// A request from an agent to register a new lane for metadata reporting.
340368
pub struct UplinkReporterRegistration {
369+
/// The ID of the agent making the request.
341370
pub agent_id: Uuid,
371+
/// The name of the lane.
342372
pub lane_name: Text,
373+
/// The kind of the lane.
343374
pub kind: LaneKind,
375+
/// Receiver for the uplink statistics.
344376
pub reader: UplinkReportReader,
345377
}
346378

347379
impl UplinkReporterRegistration {
348-
pub fn new(
349-
agent_id: Uuid,
350-
lane_name: Text,
351-
kind: LaneKind,
352-
reader: UplinkReportReader,
353-
) -> Self {
380+
/// # Arguments
381+
/// * `agent_id` - The ID of the agent making the request.
382+
/// * `lane_name` - The name of the lane.
383+
/// * `kind` - The kind of the lane.
384+
/// * `reader` - Receiver for the uplink statistics.
385+
fn new(agent_id: Uuid, lane_name: Text, kind: LaneKind, reader: UplinkReportReader) -> Self {
354386
UplinkReporterRegistration {
355387
agent_id,
356388
lane_name,
@@ -415,16 +447,13 @@ impl NodeReporting {
415447
}
416448

417449
impl AgentAttachmentRequest {
418-
pub fn downlink(id: Uuid, io: Io, completion: promise::Sender<DisconnectionReason>) -> Self {
419-
AgentAttachmentRequest::TwoWay {
420-
id,
421-
io,
422-
completion,
423-
on_attached: None,
424-
}
425-
}
426-
427-
/// Constructs a request with a trigger that will be called when the registration completes.
450+
/// Constructs a downlink request with a trigger that will be called when the request is completed.
451+
///
452+
/// # Arguments
453+
/// * `id` - The ID of the remote endpoint requesting the downlink.
454+
/// * `io` - The bidirectional channel.
455+
/// * `completion` - Called for when the downlink closes.
456+
/// * `on_attached` - Called when the request is completed.
428457
pub fn with_confirmation(
429458
id: Uuid,
430459
io: Io,
@@ -439,6 +468,13 @@ impl AgentAttachmentRequest {
439468
}
440469
}
441470

471+
/// Constructs a request to open a one way channel to send commands to the agent.
472+
///
473+
/// # Arguments
474+
/// * `id` - The ID of the remote endpoint requesting the channel.
475+
/// * `io` - The reader to receive the commands.
476+
/// * `on_attached` - Called when the channel is established.
477+
///
442478
pub fn commander(id: Uuid, io: ByteReader, on_attached: trigger::Sender) -> Self {
443479
AgentAttachmentRequest::OneWay {
444480
id,
@@ -522,18 +558,24 @@ pub enum AgentExecError {
522558
PersistenceFailure(#[from] StoreError),
523559
}
524560

525-
pub struct AgentRoute {
561+
/// Descriptor of an agent route.
562+
pub struct AgentRouteDescriptor {
563+
/// The unique ID of the agent instance.
526564
pub identity: Uuid,
565+
/// The route URI of the instance.
527566
pub route: RouteUri,
567+
/// Parameters extracted from the route URI of the instance.
528568
pub route_params: HashMap<String, String>,
529569
}
530570

571+
/// All configuration parameters associated with an agent instance.
531572
#[derive(Debug, Default, Clone, Copy)]
532573
pub struct CombinedAgentConfig {
533574
pub agent_config: AgentConfig,
534575
pub runtime_config: AgentRuntimeConfig,
535576
}
536577

578+
/// Channels used by an agent instance to communicate with the runtime.
537579
pub struct AgentRouteChannels {
538580
attachment_rx: mpsc::Receiver<AgentAttachmentRequest>,
539581
http_rx: mpsc::Receiver<HttpLaneRequest>,
@@ -558,6 +600,8 @@ impl AgentRouteChannels {
558600
}
559601
}
560602

603+
/// The agent runtime task. This mediates between the the user defined agent state and event handlers
604+
/// and the other entities within the Swim server application.
561605
pub struct AgentRouteTask<'a, A> {
562606
agent: &'a A,
563607
identity: Uuid,
@@ -584,7 +628,7 @@ impl<'a, A: Agent + 'static> AgentRouteTask<'a, A> {
584628
/// * `reporting` - Uplink metrics reporters.
585629
pub fn new(
586630
agent: &'a A,
587-
identity: AgentRoute,
631+
identity: AgentRouteDescriptor,
588632
channels: AgentRouteChannels,
589633
stopping: trigger::Receiver,
590634
config: CombinedAgentConfig,
@@ -605,6 +649,7 @@ impl<'a, A: Agent + 'static> AgentRouteTask<'a, A> {
605649
}
606650
}
607651

652+
/// Run the agent task without persistence.
608653
pub fn run_agent(self) -> impl Future<Output = Result<(), AgentExecError>> + Send + 'static {
609654
let AgentRouteTask {
610655
agent,
@@ -675,6 +720,10 @@ impl<'a, A: Agent + 'static> AgentRouteTask<'a, A> {
675720
}
676721
}
677722

723+
/// Run the agent task with persistence support.
724+
///
725+
/// # Arguments
726+
/// * `store_fut` - A future that will resolve to the persistence implementation.
678727
pub fn run_agent_with_store<Store, Fut>(
679728
self,
680729
store_fut: Fut,

runtime/swimos_runtime/src/agent/reporting/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ struct UplinkCounters {
3939
command_count: AtomicU64,
4040
}
4141

42-
/// A snapshot taken from an [`UplinkCounters`].
42+
/// A snapshot taken from an the uplink counters.
4343
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
4444
pub struct UplinkSnapshot {
4545
pub link_count: u64,
@@ -91,7 +91,7 @@ pub struct UplinkReporter {
9191
counters: Arc<UplinkCounters>,
9292
}
9393

94-
/// A cosumer attached to an [`UplinkReporter`]. When the corresponding reporter is dropped, this
94+
/// A consumer attached to an [`UplinkReporter`]. When the corresponding reporter is dropped, this
9595
/// will become invalidated and all future snapshot calls will return nothing.
9696
#[derive(Debug, Clone)]
9797
pub struct UplinkReportReader {

runtime/swimos_runtime/src/agent/task/remotes/uplink/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@ use crate::{
2828
DisconnectionReason,
2929
},
3030
backpressure::{
31-
recon::MapOperationReconEncoder, BackpressureStrategy, MapBackpressure, SupplyBackpressure,
32-
ValueBackpressure, InvalidKey
31+
recon::MapOperationReconEncoder, BackpressureStrategy, InvalidKey, MapBackpressure,
32+
SupplyBackpressure, ValueBackpressure,
3333
},
3434
};
3535

runtime/swimos_runtime/src/agent/task/tests/coordination.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -526,6 +526,17 @@ async fn immediate_shutdown() {
526526
assert_eq!(map_lanes.remove(MAP_LANE), Some(BTreeMap::new()));
527527
}
528528

529+
impl AgentAttachmentRequest {
530+
fn downlink(id: Uuid, io: Io, completion: promise::Sender<DisconnectionReason>) -> Self {
531+
AgentAttachmentRequest::TwoWay {
532+
id,
533+
io,
534+
completion,
535+
on_attached: None,
536+
}
537+
}
538+
}
539+
529540
async fn attach_remote(
530541
remote_id: Uuid,
531542
att_tx: &mpsc::Sender<AgentAttachmentRequest>,

runtime/swimos_runtime/src/agent/tests.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use swimos_utilities::{
2929
};
3030
use tokio::sync::mpsc;
3131

32-
use crate::agent::{AgentExecError, AgentRoute, AgentRouteChannels};
32+
use crate::agent::{AgentExecError, AgentRouteChannels, AgentRouteDescriptor};
3333

3434
use super::AgentRouteTask;
3535

@@ -123,7 +123,7 @@ where
123123
async fn test_agent_failure() {
124124
with_timeout(async {
125125
let agent = TestAgent::Running;
126-
let identity = AgentRoute {
126+
let identity = AgentRouteDescriptor {
127127
identity: Uuid::from_u128(1),
128128
route: "/node".parse().unwrap(),
129129
route_params: HashMap::new(),
@@ -156,7 +156,7 @@ async fn test_agent_failure() {
156156
async fn test_agent_failure_with_store() {
157157
with_timeout(async {
158158
let agent = TestAgent::Running;
159-
let identity = AgentRoute {
159+
let identity = AgentRouteDescriptor {
160160
identity: Uuid::from_u128(1),
161161
route: "/node".parse().unwrap(),
162162
route_params: HashMap::new(),
@@ -191,7 +191,7 @@ async fn test_agent_failure_with_store() {
191191
async fn test_agent_init_failure() {
192192
with_timeout(async {
193193
let agent = TestAgent::Init;
194-
let identity = AgentRoute {
194+
let identity = AgentRouteDescriptor {
195195
identity: Uuid::from_u128(1),
196196
route: "/node".parse().unwrap(),
197197
route_params: HashMap::new(),

runtime/swimos_runtime/src/backpressure/map_queue/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use std::{
2020
use bytes::{BufMut, BytesMut};
2121
use swimos_agent_protocol::MapOperation;
2222

23-
use super::{key::ReconKey, RawMapOperation, RawMapOperationMut, InvalidKey};
23+
use super::{key::ReconKey, InvalidKey, RawMapOperation, RawMapOperationMut};
2424

2525
#[cfg(test)]
2626
mod tests;

runtime/swimos_runtime/src/backpressure/mod.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,16 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use std::{convert::Infallible, fmt::{Display, Formatter}};
15+
use std::{
16+
convert::Infallible,
17+
fmt::{Display, Formatter},
18+
};
1619

1720
use bytes::{Buf, BufMut, Bytes, BytesMut};
18-
use swimos_agent_protocol::{DownlinkOperation, MapOperation};
19-
use tokio_util::codec::Encoder;
2021
use std::str::Utf8Error;
22+
use swimos_agent_protocol::{DownlinkOperation, MapOperation};
2123
use thiserror::Error;
24+
use tokio_util::codec::Encoder;
2225

2326
use map_queue::MapOperationQueue;
2427
mod key;
@@ -27,7 +30,6 @@ pub mod recon;
2730

2831
use recon::MapOperationReconEncoder;
2932

30-
3133
type RawMapOperation = MapOperation<Bytes, BytesMut>;
3234
type RawMapOperationMut = MapOperation<BytesMut, BytesMut>;
3335

runtime/swimos_runtime/src/lib.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,15 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
//! SwimOS Agent & Downlink Runtime
16+
//!
17+
//! Tokio tasks describing the core IO loops for agents and downlinks. These tasks implement
18+
//! the Warp protocol only and are entirely decoupled from the state and user defined behaviour
19+
//! of the the agents/downlinks.
20+
21+
/// The agent runtime task.
1522
pub mod agent;
1623
mod backpressure;
24+
/// The downlink runtime task.
1725
pub mod downlink;
1826
mod timeout_coord;

0 commit comments

Comments
 (0)