DB commit queue
This commit is contained in:
parent
0f97edad7c
commit
4bf1c205b4
@ -10,10 +10,9 @@ use log::{LogLevelFilter};
|
|||||||
use env_logger::LogBuilder;
|
use env_logger::LogBuilder;
|
||||||
use util::*;
|
use util::*;
|
||||||
use ethcore::client::*;
|
use ethcore::client::*;
|
||||||
use ethcore::service::ClientService;
|
use ethcore::service::{ClientService, NetSyncMessage};
|
||||||
use ethcore::ethereum;
|
use ethcore::ethereum;
|
||||||
use ethcore::blockchain::CacheSize;
|
use ethcore::blockchain::CacheSize;
|
||||||
use ethcore::sync::*;
|
|
||||||
|
|
||||||
fn setup_log() {
|
fn setup_log() {
|
||||||
let mut builder = LogBuilder::new();
|
let mut builder = LogBuilder::new();
|
||||||
@ -90,7 +89,7 @@ impl Informant {
|
|||||||
const INFO_TIMER: TimerToken = 0;
|
const INFO_TIMER: TimerToken = 0;
|
||||||
|
|
||||||
struct ClientIoHandler {
|
struct ClientIoHandler {
|
||||||
client: Arc<RwLock<Client>>,
|
client: Arc<Client>,
|
||||||
info: Informant,
|
info: Informant,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -101,8 +100,7 @@ impl IoHandler<NetSyncMessage> for ClientIoHandler {
|
|||||||
|
|
||||||
fn timeout(&self, _io: &IoContext<NetSyncMessage>, timer: TimerToken) {
|
fn timeout(&self, _io: &IoContext<NetSyncMessage>, timer: TimerToken) {
|
||||||
if INFO_TIMER == timer {
|
if INFO_TIMER == timer {
|
||||||
let client = self.client.read().unwrap();
|
self.info.tick(&self.client);
|
||||||
self.info.tick(client.deref());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,12 +1,14 @@
|
|||||||
|
//! A queue of blocks. Sits between network or other I/O and the BlockChain.
|
||||||
|
//! Sorts them ready for blockchain insertion.
|
||||||
use std::thread::{JoinHandle, self};
|
use std::thread::{JoinHandle, self};
|
||||||
use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
|
use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
|
||||||
use util::*;
|
use util::*;
|
||||||
use verification::*;
|
use verification::*;
|
||||||
use error::*;
|
use error::*;
|
||||||
use engine::Engine;
|
use engine::Engine;
|
||||||
use sync::*;
|
|
||||||
use views::*;
|
use views::*;
|
||||||
use header::*;
|
use header::*;
|
||||||
|
use service::*;
|
||||||
|
|
||||||
/// A queue of blocks. Sits between network or other I/O and the BlockChain.
|
/// A queue of blocks. Sits between network or other I/O and the BlockChain.
|
||||||
/// Sorts them ready for blockchain insertion.
|
/// Sorts them ready for blockchain insertion.
|
@ -6,8 +6,9 @@ use error::*;
|
|||||||
use header::BlockNumber;
|
use header::BlockNumber;
|
||||||
use spec::Spec;
|
use spec::Spec;
|
||||||
use engine::Engine;
|
use engine::Engine;
|
||||||
use queue::BlockQueue;
|
use block_queue::BlockQueue;
|
||||||
use sync::NetSyncMessage;
|
use db_queue::{DbQueue, StateDBCommit};
|
||||||
|
use service::NetSyncMessage;
|
||||||
use env_info::LastHashes;
|
use env_info::LastHashes;
|
||||||
use verification::*;
|
use verification::*;
|
||||||
use block::*;
|
use block::*;
|
||||||
@ -95,13 +96,13 @@ pub trait BlockChainClient : Sync + Send {
|
|||||||
fn block_receipts(&self, hash: &H256) -> Option<Bytes>;
|
fn block_receipts(&self, hash: &H256) -> Option<Bytes>;
|
||||||
|
|
||||||
/// Import a block into the blockchain.
|
/// Import a block into the blockchain.
|
||||||
fn import_block(&mut self, bytes: Bytes) -> ImportResult;
|
fn import_block(&self, bytes: Bytes) -> ImportResult;
|
||||||
|
|
||||||
/// Get block queue information.
|
/// Get block queue information.
|
||||||
fn queue_status(&self) -> BlockQueueStatus;
|
fn queue_status(&self) -> BlockQueueStatus;
|
||||||
|
|
||||||
/// Clear block queue and abort all import activity.
|
/// Clear block queue and abort all import activity.
|
||||||
fn clear_queue(&mut self);
|
fn clear_queue(&self);
|
||||||
|
|
||||||
/// Get blockchain information.
|
/// Get blockchain information.
|
||||||
fn chain_info(&self) -> BlockChainInfo;
|
fn chain_info(&self) -> BlockChainInfo;
|
||||||
@ -132,19 +133,24 @@ pub struct Client {
|
|||||||
chain: Arc<RwLock<BlockChain>>,
|
chain: Arc<RwLock<BlockChain>>,
|
||||||
engine: Arc<Box<Engine>>,
|
engine: Arc<Box<Engine>>,
|
||||||
state_db: JournalDB,
|
state_db: JournalDB,
|
||||||
queue: BlockQueue,
|
block_queue: RwLock<BlockQueue>,
|
||||||
report: ClientReport,
|
db_queue: RwLock<DbQueue>,
|
||||||
|
report: RwLock<ClientReport>,
|
||||||
|
uncommited_states: RwLock<HashMap<H256, JournalDB>>,
|
||||||
|
import_lock: Mutex<()>
|
||||||
}
|
}
|
||||||
|
|
||||||
const HISTORY: u64 = 1000;
|
const HISTORY: u64 = 1000;
|
||||||
|
|
||||||
impl Client {
|
impl Client {
|
||||||
/// Create a new client with given spec and DB path.
|
/// Create a new client with given spec and DB path.
|
||||||
pub fn new(spec: Spec, path: &Path, message_channel: IoChannel<NetSyncMessage> ) -> Result<Client, Error> {
|
pub fn new(spec: Spec, path: &Path, message_channel: IoChannel<NetSyncMessage> ) -> Result<Arc<Client>, Error> {
|
||||||
let chain = Arc::new(RwLock::new(BlockChain::new(&spec.genesis_block(), path)));
|
let chain = Arc::new(RwLock::new(BlockChain::new(&spec.genesis_block(), path)));
|
||||||
let mut opts = Options::new();
|
let mut opts = Options::new();
|
||||||
opts.set_max_open_files(256);
|
opts.set_max_open_files(256);
|
||||||
opts.create_if_missing(true);
|
opts.create_if_missing(true);
|
||||||
|
opts.set_disable_data_sync(true);
|
||||||
|
opts.set_disable_auto_compactions(true);
|
||||||
/*opts.set_use_fsync(false);
|
/*opts.set_use_fsync(false);
|
||||||
opts.set_bytes_per_sync(8388608);
|
opts.set_bytes_per_sync(8388608);
|
||||||
opts.set_disable_data_sync(false);
|
opts.set_disable_data_sync(false);
|
||||||
@ -164,37 +170,46 @@ impl Client {
|
|||||||
|
|
||||||
let mut state_path = path.to_path_buf();
|
let mut state_path = path.to_path_buf();
|
||||||
state_path.push("state");
|
state_path.push("state");
|
||||||
let db = DB::open(&opts, state_path.to_str().unwrap()).unwrap();
|
let db = Arc::new(DB::open(&opts, state_path.to_str().unwrap()).unwrap());
|
||||||
let mut state_db = JournalDB::new(db);
|
|
||||||
|
|
||||||
let engine = Arc::new(try!(spec.to_engine()));
|
let engine = Arc::new(try!(spec.to_engine()));
|
||||||
|
{
|
||||||
|
let mut state_db = JournalDB::new_with_arc(db.clone());
|
||||||
if engine.spec().ensure_db_good(&mut state_db) {
|
if engine.spec().ensure_db_good(&mut state_db) {
|
||||||
state_db.commit(0, &engine.spec().genesis_header().hash(), None).expect("Error commiting genesis state to state DB");
|
state_db.commit(0, &engine.spec().genesis_header().hash(), None).expect("Error commiting genesis state to state DB");
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
let state_db = JournalDB::new_with_arc(db);
|
||||||
|
|
||||||
// chain.write().unwrap().ensure_good(&state_db);
|
let client = Arc::new(Client {
|
||||||
|
|
||||||
Ok(Client {
|
|
||||||
chain: chain,
|
chain: chain,
|
||||||
engine: engine.clone(),
|
engine: engine.clone(),
|
||||||
state_db: state_db,
|
state_db: state_db,
|
||||||
queue: BlockQueue::new(engine, message_channel),
|
block_queue: RwLock::new(BlockQueue::new(engine, message_channel)),
|
||||||
report: Default::default(),
|
db_queue: RwLock::new(DbQueue::new()),
|
||||||
})
|
report: RwLock::new(Default::default()),
|
||||||
|
uncommited_states: RwLock::new(HashMap::new()),
|
||||||
|
import_lock: Mutex::new(()),
|
||||||
|
});
|
||||||
|
|
||||||
|
let weak = Arc::downgrade(&client);
|
||||||
|
client.db_queue.read().unwrap().start(weak);
|
||||||
|
Ok(client)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// This is triggered by a message coming from a block queue when the block is ready for insertion
|
/// This is triggered by a message coming from a block queue when the block is ready for insertion
|
||||||
pub fn import_verified_blocks(&mut self) {
|
pub fn import_verified_blocks(&self, _io: &IoChannel<NetSyncMessage>) {
|
||||||
|
|
||||||
let mut bad = HashSet::new();
|
let mut bad = HashSet::new();
|
||||||
let blocks = self.queue.drain(128);
|
let _import_lock = self.import_lock.lock();
|
||||||
|
let blocks = self.block_queue.write().unwrap().drain(128);
|
||||||
if blocks.is_empty() {
|
if blocks.is_empty() {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
for block in blocks {
|
for block in blocks {
|
||||||
if bad.contains(&block.header.parent_hash) {
|
if bad.contains(&block.header.parent_hash) {
|
||||||
self.queue.mark_as_bad(&block.header.hash());
|
self.block_queue.write().unwrap().mark_as_bad(&block.header.hash());
|
||||||
bad.insert(block.header.hash());
|
bad.insert(block.header.hash());
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
@ -202,7 +217,7 @@ impl Client {
|
|||||||
let header = &block.header;
|
let header = &block.header;
|
||||||
if let Err(e) = verify_block_family(&header, &block.bytes, self.engine.deref().deref(), self.chain.read().unwrap().deref()) {
|
if let Err(e) = verify_block_family(&header, &block.bytes, self.engine.deref().deref(), self.chain.read().unwrap().deref()) {
|
||||||
warn!(target: "client", "Stage 3 block verification failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e);
|
warn!(target: "client", "Stage 3 block verification failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e);
|
||||||
self.queue.mark_as_bad(&header.hash());
|
self.block_queue.write().unwrap().mark_as_bad(&header.hash());
|
||||||
bad.insert(block.header.hash());
|
bad.insert(block.header.hash());
|
||||||
return;
|
return;
|
||||||
};
|
};
|
||||||
@ -210,7 +225,7 @@ impl Client {
|
|||||||
Some(p) => p,
|
Some(p) => p,
|
||||||
None => {
|
None => {
|
||||||
warn!(target: "client", "Block import failed for #{} ({}): Parent not found ({}) ", header.number(), header.hash(), header.parent_hash);
|
warn!(target: "client", "Block import failed for #{} ({}): Parent not found ({}) ", header.number(), header.hash(), header.parent_hash);
|
||||||
self.queue.mark_as_bad(&header.hash());
|
self.block_queue.write().unwrap().mark_as_bad(&header.hash());
|
||||||
bad.insert(block.header.hash());
|
bad.insert(block.header.hash());
|
||||||
return;
|
return;
|
||||||
},
|
},
|
||||||
@ -228,18 +243,23 @@ impl Client {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let result = match enact_verified(&block, self.engine.deref().deref(), self.state_db.clone(), &parent, &last_hashes) {
|
let db = match self.uncommited_states.read().unwrap().get(&header.parent_hash) {
|
||||||
|
Some(db) => db.clone(),
|
||||||
|
None => self.state_db.clone(),
|
||||||
|
};
|
||||||
|
|
||||||
|
let result = match enact_verified(&block, self.engine.deref().deref(), db, &parent, &last_hashes) {
|
||||||
Ok(b) => b,
|
Ok(b) => b,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
warn!(target: "client", "Block import failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e);
|
warn!(target: "client", "Block import failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e);
|
||||||
bad.insert(block.header.hash());
|
bad.insert(block.header.hash());
|
||||||
self.queue.mark_as_bad(&header.hash());
|
self.block_queue.write().unwrap().mark_as_bad(&header.hash());
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
if let Err(e) = verify_block_final(&header, result.block().header()) {
|
if let Err(e) = verify_block_final(&header, result.block().header()) {
|
||||||
warn!(target: "client", "Stage 4 block verification failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e);
|
warn!(target: "client", "Stage 4 block verification failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e);
|
||||||
self.queue.mark_as_bad(&header.hash());
|
self.block_queue.write().unwrap().mark_as_bad(&header.hash());
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -252,11 +272,25 @@ impl Client {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
self.report.accrue_block(&block);
|
/*
|
||||||
|
let db = result.drain();
|
||||||
|
self.uncommited_states.write().unwrap().insert(header.hash(), db.clone());
|
||||||
|
self.db_queue.write().unwrap().queue(StateDBCommit {
|
||||||
|
now: header.number(),
|
||||||
|
hash: header.hash().clone(),
|
||||||
|
end: ancient.map(|n|(n, self.chain.read().unwrap().block_hash(n).unwrap())),
|
||||||
|
db: db,
|
||||||
|
});*/
|
||||||
|
self.report.write().unwrap().accrue_block(&block);
|
||||||
trace!(target: "client", "Imported #{} ({})", header.number(), header.hash());
|
trace!(target: "client", "Imported #{} ({})", header.number(), header.hash());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Clear cached state overlay
|
||||||
|
pub fn clear_state(&self, hash: &H256) {
|
||||||
|
self.uncommited_states.write().unwrap().remove(hash);
|
||||||
|
}
|
||||||
|
|
||||||
/// Get info on the cache.
|
/// Get info on the cache.
|
||||||
pub fn cache_info(&self) -> CacheSize {
|
pub fn cache_info(&self) -> CacheSize {
|
||||||
self.chain.read().unwrap().cache_size()
|
self.chain.read().unwrap().cache_size()
|
||||||
@ -264,7 +298,7 @@ impl Client {
|
|||||||
|
|
||||||
/// Get the report.
|
/// Get the report.
|
||||||
pub fn report(&self) -> ClientReport {
|
pub fn report(&self) -> ClientReport {
|
||||||
self.report.clone()
|
self.report.read().unwrap().clone()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Tick the client.
|
/// Tick the client.
|
||||||
@ -327,12 +361,12 @@ impl BlockChainClient for Client {
|
|||||||
unimplemented!();
|
unimplemented!();
|
||||||
}
|
}
|
||||||
|
|
||||||
fn import_block(&mut self, bytes: Bytes) -> ImportResult {
|
fn import_block(&self, bytes: Bytes) -> ImportResult {
|
||||||
let header = BlockView::new(&bytes).header();
|
let header = BlockView::new(&bytes).header();
|
||||||
if self.chain.read().unwrap().is_known(&header.hash()) {
|
if self.chain.read().unwrap().is_known(&header.hash()) {
|
||||||
return Err(ImportError::AlreadyInChain);
|
return Err(ImportError::AlreadyInChain);
|
||||||
}
|
}
|
||||||
self.queue.import_block(bytes)
|
self.block_queue.write().unwrap().import_block(bytes)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn queue_status(&self) -> BlockQueueStatus {
|
fn queue_status(&self) -> BlockQueueStatus {
|
||||||
@ -341,7 +375,8 @@ impl BlockChainClient for Client {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn clear_queue(&mut self) {
|
fn clear_queue(&self) {
|
||||||
|
self.block_queue.write().unwrap().clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
fn chain_info(&self) -> BlockChainInfo {
|
fn chain_info(&self) -> BlockChainInfo {
|
||||||
|
@ -150,5 +150,6 @@ pub mod block;
|
|||||||
/// TODO [arkpar] Please document me
|
/// TODO [arkpar] Please document me
|
||||||
pub mod verification;
|
pub mod verification;
|
||||||
/// TODO [debris] Please document me
|
/// TODO [debris] Please document me
|
||||||
pub mod queue;
|
pub mod db_queue;
|
||||||
|
pub mod block_queue;
|
||||||
pub mod ethereum;
|
pub mod ethereum;
|
||||||
|
@ -5,10 +5,22 @@ use error::*;
|
|||||||
use std::env;
|
use std::env;
|
||||||
use client::Client;
|
use client::Client;
|
||||||
|
|
||||||
|
/// Message type for external and internal events
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub enum SyncMessage {
|
||||||
|
/// New block has been imported into the blockchain
|
||||||
|
NewChainBlock(Bytes), //TODO: use Cow
|
||||||
|
/// A block is ready
|
||||||
|
BlockVerified,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// TODO [arkpar] Please document me
|
||||||
|
pub type NetSyncMessage = NetworkIoMessage<SyncMessage>;
|
||||||
|
|
||||||
/// Client service setup. Creates and registers client and network services with the IO subsystem.
|
/// Client service setup. Creates and registers client and network services with the IO subsystem.
|
||||||
pub struct ClientService {
|
pub struct ClientService {
|
||||||
net_service: NetworkService<SyncMessage>,
|
net_service: NetworkService<SyncMessage>,
|
||||||
client: Arc<RwLock<Client>>,
|
client: Arc<Client>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ClientService {
|
impl ClientService {
|
||||||
@ -20,7 +32,7 @@ impl ClientService {
|
|||||||
let mut dir = env::home_dir().unwrap();
|
let mut dir = env::home_dir().unwrap();
|
||||||
dir.push(".parity");
|
dir.push(".parity");
|
||||||
dir.push(H64::from(spec.genesis_header().hash()).hex());
|
dir.push(H64::from(spec.genesis_header().hash()).hex());
|
||||||
let client = Arc::new(RwLock::new(try!(Client::new(spec, &dir, net_service.io().channel()))));
|
let client = try!(Client::new(spec, &dir, net_service.io().channel()));
|
||||||
EthSync::register(&mut net_service, client.clone());
|
EthSync::register(&mut net_service, client.clone());
|
||||||
let client_io = Arc::new(ClientIoHandler {
|
let client_io = Arc::new(ClientIoHandler {
|
||||||
client: client.clone()
|
client: client.clone()
|
||||||
@ -39,14 +51,14 @@ impl ClientService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// TODO [arkpar] Please document me
|
/// TODO [arkpar] Please document me
|
||||||
pub fn client(&self) -> Arc<RwLock<Client>> {
|
pub fn client(&self) -> Arc<Client> {
|
||||||
self.client.clone()
|
self.client.clone()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// IO interface for the Client handler
|
/// IO interface for the Client handler
|
||||||
struct ClientIoHandler {
|
struct ClientIoHandler {
|
||||||
client: Arc<RwLock<Client>>
|
client: Arc<Client>
|
||||||
}
|
}
|
||||||
|
|
||||||
const CLIENT_TICK_TIMER: TimerToken = 0;
|
const CLIENT_TICK_TIMER: TimerToken = 0;
|
||||||
@ -59,16 +71,16 @@ impl IoHandler<NetSyncMessage> for ClientIoHandler {
|
|||||||
|
|
||||||
fn timeout(&self, _io: &IoContext<NetSyncMessage>, timer: TimerToken) {
|
fn timeout(&self, _io: &IoContext<NetSyncMessage>, timer: TimerToken) {
|
||||||
if timer == CLIENT_TICK_TIMER {
|
if timer == CLIENT_TICK_TIMER {
|
||||||
self.client.read().unwrap().tick();
|
self.client.tick();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn message(&self, _io: &IoContext<NetSyncMessage>, net_message: &NetSyncMessage) {
|
fn message(&self, io: &IoContext<NetSyncMessage>, net_message: &NetSyncMessage) {
|
||||||
match net_message {
|
match net_message {
|
||||||
&UserMessage(ref message) => {
|
&UserMessage(ref message) => {
|
||||||
match message {
|
match message {
|
||||||
&SyncMessage::BlockVerified => {
|
&SyncMessage::BlockVerified => {
|
||||||
self.client.write().unwrap().import_verified_blocks();
|
self.client.import_verified_blocks(&io.channel());
|
||||||
},
|
},
|
||||||
_ => {}, // ignore other messages
|
_ => {}, // ignore other messages
|
||||||
}
|
}
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
use client::BlockChainClient;
|
use client::BlockChainClient;
|
||||||
use util::{NetworkContext, PeerId, PacketId,};
|
use util::{NetworkContext, PeerId, PacketId,};
|
||||||
use util::error::UtilError;
|
use util::error::UtilError;
|
||||||
use sync::SyncMessage;
|
use service::SyncMessage;
|
||||||
|
|
||||||
/// IO interface for the syning handler.
|
/// IO interface for the syning handler.
|
||||||
/// Provides peer connection management and an interface to the blockchain client.
|
/// Provides peer connection management and an interface to the blockchain client.
|
||||||
@ -14,7 +14,7 @@ pub trait SyncIo {
|
|||||||
/// Send a packet to a peer.
|
/// Send a packet to a peer.
|
||||||
fn send(&mut self, peer_id: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), UtilError>;
|
fn send(&mut self, peer_id: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), UtilError>;
|
||||||
/// Get the blockchain
|
/// Get the blockchain
|
||||||
fn chain<'s>(&'s mut self) -> &'s mut BlockChainClient;
|
fn chain<'s>(&'s self) -> &'s BlockChainClient;
|
||||||
/// Returns peer client identifier string
|
/// Returns peer client identifier string
|
||||||
fn peer_info(&self, peer_id: PeerId) -> String {
|
fn peer_info(&self, peer_id: PeerId) -> String {
|
||||||
peer_id.to_string()
|
peer_id.to_string()
|
||||||
@ -24,12 +24,12 @@ pub trait SyncIo {
|
|||||||
/// Wraps `NetworkContext` and the blockchain client
|
/// Wraps `NetworkContext` and the blockchain client
|
||||||
pub struct NetSyncIo<'s, 'h> where 'h: 's {
|
pub struct NetSyncIo<'s, 'h> where 'h: 's {
|
||||||
network: &'s NetworkContext<'h, SyncMessage>,
|
network: &'s NetworkContext<'h, SyncMessage>,
|
||||||
chain: &'s mut BlockChainClient
|
chain: &'s BlockChainClient
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'s, 'h> NetSyncIo<'s, 'h> {
|
impl<'s, 'h> NetSyncIo<'s, 'h> {
|
||||||
/// Creates a new instance from the `NetworkContext` and the blockchain client reference.
|
/// Creates a new instance from the `NetworkContext` and the blockchain client reference.
|
||||||
pub fn new(network: &'s NetworkContext<'h, SyncMessage>, chain: &'s mut BlockChainClient) -> NetSyncIo<'s, 'h> {
|
pub fn new(network: &'s NetworkContext<'h, SyncMessage>, chain: &'s BlockChainClient) -> NetSyncIo<'s, 'h> {
|
||||||
NetSyncIo {
|
NetSyncIo {
|
||||||
network: network,
|
network: network,
|
||||||
chain: chain,
|
chain: chain,
|
||||||
@ -50,7 +50,7 @@ impl<'s, 'h> SyncIo for NetSyncIo<'s, 'h> {
|
|||||||
self.network.send(peer_id, packet_id, data)
|
self.network.send(peer_id, packet_id, data)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn chain<'a>(&'a mut self) -> &'a mut BlockChainClient {
|
fn chain<'a>(&'a self) -> &'a BlockChainClient {
|
||||||
self.chain
|
self.chain
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -25,9 +25,10 @@
|
|||||||
use std::ops::*;
|
use std::ops::*;
|
||||||
use std::sync::*;
|
use std::sync::*;
|
||||||
use client::Client;
|
use client::Client;
|
||||||
use util::network::{NetworkProtocolHandler, NetworkService, NetworkContext, PeerId, NetworkIoMessage};
|
use util::network::{NetworkProtocolHandler, NetworkService, NetworkContext, PeerId};
|
||||||
use sync::chain::ChainSync;
|
use sync::chain::ChainSync;
|
||||||
use util::{Bytes, TimerToken};
|
use util::TimerToken;
|
||||||
|
use service::SyncMessage;
|
||||||
use sync::io::NetSyncIo;
|
use sync::io::NetSyncIo;
|
||||||
|
|
||||||
mod chain;
|
mod chain;
|
||||||
@ -39,22 +40,10 @@ mod tests;
|
|||||||
|
|
||||||
const SYNC_TIMER: usize = 0;
|
const SYNC_TIMER: usize = 0;
|
||||||
|
|
||||||
/// Message type for external events
|
|
||||||
#[derive(Clone)]
|
|
||||||
pub enum SyncMessage {
|
|
||||||
/// New block has been imported into the blockchain
|
|
||||||
NewChainBlock(Bytes), //TODO: use Cow
|
|
||||||
/// A block is ready
|
|
||||||
BlockVerified,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// TODO [arkpar] Please document me
|
|
||||||
pub type NetSyncMessage = NetworkIoMessage<SyncMessage>;
|
|
||||||
|
|
||||||
/// Ethereum network protocol handler
|
/// Ethereum network protocol handler
|
||||||
pub struct EthSync {
|
pub struct EthSync {
|
||||||
/// Shared blockchain client. TODO: this should evetually become an IPC endpoint
|
/// Shared blockchain client. TODO: this should evetually become an IPC endpoint
|
||||||
chain: Arc<RwLock<Client>>,
|
chain: Arc<Client>,
|
||||||
/// Sync strategy
|
/// Sync strategy
|
||||||
sync: RwLock<ChainSync>
|
sync: RwLock<ChainSync>
|
||||||
}
|
}
|
||||||
@ -63,7 +52,7 @@ pub use self::chain::SyncStatus;
|
|||||||
|
|
||||||
impl EthSync {
|
impl EthSync {
|
||||||
/// Creates and register protocol with the network service
|
/// Creates and register protocol with the network service
|
||||||
pub fn register(service: &mut NetworkService<SyncMessage>, chain: Arc<RwLock<Client>>) {
|
pub fn register(service: &mut NetworkService<SyncMessage>, chain: Arc<Client>) {
|
||||||
let sync = Arc::new(EthSync {
|
let sync = Arc::new(EthSync {
|
||||||
chain: chain,
|
chain: chain,
|
||||||
sync: RwLock::new(ChainSync::new()),
|
sync: RwLock::new(ChainSync::new()),
|
||||||
@ -78,12 +67,12 @@ impl EthSync {
|
|||||||
|
|
||||||
/// Stop sync
|
/// Stop sync
|
||||||
pub fn stop(&mut self, io: &mut NetworkContext<SyncMessage>) {
|
pub fn stop(&mut self, io: &mut NetworkContext<SyncMessage>) {
|
||||||
self.sync.write().unwrap().abort(&mut NetSyncIo::new(io, self.chain.write().unwrap().deref_mut()));
|
self.sync.write().unwrap().abort(&mut NetSyncIo::new(io, self.chain.deref()));
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Restart sync
|
/// Restart sync
|
||||||
pub fn restart(&mut self, io: &mut NetworkContext<SyncMessage>) {
|
pub fn restart(&mut self, io: &mut NetworkContext<SyncMessage>) {
|
||||||
self.sync.write().unwrap().restart(&mut NetSyncIo::new(io, self.chain.write().unwrap().deref_mut()));
|
self.sync.write().unwrap().restart(&mut NetSyncIo::new(io, self.chain.deref()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -93,20 +82,20 @@ impl NetworkProtocolHandler<SyncMessage> for EthSync {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn read(&self, io: &NetworkContext<SyncMessage>, peer: &PeerId, packet_id: u8, data: &[u8]) {
|
fn read(&self, io: &NetworkContext<SyncMessage>, peer: &PeerId, packet_id: u8, data: &[u8]) {
|
||||||
self.sync.write().unwrap().on_packet(&mut NetSyncIo::new(io, self.chain.write().unwrap().deref_mut()) , *peer, packet_id, data);
|
self.sync.write().unwrap().on_packet(&mut NetSyncIo::new(io, self.chain.deref()) , *peer, packet_id, data);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn connected(&self, io: &NetworkContext<SyncMessage>, peer: &PeerId) {
|
fn connected(&self, io: &NetworkContext<SyncMessage>, peer: &PeerId) {
|
||||||
self.sync.write().unwrap().on_peer_connected(&mut NetSyncIo::new(io, self.chain.write().unwrap().deref_mut()), *peer);
|
self.sync.write().unwrap().on_peer_connected(&mut NetSyncIo::new(io, self.chain.deref()), *peer);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn disconnected(&self, io: &NetworkContext<SyncMessage>, peer: &PeerId) {
|
fn disconnected(&self, io: &NetworkContext<SyncMessage>, peer: &PeerId) {
|
||||||
self.sync.write().unwrap().on_peer_aborting(&mut NetSyncIo::new(io, self.chain.write().unwrap().deref_mut()), *peer);
|
self.sync.write().unwrap().on_peer_aborting(&mut NetSyncIo::new(io, self.chain.deref()), *peer);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn timeout(&self, io: &NetworkContext<SyncMessage>, timer: TimerToken) {
|
fn timeout(&self, io: &NetworkContext<SyncMessage>, timer: TimerToken) {
|
||||||
if timer == SYNC_TIMER {
|
if timer == SYNC_TIMER {
|
||||||
self.sync.write().unwrap().maintain_sync(&mut NetSyncIo::new(io, self.chain.write().unwrap().deref_mut()));
|
self.sync.write().unwrap().maintain_sync(&mut NetSyncIo::new(io, self.chain.deref()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -102,6 +102,11 @@ impl<Message> IoContext<Message> where Message: Send + Clone + 'static {
|
|||||||
pub fn message(&self, message: Message) {
|
pub fn message(&self, message: Message) {
|
||||||
self.channel.send(message).expect("Error seding message");
|
self.channel.send(message).expect("Error seding message");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Get message channel
|
||||||
|
pub fn channel(&self) -> IoChannel<Message> {
|
||||||
|
self.channel.clone()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
|
@ -34,6 +34,16 @@ impl JournalDB {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Create a new instance given a shared `backing` database.
|
||||||
|
pub fn new_with_arc(backing: Arc<DB>) -> JournalDB {
|
||||||
|
JournalDB {
|
||||||
|
forward: OverlayDB::new_with_arc(backing.clone()),
|
||||||
|
backing: backing,
|
||||||
|
inserts: vec![],
|
||||||
|
removes: vec![],
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Create a new instance with an anonymous temporary database.
|
/// Create a new instance with an anonymous temporary database.
|
||||||
pub fn new_temp() -> JournalDB {
|
pub fn new_temp() -> JournalDB {
|
||||||
let mut dir = env::temp_dir();
|
let mut dir = env::temp_dir();
|
||||||
|
@ -675,13 +675,13 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
|
|||||||
if let Some(connection) = self.connections.read().unwrap().get(stream).map(|c| c.clone()) {
|
if let Some(connection) = self.connections.read().unwrap().get(stream).map(|c| c.clone()) {
|
||||||
match connection.lock().unwrap().deref() {
|
match connection.lock().unwrap().deref() {
|
||||||
&ConnectionEntry::Handshake(ref h) => h.register_socket(reg, event_loop).expect("Error registering socket"),
|
&ConnectionEntry::Handshake(ref h) => h.register_socket(reg, event_loop).expect("Error registering socket"),
|
||||||
_ => warn!("Unexpected session stream registration")
|
&ConnectionEntry::Session(_) => warn!("Unexpected session stream registration")
|
||||||
}
|
}
|
||||||
} else { warn!("Unexpected stream registration")}
|
} else {} // expired
|
||||||
}
|
}
|
||||||
NODETABLE_RECEIVE => event_loop.register(self.udp_socket.lock().unwrap().deref(), Token(NODETABLE_RECEIVE), EventSet::all(), PollOpt::edge()).expect("Error registering stream"),
|
NODETABLE_RECEIVE => event_loop.register(self.udp_socket.lock().unwrap().deref(), Token(NODETABLE_RECEIVE), EventSet::all(), PollOpt::edge()).expect("Error registering stream"),
|
||||||
TCP_ACCEPT => event_loop.register(self.tcp_listener.lock().unwrap().deref(), Token(TCP_ACCEPT), EventSet::all(), PollOpt::edge()).expect("Error registering stream"),
|
TCP_ACCEPT => event_loop.register(self.tcp_listener.lock().unwrap().deref(), Token(TCP_ACCEPT), EventSet::all(), PollOpt::edge()).expect("Error registering stream"),
|
||||||
_ => warn!("Unexpected stream regitration")
|
_ => warn!("Unexpected stream registration")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -693,7 +693,7 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
|
|||||||
&ConnectionEntry::Handshake(ref h) => h.update_socket(reg, event_loop).expect("Error updating socket"),
|
&ConnectionEntry::Handshake(ref h) => h.update_socket(reg, event_loop).expect("Error updating socket"),
|
||||||
&ConnectionEntry::Session(ref s) => s.update_socket(reg, event_loop).expect("Error updating socket"),
|
&ConnectionEntry::Session(ref s) => s.update_socket(reg, event_loop).expect("Error updating socket"),
|
||||||
}
|
}
|
||||||
} else { warn!("Unexpected stream update")}
|
} else {} // expired
|
||||||
}
|
}
|
||||||
NODETABLE_RECEIVE => event_loop.reregister(self.udp_socket.lock().unwrap().deref(), Token(NODETABLE_RECEIVE), EventSet::all(), PollOpt::edge()).expect("Error reregistering stream"),
|
NODETABLE_RECEIVE => event_loop.reregister(self.udp_socket.lock().unwrap().deref(), Token(NODETABLE_RECEIVE), EventSet::all(), PollOpt::edge()).expect("Error reregistering stream"),
|
||||||
TCP_ACCEPT => event_loop.reregister(self.tcp_listener.lock().unwrap().deref(), Token(TCP_ACCEPT), EventSet::all(), PollOpt::edge()).expect("Error reregistering stream"),
|
TCP_ACCEPT => event_loop.reregister(self.tcp_listener.lock().unwrap().deref(), Token(TCP_ACCEPT), EventSet::all(), PollOpt::edge()).expect("Error reregistering stream"),
|
||||||
|
Loading…
Reference in New Issue
Block a user