Skip to content

Commit 2de6e2f

Browse files
authored
Merge pull request #46 from petehayes102/master
Streaming API
2 parents 16dfec7 + 3644c52 commit 2de6e2f

File tree

21 files changed

+860
-418
lines changed

21 files changed

+860
-418
lines changed

RELEASES.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,6 @@ Version 0.4 is a complete rewrite of Intecture's core API. If we had been runnin
55
Anyway, there are some big headlines in this release:
66

77
- **Hello Tokio!** This is an exciting change to our socket API, as we bid a fond farewell to ZeroMQ. The change to Tokio will bring loads of benefits, such as substantially reducing our non-Rust dependencies, and introducing strongly typed messaging between servers and clients. This will make our communications more robust and less open to exploitation. Woo hoo!
8-
- **Asynchronous endpoints with `Futures`!** This is another huge change to the API, allowing us to multitask our configurations. It will also pave the way for a streaming API, which has been sorely lacking in Intecture until now.
8+
- **Asynchronous, streaming endpoints with `Futures`!** This is another huge change to the API, which allows configuration tasks to be run in parallel; moreover it allows users to stream task output in realtime. Gone are the days of waiting in the dark for tasks to complete!
99
- **Greater emphasis on composability.** Each endpoint (formerly called _primitives_) is organised into a collection of _providers_ that will (you guessed it) provide target-specific implementations of the endpoint. Users will be able to select an endpoint provider manually or let the system choose the best provider for the target platform.
1010
- **Separation of duties.** In the previous versions of Intecture, the API had been organised into a single project, making it cluttered and unwieldy. For the next release, things like the FFI, language bindings and project boilerplate will be moved into separate child projects under the same Cargo workspace.

agent/src/errors.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,42 @@
44
// https://www.tldrlegal.com/l/mpl-2.0>. This file may not be copied,
55
// modified, or distributed except according to those terms.
66

7+
use error_chain::ChainedError;
8+
use futures::Future;
79
use intecture_api;
10+
use std::{convert, error, io};
811

912
error_chain! {
1013
links {
1114
Api(intecture_api::errors::Error, intecture_api::errors::ErrorKind);
1215
}
1316
}
17+
18+
impl convert::From<Error> for io::Error {
19+
fn from(e: Error) -> io::Error {
20+
io::Error::new(io::ErrorKind::Other, format!("{}", e.display_chain()))
21+
}
22+
}
23+
24+
// @todo This should disappear once Futures are officially supported
25+
// by error_chain.
26+
// See: https://github.com/rust-lang-nursery/error-chain/issues/90
27+
pub type SFuture<T> = Box<Future<Item = T, Error = Error>>;
28+
29+
pub trait FutureChainErr<T> {
30+
fn chain_err<F, E>(self, callback: F) -> SFuture<T>
31+
where F: FnOnce() -> E + 'static,
32+
E: Into<ErrorKind>;
33+
}
34+
35+
impl<F> FutureChainErr<F::Item> for F
36+
where F: Future + 'static,
37+
F::Error: error::Error + Send + 'static,
38+
{
39+
fn chain_err<C, E>(self, callback: C) -> SFuture<F::Item>
40+
where C: FnOnce() -> E + 'static,
41+
E: Into<ErrorKind>,
42+
{
43+
Box::new(self.then(|r| r.chain_err(callback)))
44+
}
45+
}

agent/src/main.rs

Lines changed: 43 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,18 @@ extern crate toml;
1818

1919
mod errors;
2020

21+
use error_chain::ChainedError;
2122
use errors::*;
2223
use futures::{future, Future};
23-
use intecture_api::remote::{Executable, Runnable};
2424
use intecture_api::host::local::Local;
25-
use intecture_api::host::remote::JsonProto;
25+
use intecture_api::host::remote::{JsonLineProto, LineMessage};
26+
use intecture_api::remote::{Executable, Request, ResponseResult};
2627
use std::fs::File;
2728
use std::io::{self, Read};
2829
use std::net::SocketAddr;
2930
use std::sync::Arc;
3031
use tokio_core::reactor::Remote;
32+
use tokio_proto::streaming::Message;
3133
use tokio_proto::TcpServer;
3234
use tokio_service::{NewService, Service};
3335

@@ -37,21 +39,20 @@ pub struct Api {
3739
}
3840

