Minor optimizations
This commit is contained in:
parent
81bb86d0ed
commit
9bcb720f1f
@ -14,6 +14,7 @@ use ethcore::client::*;
|
|||||||
use ethcore::service::{ClientService, NetSyncMessage};
|
use ethcore::service::{ClientService, NetSyncMessage};
|
||||||
use ethcore::ethereum;
|
use ethcore::ethereum;
|
||||||
use ethcore::blockchain::CacheSize;
|
use ethcore::blockchain::CacheSize;
|
||||||
|
use ethcore::sync::EthSync;
|
||||||
|
|
||||||
fn setup_log() {
|
fn setup_log() {
|
||||||
let mut builder = LogBuilder::new();
|
let mut builder = LogBuilder::new();
|
||||||
@ -30,7 +31,7 @@ fn main() {
|
|||||||
setup_log();
|
setup_log();
|
||||||
let spec = ethereum::new_frontier();
|
let spec = ethereum::new_frontier();
|
||||||
let mut service = ClientService::start(spec).unwrap();
|
let mut service = ClientService::start(spec).unwrap();
|
||||||
let io_handler = Arc::new(ClientIoHandler { client: service.client(), info: Default::default() });
|
let io_handler = Arc::new(ClientIoHandler { client: service.client(), info: Default::default(), sync: service.sync() });
|
||||||
service.io().register_handler(io_handler).expect("Error registering IO handler");
|
service.io().register_handler(io_handler).expect("Error registering IO handler");
|
||||||
|
|
||||||
let exit = Arc::new(Condvar::new());
|
let exit = Arc::new(Condvar::new());
|
||||||
@ -60,22 +61,29 @@ impl Default for Informant {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl Informant {
|
impl Informant {
|
||||||
pub fn tick(&self, client: &Client) {
|
pub fn tick(&self, client: &Client, sync: &EthSync) {
|
||||||
// 5 seconds betwen calls. TODO: calculate this properly.
|
// 5 seconds betwen calls. TODO: calculate this properly.
|
||||||
let dur = 5usize;
|
let dur = 5usize;
|
||||||
|
|
||||||
let chain_info = client.chain_info();
|
let chain_info = client.chain_info();
|
||||||
|
let queue_info = client.queue_info();
|
||||||
let cache_info = client.cache_info();
|
let cache_info = client.cache_info();
|
||||||
let report = client.report();
|
let report = client.report();
|
||||||
|
let sync_info = sync.status();
|
||||||
|
|
||||||
if let (_, &Some(ref last_cache_info), &Some(ref last_report)) = (self.chain_info.read().unwrap().deref(), self.cache_info.read().unwrap().deref(), self.report.read().unwrap().deref()) {
|
if let (_, &Some(ref last_cache_info), &Some(ref last_report)) = (self.chain_info.read().unwrap().deref(), self.cache_info.read().unwrap().deref(), self.report.read().unwrap().deref()) {
|
||||||
println!("[ {} {} ]---[ {} blk/s | {} tx/s | {} gas/s //···{}···// {} ({}) bl {} ({}) ex ]",
|
println!("[ {} {} ]---[ {} blk/s | {} tx/s | {} gas/s //··· {}/{} peers, {} downloaded, {} queued ···// {} ({}) bl {} ({}) ex ]",
|
||||||
chain_info.best_block_number,
|
chain_info.best_block_number,
|
||||||
chain_info.best_block_hash,
|
chain_info.best_block_hash,
|
||||||
(report.blocks_imported - last_report.blocks_imported) / dur,
|
(report.blocks_imported - last_report.blocks_imported) / dur,
|
||||||
(report.transactions_applied - last_report.transactions_applied) / dur,
|
(report.transactions_applied - last_report.transactions_applied) / dur,
|
||||||
(report.gas_processed - last_report.gas_processed) / From::from(dur),
|
(report.gas_processed - last_report.gas_processed) / From::from(dur),
|
||||||
0, // TODO: peers
|
|
||||||
|
sync_info.num_active_peers,
|
||||||
|
sync_info.num_peers,
|
||||||
|
sync_info.blocks_received,
|
||||||
|
queue_info.queue_size,
|
||||||
|
|
||||||
cache_info.blocks,
|
cache_info.blocks,
|
||||||
cache_info.blocks as isize - last_cache_info.blocks as isize,
|
cache_info.blocks as isize - last_cache_info.blocks as isize,
|
||||||
cache_info.block_details,
|
cache_info.block_details,
|
||||||
@ -93,6 +101,7 @@ const INFO_TIMER: TimerToken = 0;
|
|||||||
|
|
||||||
struct ClientIoHandler {
|
struct ClientIoHandler {
|
||||||
client: Arc<Client>,
|
client: Arc<Client>,
|
||||||
|
sync: Arc<EthSync>,
|
||||||
info: Informant,
|
info: Informant,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -103,7 +112,7 @@ impl IoHandler<NetSyncMessage> for ClientIoHandler {
|
|||||||
|
|
||||||
fn timeout(&self, _io: &IoContext<NetSyncMessage>, timer: TimerToken) {
|
fn timeout(&self, _io: &IoContext<NetSyncMessage>, timer: TimerToken) {
|
||||||
if INFO_TIMER == timer {
|
if INFO_TIMER == timer {
|
||||||
self.info.tick(&self.client);
|
self.info.tick(&self.client, &self.sync);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -10,6 +10,15 @@ use views::*;
|
|||||||
use header::*;
|
use header::*;
|
||||||
use service::*;
|
use service::*;
|
||||||
|
|
||||||
|
/// Block queue status
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct BlockQueueInfo {
|
||||||
|
/// Indicates that queue is full
|
||||||
|
pub full: bool,
|
||||||
|
/// Number of queued blocks
|
||||||
|
pub queue_size: usize,
|
||||||
|
}
|
||||||
|
|
||||||
/// A queue of blocks. Sits between network or other I/O and the BlockChain.
|
/// A queue of blocks. Sits between network or other I/O and the BlockChain.
|
||||||
/// Sorts them ready for blockchain insertion.
|
/// Sorts them ready for blockchain insertion.
|
||||||
pub struct BlockQueue {
|
pub struct BlockQueue {
|
||||||
@ -65,14 +74,15 @@ impl BlockQueue {
|
|||||||
let deleting = Arc::new(AtomicBool::new(false));
|
let deleting = Arc::new(AtomicBool::new(false));
|
||||||
|
|
||||||
let mut verifiers: Vec<JoinHandle<()>> = Vec::new();
|
let mut verifiers: Vec<JoinHandle<()>> = Vec::new();
|
||||||
let thread_count = max(::num_cpus::get(), 2) - 1;
|
let thread_count = max(::num_cpus::get(), 3) - 2;
|
||||||
for _ in 0..thread_count {
|
for i in 0..thread_count {
|
||||||
let verification = verification.clone();
|
let verification = verification.clone();
|
||||||
let engine = engine.clone();
|
let engine = engine.clone();
|
||||||
let more_to_verify = more_to_verify.clone();
|
let more_to_verify = more_to_verify.clone();
|
||||||
let ready_signal = ready_signal.clone();
|
let ready_signal = ready_signal.clone();
|
||||||
let deleting = deleting.clone();
|
let deleting = deleting.clone();
|
||||||
verifiers.push(thread::spawn(move || BlockQueue::verify(verification, engine, more_to_verify, ready_signal, deleting)));
|
verifiers.push(thread::Builder::new().name(format!("Verifier #{}", i)).spawn(move || BlockQueue::verify(verification, engine, more_to_verify, ready_signal, deleting))
|
||||||
|
.expect("Error starting block verification thread"));
|
||||||
}
|
}
|
||||||
BlockQueue {
|
BlockQueue {
|
||||||
engine: engine,
|
engine: engine,
|
||||||
@ -206,7 +216,7 @@ impl BlockQueue {
|
|||||||
verification.verified = new_verified;
|
verification.verified = new_verified;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// TODO [arkpar] Please document me
|
/// Removes up to `max` verified blocks from the queue
|
||||||
pub fn drain(&mut self, max: usize) -> Vec<PreVerifiedBlock> {
|
pub fn drain(&mut self, max: usize) -> Vec<PreVerifiedBlock> {
|
||||||
let mut verification = self.verification.lock().unwrap();
|
let mut verification = self.verification.lock().unwrap();
|
||||||
let count = min(max, verification.verified.len());
|
let count = min(max, verification.verified.len());
|
||||||
@ -217,8 +227,19 @@ impl BlockQueue {
|
|||||||
result.push(block);
|
result.push(block);
|
||||||
}
|
}
|
||||||
self.ready_signal.reset();
|
self.ready_signal.reset();
|
||||||
|
if !verification.verified.is_empty() {
|
||||||
|
self.ready_signal.set();
|
||||||
|
}
|
||||||
result
|
result
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Get queue status.
|
||||||
|
pub fn queue_info(&self) -> BlockQueueInfo {
|
||||||
|
BlockQueueInfo {
|
||||||
|
full: false,
|
||||||
|
queue_size: self.verification.lock().unwrap().unverified.len(),
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Drop for BlockQueue {
|
impl Drop for BlockQueue {
|
||||||
|
@ -342,19 +342,19 @@ impl BlockChain {
|
|||||||
Some(h) => h,
|
Some(h) => h,
|
||||||
None => return None,
|
None => return None,
|
||||||
};
|
};
|
||||||
Some(self._tree_route((from_details, from), (to_details, to)))
|
Some(self._tree_route((&from_details, &from), (&to_details, &to)))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Similar to `tree_route` function, but can be used to return a route
|
/// Similar to `tree_route` function, but can be used to return a route
|
||||||
/// between blocks which may not be in database yet.
|
/// between blocks which may not be in database yet.
|
||||||
fn _tree_route(&self, from: (BlockDetails, H256), to: (BlockDetails, H256)) -> TreeRoute {
|
fn _tree_route(&self, from: (&BlockDetails, &H256), to: (&BlockDetails, &H256)) -> TreeRoute {
|
||||||
let mut from_branch = vec![];
|
let mut from_branch = vec![];
|
||||||
let mut to_branch = vec![];
|
let mut to_branch = vec![];
|
||||||
|
|
||||||
let mut from_details = from.0;
|
let mut from_details = from.0.clone();
|
||||||
let mut to_details = to.0;
|
let mut to_details = to.0.clone();
|
||||||
let mut current_from = from.1;
|
let mut current_from = from.1.clone();
|
||||||
let mut current_to = to.1;
|
let mut current_to = to.1.clone();
|
||||||
|
|
||||||
// reset from && to to the same level
|
// reset from && to to the same level
|
||||||
while from_details.number > to_details.number {
|
while from_details.number > to_details.number {
|
||||||
@ -409,7 +409,7 @@ impl BlockChain {
|
|||||||
|
|
||||||
// store block in db
|
// store block in db
|
||||||
self.blocks_db.put(&hash, &bytes).unwrap();
|
self.blocks_db.put(&hash, &bytes).unwrap();
|
||||||
let (batch, new_best) = self.block_to_extras_insert_batch(bytes);
|
let (batch, new_best, details) = self.block_to_extras_insert_batch(bytes);
|
||||||
|
|
||||||
// update best block
|
// update best block
|
||||||
let mut best_block = self.best_block.write().unwrap();
|
let mut best_block = self.best_block.write().unwrap();
|
||||||
@ -420,6 +420,8 @@ impl BlockChain {
|
|||||||
// update caches
|
// update caches
|
||||||
let mut write = self.block_details.write().unwrap();
|
let mut write = self.block_details.write().unwrap();
|
||||||
write.remove(&header.parent_hash());
|
write.remove(&header.parent_hash());
|
||||||
|
write.insert(hash.clone(), details);
|
||||||
|
self.note_used(CacheID::Block(hash));
|
||||||
|
|
||||||
// update extras database
|
// update extras database
|
||||||
self.extras_db.write(batch).unwrap();
|
self.extras_db.write(batch).unwrap();
|
||||||
@ -427,7 +429,7 @@ impl BlockChain {
|
|||||||
|
|
||||||
/// Transforms block into WriteBatch that may be written into database
|
/// Transforms block into WriteBatch that may be written into database
|
||||||
/// Additionally, if it's new best block it returns new best block object.
|
/// Additionally, if it's new best block it returns new best block object.
|
||||||
fn block_to_extras_insert_batch(&self, bytes: &[u8]) -> (WriteBatch, Option<BestBlock>) {
|
fn block_to_extras_insert_batch(&self, bytes: &[u8]) -> (WriteBatch, Option<BestBlock>, BlockDetails) {
|
||||||
// create views onto rlp
|
// create views onto rlp
|
||||||
let block = BlockView::new(bytes);
|
let block = BlockView::new(bytes);
|
||||||
let header = block.header_view();
|
let header = block.header_view();
|
||||||
@ -459,7 +461,7 @@ impl BlockChain {
|
|||||||
|
|
||||||
// if it's not new best block, just return
|
// if it's not new best block, just return
|
||||||
if !is_new_best {
|
if !is_new_best {
|
||||||
return (batch, None);
|
return (batch, None, details);
|
||||||
}
|
}
|
||||||
|
|
||||||
// if its new best block we need to make sure that all ancestors
|
// if its new best block we need to make sure that all ancestors
|
||||||
@ -467,7 +469,7 @@ impl BlockChain {
|
|||||||
// find the route between old best block and the new one
|
// find the route between old best block and the new one
|
||||||
let best_hash = self.best_block_hash();
|
let best_hash = self.best_block_hash();
|
||||||
let best_details = self.block_details(&best_hash).expect("best block hash is invalid!");
|
let best_details = self.block_details(&best_hash).expect("best block hash is invalid!");
|
||||||
let route = self._tree_route((best_details, best_hash), (details, hash.clone()));
|
let route = self._tree_route((&best_details, &best_hash), (&details, &hash));
|
||||||
|
|
||||||
match route.blocks.len() {
|
match route.blocks.len() {
|
||||||
// its our parent
|
// its our parent
|
||||||
@ -494,7 +496,7 @@ impl BlockChain {
|
|||||||
total_difficulty: total_difficulty
|
total_difficulty: total_difficulty
|
||||||
};
|
};
|
||||||
|
|
||||||
(batch, Some(best_block))
|
(batch, Some(best_block), details)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns true if transaction is known.
|
/// Returns true if transaction is known.
|
||||||
|
@ -6,8 +6,8 @@ use error::*;
|
|||||||
use header::BlockNumber;
|
use header::BlockNumber;
|
||||||
use spec::Spec;
|
use spec::Spec;
|
||||||
use engine::Engine;
|
use engine::Engine;
|
||||||
use block_queue::BlockQueue;
|
use block_queue::{BlockQueue, BlockQueueInfo};
|
||||||
use db_queue::{DbQueue, StateDBCommit};
|
use db_queue::{DbQueue};
|
||||||
use service::NetSyncMessage;
|
use service::NetSyncMessage;
|
||||||
use env_info::LastHashes;
|
use env_info::LastHashes;
|
||||||
use verification::*;
|
use verification::*;
|
||||||
@ -47,13 +47,6 @@ impl fmt::Display for BlockChainInfo {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Block queue status
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct BlockQueueStatus {
|
|
||||||
/// TODO [arkpar] Please document me
|
|
||||||
pub full: bool,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// TODO [arkpar] Please document me
|
/// TODO [arkpar] Please document me
|
||||||
pub type TreeRoute = ::blockchain::TreeRoute;
|
pub type TreeRoute = ::blockchain::TreeRoute;
|
||||||
|
|
||||||
@ -99,7 +92,7 @@ pub trait BlockChainClient : Sync + Send {
|
|||||||
fn import_block(&self, bytes: Bytes) -> ImportResult;
|
fn import_block(&self, bytes: Bytes) -> ImportResult;
|
||||||
|
|
||||||
/// Get block queue information.
|
/// Get block queue information.
|
||||||
fn queue_status(&self) -> BlockQueueStatus;
|
fn queue_info(&self) -> BlockQueueInfo;
|
||||||
|
|
||||||
/// Clear block queue and abort all import activity.
|
/// Clear block queue and abort all import activity.
|
||||||
fn clear_queue(&self);
|
fn clear_queue(&self);
|
||||||
@ -149,8 +142,6 @@ impl Client {
|
|||||||
let mut opts = Options::new();
|
let mut opts = Options::new();
|
||||||
opts.set_max_open_files(256);
|
opts.set_max_open_files(256);
|
||||||
opts.create_if_missing(true);
|
opts.create_if_missing(true);
|
||||||
opts.set_disable_data_sync(true);
|
|
||||||
opts.set_disable_auto_compactions(true);
|
|
||||||
/*opts.set_use_fsync(false);
|
/*opts.set_use_fsync(false);
|
||||||
opts.set_bytes_per_sync(8388608);
|
opts.set_bytes_per_sync(8388608);
|
||||||
opts.set_disable_data_sync(false);
|
opts.set_disable_data_sync(false);
|
||||||
@ -199,7 +190,6 @@ impl Client {
|
|||||||
|
|
||||||
/// This is triggered by a message coming from a block queue when the block is ready for insertion
|
/// This is triggered by a message coming from a block queue when the block is ready for insertion
|
||||||
pub fn import_verified_blocks(&self, _io: &IoChannel<NetSyncMessage>) {
|
pub fn import_verified_blocks(&self, _io: &IoChannel<NetSyncMessage>) {
|
||||||
|
|
||||||
let mut bad = HashSet::new();
|
let mut bad = HashSet::new();
|
||||||
let _import_lock = self.import_lock.lock();
|
let _import_lock = self.import_lock.lock();
|
||||||
let blocks = self.block_queue.write().unwrap().drain(128);
|
let blocks = self.block_queue.write().unwrap().drain(128);
|
||||||
@ -243,11 +233,7 @@ impl Client {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let db = match self.uncommited_states.read().unwrap().get(&header.parent_hash) {
|
let db = self.state_db.clone();
|
||||||
Some(db) => db.clone(),
|
|
||||||
None => self.state_db.clone(),
|
|
||||||
};
|
|
||||||
|
|
||||||
let result = match enact_verified(&block, self.engine.deref().deref(), db, &parent, &last_hashes) {
|
let result = match enact_verified(&block, self.engine.deref().deref(), db, &parent, &last_hashes) {
|
||||||
Ok(b) => b,
|
Ok(b) => b,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
@ -272,15 +258,6 @@ impl Client {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
/*
|
|
||||||
let db = result.drain();
|
|
||||||
self.uncommited_states.write().unwrap().insert(header.hash(), db.clone());
|
|
||||||
self.db_queue.write().unwrap().queue(StateDBCommit {
|
|
||||||
now: header.number(),
|
|
||||||
hash: header.hash().clone(),
|
|
||||||
end: ancient.map(|n|(n, self.chain.read().unwrap().block_hash(n).unwrap())),
|
|
||||||
db: db,
|
|
||||||
});*/
|
|
||||||
self.report.write().unwrap().accrue_block(&block);
|
self.report.write().unwrap().accrue_block(&block);
|
||||||
trace!(target: "client", "Imported #{} ({})", header.number(), header.hash());
|
trace!(target: "client", "Imported #{} ({})", header.number(), header.hash());
|
||||||
}
|
}
|
||||||
@ -369,10 +346,8 @@ impl BlockChainClient for Client {
|
|||||||
self.block_queue.write().unwrap().import_block(bytes)
|
self.block_queue.write().unwrap().import_block(bytes)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn queue_status(&self) -> BlockQueueStatus {
|
fn queue_info(&self) -> BlockQueueInfo {
|
||||||
BlockQueueStatus {
|
self.block_queue.read().unwrap().queue_info()
|
||||||
full: false
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn clear_queue(&self) {
|
fn clear_queue(&self) {
|
||||||
|
@ -21,6 +21,7 @@ pub type NetSyncMessage = NetworkIoMessage<SyncMessage>;
|
|||||||
pub struct ClientService {
|
pub struct ClientService {
|
||||||
net_service: NetworkService<SyncMessage>,
|
net_service: NetworkService<SyncMessage>,
|
||||||
client: Arc<Client>,
|
client: Arc<Client>,
|
||||||
|
sync: Arc<EthSync>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ClientService {
|
impl ClientService {
|
||||||
@ -33,7 +34,7 @@ impl ClientService {
|
|||||||
dir.push(".parity");
|
dir.push(".parity");
|
||||||
dir.push(H64::from(spec.genesis_header().hash()).hex());
|
dir.push(H64::from(spec.genesis_header().hash()).hex());
|
||||||
let client = try!(Client::new(spec, &dir, net_service.io().channel()));
|
let client = try!(Client::new(spec, &dir, net_service.io().channel()));
|
||||||
EthSync::register(&mut net_service, client.clone());
|
let sync = EthSync::register(&mut net_service, client.clone());
|
||||||
let client_io = Arc::new(ClientIoHandler {
|
let client_io = Arc::new(ClientIoHandler {
|
||||||
client: client.clone()
|
client: client.clone()
|
||||||
});
|
});
|
||||||
@ -42,6 +43,7 @@ impl ClientService {
|
|||||||
Ok(ClientService {
|
Ok(ClientService {
|
||||||
net_service: net_service,
|
net_service: net_service,
|
||||||
client: client,
|
client: client,
|
||||||
|
sync: sync,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -53,6 +55,12 @@ impl ClientService {
|
|||||||
/// TODO [arkpar] Please document me
|
/// TODO [arkpar] Please document me
|
||||||
pub fn client(&self) -> Arc<Client> {
|
pub fn client(&self) -> Arc<Client> {
|
||||||
self.client.clone()
|
self.client.clone()
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get shared sync handler
|
||||||
|
pub fn sync(&self) -> Arc<EthSync> {
|
||||||
|
self.sync.clone()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -107,6 +107,10 @@ pub struct SyncStatus {
|
|||||||
pub blocks_total: usize,
|
pub blocks_total: usize,
|
||||||
/// Number of blocks downloaded so far.
|
/// Number of blocks downloaded so far.
|
||||||
pub blocks_received: usize,
|
pub blocks_received: usize,
|
||||||
|
/// Total number of connected peers
|
||||||
|
pub num_peers: usize,
|
||||||
|
/// Total number of active peers
|
||||||
|
pub num_active_peers: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(PartialEq, Eq, Debug)]
|
#[derive(PartialEq, Eq, Debug)]
|
||||||
@ -195,8 +199,10 @@ impl ChainSync {
|
|||||||
start_block_number: self.starting_block,
|
start_block_number: self.starting_block,
|
||||||
last_imported_block_number: self.last_imported_block,
|
last_imported_block_number: self.last_imported_block,
|
||||||
highest_block_number: self.highest_block,
|
highest_block_number: self.highest_block,
|
||||||
blocks_total: (self.last_imported_block - self.starting_block) as usize,
|
blocks_received: (self.last_imported_block - self.starting_block) as usize,
|
||||||
blocks_received: (self.highest_block - self.starting_block) as usize,
|
blocks_total: (self.highest_block - self.starting_block) as usize,
|
||||||
|
num_peers: self.peers.len(),
|
||||||
|
num_active_peers: self.peers.values().filter(|p| p.asking != PeerAsking::Nothing).count(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -544,7 +550,7 @@ impl ChainSync {
|
|||||||
fn request_blocks(&mut self, io: &mut SyncIo, peer_id: PeerId) {
|
fn request_blocks(&mut self, io: &mut SyncIo, peer_id: PeerId) {
|
||||||
self.clear_peer_download(peer_id);
|
self.clear_peer_download(peer_id);
|
||||||
|
|
||||||
if io.chain().queue_status().full {
|
if io.chain().queue_info().full {
|
||||||
self.pause_sync();
|
self.pause_sync();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -971,7 +977,7 @@ impl ChainSync {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Maintain other peers. Send out any new blocks and transactions
|
/// Maintain other peers. Send out any new blocks and transactions
|
||||||
pub fn maintain_sync(&mut self, _io: &mut SyncIo) {
|
pub fn _maintain_sync(&mut self, _io: &mut SyncIo) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -27,7 +27,6 @@ use std::sync::*;
|
|||||||
use client::Client;
|
use client::Client;
|
||||||
use util::network::{NetworkProtocolHandler, NetworkService, NetworkContext, PeerId};
|
use util::network::{NetworkProtocolHandler, NetworkService, NetworkContext, PeerId};
|
||||||
use sync::chain::ChainSync;
|
use sync::chain::ChainSync;
|
||||||
use util::TimerToken;
|
|
||||||
use service::SyncMessage;
|
use service::SyncMessage;
|
||||||
use sync::io::NetSyncIo;
|
use sync::io::NetSyncIo;
|
||||||
|
|
||||||
@ -38,8 +37,6 @@ mod range_collection;
|
|||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests;
|
mod tests;
|
||||||
|
|
||||||
const SYNC_TIMER: usize = 0;
|
|
||||||
|
|
||||||
/// Ethereum network protocol handler
|
/// Ethereum network protocol handler
|
||||||
pub struct EthSync {
|
pub struct EthSync {
|
||||||
/// Shared blockchain client. TODO: this should evetually become an IPC endpoint
|
/// Shared blockchain client. TODO: this should evetually become an IPC endpoint
|
||||||
@ -52,12 +49,13 @@ pub use self::chain::SyncStatus;
|
|||||||
|
|
||||||
impl EthSync {
|
impl EthSync {
|
||||||
/// Creates and register protocol with the network service
|
/// Creates and register protocol with the network service
|
||||||
pub fn register(service: &mut NetworkService<SyncMessage>, chain: Arc<Client>) {
|
pub fn register(service: &mut NetworkService<SyncMessage>, chain: Arc<Client>) -> Arc<EthSync> {
|
||||||
let sync = Arc::new(EthSync {
|
let sync = Arc::new(EthSync {
|
||||||
chain: chain,
|
chain: chain,
|
||||||
sync: RwLock::new(ChainSync::new()),
|
sync: RwLock::new(ChainSync::new()),
|
||||||
});
|
});
|
||||||
service.register_protocol(sync.clone(), "eth", &[62u8, 63u8]).expect("Error registering eth protocol handler");
|
service.register_protocol(sync.clone(), "eth", &[62u8, 63u8]).expect("Error registering eth protocol handler");
|
||||||
|
sync
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get sync status
|
/// Get sync status
|
||||||
@ -77,8 +75,7 @@ impl EthSync {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl NetworkProtocolHandler<SyncMessage> for EthSync {
|
impl NetworkProtocolHandler<SyncMessage> for EthSync {
|
||||||
fn initialize(&self, io: &NetworkContext<SyncMessage>) {
|
fn initialize(&self, _io: &NetworkContext<SyncMessage>) {
|
||||||
io.register_timer(SYNC_TIMER, 1000).unwrap();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn read(&self, io: &NetworkContext<SyncMessage>, peer: &PeerId, packet_id: u8, data: &[u8]) {
|
fn read(&self, io: &NetworkContext<SyncMessage>, peer: &PeerId, packet_id: u8, data: &[u8]) {
|
||||||
@ -92,12 +89,6 @@ impl NetworkProtocolHandler<SyncMessage> for EthSync {
|
|||||||
fn disconnected(&self, io: &NetworkContext<SyncMessage>, peer: &PeerId) {
|
fn disconnected(&self, io: &NetworkContext<SyncMessage>, peer: &PeerId) {
|
||||||
self.sync.write().unwrap().on_peer_aborting(&mut NetSyncIo::new(io, self.chain.deref()), *peer);
|
self.sync.write().unwrap().on_peer_aborting(&mut NetSyncIo::new(io, self.chain.deref()), *peer);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn timeout(&self, io: &NetworkContext<SyncMessage>, timer: TimerToken) {
|
|
||||||
if timer == SYNC_TIMER {
|
|
||||||
self.sync.write().unwrap().maintain_sync(&mut NetSyncIo::new(io, self.chain.deref()));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user