Skip to content

Commit 17b5eb4

Browse files
committed
Implement streaming proto
1 parent 16dfec7 commit 17b5eb4

File tree

3 files changed

+131
-49
lines changed

3 files changed

+131
-49
lines changed

agent/src/main.rs

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,13 @@ use errors::*;
2222
use futures::{future, Future};
2323
use intecture_api::remote::{Executable, Runnable};
2424
use intecture_api::host::local::Local;
25-
use intecture_api::host::remote::JsonProto;
25+
use intecture_api::host::remote::{JsonLineProto, LineMessage};
2626
use std::fs::File;
2727
use std::io::{self, Read};
2828
use std::net::SocketAddr;
2929
use std::sync::Arc;
3030
use tokio_core::reactor::Remote;
31+
use tokio_proto::streaming::Message;
3132
use tokio_proto::TcpServer;
3233
use tokio_service::{NewService, Service};
3334

@@ -37,12 +38,17 @@ pub struct Api {
3738
}
3839

3940
impl Service for Api {
40-
type Request = serde_json::Value;
41-
type Response = serde_json::Value;
41+
type Request = LineMessage;
42+
type Response = LineMessage;
4243
type Error = io::Error;
4344
type Future = Box<Future<Item = Self::Response, Error = Self::Error>>;
4445

4546
fn call(&self, req: Self::Request) -> Self::Future {
47+
let req = match req {
48+
Message::WithBody(req, _) => req,
49+
Message::WithoutBody(req) => req,
50+
};
51+
4652
let runnable: Runnable = match serde_json::from_value(req).chain_err(|| "Received invalid Runnable") {
4753
Ok(r) => r,
4854
Err(e) => return Box::new(
@@ -65,15 +71,15 @@ impl Service for Api {
6571
// Waiting for https://github.com/rust-lang-nursery/error-chain/pull/163
6672
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.description()))
6773
.and_then(|ser| match serde_json::to_value(ser).chain_err(|| "Could not serialize result") {
68-
Ok(v) => future::ok(v),
74+
Ok(v) => future::ok(Message::WithoutBody(v)),
6975
Err(e) => future::err(io::Error::new(io::ErrorKind::Other, e.description())),
7076
}))
7177
}
7278
}
7379

