Skip to content

Commit 14f8417

Browse files
committed
Minimal Agent with issues - hardcoded bind address, no error handling
1 parent 15f5558 commit 14f8417

File tree

5 files changed

+91
-18
lines changed

5 files changed

+91
-18
lines changed

agent/Cargo.toml

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ description = "Tiny daemon that wraps the core API"
77
homepage = "https://intecture.io"
88
repository = "https://github.com/intecture/api"
99
documentation = "https://intecture.io/rust/inapi/"
10-
keywords = ["intecture", "remote"]
10+
keywords = ["intecture", "agent"]
1111
categories = ["servers"]
1212
readme = "README.md"
1313
include = ["Cargo.toml", "src/**/*.rs", "README.md", "LICENSE"]
@@ -16,8 +16,10 @@ include = ["Cargo.toml", "src/**/*.rs", "README.md", "LICENSE"]
1616
travis-ci = { repository = "intecture/api" }
1717

1818
[dependencies]
19+
env_logger = "0.4"
1920
error-chain = "0.11"
21+
futures = "0.1"
2022
intecture_api = { version = "0.4.0", path = "../core" }
21-
serde = "1.0"
22-
serde_derive = "1.0"
2323
serde_json = "1.0"
24+
tokio-proto = "0.1"
25+
tokio-service = "0.1"

agent/src/main.rs

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,59 @@
66

77
#![recursion_limit = "1024"]
88

9+
extern crate env_logger;
910
#[macro_use] extern crate error_chain;
11+
extern crate futures;
1012
extern crate intecture_api;
11-
extern crate serde;
12-
#[macro_use] extern crate serde_derive;
1313
extern crate serde_json;
14+
extern crate tokio_proto;
15+
extern crate tokio_service;
1416

1517
mod errors;
1618

1719
use errors::*;
20+
use futures::{future, Future};
21+
use intecture_api::{Executable, Runnable};
22+
use intecture_api::host::JsonProto;
23+
use std::io;
24+
use tokio_proto::TcpServer;
25+
use tokio_service::Service;
26+
27+
pub struct Api;
28+
29+
impl Service for Api {
30+
type Request = serde_json::Value;
31+
type Response = serde_json::Value;
32+
type Error = io::Error;
33+
type Future = Box<Future<Item = Self::Response, Error = Self::Error>>;
34+
35+
fn call(&self, req: Self::Request) -> Self::Future {
36+
let runnable: Runnable = match serde_json::from_value(req).chain_err(|| "Received invalid Runnable") {
37+
Ok(r) => r,
38+
Err(e) => return Box::new(
39+
future::err(
40+
io::Error::new(
41+
// @todo Can't wrap 'e' as error_chain Error doesn't derive Sync.
42+
// Waiting for https://github.com/rust-lang-nursery/error-chain/pull/163
43+
io::ErrorKind::Other, e.description()
44+
))),
45+
};
46+
Box::new(runnable.exec()
47+
// @todo Can't wrap 'e' as error_chain Error doesn't derive Sync.
48+
// Waiting for https://github.com/rust-lang-nursery/error-chain/pull/163
49+
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.description()))
50+
.and_then(|ser| match serde_json::to_value(ser).chain_err(|| "Could not serialize result") {
51+
Ok(v) => future::ok(v),
52+
Err(e) => future::err(io::Error::new(io::ErrorKind::Other, e.description())),
53+
}))
54+
}
55+
}
1856

1957
quick_main!(|| -> Result<()> {
58+
env_logger::init().chain_err(|| "Could not start logging")?;
59+
60+
let addr = "127.0.0.1:7101".parse().chain_err(|| "Invalid server address")?;
61+
let server = TcpServer::new(JsonProto, addr);
62+
server.serve(|| Ok(Api));
2063
Ok(())
2164
});

core/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ error-chain = "0.11"
2222
futures = "0.1"
2323
hostname = "0.1"
2424
ipnetwork = "0.12"
25+
log = "0.3"
2526
pnet = "0.19"
2627
regex = "0.2"
2728
serde = "1.0"

core/src/host.rs

Lines changed: 39 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use tokio_core::net::TcpStream;
2020
use tokio_core::reactor::Handle;
2121
use tokio_io::{AsyncRead, AsyncWrite};
2222
use tokio_io::codec::{Encoder, Decoder, Framed};
23-
use tokio_proto::pipeline::{ClientProto, ClientService};
23+
use tokio_proto::pipeline::{ClientProto, ClientService, ServerProto};
2424
use tokio_proto::TcpClient;
2525
use tokio_service::Service;
2626

