Skip to content

Commit a7d2512

Browse files
committed
Ported to tokio-process in preparation for streaming API
1 parent 17706bd commit a7d2512

File tree

16 files changed

+98
-51
lines changed

16 files changed

+98
-51
lines changed

agent/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ intecture_api = { version = "0.4.0", path = "../core" }
2424
serde = "1.0"
2525
serde_derive = "1.0"
2626
serde_json = "1.0"
27+
tokio-core = "0.1"
2728
tokio-proto = "0.1"
2829
tokio-service = "0.1"
2930
toml = "0.4"

agent/src/main.rs

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ extern crate futures;
1111
extern crate intecture_api;
1212
#[macro_use] extern crate serde_derive;
1313
extern crate serde_json;
14+
extern crate tokio_core;
1415
extern crate tokio_proto;
1516
extern crate tokio_service;
1617
extern crate toml;
@@ -25,11 +26,14 @@ use intecture_api::host::remote::JsonProto;
2526
use std::fs::File;
2627
use std::io::{self, Read};
2728
use std::net::SocketAddr;
29+
use std::sync::Arc;
30+
use tokio_core::reactor::Remote;
2831
use tokio_proto::TcpServer;
29-
use tokio_service::Service;
32+
use tokio_service::{NewService, Service};
3033

3134
pub struct Api {
3235
host: Local,
36+
remote: Remote,
3337
}
3438

3539
impl Service for Api {
@@ -49,7 +53,14 @@ impl Service for Api {
4953
io::ErrorKind::Other, e.description()
5054
))),
5155
};
52-
Box::new(runnable.exec(&self.host)
56+
57+
// XXX Danger zone! If we're running multiple threads, this `unwrap()`
58+
// will explode. The API requires a `Handle`, but we can only send a
59+
// `Remote` to this Service. Currently we force the `Handle`, which is
60+
// only safe for the current thread.
61+
// See https://github.com/alexcrichton/tokio-process/issues/23
62+
let handle = self.remote.handle().unwrap();
63+
Box::new(runnable.exec(&self.host, &handle)
5364
// @todo Can't wrap 'e' as error_chain Error doesn't derive Sync.
5465
// Waiting for https://github.com/rust-lang-nursery/error-chain/pull/163
5566
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.description()))
@@ -60,6 +71,19 @@ impl Service for Api {
6071
}
6172
}
6273

74+
impl NewService for Api {
75+
type Request = serde_json::Value;
76+
type Response = serde_json::Value;
77+
type Error = io::Error;
78+
type Instance = Api;
79+
fn new_service(&self) -> io::Result<Self::Instance> {
80+
Ok(Api {
81+
host: self.host.clone(),
82+
remote: self.remote.clone(),
83+
})
84+
}
85+
}
86+
6387
#[derive(Deserialize)]
6488
struct Config {
6589
address: SocketAddr,
@@ -100,7 +124,18 @@ quick_main!(|| -> Result<()> {
100124
};
101125

102126
let host = Local::new().wait()?;
127+
// XXX We can only run a single thread here, or big boom!!
128+
// The API requires a `Handle`, but we can only send a `Remote`.
129+
// Currently we force the issue (`unwrap()`), which is only safe
130+
// for the current thread.
131+
// See https://github.com/alexcrichton/tokio-process/issues/23
103132
let server = TcpServer::new(JsonProto, config.address);
104-
server.serve(move || Ok(Api { host: host.clone() }));
133+
server.with_handle(move |handle| {
134+
let api = Api {
135+
host: host.clone(),
136+
remote: handle.remote().clone(),
137+
};
138+
Arc::new(api)
139+
});
105140
Ok(())
106141
});

core/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ serde_derive = "1.0"
3030
serde_json = "1.0"
3131
tokio-core = "0.1"
3232
tokio-io = "0.1"
33+
tokio-process = "0.1"
3334
tokio-proto = "0.1"
3435
tokio-service = "0.1"
3536

