commit
87c2d27a5a
@ -28,6 +28,7 @@ use std::ops::Deref;
|
|||||||
|
|
||||||
const POLL_TIMEOUT: isize = 100;
|
const POLL_TIMEOUT: isize = 100;
|
||||||
|
|
||||||
|
/// Generic worker to handle service (binded) sockets
|
||||||
pub struct Worker<S> where S: IpcInterface<S> {
|
pub struct Worker<S> where S: IpcInterface<S> {
|
||||||
service: Arc<S>,
|
service: Arc<S>,
|
||||||
sockets: Vec<(Socket, Endpoint)>,
|
sockets: Vec<(Socket, Endpoint)>,
|
||||||
@ -35,6 +36,8 @@ pub struct Worker<S> where S: IpcInterface<S> {
|
|||||||
buf: Vec<u8>,
|
buf: Vec<u8>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// struct for guarding `_endpoint` (so that it wont drop)
|
||||||
|
/// derefs to client `S`
|
||||||
pub struct GuardedSocket<S> where S: WithSocket<Socket> {
|
pub struct GuardedSocket<S> where S: WithSocket<Socket> {
|
||||||
client: Arc<S>,
|
client: Arc<S>,
|
||||||
_endpoint: Endpoint,
|
_endpoint: Endpoint,
|
||||||
@ -48,6 +51,9 @@ impl<S> Deref for GuardedSocket<S> where S: WithSocket<Socket> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Spawns client <`S`> over specified address
|
||||||
|
/// creates socket and connects endpoint to it
|
||||||
|
/// for duplex (paired) connections with the service
|
||||||
pub fn init_client<S>(socket_addr: &str) -> Result<GuardedSocket<S>, SocketError> where S: WithSocket<Socket> {
|
pub fn init_client<S>(socket_addr: &str) -> Result<GuardedSocket<S>, SocketError> where S: WithSocket<Socket> {
|
||||||
let mut socket = try!(Socket::new(Protocol::Pair).map_err(|e| {
|
let mut socket = try!(Socket::new(Protocol::Pair).map_err(|e| {
|
||||||
warn!(target: "ipc", "Failed to create ipc socket: {:?}", e);
|
warn!(target: "ipc", "Failed to create ipc socket: {:?}", e);
|
||||||
@ -65,12 +71,15 @@ pub fn init_client<S>(socket_addr: &str) -> Result<GuardedSocket<S>, SocketError
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Error occured while establising socket or endpoint
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub enum SocketError {
|
pub enum SocketError {
|
||||||
|
/// Error establising duplex (paired) socket and/or endpoint
|
||||||
DuplexLink
|
DuplexLink
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<S> Worker<S> where S: IpcInterface<S> {
|
impl<S> Worker<S> where S: IpcInterface<S> {
|
||||||
|
/// New worker over specified `service`
|
||||||
pub fn new(service: Arc<S>) -> Worker<S> {
|
pub fn new(service: Arc<S>) -> Worker<S> {
|
||||||
Worker::<S> {
|
Worker::<S> {
|
||||||
service: service.clone(),
|
service: service.clone(),
|
||||||
@ -80,6 +89,7 @@ impl<S> Worker<S> where S: IpcInterface<S> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Polls all sockets, reads and dispatches method invocations
|
||||||
pub fn poll(&mut self) {
|
pub fn poll(&mut self) {
|
||||||
let mut request = PollRequest::new(&mut self.polls[..]);
|
let mut request = PollRequest::new(&mut self.polls[..]);
|
||||||
let _result_guard = Socket::poll(&mut request, POLL_TIMEOUT);
|
let _result_guard = Socket::poll(&mut request, POLL_TIMEOUT);
|
||||||
@ -119,12 +129,15 @@ impl<S> Worker<S> where S: IpcInterface<S> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Stores nanomsg poll request for reuse
|
||||||
fn rebuild_poll_request(&mut self) {
|
fn rebuild_poll_request(&mut self) {
|
||||||
self.polls = self.sockets.iter()
|
self.polls = self.sockets.iter()
|
||||||
.map(|&(ref socket, _)| socket.new_pollfd(PollInOut::In))
|
.map(|&(ref socket, _)| socket.new_pollfd(PollInOut::In))
|
||||||
.collect::<Vec<PollFd>>();
|
.collect::<Vec<PollFd>>();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Add exclusive socket for paired client
|
||||||
|
/// Only one connection over this address is allowed
|
||||||
pub fn add_duplex(&mut self, addr: &str) -> Result<(), SocketError> {
|
pub fn add_duplex(&mut self, addr: &str) -> Result<(), SocketError> {
|
||||||
let mut socket = try!(Socket::new(Protocol::Pair).map_err(|e| {
|
let mut socket = try!(Socket::new(Protocol::Pair).map_err(|e| {
|
||||||
warn!(target: "ipc", "Failed to create ipc socket: {:?}", e);
|
warn!(target: "ipc", "Failed to create ipc socket: {:?}", e);
|
||||||
|
@ -20,26 +20,35 @@ use std::io::{Read, Write};
|
|||||||
use std::marker::Sync;
|
use std::marker::Sync;
|
||||||
use semver::Version;
|
use semver::Version;
|
||||||
|
|
||||||
|
/// Handshake for client and server to negotiate api/protocol version
|
||||||
pub struct Handshake {
|
pub struct Handshake {
|
||||||
pub protocol_version: Version,
|
pub protocol_version: Version,
|
||||||
pub api_version: Version,
|
pub api_version: Version,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Allows to configure custom version and custom handshake response for
|
||||||
|
/// ipc host
|
||||||
pub trait IpcConfig {
|
pub trait IpcConfig {
|
||||||
|
/// Current service api version
|
||||||
|
/// Should be increased if any of the methods changes signature
|
||||||
fn api_version() -> Version {
|
fn api_version() -> Version {
|
||||||
Version::parse("1.0.0").unwrap()
|
Version::parse("1.0.0").unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Current ipc protocol version
|
||||||
|
/// Should be increased only if signature of system methods changes
|
||||||
fn protocol_version() -> Version {
|
fn protocol_version() -> Version {
|
||||||
Version::parse("1.0.0").unwrap()
|
Version::parse("1.0.0").unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Default handshake requires exact versions match
|
||||||
fn handshake(handshake: &Handshake) -> bool {
|
fn handshake(handshake: &Handshake) -> bool {
|
||||||
handshake.protocol_version == Self::protocol_version() &&
|
handshake.protocol_version == Self::protocol_version() &&
|
||||||
handshake.api_version == Self::api_version()
|
handshake.api_version == Self::api_version()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Error in dispatching or invoking methods via IPC
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub enum Error {
|
pub enum Error {
|
||||||
UnkownSystemCall,
|
UnkownSystemCall,
|
||||||
@ -48,6 +57,8 @@ pub enum Error {
|
|||||||
HandshakeFailed,
|
HandshakeFailed,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Allows implementor to be attached to generic worker and dispatch rpc requests
|
||||||
|
/// over IPC
|
||||||
pub trait IpcInterface<T>: IpcConfig {
|
pub trait IpcInterface<T>: IpcConfig {
|
||||||
/// reads the message from io, dispatches the call and returns serialized result
|
/// reads the message from io, dispatches the call and returns serialized result
|
||||||
fn dispatch<R>(&self, r: &mut R) -> Vec<u8> where R: Read;
|
fn dispatch<R>(&self, r: &mut R) -> Vec<u8> where R: Read;
|
||||||
@ -79,11 +90,11 @@ pub fn invoke<W>(method_num: u16, params: &Option<Vec<u8>>, w: &mut W) where W:
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// IpcSocket
|
/// IpcSocket, read/write generalization
|
||||||
pub trait IpcSocket: Read + Write + Sync {
|
pub trait IpcSocket: Read + Write + Sync {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Basically something that needs only socket to be spawned
|
||||||
pub trait WithSocket<S: IpcSocket> {
|
pub trait WithSocket<S: IpcSocket> {
|
||||||
fn init(socket: S) -> Self;
|
fn init(socket: S) -> Self;
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user