more networking

This commit is contained in:
arkpar 2015-12-22 22:23:43 +01:00
parent f34b22c65b
commit 3ad262e18f
3 changed files with 64 additions and 55 deletions

View File

@ -11,12 +11,12 @@ log = "0.3"
env_logger = "0.3" env_logger = "0.3"
rustc-serialize = "0.3" rustc-serialize = "0.3"
arrayvec = "0.3" arrayvec = "0.3"
mio = "0.5.*" mio = "0.5.0"
rand = "0.3.12" rand = "0.3.12"
time = "0.1.34" time = "0.1.34"
tiny-keccak = "1.0" tiny-keccak = "1.0"
rocksdb = "0.2.1" rocksdb = "0.2.1"
lazy_static = "0.1.*" lazy_static = "0.1.0"
eth-secp256k1 = { git = "https://github.com/arkpar/rust-secp256k1.git" } eth-secp256k1 = { git = "https://github.com/arkpar/rust-secp256k1.git" }
rust-crypto = "0.2.34" rust-crypto = "0.2.34"
elastic-array = "0.4" elastic-array = "0.4"

View File

@ -23,7 +23,7 @@ const MAX_USER_TIMERS: usize = 32;
const IDEAL_PEERS:u32 = 10; const IDEAL_PEERS:u32 = 10;
pub type NodeId = H512; pub type NodeId = H512;
type TimerFun = Fn(&mut HostIo) -> bool + Send; pub type TimerToken = usize;
#[derive(Debug)] #[derive(Debug)]
struct NetworkConfiguration { struct NetworkConfiguration {
@ -146,7 +146,7 @@ const LAST_CONNECTION: usize = FIRST_CONNECTION + MAX_CONNECTIONS - 1;
const USER_TIMER: usize = LAST_CONNECTION; const USER_TIMER: usize = LAST_CONNECTION;
const LAST_USER_TIMER: usize = USER_TIMER + MAX_USER_TIMERS - 1; const LAST_USER_TIMER: usize = USER_TIMER + MAX_USER_TIMERS - 1;
pub type PacketId = u32; pub type PacketId = u8;
pub type ProtocolId = &'static str; pub type ProtocolId = &'static str;
pub enum HostMessage { pub enum HostMessage {
@ -162,14 +162,9 @@ pub enum HostMessage {
protocol: ProtocolId, protocol: ProtocolId,
data: Vec<u8>, data: Vec<u8>,
}, },
AddTimer {
handler: Box<TimerFun>,
delay: u32,
protocol: ProtocolId,
}
} }
pub type PeerId = u32; pub type PeerId = usize;
#[derive(Debug, PartialEq, Eq)] #[derive(Debug, PartialEq, Eq)]
pub struct CapabilityInfo { pub struct CapabilityInfo {
@ -189,54 +184,67 @@ impl Encodable for CapabilityInfo {
pub struct HostIo<'s> { pub struct HostIo<'s> {
protocol: ProtocolId, protocol: ProtocolId,
session: Option<&'s mut Session>, connections: &'s mut Slab<ConnectionEntry>,
timers: &'s mut Slab<UserTimer>,
session: Option<Token>,
event_loop: &'s mut EventLoop<Host>, event_loop: &'s mut EventLoop<Host>,
channel: &'s mut Sender<HostMessage>
} }
impl<'s> HostIo<'s> { impl<'s> HostIo<'s> {
fn new(protocol: ProtocolId, session: Option<&'s mut Session>, event_loop: &'s mut EventLoop<Host>, channel: &'s mut Sender<HostMessage>) -> HostIo<'s> { fn new(protocol: ProtocolId, session: Option<Token>, event_loop: &'s mut EventLoop<Host>, connections: &'s mut Slab<ConnectionEntry>, timers: &'s mut Slab<UserTimer>) -> HostIo<'s> {
HostIo { HostIo {
protocol: protocol, protocol: protocol,
session: session, session: session,
event_loop: event_loop, event_loop: event_loop,
channel: channel, connections: connections,
timers: timers,
} }
} }
pub fn send(&mut self, peer: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), Error> { pub fn send(&mut self, peer: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), Error> {
try!(self.channel.send(HostMessage::Send { match self.connections.get_mut(Token(peer)) {
peer: peer, Some(&mut ConnectionEntry::Session(ref mut s)) => {
packet_id: packet_id, s.send_packet(self.protocol, packet_id as u8, &data).unwrap_or_else(|e| {
protocol: self.protocol, warn!(target: "net", "Send error: {:?}", e);
data: data }); //TODO: don't copy vector data
})); },
_ => {
warn!(target: "net", "Send: Peer does not exist");
}
}
Ok(()) Ok(())
} }
pub fn respond(&mut self, packet_id: PacketId, data: &[u8]) -> Result<(), Error> { pub fn respond(&mut self, packet_id: PacketId, data: Vec<u8>) -> Result<(), Error> {
match self.session.as_mut() { match self.session {
Some(session) => session.send_packet(self.protocol, packet_id as u8, data), Some(session) => self.send(session.as_usize(), packet_id, data),
None => { None => {
panic!("Respond: Session does not exist") panic!("Respond: Session does not exist")
} }
} }
} }
fn register_timer(&mut self, ms: u32, handler: Box<TimerFun>) -> Result<(), Error>{ pub fn register_timer(&mut self, ms: u64) -> Result<TimerToken, Error>{
try!(self.channel.send(HostMessage::AddTimer { match self.timers.insert(UserTimer {
delay: ms, delay: ms,
handler: handler,
protocol: self.protocol, protocol: self.protocol,
})); }) {
Ok(()) Ok(token) => {
self.event_loop.timeout_ms(token, ms).expect("Error registering user timer");
Ok(token.as_usize())
},
_ => { panic!("Max timers reached") }
}
}
pub fn disable_peer(&mut self, _peer: PeerId) {
//TODO: remove capability, disconnect if no capabilities left
} }
} }
struct UserTimer { struct UserTimer {
handler: Box<TimerFun>,
protocol: ProtocolId, protocol: ProtocolId,
delay: u32, delay: u64,
} }
pub struct HostInfo { pub struct HostInfo {
@ -492,6 +500,8 @@ impl Host {
fn connection_readable(&mut self, token: Token, event_loop: &mut EventLoop<Host>) { fn connection_readable(&mut self, token: Token, event_loop: &mut EventLoop<Host>) {
let mut kill = false; let mut kill = false;
let mut create_session = false; let mut create_session = false;
let mut ready_data: Vec<ProtocolId> = Vec::new();
let mut packet_data: Option<(ProtocolId, PacketId, Vec<u8>)> = None;
{ {
match self.connections.get_mut(token) { match self.connections.get_mut(token) {
Some(&mut ConnectionEntry::Handshake(ref mut h)) => { Some(&mut ConnectionEntry::Handshake(ref mut h)) => {
@ -509,9 +519,9 @@ impl Host {
}) }; }) };
match sd { match sd {
SessionData::Ready => { SessionData::Ready => {
for (p, h) in self.handlers.iter_mut() { for (p, _) in self.handlers.iter_mut() {
if s.have_capability(p) { if s.have_capability(p) {
h.connected(&mut HostIo::new(p, Some(s), event_loop, &mut self.channel), &(token.as_usize() as u32)); ready_data.push(p);
} }
} }
}, },
@ -522,7 +532,7 @@ impl Host {
} => { } => {
match self.handlers.get_mut(protocol) { match self.handlers.get_mut(protocol) {
None => { warn!(target: "net", "No handler found for protocol: {:?}", protocol) }, None => { warn!(target: "net", "No handler found for protocol: {:?}", protocol) },
Some(h) => h.read(&mut HostIo::new(protocol, Some(s), event_loop, &mut self.channel), packet_id, &data[1..]) Some(_) => packet_data = Some((protocol, packet_id, data)),
} }
}, },
SessionData::None => {}, SessionData::None => {},
@ -539,6 +549,15 @@ impl Host {
if create_session { if create_session {
self.start_session(token, event_loop); self.start_session(token, event_loop);
} }
for p in ready_data {
let mut h = self.handlers.get_mut(p).unwrap();
h.connected(&mut HostIo::new(p, Some(token), event_loop, &mut self.connections, &mut self.timers), &token.as_usize());
}
if let Some((p, packet_id, data)) = packet_data {
let mut h = self.handlers.get_mut(p).unwrap();
h.read(&mut HostIo::new(p, Some(token), event_loop, &mut self.connections, &mut self.timers), &token.as_usize(), packet_id, &data[1..]);
}
} }
fn start_session(&mut self, token: Token, event_loop: &mut EventLoop<Host>) { fn start_session(&mut self, token: Token, event_loop: &mut EventLoop<Host>) {
@ -593,9 +612,10 @@ impl Handler for Host {
NODETABLE_DISCOVERY => {}, NODETABLE_DISCOVERY => {},
NODETABLE_MAINTAIN => {}, NODETABLE_MAINTAIN => {},
USER_TIMER ... LAST_USER_TIMER => { USER_TIMER ... LAST_USER_TIMER => {
let timer = self.timers.get_mut(token).expect("Unknown user timer token"); let protocol = self.timers.get_mut(token).expect("Unknown user timer token").protocol;
if (*timer.handler)(&mut HostIo::new(timer.protocol, None, event_loop, &mut self.channel)) { match self.handlers.get_mut(protocol) {
event_loop.timeout_ms(token, timer.delay as u64).expect("Unable to reregister user timer"); None => { warn!(target: "net", "No handler found for protocol: {:?}", protocol) },
Some(h) => h.timeout(&mut HostIo::new(protocol, None, event_loop, &mut self.connections, &mut self.timers), token.as_usize()),
} }
} }
_ => panic!("Unknown timer token"), _ => panic!("Unknown timer token"),
@ -632,22 +652,6 @@ impl Handler for Host {
} }
} }
}, },
HostMessage::AddTimer {
handler,
delay,
protocol,
} => {
match self.timers.insert(UserTimer {
handler: handler,
delay: delay,
protocol: protocol,
}) {
Ok(token) => {
event_loop.timeout_ms(token, delay as u64).expect("Error registering user timer");
},
_ => { panic!("Max timers reached") }
}
}
} }
} }
} }

View File

@ -47,6 +47,7 @@ impl From<::crypto::CryptoError> for Error {
Error::Crypto(err) Error::Crypto(err)
} }
} }
impl From<::std::net::AddrParseError> for Error { impl From<::std::net::AddrParseError> for Error {
fn from(err: ::std::net::AddrParseError) -> Error { fn from(err: ::std::net::AddrParseError) -> Error {
Error::AddressParse(err) Error::AddressParse(err)
@ -70,15 +71,19 @@ impl From<::mio::NotifyError<host::HostMessage>> for Error {
} }
pub type PeerId = host::PeerId; pub type PeerId = host::PeerId;
pub type PacketId = host::PacketId;
pub type TimerToken = host::TimerToken;
pub type HandlerIo<'s> = host::HostIo<'s>; pub type HandlerIo<'s> = host::HostIo<'s>;
pub trait ProtocolHandler: Send { pub trait ProtocolHandler: Send {
fn initialize(&mut self, io: &mut HandlerIo); fn initialize(&mut self, io: &mut HandlerIo);
fn read(&mut self, io: &mut HandlerIo, packet_id: u8, data: &[u8]); fn read(&mut self, io: &mut HandlerIo, peer: &PeerId, packet_id: u8, data: &[u8]);
fn connected(&mut self, io: &mut HandlerIo, peer: &PeerId); fn connected(&mut self, io: &mut HandlerIo, peer: &PeerId);
fn disconnected(&mut self, io: &mut HandlerIo, peer: &PeerId); fn disconnected(&mut self, io: &mut HandlerIo, peer: &PeerId);
fn timeout(&mut self, io: &mut HandlerIo, timer: TimerToken);
} }
pub struct NetworkClient; pub struct NetworkClient;
pub type NetworkService = service::NetworkService;