core/src/command/mod.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use errors::*;
1212
use futures::{future, Future};
1313
use host::Host;
1414
use self::providers::CommandProvider;
15+
use tokio_core::reactor::Handle;
1516

1617
#[cfg(not(windows))]
1718
const DEFAULT_SHELL: [&'static str; 2] = ["/bin/sh", "-c"];
@@ -67,7 +68,7 @@ impl<H: Host + 'static> Command<H> {
6768
}
6869
}
6970

70-
pub fn exec(&mut self) -> Box<Future<Item = CommandResult, Error = Error>> {
71-
self.inner.exec(&self.host, &self.cmd, &self.shell)
71+
pub fn exec(&mut self, handle: &Handle) -> Box<Future<Item = CommandResult, Error = Error>> {
72+
self.inner.exec(&self.host, handle, &self.cmd, &self.shell)
7273
}
7374
}

core/src/command/providers/generic.rs

Lines changed: 22 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ use provider::Provider;
1515
use remote::{Executable, Runnable};
1616
use std::process;
1717
use super::{CommandProvider, CommandRunnable};
18+
use tokio_core::reactor::Handle;
19+
use tokio_process::CommandExt;
1820

1921
#[derive(Clone)]
2022
pub struct Generic;
@@ -50,9 +52,9 @@ impl<H: Host + 'static> Provider<H> for Generic {
5052
}
5153

