Multithreaded event loop

This commit is contained in:
arkpar
2016-01-21 16:48:37 +01:00
parent 77d2303b55
commit e514d3d80f
17 changed files with 661 additions and 507 deletions

View File

@@ -29,7 +29,7 @@ fn main() {
setup_log();
let spec = ethereum::new_frontier();
let mut service = ClientService::start(spec).unwrap();
let io_handler = Box::new(ClientIoHandler { client: service.client(), timer: 0 });
let io_handler = Arc::new(ClientIoHandler { client: service.client() });
service.io().register_handler(io_handler).expect("Error registering IO handler");
loop {
let mut cmd = String::new();
@@ -43,16 +43,15 @@ fn main() {
struct ClientIoHandler {
client: Arc<RwLock<Client>>,
timer: TimerToken,
}
impl IoHandler<NetSyncMessage> for ClientIoHandler {
fn initialize<'s>(&'s mut self, io: &mut IoContext<'s, NetSyncMessage>) {
self.timer = io.register_timer(5000).expect("Error registering timer");
fn initialize(&self, io: &IoContext<NetSyncMessage>) {
io.register_timer(0, 5000).expect("Error registering timer");
}
fn timeout<'s>(&'s mut self, _io: &mut IoContext<'s, NetSyncMessage>, timer: TimerToken) {
if self.timer == timer {
fn timeout(&self, _io: &IoContext<NetSyncMessage>, timer: TimerToken) {
if timer == 0 {
println!("Chain info: {:?}", self.client.read().unwrap().deref().chain_info());
}
}

View File

@@ -1,6 +1,5 @@
use util::*;
use rocksdb::{Options, DB};
use rocksdb::DBCompactionStyle::DBUniversalCompaction;
use blockchain::{BlockChain, BlockProvider};
use views::BlockView;
use error::*;
@@ -113,7 +112,9 @@ impl Client {
pub fn new(spec: Spec, path: &Path, message_channel: IoChannel<NetSyncMessage> ) -> Result<Client, Error> {
let chain = Arc::new(RwLock::new(BlockChain::new(&spec.genesis_block(), path)));
let mut opts = Options::new();
opts.set_max_open_files(256);
opts.create_if_missing(true);
/*
opts.set_max_open_files(256);
opts.set_use_fsync(false);
opts.set_bytes_per_sync(8388608);
@@ -131,6 +132,7 @@ impl Client {
opts.set_max_background_flushes(4);
opts.set_filter_deletes(false);
opts.set_disable_auto_compactions(true);
*/
let mut state_path = path.to_path_buf();
state_path.push("state");
@@ -219,7 +221,7 @@ impl Client {
return;
}
}
info!(target: "client", "Imported #{} ({})", header.number(), header.hash());
//info!(target: "client", "Imported #{} ({})", header.number(), header.hash());
}
}
}

View File

@@ -22,7 +22,7 @@ impl ClientService {
dir.push(H64::from(spec.genesis_header().hash()).hex());
let client = Arc::new(RwLock::new(try!(Client::new(spec, &dir, net_service.io().channel()))));
EthSync::register(&mut net_service, client.clone());
let client_io = Box::new(ClientIoHandler {
let client_io = Arc::new(ClientIoHandler {
client: client.clone()
});
try!(net_service.io().register_handler(client_io));
@@ -48,14 +48,14 @@ struct ClientIoHandler {
}
impl IoHandler<NetSyncMessage> for ClientIoHandler {
fn initialize<'s>(&'s mut self, _io: &mut IoContext<'s, NetSyncMessage>) {
fn initialize(&self, _io: &IoContext<NetSyncMessage>) {
}
fn message<'s>(&'s mut self, _io: &mut IoContext<'s, NetSyncMessage>, net_message: &'s mut NetSyncMessage) {
fn message(&self, _io: &IoContext<NetSyncMessage>, net_message: &NetSyncMessage) {
match net_message {
&mut UserMessage(ref mut message) => {
&UserMessage(ref message) => {
match message {
&mut SyncMessage::BlockVerified => {
&SyncMessage::BlockVerified => {
self.client.write().unwrap().import_verified_blocks();
},
_ => {}, // ignore other messages

View File

@@ -424,6 +424,10 @@ impl ChainSync {
let peer_difficulty = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer").difficulty;
if difficulty > peer_difficulty {
trace!(target: "sync", "Received block {:?} with no known parent. Peer needs syncing...", h);
{
let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer");
peer.latest = header_view.sha3();
}
self.sync_peer(io, peer_id, true);
}
}

View File

@@ -22,14 +22,14 @@ pub trait SyncIo {
}
/// Wraps `NetworkContext` and the blockchain client
pub struct NetSyncIo<'s, 'h, 'io> where 'h: 's, 'io: 'h {
network: &'s mut NetworkContext<'h, 'io, SyncMessage>,
pub struct NetSyncIo<'s, 'h> where 'h: 's {
network: &'s NetworkContext<'h, SyncMessage>,
chain: &'s mut BlockChainClient
}
impl<'s, 'h, 'io> NetSyncIo<'s, 'h, 'io> {
impl<'s, 'h> NetSyncIo<'s, 'h> {
/// Creates a new instance from the `NetworkContext` and the blockchain client reference.
pub fn new(network: &'s mut NetworkContext<'h, 'io, SyncMessage>, chain: &'s mut BlockChainClient) -> NetSyncIo<'s,'h,'io> {
pub fn new(network: &'s NetworkContext<'h, SyncMessage>, chain: &'s mut BlockChainClient) -> NetSyncIo<'s, 'h> {
NetSyncIo {
network: network,
chain: chain,
@@ -37,7 +37,7 @@ impl<'s, 'h, 'io> NetSyncIo<'s, 'h, 'io> {
}
}
impl<'s, 'h, 'op> SyncIo for NetSyncIo<'s, 'h, 'op> {
impl<'s, 'h> SyncIo for NetSyncIo<'s, 'h> {
fn disable_peer(&mut self, peer_id: PeerId) {
self.network.disable_peer(peer_id);
}

View File

@@ -26,9 +26,8 @@ use std::ops::*;
use std::sync::*;
use client::Client;
use util::network::{NetworkProtocolHandler, NetworkService, NetworkContext, PeerId, NetworkIoMessage};
use util::TimerToken;
use util::Bytes;
use sync::chain::ChainSync;
use util::{Bytes, TimerToken};
use sync::io::NetSyncIo;
mod chain;
@@ -38,10 +37,13 @@ mod range_collection;
#[cfg(test)]
mod tests;
const SYNC_TIMER: usize = 0;
/// Message type for external events
#[derive(Clone)]
pub enum SyncMessage {
/// New block has been imported into the blockchain
NewChainBlock(Bytes),
NewChainBlock(Bytes), //TODO: use Cow
/// A block is ready
BlockVerified,
}
@@ -53,7 +55,7 @@ pub struct EthSync {
/// Shared blockchain client. TODO: this should evetually become an IPC endpoint
chain: Arc<RwLock<Client>>,
/// Sync strategy
sync: ChainSync
sync: RwLock<ChainSync>
}
pub use self::chain::SyncStatus;
@@ -61,52 +63,50 @@ pub use self::chain::SyncStatus;
impl EthSync {
/// Creates and register protocol with the network service
pub fn register(service: &mut NetworkService<SyncMessage>, chain: Arc<RwLock<Client>>) {
let sync = Box::new(EthSync {
let sync = Arc::new(EthSync {
chain: chain,
sync: ChainSync::new(),
sync: RwLock::new(ChainSync::new()),
});
service.register_protocol(sync, "eth", &[62u8, 63u8]).expect("Error registering eth protocol handler");
service.register_protocol(sync.clone(), "eth", &[62u8, 63u8]).expect("Error registering eth protocol handler");
}
/// Get sync status
pub fn status(&self) -> SyncStatus {
self.sync.status()
self.sync.read().unwrap().status()
}
/// Stop sync
pub fn stop(&mut self, io: &mut NetworkContext<SyncMessage>) {
self.sync.abort(&mut NetSyncIo::new(io, self.chain.write().unwrap().deref_mut()));
self.sync.write().unwrap().abort(&mut NetSyncIo::new(io, self.chain.write().unwrap().deref_mut()));
}
/// Restart sync
pub fn restart(&mut self, io: &mut NetworkContext<SyncMessage>) {
self.sync.restart(&mut NetSyncIo::new(io, self.chain.write().unwrap().deref_mut()));
self.sync.write().unwrap().restart(&mut NetSyncIo::new(io, self.chain.write().unwrap().deref_mut()));
}
}
impl NetworkProtocolHandler<SyncMessage> for EthSync {
fn initialize(&mut self, io: &mut NetworkContext<SyncMessage>) {
self.sync.restart(&mut NetSyncIo::new(io, self.chain.write().unwrap().deref_mut()));
io.register_timer(1000).unwrap();
fn initialize(&self, io: &NetworkContext<SyncMessage>) {
io.register_timer(SYNC_TIMER, 1000).unwrap();
}
fn read(&mut self, io: &mut NetworkContext<SyncMessage>, peer: &PeerId, packet_id: u8, data: &[u8]) {
self.sync.on_packet(&mut NetSyncIo::new(io, self.chain.write().unwrap().deref_mut()) , *peer, packet_id, data);
fn read(&self, io: &NetworkContext<SyncMessage>, peer: &PeerId, packet_id: u8, data: &[u8]) {
self.sync.write().unwrap().on_packet(&mut NetSyncIo::new(io, self.chain.write().unwrap().deref_mut()) , *peer, packet_id, data);
}
fn connected(&mut self, io: &mut NetworkContext<SyncMessage>, peer: &PeerId) {
self.sync.on_peer_connected(&mut NetSyncIo::new(io, self.chain.write().unwrap().deref_mut()), *peer);
fn connected(&self, io: &NetworkContext<SyncMessage>, peer: &PeerId) {
self.sync.write().unwrap().on_peer_connected(&mut NetSyncIo::new(io, self.chain.write().unwrap().deref_mut()), *peer);
}
fn disconnected(&mut self, io: &mut NetworkContext<SyncMessage>, peer: &PeerId) {
self.sync.on_peer_aborting(&mut NetSyncIo::new(io, self.chain.write().unwrap().deref_mut()), *peer);
fn disconnected(&self, io: &NetworkContext<SyncMessage>, peer: &PeerId) {
self.sync.write().unwrap().on_peer_aborting(&mut NetSyncIo::new(io, self.chain.write().unwrap().deref_mut()), *peer);
}
fn timeout(&mut self, io: &mut NetworkContext<SyncMessage>, _timer: TimerToken) {
self.sync.maintain_sync(&mut NetSyncIo::new(io, self.chain.write().unwrap().deref_mut()));
}
fn message(&mut self, _io: &mut NetworkContext<SyncMessage>, _message: &SyncMessage) {
fn timeout(&self, io: &NetworkContext<SyncMessage>, timer: TimerToken) {
if timer == SYNC_TIMER {
self.sync.write().unwrap().maintain_sync(&mut NetSyncIo::new(io, self.chain.write().unwrap().deref_mut()));
}
}
}