Skip to content

Commit 3e9fcf0

Browse files
authored
Merge pull request #45 from petehayes102/master
Restructure endpoints to pave way for streaming API
2 parents 294677e + de48a23 commit 3e9fcf0

File tree

23 files changed

+944
-555
lines changed

23 files changed

+944
-555
lines changed

agent/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ intecture_api = { version = "0.4.0", path = "../core" }
2424
serde = "1.0"
2525
serde_derive = "1.0"
2626
serde_json = "1.0"
27+
tokio-core = "0.1"
2728
tokio-proto = "0.1"
2829
tokio-service = "0.1"
2930
toml = "0.4"

agent/src/main.rs

Lines changed: 44 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ extern crate futures;
1111
extern crate intecture_api;
1212
#[macro_use] extern crate serde_derive;
1313
extern crate serde_json;
14+
extern crate tokio_core;
1415
extern crate tokio_proto;
1516
extern crate tokio_service;
1617
extern crate toml;
@@ -19,15 +20,21 @@ mod errors;
1920

2021
use errors::*;
2122
use futures::{future, Future};
22-
use intecture_api::{Executable, Runnable};
23+
use intecture_api::remote::{Executable, Runnable};
24+
use intecture_api::host::local::Local;
2325
use intecture_api::host::remote::JsonProto;
2426
use std::fs::File;
2527
use std::io::{self, Read};
2628
use std::net::SocketAddr;
29+
use std::sync::Arc;
30+
use tokio_core::reactor::Remote;
2731
use tokio_proto::TcpServer;
28-
use tokio_service::Service;
32+
use tokio_service::{NewService, Service};
2933

30-
pub struct Api;
34+
pub struct Api {
35+
host: Local,
36+
remote: Remote,
37+
}
3138

3239
impl Service for Api {
3340
type Request = serde_json::Value;
@@ -46,7 +53,14 @@ impl Service for Api {
4653
io::ErrorKind::Other, e.description()
4754
))),
4855
};
49-
Box::new(runnable.exec()
56+
57+
// XXX Danger zone! If we're running multiple threads, this `unwrap()`
58+
// will explode. The API requires a `Handle`, but we can only send a
59+
// `Remote` to this Service. Currently we force the `Handle`, which is
60+
// only safe for the current thread.
61+
// See https://github.com/alexcrichton/tokio-process/issues/23
62+
let handle = self.remote.handle().unwrap();
63+
Box::new(runnable.exec(&self.host, &handle)
5064
// @todo Can't wrap 'e' as error_chain Error doesn't derive Sync.
5165
// Waiting for https://github.com/rust-lang-nursery/error-chain/pull/163
5266
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.description()))
@@ -57,6 +71,19 @@ impl Service for Api {
5771
}
5872
}
5973

74+
impl NewService for Api {
75+
type Request = serde_json::Value;
76+
type Response = serde_json::Value;
77+
type Error = io::Error;
78+
type Instance = Api;
79+
fn new_service(&self) -> io::Result<Self::Instance> {
80+
Ok(Api {
81+
host: self.host.clone(),
82+
remote: self.remote.clone(),
83+
})
84+
}
85+
}
86+
6087
#[derive(Deserialize)]
6188
struct Config {
6289
address: SocketAddr,
@@ -96,7 +123,19 @@ quick_main!(|| -> Result<()> {
96123
Config { address }
97124
};
98125

126+
let host = Local::new().wait()?;
127+
// XXX We can only run a single thread here, or big boom!!
128+
// The API requires a `Handle`, but we can only send a `Remote`.
129+
// Currently we force the issue (`unwrap()`), which is only safe
130+
// for the current thread.
131+
// See https://github.com/alexcrichton/tokio-process/issues/23
99132
let server = TcpServer::new(JsonProto, config.address);
100-
server.serve(|| Ok(Api));
133+
server.with_handle(move |handle| {
134+
let api = Api {
135+
host: host.clone(),
136+
remote: handle.remote().clone(),
137+
};
138+
Arc::new(api)
139+
});
101140
Ok(())
102141
});

