Skip to content

Commit 6f0996e

Browse files
committed
Streaming Command and fixes for streaming protocol. Closes #40.
1 parent c63cd0c commit 6f0996e

File tree

18 files changed

+410
-159
lines changed

18 files changed

+410
-159
lines changed

agent/src/errors.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,42 @@
44
// https://www.tldrlegal.com/l/mpl-2.0>. This file may not be copied,
55
// modified, or distributed except according to those terms.
66

7+
use futures::Future;
78
use intecture_api;
9+
use std::{convert, error, io};
810

911
error_chain! {
1012
links {
1113
Api(intecture_api::errors::Error, intecture_api::errors::ErrorKind);
1214
}
1315
}
16+
17+
impl convert::From<Error> for io::Error {
18+
fn from(e: Error) -> io::Error {
19+
// @todo Return whole error chain
20+
io::Error::new(io::ErrorKind::Other, e.description())
21+
}
22+
}
23+
24+
// @todo This should disappear once Futures are officially supported
25+
// by error_chain.
26+
// See: https://github.com/rust-lang-nursery/error-chain/issues/90
27+
pub type SFuture<T> = Box<Future<Item = T, Error = Error>>;
28+
29+
pub trait FutureChainErr<T> {
30+
fn chain_err<F, E>(self, callback: F) -> SFuture<T>
31+
where F: FnOnce() -> E + 'static,
32+
E: Into<ErrorKind>;
33+
}
34+
35+
impl<F> FutureChainErr<F::Item> for F
36+
where F: Future + 'static,
37+
F::Error: error::Error + Send + 'static,
38+
{
39+
fn chain_err<C, E>(self, callback: C) -> SFuture<F::Item>
40+
where C: FnOnce() -> E + 'static,
41+
E: Into<ErrorKind>,
42+
{
43+
Box::new(self.then(|r| r.chain_err(callback)))
44+
}
45+
}

agent/src/main.rs

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@ mod errors;
2020