@@ -78,16 +78,27 @@ pub struct RemoteHost {
7878
telemetry: Option<Telemetry>,
7979
}
8080

81-
struct JsonCodec;
82-
struct JsonProto;
81+
#[doc(hidden)]
82+
pub struct JsonCodec;
83+
#[doc(hidden)]
84+
pub struct JsonProto;
8385

8486
impl RemoteHost {
8587
/// Create a new Host connected to addr.
86-
pub fn connect(addr: &SocketAddr, handle: &Handle) -> Box<Future<Item = Arc<RemoteHost>, Error = Error>> {
88+
pub fn connect(addr: &str, handle: &Handle) -> Box<Future<Item = Arc<RemoteHost>, Error = Error>> {
89+
let addr: SocketAddr = match addr.parse().chain_err(|| "Invalid host address") {
90+
Ok(addr) => addr,
91+
Err(e) => return Box::new(future::err(e)),
92+
};
93+
94+
info!("Connecting to host {}", addr);
95+
8796
Box::new(TcpClient::new(JsonProto)
88-
.connect(addr, handle)
97+
.connect(&addr, handle)
8998
.chain_err(|| "Could not connect to host")
9099
.and_then(|client_service| {
100+
info!("Connected!");
101+
91102
let mut host = Arc::new(RemoteHost {
92103
inner: client_service,
93104
telemetry: None,
@@ -111,17 +122,21 @@ impl Host for RemoteHost {
111122
fn run<D: 'static>(&self, provider: Runnable) -> Box<Future<Item = D, Error = Error>>
112123
where for<'de> D: Deserialize<'de>
113124
{
114-
Box::new(self.inner.call(provider)
115-
.chain_err(|| "Could not run provider")
116-
.and_then(|v| match serde_json::from_value::<D>(v).chain_err(|| "Could not run provider") {
125+
let value = match serde_json::to_value(provider).chain_err(|| "Could not encode provider to send to host") {
126+
Ok(v) => v,
127+
Err(e) => return Box::new(future::err(e))
128+
};
129+
Box::new(self.inner.call(value)
130+
.chain_err(|| "Error while running provider on host")
131+
.and_then(|v| match serde_json::from_value::<D>(v).chain_err(|| "Could not understand response from host") {
117132
Ok(d) => future::ok(d),
118133
Err(e) => future::err(e)
119134
}))
120135
}
121136
}
122137

123138
impl Service for RemoteHost {
124-
type Request = Runnable;
139+
type Request = serde_json::Value;
125140
type Response = serde_json::Value;
126141
type Error = io::Error;
127142
type Future = Box<Future<Item = Self::Response, Error = Self::Error>>;
@@ -152,11 +167,11 @@ impl Decoder for JsonCodec {
152167
}
153168

154169
impl Encoder for JsonCodec {
155-
type Item = Runnable;
170+
type Item = serde_json::Value;
156171
type Error = io::Error;
157172

158-
fn encode(&mut self, provider: Self::Item, buf: &mut BytesMut) -> io::Result<()> {
159-
let json = serde_json::to_string(&provider).unwrap();
173+
fn encode(&mut self, value: Self::Item, buf: &mut BytesMut) -> io::Result<()> {
174+
let json = serde_json::to_string(&value).unwrap();
160175
buf.reserve(json.len() + 1);
161176
buf.extend(json.as_bytes());
162177
buf.put_u8(b'\n');
@@ -166,7 +181,18 @@ impl Encoder for JsonCodec {
166181
}
167182

168183
impl<T: AsyncRead + AsyncWrite + 'static> ClientProto<T> for JsonProto {
169-
type Request = Runnable;
184+
type Request = serde_json::Value;
185+
type Response = serde_json::Value;
186+
type Transport = Framed<T, JsonCodec>;
187+
type BindTransport = result::Result<Self::Transport, io::Error>;
188+
189+
fn bind_transport(&self, io: T) -> Self::BindTransport {
190+
Ok(io.framed(JsonCodec))
191+
}
192+
}
193+
194+
impl<T: AsyncRead + AsyncWrite + 'static> ServerProto<T> for JsonProto {
195+
type Request = serde_json::Value;
170196
type Response = serde_json::Value;
171197
type Transport = Framed<T, JsonCodec>;
172198
type BindTransport = result::Result<Self::Transport, io::Error>;

core/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ extern crate erased_serde;
1717
extern crate futures;
1818
extern crate hostname;
1919
extern crate ipnetwork;
20+
#[macro_use] extern crate log;
2021
extern crate pnet;
2122
extern crate regex;
2223
extern crate serde;

0 commit comments

Comments
 (0)