core/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ serde_derive = "1.0"
3030
serde_json = "1.0"
3131
tokio-core = "0.1"
3232
tokio-io = "0.1"
33+
tokio-process = "0.1"
3334
tokio-proto = "0.1"
3435
tokio-service = "0.1"
3536

core/examples/basic.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,16 @@ use tokio_core::reactor::Core;
1414

1515
fn main() {
1616
let mut core = Core::new().unwrap();
17+
let handle = core.handle();
1718

1819
let host = Local::new().and_then(|host| {
19-
command::factory(&host, "whoami", None).and_then(|mut cmd| {
20-
cmd.exec().map(|out| {
20+
Command::new(&host, "whoami", None).and_then(|mut cmd| {
21+
cmd.exec(&handle).map(|out| {
2122
println!("I'm currently running as {}", String::from_utf8_lossy(&out.stdout).trim());
2223
})
2324
})
2425
});
2526

2627
core.run(host).unwrap();
2728
}
29+

core/src/command/mod.rs

Lines changed: 43 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -8,29 +8,20 @@
88
99
pub mod providers;
1010

11-
use erased_serde::Serialize;
1211
use errors::*;
13-
use Executable;
1412
use futures::{future, Future};
1513
use host::Host;
16-
use self::providers::{Nix, NixRunnable};
17-
use std::sync::Arc;
14+
use self::providers::CommandProvider;
15+
use tokio_core::reactor::Handle;
1816

19-
pub trait CommandProvider<H: Host> {
20-
fn available(&Arc<H>) -> Box<Future<Item = bool, Error = Error>> where Self: Sized;
21-
fn try_new(&Arc<H>, &str, Option<&[&str]>) -> Box<Future<Item = Option<Self>, Error = Error>> where Self: Sized;
22-
fn exec(&mut self) -> Box<Future<Item = CommandResult, Error = Error>>;
23-
}
24-
25-
#[doc(hidden)]
26-
#[derive(Serialize, Deserialize)]
27-
pub enum CommandRunnable {
28-
Nix(NixRunnable)
29-
}
17+
#[cfg(not(windows))]
18+
const DEFAULT_SHELL: [&'static str; 2] = ["/bin/sh", "-c"];
19+
#[cfg(windows)]
20+
const DEFAULT_SHELL: [&'static str; 1] = ["yeah...we don't currently support windows :("];
3021

31-
#[doc(hidden)]
32-
#[derive(Clone, Serialize, Deserialize)]
33-
pub struct Command {
22+
pub struct Command<H: Host> {
23+
host: H,
24+
inner: Box<CommandProvider<H>>,
3425
shell: Vec<String>,
3526
cmd: String,
3627
}
@@ -43,18 +34,41 @@ pub struct CommandResult {
4334
pub stderr: Vec<u8>,
4435
}
4536

46-
impl Executable for CommandRunnable {
47-
fn exec(self) -> Box<Future<Item = Box<Serialize>, Error = Error>> {
48-
match self {
49-
CommandRunnable::Nix(p) => p.exec()
37+
impl<H: Host + 'static> Command<H> {
38+
pub fn new(host: &H, cmd: &str, shell: Option<&[&str]>) -> Box<Future<Item = Command<H>, Error = Error>> {
39+
let host = host.clone();
40+
let cmd = cmd.to_owned();
41+
let shell: Vec<String> = shell.unwrap_or(&DEFAULT_SHELL)
42+
.to_owned()
43+
.iter()
44+
.map(|s| s.to_string())
45+
.collect();
46+
47+
Box::new(host.command_provider()
48+
.and_then(|provider| future::ok(Command {
49+
host: host,
50+
inner: provider,
51+
shell: shell,
52+
cmd: cmd,
53+
})))
54+
}
55+
56+
pub fn with_provider<P>(host: &H, provider: P, cmd: &str, shell: Option<&[&str]>) -> Command<H>
57+
where P: CommandProvider<H> + 'static
58+
{
59+
Command {
60+
host: host.clone(),
61+
inner: Box::new(provider),
62+
shell: shell.unwrap_or(&DEFAULT_SHELL)
63+
.to_owned()
64+
.iter()
65+
.map(|s| s.to_string())
66+
.collect(),
67+
cmd: cmd.into(),
5068
}
5169
}
52-
}
5370

54-
pub fn factory<H: Host + 'static>(host: &Arc<H>, cmd: &str, shell: Option<&[&str]>) -> Box<Future<Item = Box<CommandProvider<H>>, Error = Error>> {
55-
Box::new(Nix::try_new(host, cmd, shell)
56-
.and_then(|opt| match opt {
57-
Some(provider) => future::ok(Box::new(provider) as Box<CommandProvider<H>>),
58-
None => future::err(ErrorKind::ProviderUnavailable("Command").into())
59-
}))
71+
pub fn exec(&mut self, handle: &Handle) -> Box<Future<Item = CommandResult, Error = Error>> {
72+
self.inner.exec(&self.host, handle, &self.cmd, &self.shell)
73+
}
6074
}

core/src/command/providers/generic.rs

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
// Copyright 2015-2017 Intecture Developers.
2+
//
3+
// Licensed under the Mozilla Public License 2.0 <LICENSE or
4+
// https://www.tldrlegal.com/l/mpl-2.0>. This file may not be copied,
5+
// modified, or distributed except according to those terms.
6+
7+
use command::CommandResult;
8+
use erased_serde::Serialize;
9+
use errors::*;
10+
use futures::{future, Future};
11+
use host::{Host, HostType};
12+
use host::local::Local;
13+
use host::remote::Plain;
14+
use provider::Provider;
15+
use remote::{Executable, Runnable};
16+
use std::process;
17+
use super::{CommandProvider, CommandRunnable};
18+
use tokio_core::reactor::Handle;
19+
use tokio_process::CommandExt;
20+
21+
#[derive(Clone)]
22+
pub struct Generic;
23+
struct LocalGeneric;
24+
struct RemoteGeneric;
25+
26+
#[doc(hidden)]
27+
#[derive(Serialize, Deserialize)]
28+
pub enum GenericRunnable {
29+
Available,
30+
Exec(String, Vec<String>),
31+
}
32+
33+
impl<H: Host + 'static> Provider<H> for Generic {
34+
fn available(host: &H) -> Box<Future<Item = bool, Error = Error>> {
35+
match host.get_type() {
36+
HostType::Local(_) => LocalGeneric::available(),
37+
HostType::Remote(r) => RemoteGeneric::available(r),
38+
}
39+
}
40+
41+
fn try_new(host: &H) -> Box<Future<Item = Option<Generic>, Error = Error>> {
42+
let host = host.clone();
43+
Box::new(Self::available(&host)
44+
.and_then(|available| {
45+
if available {
46+
future::ok(Some(Generic))
47+
} else {
48+
future::ok(None)
49+
}
50+
}))
51+
}
52+
}
53+
54+
impl<H: Host + 'static> CommandProvider<H> for Generic {
55+
fn exec(&self, host: &H, handle: &Handle, cmd: &str, shell: &[String]) -> Box<Future<Item = CommandResult, Error = Error>> {
56+
match host.get_type() {
57+
HostType::Local(_) => LocalGeneric::exec(handle, cmd, shell),
58+
HostType::Remote(r) => RemoteGeneric::exec(r, cmd, shell),
59+
}
60+
}
61+
}
62+
63+
impl LocalGeneric {
64+
fn available() -> Box<Future<Item = bool, Error = Error>> {
65+
Box::new(future::ok(cfg!(unix)))
66+
}
67+
68+
fn exec(handle: &Handle, cmd: &str, shell: &[String]) -> Box<Future<Item = CommandResult, Error = Error>> {
69+
let cmd = cmd.to_owned();
70+
let shell = shell.to_owned();
71+
let (shell, shell_args) = match shell.split_first() {
72+
Some((s, a)) => (s, a),
73+
None => return Box::new(future::err("Invalid shell provided".into())),
74+
};
75+
76+
Box::new(process::Command::new(shell)
77+
.args(shell_args)
78+
.arg(&cmd)
79+
.output_async(handle)
80+
.chain_err(|| "Command execution failed")
81+
.and_then(|output| {
82+
future::ok(CommandResult {
83+
success: output.status.success(),
84+
exit_code: output.status.code(),
85+
stdout: output.stdout,
86+
stderr: output.stderr,
87+
})
88+
}))
89+
}
90+
}
91+
92+
impl RemoteGeneric {
93+
fn available(host: &Plain) -> Box<Future<Item = bool, Error = Error>> {
94+
let runnable = Runnable::Command(
95+
CommandRunnable::Generic(
96+
GenericRunnable::Available));
97+
host.run(runnable)
98+
.chain_err(|| ErrorKind::Runnable { endpoint: "Command::Generic", func: "available" })
99+
}
100+
101+
fn exec(host: &Plain, cmd: &str, shell: &[String]) -> Box<Future<Item = CommandResult, Error = Error>> {
102+
let runnable = Runnable::Command(
103+
CommandRunnable::Generic(
104+
GenericRunnable::Exec(cmd.into(), shell.to_owned())));
105+
host.run(runnable)
106+
.chain_err(|| ErrorKind::Runnable { endpoint: "Command::Generic", func: "exec" })
107+
}
108+
}
109+
110+
impl Executable for GenericRunnable {
111+
fn exec(self, _: &Local, handle: &Handle) -> Box<Future<Item = Box<Serialize>, Error = Error>> {
112+
match self {
113+
GenericRunnable::Available => Box::new(LocalGeneric::available().map(|b| Box::new(b) as Box<Serialize>)),
114+
GenericRunnable::Exec(cmd, shell) => Box::new(LocalGeneric::exec(handle, &cmd, &shell).map(|r| Box::new(r) as Box<Serialize>)),
115+
}
116+
}
117+
}

core/src/command/providers/mod.rs

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,41 @@
44
// https://www.tldrlegal.com/l/mpl-2.0>. This file may not be copied,
55
// modified, or distributed except according to those terms.
66

7-
mod nix;
7+
mod generic;
88

9-
pub use self::nix::{Nix, NixRunnable};
9+
use errors::*;
10+
use erased_serde::Serialize;
11+
use futures::{future, Future};
12+
use host::Host;
13+
use host::local::Local;
14+
use provider::Provider;
15+
use remote::Executable;
16+
pub use self::generic::{Generic, GenericRunnable};
17+
use super::CommandResult;
18+
use tokio_core::reactor::Handle;
19+
20+
pub trait CommandProvider<H: Host>: Provider<H> {
21+
fn exec(&self, &H, &Handle, &str, &[String]) -> Box<Future<Item = CommandResult, Error = Error>>;
22+
}
23+
24+
#[doc(hidden)]
25+
#[derive(Serialize, Deserialize)]
26+
pub enum CommandRunnable {
27+
Generic(GenericRunnable)
28+
}
29+
30+
impl Executable for CommandRunnable {
31+
fn exec(self, host: &Local, handle: &Handle) -> Box<Future<Item = Box<Serialize>, Error = Error>> {
32+
match self {
33+
CommandRunnable::Generic(p) => p.exec(host, handle)
34+
}
35+
}
36+
}
37+
38+
pub fn factory<H: Host + 'static>(host: &H) -> Box<Future<Item = Box<CommandProvider<H>>, Error = Error>> {
39+
Box::new(Generic::try_new(host)
40+
.and_then(|opt| match opt {
41+
Some(provider) => future::ok(Box::new(provider) as Box<CommandProvider<H>>),
42+
None => future::err(ErrorKind::ProviderUnavailable("Command").into())
43+
}))
44+
}

0 commit comments

Comments
 (0)