3941
impl Service for Api {
40-
type Request = serde_json::Value;
41-
type Response = serde_json::Value;
42-
type Error = io::Error;
42+
type Request = LineMessage;
43+
type Response = LineMessage;
44+
type Error = Error;
4345
type Future = Box<Future<Item = Self::Response, Error = Self::Error>>;
4446

4547
fn call(&self, req: Self::Request) -> Self::Future {
46-
let runnable: Runnable = match serde_json::from_value(req).chain_err(|| "Received invalid Runnable") {
48+
let req = match req {
49+
Message::WithBody(req, _) => req,
50+
Message::WithoutBody(req) => req,
51+
};
52+
53+
let request: Request = match serde_json::from_value(req).chain_err(|| "Could not deserialize Request") {
4754
Ok(r) => r,
48-
Err(e) => return Box::new(
49-
future::err(
50-
io::Error::new(
51-
// @todo Can't wrap 'e' as error_chain Error doesn't derive Sync.
52-
// Waiting for https://github.com/rust-lang-nursery/error-chain/pull/163
53-
io::ErrorKind::Other, e.description()
54-
))),
55+
Err(e) => return Box::new(future::ok(error_to_msg(e))),
5556
};
5657

5758
// XXX Danger zone! If we're running multiple threads, this `unwrap()`
@@ -60,21 +61,30 @@ impl Service for Api {
6061
// only safe for the current thread.
6162
// See https://github.com/alexcrichton/tokio-process/issues/23
6263
let handle = self.remote.handle().unwrap();
63-
Box::new(runnable.exec(&self.host, &handle)
64-
// @todo Can't wrap 'e' as error_chain Error doesn't derive Sync.
65-
// Waiting for https://github.com/rust-lang-nursery/error-chain/pull/163
66-
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.description()))
67-
.and_then(|ser| match serde_json::to_value(ser).chain_err(|| "Could not serialize result") {
68-
Ok(v) => future::ok(v),
69-
Err(e) => future::err(io::Error::new(io::ErrorKind::Other, e.description())),
64+
Box::new(request.exec(&self.host, &handle)
65+
.chain_err(|| "Failed to execute Request")
66+
.then(|req| {
67+
match req {
68+
Ok(mut msg) => {
69+
let body = msg.take_body();
70+
match serde_json::to_value(msg.into_inner()).chain_err(|| "Could not serialize Result") {
71+
Ok(v) => match body {
72+
Some(b) => future::ok(Message::WithBody(v, b)),
73+
None => future::ok(Message::WithoutBody(v)),
74+
},
75+
Err(e) => future::ok(error_to_msg(e)),
76+
}
77+
},
78+
Err(e) => future::ok(error_to_msg(e)),
79+
}
7080
}))
7181
}
7282
}
7383

7484
impl NewService for Api {
75-
type Request = serde_json::Value;
76-
type Response = serde_json::Value;
77-
type Error = io::Error;
85+
type Request = LineMessage;
86+
type Response = LineMessage;
87+
type Error = Error;
7888
type Instance = Api;
7989
fn new_service(&self) -> io::Result<Self::Instance> {
8090
Ok(Api {
@@ -129,7 +139,7 @@ quick_main!(|| -> Result<()> {
129139
// Currently we force the issue (`unwrap()`), which is only safe
130140
// for the current thread.
131141
// See https://github.com/alexcrichton/tokio-process/issues/23
132-
let server = TcpServer::new(JsonProto, config.address);
142+
let server = TcpServer::new(JsonLineProto, config.address);
133143
server.with_handle(move |handle| {
134144
let api = Api {
135145
host: host.clone(),
@@ -139,3 +149,12 @@ quick_main!(|| -> Result<()> {
139149
});
140150
Ok(())
141151
});
152+
153+
fn error_to_msg(e: Error) -> LineMessage {
154+
let response = ResponseResult::Err(format!("{}", e.display_chain()));
155+
// If we can't serialize this, we can't serialize anything, so
156+
// panicking is appropriate.
157+
let value = serde_json::to_value(response)
158+
.expect("Cannot serialize ResponseResult::Err. This is bad...");
159+
Message::WithoutBody(value)
160+
}

core/examples/basic.rs

Lines changed: 41 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,22 +8,58 @@ extern crate futures;
88
extern crate intecture_api;
99
extern crate tokio_core;
1010

11-
use futures::Future;
11+
use futures::{Future, Stream};
1212
use intecture_api::prelude::*;
1313
use tokio_core::reactor::Core;
1414

1515
fn main() {
16+
// These two lines are part of `tokio-core` and can be safely
17+
// ignored. So long as they appear at the top of your code,
18+
// all is fine with the world.
1619
let mut core = Core::new().unwrap();
1720
let handle = core.handle();
1821

22+
// Here's the meat of your project. In this example we're talking
23+
// to our local machine, so we use the `Local` host type.
1924
let host = Local::new().and_then(|host| {
20-
Command::new(&host, "whoami", None).and_then(|mut cmd| {
21-
cmd.exec(&handle).map(|out| {
22-
println!("I'm currently running as {}", String::from_utf8_lossy(&out.stdout).trim());
25+
// Ok, we're in! Now we can pass our `host` handle to other
26+
// endpoints, which informs them of the server we mean to
27+
// talk to.
28+
29+
// Let's start with something basic - a shell command.
30+
Command::new(&host, "whoami", None).and_then(|cmd| {
31+
// Now that we have our `Command` instance, let's run it.
32+
cmd.exec(&handle).and_then(|(stream, status)| {
33+
// At this point, our command is running. As the API
34+
// is asynchronous, we don't have to wait for it to
35+
// finish before inspecting its output. This is called
36+
// "streaming".
37+
38+
// Our first argument, `stream`, is a stream of strings,
39+
// each of which represents a line of output. We can use
40+
// the `for_each` combinator to print these lines to
41+
// stdout.
42+
//
43+
// If printing isn't your thing, you are also
44+
// free to lick them or whatever you're into. I'm not
45+
// here to judge.
46+
stream.for_each(|line| { println!("{}", line); Ok(()) })
47+
48+
// The second argument is a `Future` that represents the
49+
// command's exit status. Let's print that too*.
50+
//
51+
// * Same caveat as above RE: printing. This is a safe
52+
// place.
53+
.join(status.map(|s| println!("This command {} {}",
54+
if s.success { "succeeded" } else { "failed" },
55+
if let Some(e) = s.code { format!("with code {}", e) } else { String::new() })))
2356
})
2457
})
2558
});
2659

60+
// This line is part of `tokio-core` and is used to execute the
61+
// chain of futures you've created above. You'll need to call
62+
// `core.run()` for each host you interact with, otherwise your
63+
// project will not run at all!
2764
core.run(host).unwrap();
2865
}
29-

core/examples/remote_host.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,28 @@ use intecture_api::prelude::*;
1313
use tokio_core::reactor::Core;
1414

1515
fn main() {
16+
// These two lines are part of `tokio-core` and can be safely
17+
// ignored. So long as they appear at the top of your code,
18+
// all is fine with the world.
1619
let mut core = Core::new().unwrap();
1720
let handle = core.handle();
1821

22+
// Here's the meat of your project. In this example we're talking
23+
// to a remote machine. You'll note that this is the `Plain` host
24+
// type, where you might have been expecting `Remote` or some such.
25+
// This is to signify that this host type sends data in the clear,
26+
// rather than encrypting it. Thus the usual disclaimer about
27+
// secure networks and trust applies.
1928
let host = Plain::connect("127.0.0.1:7101", &handle).map(|host| {
29+
// Ok, we're in! Now we can pass our `host` handle to other
30+
// endpoints, which informs them of the server we mean to
31+
// talk to. See basic.rs for more usage.
2032
println!("Connected to {}", host.telemetry().hostname);
2133
});
2234

35+
// This line is part of `tokio-core` and is used to execute the
36+
// chain of futures you've created above. You'll need to call
37+
// `core.run()` for each host you interact with, otherwise your
38+
// project will not run at all!
2339
core.run(host).unwrap();
2440
}

core/src/command/mod.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ pub mod providers;
1010

1111
use errors::*;
1212
use futures::{future, Future};
13+
use futures::stream::Stream;
1314
use host::Host;
1415
use self::providers::CommandProvider;
1516
use tokio_core::reactor::Handle;
@@ -27,11 +28,9 @@ pub struct Command<H: Host> {
2728
}
2829

2930
#[derive(Debug, Serialize, Deserialize)]
30-
pub struct CommandResult {
31+
pub struct ExitStatus {
3132
pub success: bool,
32-
pub exit_code: Option<i32>,
33-
pub stdout: Vec<u8>,
34-
pub stderr: Vec<u8>,
33+
pub code: Option<i32>,
3534
}
3635

3736
impl<H: Host + 'static> Command<H> {
@@ -68,7 +67,12 @@ impl<H: Host + 'static> Command<H> {
6867
}
6968
}
7069

71-
pub fn exec(&mut self, handle: &Handle) -> Box<Future<Item = CommandResult, Error = Error>> {
70+
pub fn exec(&self, handle: &Handle) ->
71+
Box<Future<Item = (
72+
Box<Stream<Item = String, Error = Error>>,
73+
Box<Future<Item = ExitStatus, Error = Error>>
74+
), Error = Error>>
75+
{
7276
self.inner.exec(&self.host, handle, &self.cmd, &self.shell)
7377
}
7478
}

0 commit comments

Comments
 (0)