5254
impl<H: Host + 'static> CommandProvider<H> for Generic {
53-
fn exec(&self, host: &H, cmd: &str, shell: &[String]) -> Box<Future<Item = CommandResult, Error = Error>> {
55+
fn exec(&self, host: &H, handle: &Handle, cmd: &str, shell: &[String]) -> Box<Future<Item = CommandResult, Error = Error>> {
5456
match host.get_type() {
55-
HostType::Local(_) => LocalGeneric::exec(cmd, shell),
57+
HostType::Local(_) => LocalGeneric::exec(handle, cmd, shell),
5658
HostType::Remote(r) => RemoteGeneric::exec(r, cmd, shell),
5759
}
5860
}
@@ -63,31 +65,27 @@ impl LocalGeneric {
6365
Box::new(future::ok(cfg!(unix)))
6466
}
6567

66-
fn exec(cmd: &str, shell: &[String]) -> Box<Future<Item = CommandResult, Error = Error>> {
67-
let cmd_owned = cmd.to_owned();
68-
let shell_owned = shell.to_owned();
68+
fn exec(handle: &Handle, cmd: &str, shell: &[String]) -> Box<Future<Item = CommandResult, Error = Error>> {
69+
let cmd = cmd.to_owned();
70+
let shell = shell.to_owned();
71+
let (shell, shell_args) = match shell.split_first() {
72+
Some((s, a)) => (s, a),
73+
None => return Box::new(future::err("Invalid shell provided".into())),
74+
};
6975

70-
Box::new(future::lazy(move || -> future::FutureResult<CommandResult, Error> {
71-
let (shell, shell_args) = match shell_owned.split_first() {
72-
Some((s, a)) => (s, a),
73-
None => return future::err("Invalid shell provided".into()),
74-
};
75-
76-
let out = process::Command::new(shell)
77-
.args(shell_args)
78-
.arg(&cmd_owned)
79-
.output()
80-
.chain_err(|| "Command execution failed");
81-
match out {
82-
Ok(output) => future::ok(CommandResult {
76+
Box::new(process::Command::new(shell)
77+
.args(shell_args)
78+
.arg(&cmd)
79+
.output_async(handle)
80+
.chain_err(|| "Command execution failed")
81+
.and_then(|output| {
82+
future::ok(CommandResult {
8383
success: output.status.success(),
8484
exit_code: output.status.code(),
8585
stdout: output.stdout,
8686
stderr: output.stderr,
87-
}),
88-
Err(e) => future::err(e),
89-
}
90-
}))
87+
})
88+
}))
9189
}
9290
}
9391

@@ -110,10 +108,10 @@ impl RemoteGeneric {
110108
}
111109

112110
impl Executable for GenericRunnable {
113-
fn exec(self, _: &Local) -> Box<Future<Item = Box<Serialize>, Error = Error>> {
111+
fn exec(self, _: &Local, handle: &Handle) -> Box<Future<Item = Box<Serialize>, Error = Error>> {
114112
match self {
115113
GenericRunnable::Available => Box::new(LocalGeneric::available().map(|b| Box::new(b) as Box<Serialize>)),
116-
GenericRunnable::Exec(cmd, shell) => Box::new(LocalGeneric::exec(&cmd, &shell).map(|r| Box::new(r) as Box<Serialize>)),
114+
GenericRunnable::Exec(cmd, shell) => Box::new(LocalGeneric::exec(handle, &cmd, &shell).map(|r| Box::new(r) as Box<Serialize>)),
117115
}
118116
}
119117
}

core/src/command/providers/mod.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,10 @@ use provider::Provider;
1515
use remote::Executable;
1616
pub use self::generic::{Generic, GenericRunnable};
1717
use super::CommandResult;
18+
use tokio_core::reactor::Handle;
1819

1920
pub trait CommandProvider<H: Host>: Provider<H> {
20-
fn exec(&self, &H, &str, &[String]) -> Box<Future<Item = CommandResult, Error = Error>>;
21+
fn exec(&self, &H, &Handle, &str, &[String]) -> Box<Future<Item = CommandResult, Error = Error>>;
2122
}
2223

2324
#[doc(hidden)]
@@ -27,9 +28,9 @@ pub enum CommandRunnable {
2728
}
2829

2930
impl Executable for CommandRunnable {
30-
fn exec(self, host: &Local) -> Box<Future<Item = Box<Serialize>, Error = Error>> {
31+
fn exec(self, host: &Local, handle: &Handle) -> Box<Future<Item = Box<Serialize>, Error = Error>> {
3132
match self {
32-
CommandRunnable::Generic(p) => p.exec(host)
33+
CommandRunnable::Generic(p) => p.exec(host, handle)
3334
}
3435
}
3536
}

core/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ extern crate serde;
2525
extern crate serde_json;
2626
extern crate tokio_core;
2727
extern crate tokio_io;
28+
extern crate tokio_process;
2829
extern crate tokio_proto;
2930
extern crate tokio_service;
3031

core/src/remote.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,10 @@ use errors::*;
1010
use futures::Future;
1111
use host::local::Local;
1212
use telemetry::providers::TelemetryRunnable;
13+
use tokio_core::reactor::Handle;
1314

1415
pub trait Executable {
15-
fn exec(self, &Local) -> Box<Future<Item = Box<Serialize>, Error = Error>>;
16+
fn exec(self, &Local, &Handle) -> Box<Future<Item = Box<Serialize>, Error = Error>>;
1617
}
1718

1819
#[derive(Serialize, Deserialize)]
@@ -22,10 +23,10 @@ pub enum Runnable {
2223
}
2324

2425
impl Executable for Runnable {
25-
fn exec(self, host: &Local) -> Box<Future<Item = Box<Serialize>, Error = Error>> {
26+
fn exec(self, host: &Local, handle: &Handle) -> Box<Future<Item = Box<Serialize>, Error = Error>> {
2627
match self {
27-
Runnable::Command(p) => p.exec(host),
28-
Runnable::Telemetry(p) => p.exec(host),
28+
Runnable::Command(p) => p.exec(host, handle),
29+
Runnable::Telemetry(p) => p.exec(host, handle),
2930
}
3031
}
3132
}

core/src/telemetry/providers/centos.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use super::{TelemetryProvider, TelemetryRunnable};
1818
use target::{default, linux, redhat};
1919
use target::linux::LinuxFlavour;
2020
use telemetry::{Cpu, Os, OsFamily, OsPlatform, Telemetry, serializable};
21+
use tokio_core::reactor::Handle;
2122

2223
pub struct Centos;
2324
struct LocalCentos;
@@ -93,7 +94,7 @@ impl RemoteCentos {
9394
}
9495

9596
impl Executable for CentosRunnable {
96-
fn exec(self, _: &Local) -> Box<Future<Item = Box<Serialize>, Error = Error>> {
97+
fn exec(self, _: &Local, _: &Handle) -> Box<Future<Item = Box<Serialize>, Error = Error>> {
9798
match self {
9899
CentosRunnable::Available => Box::new(LocalCentos::available().map(|b| Box::new(b) as Box<Serialize>)),
99100
CentosRunnable::Load => Box::new(LocalCentos::load().map(|t| {

core/src/telemetry/providers/debian.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use super::{TelemetryProvider, TelemetryRunnable};
1818
use target::{default, linux};
1919
use target::linux::LinuxFlavour;
2020
use telemetry::{Cpu, Os, OsFamily, OsPlatform, Telemetry, serializable};
21+
use tokio_core::reactor::Handle;
2122

2223
pub struct Debian;
2324
struct LocalDebian;
@@ -95,7 +96,7 @@ impl RemoteDebian {
9596
}
9697

9798
impl Executable for DebianRunnable {
98-
fn exec(self, _: &Local) -> Box<Future<Item = Box<Serialize>, Error = Error>> {
99+
fn exec(self, _: &Local, _: &Handle) -> Box<Future<Item = Box<Serialize>, Error = Error>> {
99100
match self {
100101
DebianRunnable::Available => Box::new(LocalDebian::available().map(|b| Box::new(b) as Box<Serialize>)),
101102
DebianRunnable::Load => Box::new(LocalDebian::load().map(|t| {

core/src/telemetry/providers/fedora.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use super::{TelemetryProvider, TelemetryRunnable};
1818
use target::{default, linux, redhat};
1919
use target::linux::LinuxFlavour;
2020
use telemetry::{Cpu, Os, OsFamily, OsPlatform, Telemetry, serializable};
21+
use tokio_core::reactor::Handle;
2122

2223
pub struct Fedora;
2324
struct LocalFedora;
@@ -95,7 +96,7 @@ impl RemoteFedora {
9596
}
9697

9798
impl Executable for FedoraRunnable {
98-
fn exec(self, _: &Local) -> Box<Future<Item = Box<Serialize>, Error = Error>> {
99+
fn exec(self, _: &Local, _: &Handle) -> Box<Future<Item = Box<Serialize>, Error = Error>> {
99100
match self {
100101
FedoraRunnable::Available => Box::new(LocalFedora::available().map(|b| Box::new(b) as Box<Serialize>)),
101102
FedoraRunnable::Load => Box::new(LocalFedora::load().map(|t| {

core/src/telemetry/providers/freebsd.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use std::io::Read;
1919
use super::{TelemetryProvider, TelemetryRunnable};
2020
use target::{default, unix};
2121
use telemetry::{Cpu, Os, OsFamily, OsPlatform, Telemetry, serializable};
22+
use tokio_core::reactor::Handle;
2223

2324
pub struct Freebsd;
2425
struct LocalFreebsd;
@@ -96,7 +97,7 @@ impl RemoteFreebsd {
9697
}
9798

9899
impl Executable for FreebsdRunnable {
99-
fn exec(self, _: &Local) -> Box<Future<Item = Box<Serialize>, Error = Error>> {
100+
fn exec(self, _: &Local, _: &Handle) -> Box<Future<Item = Box<Serialize>, Error = Error>> {
100101
match self {
101102
FreebsdRunnable::Available => Box::new(LocalFreebsd::available().map(|b| Box::new(b) as Box<Serialize>)),
102103
FreebsdRunnable::Load => Box::new(LocalFreebsd::load().map(|t| {

core/src/telemetry/providers/macos.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use std::{env, process, str};
1717
use super::{TelemetryProvider, TelemetryRunnable};
1818
use target::{default, unix};
1919
use telemetry::{Cpu, Os, OsFamily, OsPlatform, Telemetry, serializable};
20+
use tokio_core::reactor::Handle;
2021

2122
pub struct Macos;
2223
struct LocalMacos;
@@ -94,7 +95,7 @@ impl RemoteMacos {
9495
}
9596

9697
impl Executable for MacosRunnable {
97-
fn exec(self, _: &Local) -> Box<Future<Item = Box<Serialize>, Error = Error>> {
98+
fn exec(self, _: &Local, _: &Handle) -> Box<Future<Item = Box<Serialize>, Error = Error>> {
9899
match self {
99100
MacosRunnable::Available => Box::new(LocalMacos::available().map(|b| Box::new(b) as Box<Serialize>)),
100101
MacosRunnable::Load => Box::new(LocalMacos::load().map(|t| {

core/src/telemetry/providers/mod.rs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use host::local::Local;
2828
use provider::Provider;
2929
use remote::Executable;
3030
use super::Telemetry;
31+
use tokio_core::reactor::Handle;
3132

3233
pub trait TelemetryProvider<H: Host>: Provider<H> {
3334
fn load(&self, host: &H) -> Box<Future<Item = Telemetry, Error = Error>>;
@@ -46,15 +47,15 @@ pub enum TelemetryRunnable {
4647
}
4748

4849
impl Executable for TelemetryRunnable {
49-
fn exec(self, host: &Local) -> Box<Future<Item = Box<Serialize>, Error = Error>> {
50+
fn exec(self, host: &Local, handle: &Handle) -> Box<Future<Item = Box<Serialize>, Error = Error>> {
5051
match self {
51-
TelemetryRunnable::Centos(p) => p.exec(host),
52-
TelemetryRunnable::Debian(p) => p.exec(host),
53-
TelemetryRunnable::Fedora(p) => p.exec(host),
54-
TelemetryRunnable::Freebsd(p) => p.exec(host),
55-
TelemetryRunnable::Macos(p) => p.exec(host),
56-
TelemetryRunnable::Nixos(p) => p.exec(host),
57-
TelemetryRunnable::Ubuntu(p) => p.exec(host),
52+
TelemetryRunnable::Centos(p) => p.exec(host, handle),
53+
TelemetryRunnable::Debian(p) => p.exec(host, handle),
54+
TelemetryRunnable::Fedora(p) => p.exec(host, handle),
55+
TelemetryRunnable::Freebsd(p) => p.exec(host, handle),
56+
TelemetryRunnable::Macos(p) => p.exec(host, handle),
57+
TelemetryRunnable::Nixos(p) => p.exec(host, handle),
58+
TelemetryRunnable::Ubuntu(p) => p.exec(host, handle),
5859
}
5960
}
6061
}

core/src/telemetry/providers/nixos.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use super::{TelemetryProvider, TelemetryRunnable};
1818
use target::{default, linux};
1919
use target::linux::LinuxFlavour;
2020
use telemetry::{Cpu, Os, OsFamily, OsPlatform, Telemetry, serializable};
21+
use tokio_core::reactor::Handle;
2122

2223
pub struct Nixos;
2324
struct LocalNixos;
@@ -95,7 +96,7 @@ impl RemoteNixos {
9596
}
9697

9798
impl Executable for NixosRunnable {
98-
fn exec(self, _: &Local) -> Box<Future<Item = Box<Serialize>, Error = Error>> {
99+
fn exec(self, _: &Local, _: &Handle) -> Box<Future<Item = Box<Serialize>, Error = Error>> {
99100
match self {
100101
NixosRunnable::Available => Box::new(LocalNixos::available().map(|b| Box::new(b) as Box<Serialize>)),
101102
NixosRunnable::Load => Box::new(LocalNixos::load().map(|t| {

0 commit comments

Comments
 (0)