Skip to content

Commit 3bf1672

Browse files
committed
WIP - Remote host support with Tokio
1 parent b077c9a commit 3bf1672

File tree

4 files changed

+121
-19
lines changed

4 files changed

+121
-19
lines changed

core/Cargo.toml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,18 @@ include = ["Cargo.toml", "src/**/*.rs", "README.md", "LICENSE"]
1616
travis-ci = { repository = "intecture/api" }
1717

1818
[dependencies]
19+
bytes = "0.4"
1920
erased-serde = "0.3"
2021
error-chain = "0.11"
22+
futures = "0.1"
2123
hostname = "0.1"
2224
ipnetwork = "0.12"
2325
pnet = "0.19"
2426
regex = "0.2"
2527
serde = "1.0"
2628
serde_derive = "1.0"
2729
serde_json = "1.0"
30+
tokio-core = "0.1"
31+
tokio-io = "0.1"
32+
tokio-proto = "0.1"
33+
tokio-service = "0.1"

core/src/host.rs

Lines changed: 107 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -6,41 +6,70 @@
66

77
//! Manages the connection between the API and your host.
88
9+
use bytes::{BufMut, BytesMut};
910
use errors::*;
11+
use futures::Future;
12+
use RemoteProvider;
13+
use serde::Deserialize;
14+
use serde_json;
15+
use std::{io, result};
16+
use std::net::SocketAddr;
1017
use telemetry::{self, Telemetry};
18+
use tokio_core::net::TcpStream;
19+
use tokio_core::reactor::Handle;
20+
use tokio_io::{AsyncRead, AsyncWrite};
21+
use tokio_io::codec::{Encoder, Decoder, Framed};
22+
use tokio_proto::pipeline::{ClientProto, ClientService};
23+
use tokio_proto::TcpClient;
24+
use tokio_service::Service;
1125

1226
pub struct Host {
1327
telemetry: Option<Telemetry>,
14-
sock: Option<u32>,
1528
}
1629

