Moved syncing log out of the client (#1670)
This commit is contained in:
committed by
Gav Wood
parent
0cba70fba3
commit
b007770ba8
@@ -28,7 +28,8 @@ pub trait ChainNotify : Send + Sync {
|
||||
_invalid: Vec<H256>,
|
||||
_enacted: Vec<H256>,
|
||||
_retracted: Vec<H256>,
|
||||
_sealed: Vec<H256>) {
|
||||
_sealed: Vec<H256>,
|
||||
_duration: u64) {
|
||||
// does nothing by default
|
||||
}
|
||||
|
||||
|
||||
@@ -20,11 +20,11 @@ use std::sync::{Arc, Weak};
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::fmt;
|
||||
use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering as AtomicOrdering};
|
||||
use std::time::{Instant, Duration};
|
||||
use std::time::{Instant};
|
||||
use time::precise_time_ns;
|
||||
|
||||
// util
|
||||
use util::{journaldb, rlp, Bytes, Stream, View, PerfTimer, Itertools, Mutex, RwLock, Colour};
|
||||
use util::{journaldb, rlp, Bytes, Stream, View, PerfTimer, Itertools, Mutex, RwLock};
|
||||
use util::journaldb::JournalDB;
|
||||
use util::rlp::{RlpStream, Rlp, UntrustedRlp};
|
||||
use util::numbers::*;
|
||||
@@ -135,10 +135,8 @@ pub struct Client {
|
||||
sleep_state: Mutex<SleepState>,
|
||||
liveness: AtomicBool,
|
||||
io_channel: IoChannel<ClientIoMessage>,
|
||||
notify: RwLock<Option<Weak<ChainNotify>>>,
|
||||
notify: RwLock<Vec<Weak<ChainNotify>>>,
|
||||
queue_transactions: AtomicUsize,
|
||||
skipped: AtomicUsize,
|
||||
last_import: Mutex<Instant>,
|
||||
last_hashes: RwLock<VecDeque<H256>>,
|
||||
}
|
||||
|
||||
@@ -229,24 +227,24 @@ impl Client {
|
||||
trie_factory: TrieFactory::new(config.trie_spec),
|
||||
miner: miner,
|
||||
io_channel: message_channel,
|
||||
notify: RwLock::new(None),
|
||||
notify: RwLock::new(Vec::new()),
|
||||
queue_transactions: AtomicUsize::new(0),
|
||||
skipped: AtomicUsize::new(0),
|
||||
last_import: Mutex::new(Instant::now()),
|
||||
last_hashes: RwLock::new(VecDeque::new()),
|
||||
};
|
||||
Ok(Arc::new(client))
|
||||
}
|
||||
|
||||
/// Sets the actor to be notified on certain events
|
||||
pub fn set_notify(&self, target: &Arc<ChainNotify>) {
|
||||
let mut write_lock = self.notify.write();
|
||||
*write_lock = Some(Arc::downgrade(target));
|
||||
/// Adds an actor to be notified on certain events
|
||||
pub fn add_notify(&self, target: &Arc<ChainNotify>) {
|
||||
self.notify.write().push(Arc::downgrade(target));
|
||||
}
|
||||
|
||||
fn notify(&self) -> Option<Arc<ChainNotify>> {
|
||||
let read_lock = self.notify.read();
|
||||
read_lock.as_ref().and_then(|weak| weak.upgrade())
|
||||
fn notify<F>(&self, f: F) where F: Fn(&ChainNotify) {
|
||||
for np in self.notify.read().iter() {
|
||||
if let Some(n) = np.upgrade() {
|
||||
f(&*n);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Flush the block import queue.
|
||||
@@ -357,28 +355,24 @@ impl Client {
|
||||
/// This is triggered by a message coming from a block queue when the block is ready for insertion
|
||||
pub fn import_verified_blocks(&self) -> usize {
|
||||
let max_blocks_to_import = 64;
|
||||
let (imported_blocks, import_results, invalid_blocks, original_best, imported) = {
|
||||
let (imported_blocks, import_results, invalid_blocks, original_best, imported, duration) = {
|
||||
let mut imported_blocks = Vec::with_capacity(max_blocks_to_import);
|
||||
let mut invalid_blocks = HashSet::new();
|
||||
let mut import_results = Vec::with_capacity(max_blocks_to_import);
|
||||
|
||||
let _import_lock = self.import_lock.lock();
|
||||
let _timer = PerfTimer::new("import_verified_blocks");
|
||||
let start = precise_time_ns();
|
||||
let blocks = self.block_queue.drain(max_blocks_to_import);
|
||||
|
||||
let original_best = self.chain_info().best_block_hash;
|
||||
|
||||
for block in blocks {
|
||||
let header = &block.header;
|
||||
let start = precise_time_ns();
|
||||
|
||||
if invalid_blocks.contains(&header.parent_hash) {
|
||||
invalid_blocks.insert(header.hash());
|
||||
continue;
|
||||
}
|
||||
let tx_count = block.transactions.len();
|
||||
let size = block.bytes.len();
|
||||
|
||||
let closed_block = self.check_and_close_block(&block);
|
||||
if let Err(_) = closed_block {
|
||||
invalid_blocks.insert(header.hash());
|
||||
@@ -392,30 +386,6 @@ impl Client {
|
||||
import_results.push(route);
|
||||
|
||||
self.report.write().accrue_block(&block);
|
||||
|
||||
let duration_ns = precise_time_ns() - start;
|
||||
|
||||
let mut last_import = self.last_import.lock();
|
||||
if Instant::now() > *last_import + Duration::from_secs(1) {
|
||||
let queue_info = self.queue_info();
|
||||
let importing = queue_info.unverified_queue_size + queue_info.verified_queue_size > 3;
|
||||
if !importing {
|
||||
let skipped = self.skipped.load(AtomicOrdering::Relaxed);
|
||||
info!(target: "import", "Imported {} {} ({} txs, {} Mgas, {} ms, {} KiB){}",
|
||||
Colour::White.bold().paint(format!("#{}", header.number())),
|
||||
Colour::White.bold().paint(format!("{}", header.hash())),
|
||||
Colour::Yellow.bold().paint(format!("{}", tx_count)),
|
||||
Colour::Yellow.bold().paint(format!("{:.2}", header.gas_used.low_u64() as f32 / 1000000f32)),
|
||||
Colour::Purple.bold().paint(format!("{:.2}", duration_ns as f32 / 1000000f32)),
|
||||
Colour::Blue.bold().paint(format!("{:.2}", size as f32 / 1024f32)),
|
||||
if skipped > 0 { format!(" + another {} block(s)", Colour::Red.bold().paint(format!("{}", skipped))) } else { String::new() }
|
||||
);
|
||||
*last_import = Instant::now();
|
||||
}
|
||||
self.skipped.store(0, AtomicOrdering::Relaxed);
|
||||
} else {
|
||||
self.skipped.fetch_add(1, AtomicOrdering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
let imported = imported_blocks.len();
|
||||
@@ -429,7 +399,8 @@ impl Client {
|
||||
self.block_queue.mark_as_good(&imported_blocks);
|
||||
}
|
||||
}
|
||||
(imported_blocks, import_results, invalid_blocks, original_best, imported)
|
||||
let duration_ns = precise_time_ns() - start;
|
||||
(imported_blocks, import_results, invalid_blocks, original_best, imported, duration_ns)
|
||||
};
|
||||
|
||||
{
|
||||
@@ -440,15 +411,16 @@ impl Client {
|
||||
self.miner.chain_new_blocks(self, &imported_blocks, &invalid_blocks, &enacted, &retracted);
|
||||
}
|
||||
|
||||
if let Some(notify) = self.notify() {
|
||||
self.notify(|notify| {
|
||||
notify.new_blocks(
|
||||
imported_blocks,
|
||||
invalid_blocks,
|
||||
enacted,
|
||||
retracted,
|
||||
imported_blocks.clone(),
|
||||
invalid_blocks.clone(),
|
||||
enacted.clone(),
|
||||
retracted.clone(),
|
||||
Vec::new(),
|
||||
duration,
|
||||
);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -640,9 +612,7 @@ impl Client {
|
||||
fn wake_up(&self) {
|
||||
if !self.liveness.load(AtomicOrdering::Relaxed) {
|
||||
self.liveness.store(true, AtomicOrdering::Relaxed);
|
||||
if let Some(notify) = self.notify() {
|
||||
notify.start();
|
||||
}
|
||||
self.notify(|n| n.start());
|
||||
trace!(target: "mode", "wake_up: Waking.");
|
||||
}
|
||||
}
|
||||
@@ -652,9 +622,7 @@ impl Client {
|
||||
// only sleep if the import queue is mostly empty.
|
||||
if self.queue_info().total_queue_size() <= MAX_QUEUE_SIZE_TO_SLEEP_ON {
|
||||
self.liveness.store(false, AtomicOrdering::Relaxed);
|
||||
if let Some(notify) = self.notify() {
|
||||
notify.stop();
|
||||
}
|
||||
self.notify(|n| n.stop());
|
||||
trace!(target: "mode", "sleep: Sleeping.");
|
||||
} else {
|
||||
trace!(target: "mode", "sleep: Cannot sleep - syncing ongoing.");
|
||||
@@ -1029,6 +997,7 @@ impl MiningBlockChainClient for Client {
|
||||
fn import_sealed_block(&self, block: SealedBlock) -> ImportResult {
|
||||
let _import_lock = self.import_lock.lock();
|
||||
let _timer = PerfTimer::new("import_sealed_block");
|
||||
let start = precise_time_ns();
|
||||
|
||||
let original_best = self.chain_info().best_block_hash;
|
||||
|
||||
@@ -1043,15 +1012,16 @@ impl MiningBlockChainClient for Client {
|
||||
let (enacted, retracted) = self.calculate_enacted_retracted(&[route]);
|
||||
self.miner.chain_new_blocks(self, &[h.clone()], &[], &enacted, &retracted);
|
||||
|
||||
if let Some(notify) = self.notify() {
|
||||
self.notify(|notify| {
|
||||
notify.new_blocks(
|
||||
vec![h.clone()],
|
||||
vec![],
|
||||
enacted,
|
||||
retracted,
|
||||
enacted.clone(),
|
||||
retracted.clone(),
|
||||
vec![h.clone()],
|
||||
precise_time_ns() - start,
|
||||
);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
if self.chain_info().best_block_hash != original_best {
|
||||
|
||||
Reference in New Issue
Block a user