Skip to content
This repository was archived by the owner on Mar 11, 2021. It is now read-only.

Add support for unix sockets #47

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ pub use messages::merkle::*;
pub use messages::types::*;

use server::serve;
use server::serve_unix;

/// Main Trait for an ABCI application. Provides generic responses for all callbacks
/// Override desired callbacks as needed. Tendermint makes 3 TCP connections to the
Expand Down Expand Up @@ -114,3 +115,11 @@ where
{
serve(app, listen_addr).unwrap();
}

/// Setup the application and start the server using unix domain socket.
pub fn run_unix<A>(listen_addr: &str, app: A)
where
A: Application + 'static + Send + Sync,
{
serve_unix(app, listen_addr).unwrap();
}
28 changes: 28 additions & 0 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,36 @@ use std::sync::{Arc, Mutex};
use std::thread;

use messages::abci::*;
use std::os::unix::net::UnixListener;
use stream::AbciStream;

/// Creates the unix server and listens for connections from Tendermint
pub fn serve_unix<A>(app: A, addr: &str) -> io::Result<()>
where
A: Application + 'static + Send + Sync,
{
let listener = UnixListener::bind(addr).unwrap();

// Wrap the app atomically and clone for each connection.
let app = Arc::new(Mutex::new(app));

for new_connection in listener.incoming() {
let app_instance = Arc::clone(&app);
match new_connection {
Ok(stream) => {
println!("Got connection! {:?}", stream);
thread::spawn(move || handle_stream(AbciStream::from(stream), &app_instance));
}
Err(err) => {
// We need all 3 connections...
panic!("Connection failed: {}", err);
}
}
}
drop(listener);
Ok(())
}

/// Creates the TCP server and listens for connections from Tendermint
pub fn serve<A>(app: A, addr: SocketAddr) -> io::Result<()>
where
Expand Down
14 changes: 14 additions & 0 deletions src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,21 @@ use std::fmt::{self, Debug, Formatter};
use std::io;
use std::io::{Read, Write};
use std::net::TcpStream;
use std::os::unix::net::UnixStream;

/// Wraps a stream for easier testing. Original code from tomtau
pub enum StreamWrapper {
Mocked(SharedMockStream),
Tcp(TcpStream),
Unix(UnixStream),
}

impl io::Read for StreamWrapper {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
match *self {
StreamWrapper::Mocked(ref mut s) => s.read(buf),
StreamWrapper::Tcp(ref mut s) => s.read(buf),
StreamWrapper::Unix(ref mut s) => s.read(buf),
}
}
}
Expand All @@ -28,13 +31,15 @@ impl io::Write for StreamWrapper {
match *self {
StreamWrapper::Mocked(ref mut s) => s.write(buf),
StreamWrapper::Tcp(ref mut s) => s.write(buf),
StreamWrapper::Unix(ref mut s) => s.write(buf),
}
}

fn flush(&mut self) -> io::Result<()> {
match *self {
StreamWrapper::Mocked(ref mut s) => s.flush(),
StreamWrapper::Tcp(ref mut s) => s.flush(),
StreamWrapper::Unix(ref mut s) => s.flush(),
}
}
}
Expand All @@ -49,6 +54,7 @@ impl Debug for AbciStream {
match self.stream {
StreamWrapper::Mocked(_) => Ok(f.debug_struct("SharedMockStream").finish()?),
StreamWrapper::Tcp(ref s) => s.fmt(f),
StreamWrapper::Unix(ref s) => s.fmt(f),
}
}
}
Expand Down Expand Up @@ -99,6 +105,14 @@ impl From<TcpStream> for AbciStream {
}
}

impl From<UnixStream> for AbciStream {
fn from(stream: UnixStream) -> AbciStream {
AbciStream {
stream: StreamWrapper::Unix(stream),
}
}
}

/// Parse out the varint. This code was adapted from the excellent integer-encoding crate
fn read_varint(stream: &mut Read) -> Result<i64, io::Error> {
const BUFLEN: usize = 10;
Expand Down