30+
pub struct RemoteHost {
31+
inner: ClientService<TcpStream, JsonProto>,
32+
telemetry: Option<Telemetry>,
33+
}
34+
35+
struct JsonCodec;
36+
struct JsonProto;
37+
1738
impl Host {
1839
/// Create a new Host targeting the local machine.
1940
pub fn local() -> Result<Host> {
2041
let mut host = Host {
2142
telemetry: None,
22-
sock: None
2343
};
2444

2545
host.telemetry = Some(telemetry::load(&host)?);
2646

2747
Ok(host)
2848
}
2949

50+
/// Retrieve Telemetry
51+
pub fn telemetry(&self) -> &Telemetry {
52+
self.telemetry.as_ref().unwrap()
53+
}
54+
55+
pub fn is_local(&self) -> bool {
56+
true
57+
}
58+
}
59+
60+
impl RemoteHost {
3061
/// Create a new Host connected to addr.
31-
pub fn connect(_addr: &str) -> Result<Host> {
32-
unimplemented!();
33-
// @todo Connect to sock...
62+
pub fn connect(addr: &SocketAddr, handle: &Handle) -> Box<Future<Item = RemoteHost, Error = io::Error>> {
63+
let ret = TcpClient::new(JsonProto)
64+
.connect(addr, handle)
65+
.map(|client_service| {
66+
RemoteHost {
67+
inner: client_service,
68+
telemetry: None,
69+
}
70+
});
3471

35-
// let mut host = Host {
36-
// telemetry: None,
37-
// sock: None
38-
// };
39-
//
40-
// let provider = telemetry::factory(&host)?;
41-
// host.telemetry = Some(provider.load()?);
42-
//
43-
// Ok(host)
72+
Box::new(ret)
4473
}
4574

4675
/// Retrieve Telemetry
@@ -49,6 +78,69 @@ impl Host {
4978
}
5079

5180
pub fn is_local(&self) -> bool {
52-
self.sock.is_none()
81+
false
82+
}
83+
84+
pub fn run<T>(&self, provider: RemoteProvider) -> Box<Future<Item = T, Error = io::Error>>
85+
where for<'de> T: Deserialize<'de>
86+
{
87+
Box::new(self.inner.call(provider)
88+
.map(|v| serde_json::from_value::<T>(v).unwrap()))
89+
}
90+
}
91+
92+
impl Service for RemoteHost {
93+
type Request = RemoteProvider;
94+
type Response = serde_json::Value;
95+
type Error = io::Error;
96+
type Future = Box<Future<Item = Self::Response, Error = Self::Error>>;
97+
98+
fn call(&self, req: Self::Request) -> Self::Future {
99+
Box::new(self.inner.call(req))
100+
}
101+
}
102+
103+
impl Decoder for JsonCodec {
104+
type Item = serde_json::Value;
105+
type Error = io::Error;
106+
107+
fn decode(&mut self, buf: &mut BytesMut) -> result::Result<Option<Self::Item>, Self::Error> {
108+
// Check to see if the frame contains a new line
109+
if let Some(n) = buf.as_ref().iter().position(|b| *b == b'\n') {
110+
// remove the serialized frame from the buffer.
111+
let line = buf.split_to(n);
112+
113+
// Also remove the '\n'
114+
buf.split_to(1);
115+
116+
return Ok(Some(serde_json::from_slice(&line).unwrap()));
117+
}
118+
119+
Ok(None)
120+
}
121+
}
122+
123+
impl Encoder for JsonCodec {
124+
type Item = RemoteProvider;
125+
type Error = io::Error;
126+
127+
fn encode(&mut self, provider: Self::Item, buf: &mut BytesMut) -> io::Result<()> {
128+
let json = serde_json::to_string(&provider).unwrap();
129+
buf.reserve(json.len() + 1);
130+
buf.extend(json.as_bytes());
131+
buf.put_u8(b'\n');
132+
133+
Ok(())
134+
}
135+
}
136+
137+
impl<T: AsyncRead + AsyncWrite + 'static> ClientProto<T> for JsonProto {
138+
type Request = RemoteProvider;
139+
type Response = serde_json::Value;
140+
type Transport = Framed<T, JsonCodec>;
141+
type BindTransport = result::Result<Self::Transport, io::Error>;
142+
143+
fn bind_transport(&self, io: T) -> Self::BindTransport {
144+
Ok(io.framed(JsonCodec))
53145
}
54146
}

core/src/lib.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,21 @@
1111
1212
#![recursion_limit = "1024"]
1313

14+
extern crate bytes;
1415
extern crate erased_serde;
1516
#[macro_use] extern crate error_chain;
17+
extern crate futures;
1618
extern crate hostname;
1719
extern crate ipnetwork;
1820
extern crate pnet;
1921
extern crate regex;
2022
extern crate serde;
2123
#[macro_use] extern crate serde_derive;
2224
extern crate serde_json;
25+
extern crate tokio_core;
26+
extern crate tokio_io;
27+
extern crate tokio_proto;
28+
extern crate tokio_service;
2329

2430
pub mod command;
2531
pub mod errors;

core/src/telemetry/providers/macos.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,8 @@ impl TelemetryProvider for Macos {
4040
cfg!(target_os="macos")
4141
} else {
4242
unimplemented!();
43-
// let r = RemoteProvider::Available;
44-
// self.host.send(r).chain_err(|| ErrorKind::RemoteProvider("Telemetry", "available"))?;
45-
// let t: Telemetry = self.host.recv()?;
46-
// Ok(t)
43+
// host.call(RemoteProvider::Available)
44+
// .chain_err(|| ErrorKind::RemoteProvider { endpoint: "Telemetry", func: "available" })?
4745
}
4846
}
4947

0 commit comments

Comments
 (0)