7480
impl NewService for Api {
75-
type Request = serde_json::Value;
76-
type Response = serde_json::Value;
81+
type Request = LineMessage;
82+
type Response = LineMessage;
7783
type Error = io::Error;
7884
type Instance = Api;
7985
fn new_service(&self) -> io::Result<Self::Instance> {
@@ -129,7 +135,7 @@ quick_main!(|| -> Result<()> {
129135
// Currently we force the issue (`unwrap()`), which is only safe
130136
// for the current thread.
131137
// See https://github.com/alexcrichton/tokio-process/issues/23
132-
let server = TcpServer::new(JsonProto, config.address);
138+
let server = TcpServer::new(JsonLineProto, config.address);
133139
server.with_handle(move |handle| {
134140
let api = Api {
135141
host: host.clone(),

core/src/command/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ impl<H: Host + 'static> Command<H> {
6868
}
6969
}
7070

71-
pub fn exec(&mut self, handle: &Handle) -> Box<Future<Item = CommandResult, Error = Error>> {
71+
pub fn exec(&self, handle: &Handle) -> Box<Future<Item = CommandResult, Error = Error>> {
7272
self.inner.exec(&self.host, handle, &self.cmd, &self.shell)
7373
}
7474
}

core/src/host/remote.rs

Lines changed: 117 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
// https://www.tldrlegal.com/l/mpl-2.0>. This file may not be copied,
55
// modified, or distributed except according to those terms.
66

7-
use bytes::{BufMut, BytesMut};
7+
use bytes::BytesMut;
88
use errors::*;
99
use futures::{future, Future};
1010
use remote::Runnable;
@@ -15,14 +15,18 @@ use std::sync::Arc;
1515
use std::net::SocketAddr;
1616
use super::{Host, HostType};
1717
use telemetry::{self, Telemetry};
18-
use tokio_core::net::TcpStream;
1918
use tokio_core::reactor::Handle;
2019
use tokio_io::{AsyncRead, AsyncWrite};
2120
use tokio_io::codec::{Encoder, Decoder, Framed};
22-
use tokio_proto::pipeline::{ClientProto, ClientService, ServerProto};
21+
use tokio_proto::streaming::{Body, Message};
22+
use tokio_proto::streaming::pipeline::{ClientProto, Frame, ServerProto};
2323
use tokio_proto::TcpClient;
24+
use tokio_proto::util::client_proxy::ClientProxy;
2425
use tokio_service::Service;
2526

27+
#[doc(hidden)]
28+
pub type LineMessage = Message<serde_json::Value, Body<Vec<u8>, io::Error>>;
29+
2630
/// A `Host` type that uses an unencrypted socket.
2731
///
2832
/// *Warning! An unencrypted host is susceptible to eavesdropping and MITM
@@ -34,12 +38,14 @@ pub struct Plain {
3438
}
3539

3640
struct Inner {
37-
inner: ClientService<TcpStream, JsonProto>,
41+
inner: ClientProxy<LineMessage, LineMessage, io::Error>,
3842
telemetry: Option<Telemetry>,
3943
}
4044

41-
pub struct JsonCodec;
42-
pub struct JsonProto;
45+
pub struct JsonLineCodec {
46+
decoding_head: bool,
47+
}
48+
pub struct JsonLineProto;
4349

4450
impl Plain {
4551
/// Create a new Host connected to addr.
@@ -51,7 +57,7 @@ impl Plain {
5157

5258
info!("Connecting to host {}", addr);
5359

54-
Box::new(TcpClient::new(JsonProto)
60+
Box::new(TcpClient::new(JsonLineProto)
5561
.connect(&addr, handle)
5662
.chain_err(|| "Could not connect to host")
5763
.and_then(|client_service| {
@@ -77,16 +83,31 @@ impl Plain {
7783
#[doc(hidden)]
7884
pub fn run<D: 'static>(&self, provider: Runnable) -> Box<Future<Item = D, Error = Error>>
7985
where for<'de> D: Deserialize<'de>
86+
{
87+
Box::new(self.run_msg::<D>(provider)
88+
.map(|msg| msg.into_inner()))
89+
}
90+
91+
#[doc(hidden)]
92+
pub fn run_msg<D: 'static>(&self, provider: Runnable) -> Box<Future<Item = Message<D, Body<Vec<u8>, io::Error>>, Error = Error>>
93+
where for<'de> D: Deserialize<'de>
8094
{
8195
let value = match serde_json::to_value(provider).chain_err(|| "Could not encode provider to send to host") {
8296
Ok(v) => v,
8397
Err(e) => return Box::new(future::err(e))
8498
};
85-
Box::new(self.call(value)
99+
Box::new(self.inner.inner.call(Message::WithoutBody(value))
86100
.chain_err(|| "Error while running provider on host")
87-
.and_then(|v| match serde_json::from_value::<D>(v).chain_err(|| "Could not understand response from host") {
88-
Ok(d) => future::ok(d),
89-
Err(e) => future::err(e)
101+
.and_then(|mut msg| {
102+
let body = msg.take_body();
103+
let msg = match serde_json::from_value::<D>(msg.into_inner()).chain_err(|| "Could not understand response from host") {
104+
Ok(d) => d,
105+
Err(e) => return Box::new(future::err(e)),
106+
};
107+
Box::new(future::ok(match body {
108+
Some(b) => Message::WithBody(msg, b),
109+
None => Message::WithoutBody(msg),
110+
}))
90111
}))
91112
}
92113
}
@@ -102,8 +123,8 @@ impl Host for Plain {
102123
}
103124

104125
impl Service for Plain {
105-
type Request = serde_json::Value;
106-
type Response = serde_json::Value;
126+
type Request = LineMessage;
127+
type Response = LineMessage;
107128
type Error = io::Error;
108129
type Future = Box<Future<Item = Self::Response, Error = Self::Error>>;
109130

@@ -112,58 +133,113 @@ impl Service for Plain {
112133
}
113134
}
114135

115-
impl Decoder for JsonCodec {
116-
type Item = serde_json::Value;
136+
impl Decoder for JsonLineCodec {
137+
type Item = Frame<serde_json::Value, Vec<u8>, io::Error>;
117138
type Error = io::Error;
118139

119-
fn decode(&mut self, buf: &mut BytesMut) -> result::Result<Option<Self::Item>, Self::Error> {
120-
// Check to see if the frame contains a new line
121-
if let Some(n) = buf.as_ref().iter().position(|b| *b == b'\n') {
122-
// remove the serialized frame from the buffer.
123-
let line = buf.split_to(n);
124-
125-
// Also remove the '\n'
126-
buf.split_to(1);
140+
fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<Self::Item>> {
141+
let line = match buf.iter().position(|b| *b == b'\n') {
142+
Some(n) => buf.split_to(n),
143+
None => return Ok(None),
144+
};
127145

128-
return Ok(Some(serde_json::from_slice(&line).unwrap()));
146+
buf.split_to(1);
147+
148+
if line.is_empty() {
149+
let decoding_head = self.decoding_head;
150+
self.decoding_head = !decoding_head;
151+
152+
if decoding_head {
153+
Ok(Some(Frame::Message {
154+
message: serde_json::Value::Null,
155+
body: true,
156+
}))
157+
} else {
158+
Ok(Some(Frame::Body {
159+
chunk: None
160+
}))
161+
}
162+
} else {
163+
if self.decoding_head {
164+
Ok(Some(Frame::Message {
165+
message: serde_json::from_slice(&line).map_err(|e| {
166+
io::Error::new(io::ErrorKind::Other, e)
167+
})?,
168+
body: false,
169+
}))
170+
} else {
171+
Ok(Some(Frame::Body {
172+
chunk: Some(line.to_vec()),
173+
}))
174+
}
129175
}
130-
131-
Ok(None)
132176
}
133177
}
134178

135-
impl Encoder for JsonCodec {
136-
type Item = serde_json::Value;
179+
impl Encoder for JsonLineCodec {
180+
type Item = Frame<serde_json::Value, Vec<u8>, io::Error>;
137181
type Error = io::Error;
138182

139-
fn encode(&mut self, value: Self::Item, buf: &mut BytesMut) -> io::Result<()> {
140-
let json = serde_json::to_string(&value).unwrap();
141-
buf.reserve(json.len() + 1);
142-
buf.extend(json.as_bytes());
143-
buf.put_u8(b'\n');
183+
fn encode(&mut self, msg: Self::Item, buf: &mut BytesMut) -> io::Result<()> {
184+
match msg {
185+
Frame::Message { message, body } => {
186+
// Our protocol dictates that a message head that
187+
// includes a streaming body is an empty string.
188+
assert!(message.is_null() == body);
189+
190+
let json = serde_json::to_vec(&message)
191+
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
192+
buf.extend(&json);
193+
}
194+
Frame::Body { chunk } => {
195+
if let Some(chunk) = chunk {
196+
buf.extend(&chunk);
197+
}
198+
}
199+
Frame::Error { error } => {
200+
// @todo Support error frames
201+
return Err(error)
202+
}
203+
}
204+
205+
buf.extend(b"\n");
144206

145207
Ok(())
146208
}
147209
}
148210

149-
impl<T: AsyncRead + AsyncWrite + 'static> ClientProto<T> for JsonProto {
211+
impl<T: AsyncRead + AsyncWrite + 'static> ClientProto<T> for JsonLineProto {
150212
type Request = serde_json::Value;
213+
type RequestBody = Vec<u8>;
151214
type Response = serde_json::Value;
152-
type Transport = Framed<T, JsonCodec>;
153-
type BindTransport = result::Result<Self::Transport, io::Error>;
215+
type ResponseBody = Vec<u8>;
216+
type Error = io::Error;
217+
type Transport = Framed<T, JsonLineCodec>;
218+
type BindTransport = result::Result<Self::Transport, Self::Error>;
154219

155220
fn bind_transport(&self, io: T) -> Self::BindTransport {
156-
Ok(io.framed(JsonCodec))
221+
let codec = JsonLineCodec {
222+
decoding_head: true,
223+
};
224+
225+
Ok(io.framed(codec))
157226
}
158227
}
159228

160-
impl<T: AsyncRead + AsyncWrite + 'static> ServerProto<T> for JsonProto {
229+
impl<T: AsyncRead + AsyncWrite + 'static> ServerProto<T> for JsonLineProto {
161230
type Request = serde_json::Value;
231+
type RequestBody = Vec<u8>;
162232
type Response = serde_json::Value;
163-
type Transport = Framed<T, JsonCodec>;
164-
type BindTransport = result::Result<Self::Transport, io::Error>;
233+
type ResponseBody = Vec<u8>;
234+
type Error = io::Error;
235+
type Transport = Framed<T, JsonLineCodec>;
236+
type BindTransport = result::Result<Self::Transport, Self::Error>;
165237

166238
fn bind_transport(&self, io: T) -> Self::BindTransport {
167-
Ok(io.framed(JsonCodec))
239+
let codec = JsonLineCodec {
240+
decoding_head: true,
241+
};
242+
243+
Ok(io.framed(codec))
168244
}
169245
}

0 commit comments

Comments
 (0)