2121
use errors::*;
2222
use futures::{future, Future};
23-
use intecture_api::remote::{Executable, Runnable};
2423
use intecture_api::host::local::Local;
2524
use intecture_api::host::remote::{JsonLineProto, LineMessage};
25+
use intecture_api::remote::{Executable, Request};
2626
use std::fs::File;
2727
use std::io::{self, Read};
2828
use std::net::SocketAddr;
@@ -40,7 +40,7 @@ pub struct Api {
4040
impl Service for Api {
4141
type Request = LineMessage;
4242
type Response = LineMessage;
43-
type Error = io::Error;
43+
type Error = Error;
4444
type Future = Box<Future<Item = Self::Response, Error = Self::Error>>;
4545

4646
fn call(&self, req: Self::Request) -> Self::Future {
@@ -49,15 +49,9 @@ impl Service for Api {
4949
Message::WithoutBody(req) => req,
5050
};
5151

52-
let runnable: Runnable = match serde_json::from_value(req).chain_err(|| "Received invalid Runnable") {
52+
let request: Request = match serde_json::from_value(req).chain_err(|| "Received invalid Request") {
5353
Ok(r) => r,
54-
Err(e) => return Box::new(
55-
future::err(
56-
io::Error::new(
57-
// @todo Can't wrap 'e' as error_chain Error doesn't derive Sync.
58-
// Waiting for https://github.com/rust-lang-nursery/error-chain/pull/163
59-
io::ErrorKind::Other, e.description()
60-
))),
54+
Err(e) => return Box::new(future::err(e))
6155
};
6256

6357
// XXX Danger zone! If we're running multiple threads, this `unwrap()`
@@ -66,21 +60,25 @@ impl Service for Api {
6660
// only safe for the current thread.
6761
// See https://github.com/alexcrichton/tokio-process/issues/23
6862
let handle = self.remote.handle().unwrap();
69-
Box::new(runnable.exec(&self.host, &handle)
70-
// @todo Can't wrap 'e' as error_chain Error doesn't derive Sync.
71-
// Waiting for https://github.com/rust-lang-nursery/error-chain/pull/163
72-
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.description()))
73-
.and_then(|ser| match serde_json::to_value(ser).chain_err(|| "Could not serialize result") {
74-
Ok(v) => future::ok(Message::WithoutBody(v)),
75-
Err(e) => future::err(io::Error::new(io::ErrorKind::Other, e.description())),
63+
Box::new(request.exec(&self.host, &handle)
64+
.chain_err(|| "Failed to execute Request")
65+
.and_then(|mut msg| {
66+
let body = msg.take_body();
67+
match serde_json::to_value(msg.into_inner()).chain_err(|| "Could not serialize result") {
68+
Ok(v) => match body {
69+
Some(b) => future::ok(Message::WithBody(v, b)),
70+
None => future::ok(Message::WithoutBody(v)),
71+
},
72+
Err(e) => future::err(e),
73+
}
7674
}))
7775
}
7876
}
7977

8078
impl NewService for Api {
8179
type Request = LineMessage;
8280
type Response = LineMessage;
83-
type Error = io::Error;
81+
type Error = Error;
8482
type Instance = Api;
8583
fn new_service(&self) -> io::Result<Self::Instance> {
8684
Ok(Api {

core/examples/basic.rs

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,21 +8,58 @@ extern crate futures;
88
extern crate intecture_api;
99
extern crate tokio_core;
1010

11-
use futures::Future;
11+
use futures::{Future, Stream};
1212
use intecture_api::prelude::*;
1313
use tokio_core::reactor::Core;
1414

1515
fn main() {
16+
// These two lines are part of `tokio-core` and can be safely
17+
// ignored. So long as they appear at the top of your code,
18+
// all is fine with the world.
1619
let mut core = Core::new().unwrap();
1720
let handle = core.handle();
1821

22+
// Here's the meat of your project. In this example we're talking
23+
// to our local machine, so we use the `Local` host type.
1924
let host = Local::new().and_then(|host| {
25+
// Ok, we're in! Now we can pass our `host` handle to other
26+
// endpoints, which informs them of the server we mean to
27+
// talk to.
28+
29+
// Let's start with something basic - a shell command.
2030
Command::new(&host, "whoami", None).and_then(|cmd| {
21-
cmd.exec(&handle).map(|out| {
22-
println!("I'm currently running as {}", String::from_utf8_lossy(&out.stdout).trim());
31+
// Now that we have our `Command` instance, let's run it.
32+
cmd.exec(&handle).and_then(|(stream, status)| {
33+
// At this point, our command is running. As the API
34+
// is asynchronous, we don't have to wait for it to
35+
// finish before inspecting its output. This is called
36+
// "streaming".
37+
38+
// Our first argument, `stream`, is a stream of strings,
39+
// each of which represents a line of output. We can use
40+
// the `for_each` combinator to print these lines to
41+
// stdout.
42+
//
43+
// If printing isn't your thing, you are also
44+
// free to lick them or whatever you're into. I'm not
45+
// here to judge.
46+
stream.for_each(|line| { println!("{}", line); Ok(()) })
47+
48+
// The second argument is a `Future` that represents the
49+
// command's exit status. Let's print that too*.
50+
//
51+
// * Same caveat as above RE: printing. This is a safe
52+
// place.
53+
.join(status.map(|s| println!("This command {} {}",
54+
if s.success { "succeeded" } else { "failed" },
55+
if let Some(e) = s.code { format!("with code {}", e) } else { String::new() })))
2356
})
2457
})
2558
});
2659

60+
// This line is part of `tokio-core` and is used to execute the
61+
// chain of futures you've created above. You'll need to call
62+
// `core.run()` for each host you interact with, otherwise your
63+
// project will not run at all!
2764
core.run(host).unwrap();
2865
}

core/examples/remote_host.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,28 @@ use intecture_api::prelude::*;
1313
use tokio_core::reactor::Core;
1414

1515
fn main() {
16+
// These two lines are part of `tokio-core` and can be safely
17+
// ignored. So long as they appear at the top of your code,
18+
// all is fine with the world.
1619
let mut core = Core::new().unwrap();
1720
let handle = core.handle();
1821

22+
// Here's the meat of your project. In this example we're talking
23+
// to a remote machine. You'll note that this is the `Plain` host
24+
// type, where you might have been expecting `Remote` or some such.
25+
// This is to signify that this host type sends data in the clear,
26+
// rather than encrypting it. Thus the usual disclaimer about
27+
// secure networks and trust applies.
1928
let host = Plain::connect("127.0.0.1:7101", &handle).map(|host| {
29+
// Ok, we're in! Now we can pass our `host` handle to other
30+
// endpoints, which informs them of the server we mean to
31+
// talk to. See basic.rs for more usage.
2032
println!("Connected to {}", host.telemetry().hostname);
2133
});
2234

35+
// This line is part of `tokio-core` and is used to execute the
36+
// chain of futures you've created above. You'll need to call
37+
// `core.run()` for each host you interact with, otherwise your
38+
// project will not run at all!
2339
core.run(host).unwrap();
2440
}

core/src/command/mod.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ pub mod providers;
1010

1111
use errors::*;
1212
use futures::{future, Future};
13+
use futures::stream::Stream;
1314
use host::Host;
1415
use self::providers::CommandProvider;
1516
use tokio_core::reactor::Handle;
@@ -27,11 +28,9 @@ pub struct Command<H: Host> {
2728
}
2829

2930
#[derive(Debug, Serialize, Deserialize)]
30-
pub struct CommandResult {
31+
pub struct ExitStatus {
3132
pub success: bool,
32-
pub exit_code: Option<i32>,
33-
pub stdout: Vec<u8>,
34-
pub stderr: Vec<u8>,
33+
pub code: Option<i32>,
3534
}
3635

3736
impl<H: Host + 'static> Command<H> {
@@ -68,7 +67,12 @@ impl<H: Host + 'static> Command<H> {
6867
}
6968
}
7069

71-
pub fn exec(&self, handle: &Handle) -> Box<Future<Item = CommandResult, Error = Error>> {
70+
pub fn exec(&self, handle: &Handle) ->
71+
Box<Future<Item = (
72+
Box<Stream<Item = String, Error = Error>>,
73+
Box<Future<Item = ExitStatus, Error = Error>>
74+
), Error = Error>>
75+
{
7276
self.inner.exec(&self.host, handle, &self.cmd, &self.shell)
7377
}
7478
}

0 commit comments

Comments
 (0)