Skip to content

Commit 733ca3a

Browse files
authored
Merge pull request #577 from swimos/http-lane
Adds support for HTTP lanes.
2 parents f2b799e + d163bf2 commit 733ca3a

File tree

91 files changed

+7007
-643
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

91 files changed

+7007
-643
lines changed

Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ members = [
3030
"example_apps/value_downlink",
3131
"example_apps/map_downlink",
3232
"example_apps/local_downlink",
33+
"example_apps/http_lane",
3334
"example_apps/tutorial_app",
3435
"example_apps/tutorial_app/model",
3536
"example_apps/tutorial_app/generator",
@@ -99,3 +100,6 @@ crossbeam-channel = { version = "0.5" }
99100
hyper = "0.14"
100101
lazy_static = "1.4.0"
101102
percent-encoding = "2.1.0"
103+
mime = "0.3"
104+
serde_json = "1.0"
105+
serde = "1.0"

api/formats/swim_recon/src/parser/mod.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,18 @@ pub fn parse_value(repr: &str, allow_comments: bool) -> Result<Value, ParseError
179179
parse_recognize(Span::new(repr), allow_comments)
180180
}
181181

182+
/// Parse exactly one value of type `T` from the input, returning an error if the string does not contain
183+
/// the representation of exactly one.
184+
///
185+
/// * `repr` - The input to parse.
186+
/// * `allow_comments` - Boolean flag indicating whether or not the parsing should fail on comments.
187+
pub fn parse_into<T: RecognizerReadable>(
188+
repr: &str,
189+
allow_comments: bool,
190+
) -> Result<T, ParseError> {
191+
parse_recognize(Span::new(repr), allow_comments)
192+
}
193+
182194
use crate::comparator::compare_values;
183195
#[cfg(feature = "async_parser")]
184196
pub use async_parser::{

api/swim_api/src/agent/mod.rs

Lines changed: 65 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,25 +16,28 @@ use std::{
1616
collections::HashMap,
1717
fmt::{Display, Formatter},
1818
num::NonZeroUsize,
19+
pin::Pin,
20+
task::{Context, Poll},
1921
};
2022

2123
use bytes::Bytes;
22-
use futures::future::BoxFuture;
24+
use futures::{future::BoxFuture, ready, Future, FutureExt};
2325
use swim_model::http::{HttpRequest, HttpResponse};
2426
use swim_utilities::{
2527
future::retryable::RetryStrategy,
2628
io::byte_channel::{ByteReader, ByteWriter},
2729
non_zero_usize,
2830
routing::route_uri::RouteUri,
2931
};
32+
use thiserror::Error;
3033
use tokio::sync::{mpsc, oneshot};
3134

3235
use crate::{
3336
downlink::DownlinkKind,
3437
error::{
3538
AgentInitError, AgentRuntimeError, AgentTaskError, DownlinkRuntimeError, OpenStoreError,
3639
},
37-
meta::lane::LaneKind,
40+
lane::WarpLaneKind,
3841
store::StoreKind,
3942
};
4043

@@ -101,21 +104,73 @@ impl Default for StoreConfig {
101104

102105
pub type HttpLaneResponse = HttpResponse<Bytes>;
103106

107+
/// Send half of a single use channel for providing an HTTP response.
108+
#[derive(Debug)]
109+
pub struct HttpResponseSender(oneshot::Sender<HttpLaneResponse>);
110+
111+
impl HttpResponseSender {
112+
pub fn send(self, response: HttpLaneResponse) -> Result<(), HttpLaneResponse> {
113+
self.0.send(response)
114+
}
115+
}
116+
117+
impl Future for HttpResponseReceiver {
118+
type Output = Result<HttpLaneResponse, ()>;
119+
120+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
121+
Poll::Ready(ready!(self.as_mut().0.poll_unpin(cx)).map_err(|_| ()))
122+
}
123+
}
124+
125+
pub fn response_channel() -> (HttpResponseSender, HttpResponseReceiver) {
126+
let (tx, rx) = oneshot::channel();
127+
(HttpResponseSender(tx), HttpResponseReceiver(rx))
128+
}
129+
130+
/// Receive half of a single use channel for providing an HTTP response.
131+
#[derive(Debug)]
132+
pub struct HttpResponseReceiver(oneshot::Receiver<HttpLaneResponse>);
133+
134+
#[derive(Debug, Error, Clone, Copy, PartialEq, Eq, Default)]
135+
#[error("An HTTP request was dropped before a response was sent.")]
136+
pub struct ReceiveResponseError;
137+
138+
impl HttpResponseReceiver {
139+
pub fn try_recv(&mut self) -> Result<HttpLaneResponse, ReceiveResponseError> {
140+
self.0.try_recv().map_err(|_| ReceiveResponseError)
141+
}
142+
}
143+
144+
/// The type of messages sent from the Swim agent runtime to an agent implementation. It includes the
145+
/// request that was received by the server and a single use channel for the agent implementation to
146+
/// provide the response.
104147
#[derive(Debug)]
105148
pub struct HttpLaneRequest {
106149
pub request: HttpRequest<Bytes>,
107-
pub response_tx: oneshot::Sender<HttpLaneResponse>,
150+
response_tx: HttpResponseSender,
108151
}
109152

110153
impl HttpLaneRequest {
111-
pub fn new(
112-
request: HttpRequest<Bytes>,
113-
response_tx: oneshot::Sender<HttpLaneResponse>,
114-
) -> Self {
115-
HttpLaneRequest {
154+
/// Create a new instance from an HTTP request and provide the receiver that can be
155+
/// used to wait for the response.
156+
pub fn new(request: HttpRequest<Bytes>) -> (Self, HttpResponseReceiver) {
157+
let (tx, rx) = response_channel();
158+
(
159+
HttpLaneRequest {
160+
request,
161+
response_tx: tx,
162+
},
163+
rx,
164+
)
165+
}
166+
167+
/// Split this message into the original request and the channel for sending the response.
168+
pub fn into_parts(self) -> (HttpRequest<Bytes>, HttpResponseSender) {
169+
let HttpLaneRequest {
116170
request,
117171
response_tx,
118-
}
172+
} = self;
173+
(request, response_tx)
119174
}
120175
}
121176

@@ -136,7 +191,7 @@ pub trait AgentContext: Sync {
136191
fn add_lane(
137192
&self,
138193
name: &str,
139-
lane_kind: LaneKind,
194+
lane_kind: WarpLaneKind,
140195
config: LaneConfig,
141196
) -> BoxFuture<'static, Result<(ByteWriter, ByteReader), AgentRuntimeError>>;
142197

api/swim_api/src/lane/mod.rs

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
// Copyright 2015-2023 Swim Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::fmt::{Display, Formatter};
16+
17+
use swim_form::structural::Tag;
18+
19+
use crate::{agent::UplinkKind, meta::lane::LaneKind};
20+
21+
/// An enumeration representing the kinds of Warp lanes.
22+
#[derive(Tag, Debug, PartialEq, Eq, Clone, Copy, Hash)]
23+
#[form_root(::swim_form)]
24+
pub enum WarpLaneKind {
25+
Command,
26+
Demand,
27+
DemandMap,
28+
Map,
29+
JoinMap,
30+
JoinValue,
31+
Supply,
32+
Spatial,
33+
Value,
34+
}
35+
36+
impl From<WarpLaneKind> for LaneKind {
37+
fn from(value: WarpLaneKind) -> Self {
38+
match value {
39+
WarpLaneKind::Command => LaneKind::Command,
40+
WarpLaneKind::Demand => LaneKind::Demand,
41+
WarpLaneKind::DemandMap => LaneKind::DemandMap,
42+
WarpLaneKind::Map => LaneKind::Map,
43+
WarpLaneKind::JoinMap => LaneKind::JoinMap,
44+
WarpLaneKind::JoinValue => LaneKind::JoinValue,
45+
WarpLaneKind::Supply => LaneKind::Supply,
46+
WarpLaneKind::Spatial => LaneKind::Spatial,
47+
WarpLaneKind::Value => LaneKind::Value,
48+
}
49+
}
50+
}
51+
52+
impl WarpLaneKind {
53+
pub fn map_like(&self) -> bool {
54+
matches!(
55+
self,
56+
WarpLaneKind::Map
57+
| WarpLaneKind::DemandMap
58+
| WarpLaneKind::JoinMap
59+
| WarpLaneKind::JoinValue
60+
)
61+
}
62+
63+
pub fn uplink_kind(&self) -> UplinkKind {
64+
match self {
65+
WarpLaneKind::Map | WarpLaneKind::DemandMap | WarpLaneKind::JoinMap => UplinkKind::Map,
66+
WarpLaneKind::Supply => UplinkKind::Supply,
67+
WarpLaneKind::Spatial => todo!("Spatial uplinks not supported."),
68+
_ => UplinkKind::Value,
69+
}
70+
}
71+
}
72+
73+
impl Display for WarpLaneKind {
74+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
75+
let as_str: &str = self.as_ref();
76+
write!(f, "{}", as_str)
77+
}
78+
}

api/swim_api/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ pub mod agent;
1616
pub mod downlink;
1717
pub mod error;
1818
pub mod handlers;
19+
pub mod lane;
1920
pub mod meta;
2021
pub mod net;
2122
pub mod protocol;

api/swim_api/src/meta/lane.rs

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,10 @@ use swim_form::structural::Tag;
2626
use swim_form::Form;
2727
use swim_model::{Text, ValueKind};
2828

29-
use crate::agent::UplinkKind;
30-
3129
/// An enumeration representing the type of a lane.
3230
#[derive(Tag, Debug, PartialEq, Eq, Clone, Copy, Hash)]
3331
#[form_root(::swim_form)]
3432
pub enum LaneKind {
35-
Action,
3633
Command,
3734
Demand,
3835
DemandMap,
@@ -42,6 +39,7 @@ pub enum LaneKind {
4239
Supply,
4340
Spatial,
4441
Value,
42+
Http,
4543
}
4644

4745
pub enum ValueLikeLaneKind {
@@ -59,15 +57,6 @@ impl LaneKind {
5957
LaneKind::Map | LaneKind::DemandMap | LaneKind::JoinMap | LaneKind::JoinValue
6058
)
6159
}
62-
63-
pub fn uplink_kind(&self) -> UplinkKind {
64-
match self {
65-
LaneKind::Map | LaneKind::DemandMap | LaneKind::JoinMap => UplinkKind::Map,
66-
LaneKind::Supply => UplinkKind::Supply,
67-
LaneKind::Spatial => todo!("Spatial uplinks not supported."),
68-
_ => UplinkKind::Value,
69-
}
70-
}
7160
}
7261

7362
/// Lane information metadata that can be retrieved when syncing to
@@ -105,7 +94,6 @@ impl<'a> TryFrom<&'a str> for LaneKind {
10594

10695
fn try_from(value: &'a str) -> Result<Self, Self::Error> {
10796
match value {
108-
"Action" => Ok(LaneKind::Action),
10997
"Command" => Ok(LaneKind::Command),
11098
"Demand" => Ok(LaneKind::Demand),
11199
"DemandMap" => Ok(LaneKind::DemandMap),

api/swim_api/src/protocol/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ fn write_recon_with_len<T: StructuralWritable>(dst: &mut BytesMut, body: &T) {
9191
rewound.put_u64(body_len as u64);
9292
}
9393

94-
fn write_recon<T: StructuralWritable>(dst: &mut BytesMut, body: &T) -> usize {
94+
pub fn write_recon<T: StructuralWritable>(dst: &mut BytesMut, body: &T) -> usize {
9595
let body_offset = write_recon_body(dst, body);
9696
dst.remaining() - body_offset
9797
}

api/swim_model/src/http/header.rs

Lines changed: 53 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,23 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use std::str::FromStr;
15+
use std::{
16+
cmp::Ordering,
17+
hash::{Hash, Hasher},
18+
str::FromStr,
19+
};
1620

1721
use bytes::Bytes;
1822

1923
use crate::BytesStr;
2024

21-
#[derive(Debug, Clone, PartialEq, Eq)]
25+
/// Model for the name of an HTTP header. The representation of this type will either be an enumeration
26+
/// of the standard header names or a general ASCII string for custom headers.
27+
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2228
pub struct HeaderName(Name);
2329

30+
/// Model for the value of an HTTP header. The representation of this type is an array of bytes that
31+
/// may, or may not, be a valid UTF8 string.
2432
#[derive(Debug, Clone, PartialEq, Eq)]
2533
pub struct HeaderValue(HeaderValueInner);
2634

@@ -30,12 +38,27 @@ enum HeaderValueInner {
3038
BytesHeader(Bytes),
3139
}
3240

41+
/// Mode of an HTTP header, used by [`super::HttpRequest`] and [`super::HttpResponse`].
3342
#[derive(Debug, Clone, PartialEq, Eq)]
3443
pub struct Header {
3544
pub name: HeaderName,
3645
pub value: HeaderValue,
3746
}
3847

48+
impl Header {
49+
/// Create a header from anything that can be converted into header names and header values.
50+
pub fn new<N, V>(name: N, value: V) -> Self
51+
where
52+
N: Into<HeaderName>,
53+
V: Into<HeaderValue>,
54+
{
55+
Header {
56+
name: name.into(),
57+
value: value.into(),
58+
}
59+
}
60+
}
61+
3962
impl HeaderName {
4063
pub fn as_str(&self) -> &str {
4164
self.0.str_value()
@@ -182,6 +205,7 @@ impl From<&str> for HeaderName {
182205
}
183206
}
184207

208+
/// An enumeration of standard header names.
185209
#[repr(u8)]
186210
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
187211
pub enum StandardHeaderName {
@@ -233,6 +257,33 @@ impl Name {
233257
}
234258
}
235259

260+
impl PartialOrd for Name {
261+
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
262+
self.str_value().partial_cmp(other.str_value())
263+
}
264+
}
265+
266+
impl Ord for Name {
267+
fn cmp(&self, other: &Self) -> Ordering {
268+
self.str_value().cmp(other.str_value())
269+
}
270+
}
271+
272+
impl Hash for Name {
273+
fn hash<H: Hasher>(&self, state: &mut H) {
274+
self.str_value().hash(state)
275+
}
276+
}
277+
278+
impl PartialEq<StandardHeaderName> for HeaderName {
279+
fn eq(&self, other: &StandardHeaderName) -> bool {
280+
match &self.0 {
281+
Name::Standard(h) => h == other,
282+
Name::Other(_) => false,
283+
}
284+
}
285+
}
286+
236287
impl StandardHeaderName {
237288
fn try_from_basic(s: &str) -> Option<Self> {
238289
match s {

0 commit comments

Comments
 (0)