Merge branch 'master' into sync-svc

This commit is contained in:
NikVolf
2016-07-18 15:20:57 +02:00
26 changed files with 380 additions and 258 deletions

View File

@@ -22,21 +22,18 @@ use std::sync::{Arc, Weak};
use std::path::{Path, PathBuf};
use std::fmt;
use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering as AtomicOrdering};
use std::time::Instant;
use std::time::{Instant, Duration};
use time::precise_time_ns;
// util
use util::{journaldb, rlp, Bytes, Stream, View, PerfTimer, Itertools, Mutex, RwLock, Colour};
use util::journaldb::JournalDB;
use util::rlp::{RlpStream, Rlp, UntrustedRlp};
use util::numbers::*;
use util::panics::*;
use util::io::*;
use util::rlp;
use util::sha3::*;
use util::Bytes;
use util::rlp::{RlpStream, Rlp, UntrustedRlp};
use util::journaldb;
use util::journaldb::JournalDB;
use util::kvdb::*;
use util::{Stream, View, PerfTimer, Itertools};
use util::{Mutex, RwLock};
// other
use views::BlockView;
@@ -145,6 +142,9 @@ pub struct Client {
notify: RwLock<Option<Weak<ChainNotify>>>,
queue_transactions: AtomicUsize,
previous_enode: Mutex<Option<String>>,
skipped: AtomicUsize,
last_import: Mutex<Instant>,
last_hashes: RwLock<VecDeque<H256>>,
}
const HISTORY: u64 = 1200;
@@ -205,6 +205,11 @@ impl Client {
state_db.commit(0, &spec.genesis_header().hash(), None).expect("Error commiting genesis state to state DB");
}
while !chain.block_header(&chain.best_block_hash()).map_or(true, |h| state_db.contains(h.state_root())) {
warn!("State root not found for block #{} ({}), recovering...", chain.best_block_number(), chain.best_block_hash().hex());
chain.rewind();
}
let engine = Arc::new(spec.engine);
let block_queue = BlockQueue::new(config.queue, engine.clone(), message_channel.clone());
@@ -232,6 +237,9 @@ impl Client {
notify: RwLock::new(None),
queue_transactions: AtomicUsize::new(0),
previous_enode: Mutex::new(None),
skipped: AtomicUsize::new(0),
last_import: Mutex::new(Instant::now()),
last_hashes: RwLock::new(VecDeque::new()),
};
Ok(Arc::new(client))
}
@@ -253,6 +261,14 @@ impl Client {
}
fn build_last_hashes(&self, parent_hash: H256) -> LastHashes {
{
let hashes = self.last_hashes.read();
if hashes.front().map_or(false, |h| h == &parent_hash) {
let mut res = Vec::from(hashes.clone());
res.resize(256, H256::default());
return res;
}
}
let mut last_hashes = LastHashes::new();
last_hashes.resize(256, H256::new());
last_hashes[0] = parent_hash;
@@ -264,6 +280,8 @@ impl Client {
None => break,
}
}
let mut cached_hashes = self.last_hashes.write();
*cached_hashes = VecDeque::from(last_hashes.clone());
last_hashes
}
@@ -355,16 +373,21 @@ impl Client {
for block in blocks {
let header = &block.header;
let start = precise_time_ns();
if invalid_blocks.contains(&header.parent_hash) {
invalid_blocks.insert(header.hash());
continue;
}
let tx_count = block.transactions.len();
let size = block.bytes.len();
let closed_block = self.check_and_close_block(&block);
if let Err(_) = closed_block {
invalid_blocks.insert(header.hash());
continue;
}
let closed_block = closed_block.unwrap();
imported_blocks.push(header.hash());
@@ -372,7 +395,30 @@ impl Client {
import_results.push(route);
self.report.write().accrue_block(&block);
trace!(target: "client", "Imported #{} ({})", header.number(), header.hash());
let duration_ns = precise_time_ns() - start;
let mut last_import = self.last_import.lock();
if Instant::now() > *last_import + Duration::from_secs(1) {
let queue_info = self.queue_info();
let importing = queue_info.unverified_queue_size + queue_info.verified_queue_size > 3;
if !importing {
let skipped = self.skipped.load(AtomicOrdering::Relaxed);
info!(target: "import", "Imported {} {} ({} txs, {} Mgas, {} ms, {} KiB){}",
Colour::White.bold().paint(format!("#{}", header.number())),
Colour::White.bold().paint(format!("{}", header.hash())),
Colour::Yellow.bold().paint(format!("{}", tx_count)),
Colour::Yellow.bold().paint(format!("{:.2}", header.gas_used.low_u64() as f32 / 1000000f32)),
Colour::Purple.bold().paint(format!("{:.2}", duration_ns as f32 / 1000000f32)),
Colour::Blue.bold().paint(format!("{:.2}", size as f32 / 1024f32)),
if skipped > 0 { format!(" + another {} block(s)", Colour::Red.bold().paint(format!("{}", skipped))) } else { String::new() }
);
*last_import = Instant::now();
}
self.skipped.store(0, AtomicOrdering::Relaxed);
} else {
self.skipped.fetch_add(1, AtomicOrdering::Relaxed);
}
}
let imported = imported_blocks.len();
@@ -418,6 +464,7 @@ impl Client {
fn commit_block<B>(&self, block: B, hash: &H256, block_data: &[u8]) -> ImportRoute where B: IsBlock + Drain {
let number = block.header().number();
let parent = block.header().parent_hash().clone();
// Are we committing an era?
let ancient = if number >= HISTORY {
let n = number - HISTORY;
@@ -445,9 +492,20 @@ impl Client {
enacted: route.enacted.clone(),
retracted: route.retracted.len()
});
self.update_last_hashes(&parent, hash);
route
}
fn update_last_hashes(&self, parent: &H256, hash: &H256) {
let mut hashes = self.last_hashes.write();
if hashes.front().map_or(false, |h| h == parent) {
if hashes.len() > 255 {
hashes.pop_back();
}
hashes.push_front(hash.clone());
}
}
/// Import transactions from the IO queue
pub fn import_queued_transactions(&self, transactions: &[Bytes]) -> usize {
let _timer = PerfTimer::new("import_queued_transactions");