Merge github.com:paritytech/parity into block_header_rpc
This commit is contained in:
@@ -23,9 +23,9 @@
|
||||
//! This is separate from the `BlockChain` for two reasons:
|
||||
//! - It stores only headers (and a pruned subset of them)
|
||||
//! - To allow for flexibility in the database layout once that's incorporated.
|
||||
// TODO: use DB instead of memory. DB Layout: just the contents of `candidates`/`headers`
|
||||
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use std::collections::BTreeMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use cht;
|
||||
|
||||
@@ -34,7 +34,10 @@ use ethcore::error::BlockError;
|
||||
use ethcore::encoded;
|
||||
use ethcore::header::Header;
|
||||
use ethcore::ids::BlockId;
|
||||
use util::{H256, U256, HeapSizeOf, Mutex, RwLock};
|
||||
|
||||
use rlp::{Encodable, Decodable, DecoderError, RlpStream, Rlp, UntrustedRlp};
|
||||
use util::{H256, U256, HeapSizeOf, RwLock};
|
||||
use util::kvdb::{DBTransaction, KeyValueDB};
|
||||
|
||||
use smallvec::SmallVec;
|
||||
|
||||
@@ -43,6 +46,9 @@ use smallvec::SmallVec;
|
||||
/// relevant to any blocks we've got in memory.
|
||||
const HISTORY: u64 = 2048;
|
||||
|
||||
/// The best block key. Maps to an RLP list: [best_era, last_era]
|
||||
const CURRENT_KEY: &'static [u8] = &*b"best_and_latest";
|
||||
|
||||
/// Information about a block.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct BlockDescriptor {
|
||||
@@ -75,42 +81,142 @@ impl HeapSizeOf for Entry {
|
||||
}
|
||||
}
|
||||
|
||||
impl Encodable for Entry {
|
||||
fn rlp_append(&self, s: &mut RlpStream) {
|
||||
s.begin_list(self.candidates.len());
|
||||
|
||||
for candidate in &self.candidates {
|
||||
s.begin_list(3)
|
||||
.append(&candidate.hash)
|
||||
.append(&candidate.parent_hash)
|
||||
.append(&candidate.total_difficulty);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Decodable for Entry {
|
||||
fn decode(rlp: &UntrustedRlp) -> Result<Self, DecoderError> {
|
||||
|
||||
let mut candidates = SmallVec::<[Candidate; 3]>::new();
|
||||
|
||||
for item in rlp.iter() {
|
||||
candidates.push(Candidate {
|
||||
hash: item.val_at(0)?,
|
||||
parent_hash: item.val_at(1)?,
|
||||
total_difficulty: item.val_at(2)?,
|
||||
})
|
||||
}
|
||||
|
||||
if candidates.is_empty() { return Err(DecoderError::Custom("Empty candidates vector submitted.")) }
|
||||
|
||||
// rely on the invariant that the canonical entry is always first.
|
||||
let canon_hash = candidates[0].hash;
|
||||
Ok(Entry {
|
||||
candidates: candidates,
|
||||
canonical_hash: canon_hash,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
fn cht_key(number: u64) -> String {
|
||||
format!("{:08x}_canonical", number)
|
||||
}
|
||||
|
||||
fn era_key(number: u64) -> String {
|
||||
format!("candidates_{}", number)
|
||||
}
|
||||
|
||||
/// Pending changes from `insert` to be applied after the database write has finished.
|
||||
pub struct PendingChanges {
|
||||
best_block: Option<BlockDescriptor>, // new best block.
|
||||
}
|
||||
|
||||
/// Header chain. See module docs for more details.
|
||||
pub struct HeaderChain {
|
||||
genesis_header: encoded::Header, // special-case the genesis.
|
||||
candidates: RwLock<BTreeMap<u64, Entry>>,
|
||||
headers: RwLock<HashMap<H256, encoded::Header>>,
|
||||
best_block: RwLock<BlockDescriptor>,
|
||||
cht_roots: Mutex<Vec<H256>>,
|
||||
db: Arc<KeyValueDB>,
|
||||
col: Option<u32>,
|
||||
}
|
||||
|
||||
impl HeaderChain {
|
||||
/// Create a new header chain given this genesis block.
|
||||
pub fn new(genesis: &[u8]) -> Self {
|
||||
/// Create a new header chain given this genesis block and database to read from.
|
||||
pub fn new(db: Arc<KeyValueDB>, col: Option<u32>, genesis: &[u8]) -> Result<Self, String> {
|
||||
use ethcore::views::HeaderView;
|
||||
|
||||
let g_view = HeaderView::new(genesis);
|
||||
let chain = if let Some(current) = db.get(col, CURRENT_KEY)? {
|
||||
let (best_number, highest_number) = {
|
||||
let rlp = Rlp::new(¤t);
|
||||
(rlp.val_at(0), rlp.val_at(1))
|
||||
};
|
||||
|
||||
HeaderChain {
|
||||
genesis_header: encoded::Header::new(genesis.to_owned()),
|
||||
best_block: RwLock::new(BlockDescriptor {
|
||||
hash: g_view.hash(),
|
||||
number: 0,
|
||||
total_difficulty: g_view.difficulty(),
|
||||
}),
|
||||
candidates: RwLock::new(BTreeMap::new()),
|
||||
headers: RwLock::new(HashMap::new()),
|
||||
cht_roots: Mutex::new(Vec::new()),
|
||||
}
|
||||
let mut cur_number = highest_number;
|
||||
let mut candidates = BTreeMap::new();
|
||||
|
||||
// load all era entries and referenced headers within them.
|
||||
while let Some(entry) = db.get(col, era_key(cur_number).as_bytes())? {
|
||||
let entry: Entry = ::rlp::decode(&entry);
|
||||
trace!(target: "chain", "loaded header chain entry for era {} with {} candidates",
|
||||
cur_number, entry.candidates.len());
|
||||
|
||||
candidates.insert(cur_number, entry);
|
||||
|
||||
cur_number -= 1;
|
||||
}
|
||||
|
||||
// fill best block block descriptor.
|
||||
let best_block = {
|
||||
let era = match candidates.get(&best_number) {
|
||||
Some(era) => era,
|
||||
None => return Err(format!("Database corrupt: highest block referenced but no data.")),
|
||||
};
|
||||
|
||||
let best = &era.candidates[0];
|
||||
BlockDescriptor {
|
||||
hash: best.hash,
|
||||
number: best_number,
|
||||
total_difficulty: best.total_difficulty,
|
||||
}
|
||||
};
|
||||
|
||||
HeaderChain {
|
||||
genesis_header: encoded::Header::new(genesis.to_owned()),
|
||||
best_block: RwLock::new(best_block),
|
||||
candidates: RwLock::new(candidates),
|
||||
db: db,
|
||||
col: col,
|
||||
}
|
||||
} else {
|
||||
let g_view = HeaderView::new(genesis);
|
||||
HeaderChain {
|
||||
genesis_header: encoded::Header::new(genesis.to_owned()),
|
||||
best_block: RwLock::new(BlockDescriptor {
|
||||
hash: g_view.hash(),
|
||||
number: 0,
|
||||
total_difficulty: g_view.difficulty(),
|
||||
}),
|
||||
candidates: RwLock::new(BTreeMap::new()),
|
||||
db: db,
|
||||
col: col,
|
||||
}
|
||||
};
|
||||
|
||||
Ok(chain)
|
||||
}
|
||||
|
||||
/// Insert a pre-verified header.
|
||||
///
|
||||
/// This blindly trusts that the data given to it is sensible.
|
||||
pub fn insert(&self, header: Header) -> Result<(), BlockError> {
|
||||
/// Returns a set of pending changes to be applied with `apply_pending`
|
||||
/// before the next call to insert and after the transaction has been written.
|
||||
pub fn insert(&self, transaction: &mut DBTransaction, header: Header) -> Result<PendingChanges, BlockError> {
|
||||
let hash = header.hash();
|
||||
let number = header.number();
|
||||
let parent_hash = *header.parent_hash();
|
||||
let mut pending = PendingChanges {
|
||||
best_block: None,
|
||||
};
|
||||
|
||||
// hold candidates the whole time to guard import order.
|
||||
let mut candidates = self.candidates.write();
|
||||
@@ -128,20 +234,41 @@ impl HeaderChain {
|
||||
|
||||
let total_difficulty = parent_td + *header.difficulty();
|
||||
|
||||
// insert headers and candidates entries.
|
||||
candidates.entry(number).or_insert_with(|| Entry { candidates: SmallVec::new(), canonical_hash: hash })
|
||||
.candidates.push(Candidate {
|
||||
// insert headers and candidates entries and write era to disk.
|
||||
{
|
||||
let cur_era = candidates.entry(number)
|
||||
.or_insert_with(|| Entry { candidates: SmallVec::new(), canonical_hash: hash });
|
||||
cur_era.candidates.push(Candidate {
|
||||
hash: hash,
|
||||
parent_hash: parent_hash,
|
||||
total_difficulty: total_difficulty,
|
||||
});
|
||||
});
|
||||
|
||||
let raw = ::rlp::encode(&header).to_vec();
|
||||
self.headers.write().insert(hash, encoded::Header::new(raw));
|
||||
// fix ordering of era before writing.
|
||||
if total_difficulty > cur_era.candidates[0].total_difficulty {
|
||||
let cur_pos = cur_era.candidates.len() - 1;
|
||||
cur_era.candidates.swap(cur_pos, 0);
|
||||
cur_era.canonical_hash = hash;
|
||||
}
|
||||
|
||||
transaction.put(self.col, era_key(number).as_bytes(), &::rlp::encode(&*cur_era))
|
||||
}
|
||||
|
||||
let raw = ::rlp::encode(&header);
|
||||
transaction.put(self.col, &hash[..], &*raw);
|
||||
|
||||
let (best_num, is_new_best) = {
|
||||
let cur_best = self.best_block.read();
|
||||
if cur_best.total_difficulty < total_difficulty {
|
||||
(number, true)
|
||||
} else {
|
||||
(cur_best.number, false)
|
||||
}
|
||||
};
|
||||
|
||||
// reorganize ancestors so canonical entries are first in their
|
||||
// respective candidates vectors.
|
||||
if self.best_block.read().total_difficulty < total_difficulty {
|
||||
if is_new_best {
|
||||
let mut canon_hash = hash;
|
||||
for (&height, entry) in candidates.iter_mut().rev().skip_while(|&(height, _)| *height > number) {
|
||||
if height != number && entry.canonical_hash == canon_hash { break; }
|
||||
@@ -160,23 +287,26 @@ impl HeaderChain {
|
||||
// what about reorgs > cht::SIZE + HISTORY?
|
||||
// resetting to the last block of a given CHT should be possible.
|
||||
canon_hash = entry.candidates[0].parent_hash;
|
||||
|
||||
// write altered era to disk
|
||||
if height != number {
|
||||
let rlp_era = ::rlp::encode(&*entry);
|
||||
transaction.put(self.col, era_key(height).as_bytes(), &rlp_era);
|
||||
}
|
||||
}
|
||||
|
||||
trace!(target: "chain", "New best block: ({}, {}), TD {}", number, hash, total_difficulty);
|
||||
*self.best_block.write() = BlockDescriptor {
|
||||
pending.best_block = Some(BlockDescriptor {
|
||||
hash: hash,
|
||||
number: number,
|
||||
total_difficulty: total_difficulty,
|
||||
};
|
||||
});
|
||||
|
||||
// produce next CHT root if it's time.
|
||||
let earliest_era = *candidates.keys().next().expect("at least one era just created; qed");
|
||||
if earliest_era + HISTORY + cht::SIZE <= number {
|
||||
let cht_num = cht::block_to_cht_number(earliest_era)
|
||||
.expect("fails only for number == 0; genesis never imported; qed");
|
||||
debug_assert_eq!(cht_num as usize, self.cht_roots.lock().len());
|
||||
|
||||
let mut headers = self.headers.write();
|
||||
|
||||
let cht_root = {
|
||||
let mut i = earliest_era;
|
||||
@@ -186,10 +316,12 @@ impl HeaderChain {
|
||||
let iter = || {
|
||||
let era_entry = candidates.remove(&i)
|
||||
.expect("all eras are sequential with no gaps; qed");
|
||||
transaction.delete(self.col, era_key(i).as_bytes());
|
||||
|
||||
i += 1;
|
||||
|
||||
for ancient in &era_entry.candidates {
|
||||
headers.remove(&ancient.hash);
|
||||
transaction.delete(self.col, &ancient.hash);
|
||||
}
|
||||
|
||||
let canon = &era_entry.candidates[0];
|
||||
@@ -199,28 +331,56 @@ impl HeaderChain {
|
||||
.expect("fails only when too few items; this is checked; qed")
|
||||
};
|
||||
|
||||
// write the CHT root to the database.
|
||||
debug!(target: "chain", "Produced CHT {} root: {:?}", cht_num, cht_root);
|
||||
|
||||
self.cht_roots.lock().push(cht_root);
|
||||
transaction.put(self.col, cht_key(cht_num).as_bytes(), &::rlp::encode(&cht_root));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
// write the best and latest eras to the database.
|
||||
{
|
||||
let latest_num = *candidates.iter().rev().next().expect("at least one era just inserted; qed").0;
|
||||
let mut stream = RlpStream::new_list(2);
|
||||
stream.append(&best_num).append(&latest_num);
|
||||
transaction.put(self.col, CURRENT_KEY, &stream.out())
|
||||
}
|
||||
Ok(pending)
|
||||
}
|
||||
|
||||
/// Apply pending changes from a previous `insert` operation.
|
||||
/// Must be done before the next `insert` call.
|
||||
pub fn apply_pending(&self, pending: PendingChanges) {
|
||||
if let Some(best_block) = pending.best_block {
|
||||
*self.best_block.write() = best_block;
|
||||
}
|
||||
}
|
||||
|
||||
/// Get a block header. In the case of query by number, only canonical blocks
|
||||
/// will be returned.
|
||||
pub fn block_header(&self, id: BlockId) -> Option<encoded::Header> {
|
||||
let load_from_db = |hash: H256| {
|
||||
match self.db.get(self.col, &hash) {
|
||||
Ok(val) => val.map(|x| x.to_vec()).map(encoded::Header::new),
|
||||
Err(e) => {
|
||||
warn!(target: "chain", "Failed to read from database: {}", e);
|
||||
None
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
match id {
|
||||
BlockId::Earliest | BlockId::Number(0) => Some(self.genesis_header.clone()),
|
||||
BlockId::Hash(hash) => self.headers.read().get(&hash).cloned(),
|
||||
BlockId::Hash(hash) => load_from_db(hash),
|
||||
BlockId::Number(num) => {
|
||||
if self.best_block.read().number < num { return None }
|
||||
|
||||
self.candidates.read().get(&num).map(|entry| entry.canonical_hash)
|
||||
.and_then(|hash| self.headers.read().get(&hash).cloned())
|
||||
.and_then(load_from_db)
|
||||
}
|
||||
BlockId::Latest | BlockId::Pending => {
|
||||
// hold candidates hear to prevent deletion of the header
|
||||
// as we read it.
|
||||
let _candidates = self.candidates.read();
|
||||
let hash = {
|
||||
let best = self.best_block.read();
|
||||
if best.number == 0 {
|
||||
@@ -230,7 +390,7 @@ impl HeaderChain {
|
||||
best.hash
|
||||
};
|
||||
|
||||
self.headers.read().get(&hash).cloned()
|
||||
load_from_db(hash)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -257,7 +417,13 @@ impl HeaderChain {
|
||||
/// This is because it's assumed that the genesis hash is known,
|
||||
/// so including it within a CHT would be redundant.
|
||||
pub fn cht_root(&self, n: usize) -> Option<H256> {
|
||||
self.cht_roots.lock().get(n).map(|h| h.clone())
|
||||
match self.db.get(self.col, cht_key(n as u64).as_bytes()) {
|
||||
Ok(val) => val.map(|x| ::rlp::decode(&x)),
|
||||
Err(e) => {
|
||||
warn!(target: "chain", "Error reading from database: {}", e);
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the genesis hash.
|
||||
@@ -287,7 +453,7 @@ impl HeaderChain {
|
||||
|
||||
/// Get block status.
|
||||
pub fn status(&self, hash: &H256) -> BlockStatus {
|
||||
match self.headers.read().contains_key(hash) {
|
||||
match self.db.get(self.col, &*hash).ok().map_or(false, |x| x.is_some()) {
|
||||
true => BlockStatus::InChain,
|
||||
false => BlockStatus::Unknown,
|
||||
}
|
||||
@@ -296,9 +462,7 @@ impl HeaderChain {
|
||||
|
||||
impl HeapSizeOf for HeaderChain {
|
||||
fn heap_size_of_children(&self) -> usize {
|
||||
self.candidates.read().heap_size_of_children() +
|
||||
self.headers.read().heap_size_of_children() +
|
||||
self.cht_roots.lock().heap_size_of_children()
|
||||
self.candidates.read().heap_size_of_children()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -324,16 +488,23 @@ impl<'a> Iterator for AncestryIter<'a> {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::HeaderChain;
|
||||
use std::sync::Arc;
|
||||
|
||||
use ethcore::ids::BlockId;
|
||||
use ethcore::header::Header;
|
||||
use ethcore::spec::Spec;
|
||||
|
||||
fn make_db() -> Arc<::util::KeyValueDB> {
|
||||
Arc::new(::util::kvdb::in_memory(0))
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn basic_chain() {
|
||||
let spec = Spec::new_test();
|
||||
let genesis_header = spec.genesis_header();
|
||||
let db = make_db();
|
||||
|
||||
let chain = HeaderChain::new(&::rlp::encode(&genesis_header));
|
||||
let chain = HeaderChain::new(db.clone(), None, &::rlp::encode(&genesis_header)).unwrap();
|
||||
|
||||
let mut parent_hash = genesis_header.hash();
|
||||
let mut rolling_timestamp = genesis_header.timestamp();
|
||||
@@ -345,7 +516,10 @@ mod tests {
|
||||
header.set_difficulty(*genesis_header.difficulty() * i.into());
|
||||
parent_hash = header.hash();
|
||||
|
||||
chain.insert(header).unwrap();
|
||||
let mut tx = db.transaction();
|
||||
let pending = chain.insert(&mut tx, header).unwrap();
|
||||
db.write(tx).unwrap();
|
||||
chain.apply_pending(pending);
|
||||
|
||||
rolling_timestamp += 10;
|
||||
}
|
||||
@@ -361,7 +535,8 @@ mod tests {
|
||||
let spec = Spec::new_test();
|
||||
let genesis_header = spec.genesis_header();
|
||||
|
||||
let chain = HeaderChain::new(&::rlp::encode(&genesis_header));
|
||||
let db = make_db();
|
||||
let chain = HeaderChain::new(db.clone(), None, &::rlp::encode(&genesis_header)).unwrap();
|
||||
|
||||
let mut parent_hash = genesis_header.hash();
|
||||
let mut rolling_timestamp = genesis_header.timestamp();
|
||||
@@ -373,7 +548,10 @@ mod tests {
|
||||
header.set_difficulty(*genesis_header.difficulty() * i.into());
|
||||
parent_hash = header.hash();
|
||||
|
||||
chain.insert(header).unwrap();
|
||||
let mut tx = db.transaction();
|
||||
let pending = chain.insert(&mut tx, header).unwrap();
|
||||
db.write(tx).unwrap();
|
||||
chain.apply_pending(pending);
|
||||
|
||||
rolling_timestamp += 10;
|
||||
}
|
||||
@@ -389,7 +567,10 @@ mod tests {
|
||||
header.set_difficulty(*genesis_header.difficulty() * i.into());
|
||||
parent_hash = header.hash();
|
||||
|
||||
chain.insert(header).unwrap();
|
||||
let mut tx = db.transaction();
|
||||
let pending = chain.insert(&mut tx, header).unwrap();
|
||||
db.write(tx).unwrap();
|
||||
chain.apply_pending(pending);
|
||||
|
||||
rolling_timestamp += 10;
|
||||
}
|
||||
@@ -410,7 +591,10 @@ mod tests {
|
||||
header.set_difficulty(*genesis_header.difficulty() * (i * i).into());
|
||||
parent_hash = header.hash();
|
||||
|
||||
chain.insert(header).unwrap();
|
||||
let mut tx = db.transaction();
|
||||
let pending = chain.insert(&mut tx, header).unwrap();
|
||||
db.write(tx).unwrap();
|
||||
chain.apply_pending(pending);
|
||||
|
||||
rolling_timestamp += 11;
|
||||
}
|
||||
@@ -432,11 +616,101 @@ mod tests {
|
||||
fn earliest_is_latest() {
|
||||
let spec = Spec::new_test();
|
||||
let genesis_header = spec.genesis_header();
|
||||
let db = make_db();
|
||||
|
||||
let chain = HeaderChain::new(&::rlp::encode(&genesis_header));
|
||||
let chain = HeaderChain::new(db.clone(), None, &::rlp::encode(&genesis_header)).unwrap();
|
||||
|
||||
assert!(chain.block_header(BlockId::Earliest).is_some());
|
||||
assert!(chain.block_header(BlockId::Latest).is_some());
|
||||
assert!(chain.block_header(BlockId::Pending).is_some());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn restore_from_db() {
|
||||
let spec = Spec::new_test();
|
||||
let genesis_header = spec.genesis_header();
|
||||
let db = make_db();
|
||||
|
||||
{
|
||||
let chain = HeaderChain::new(db.clone(), None, &::rlp::encode(&genesis_header)).unwrap();
|
||||
let mut parent_hash = genesis_header.hash();
|
||||
let mut rolling_timestamp = genesis_header.timestamp();
|
||||
for i in 1..10000 {
|
||||
let mut header = Header::new();
|
||||
header.set_parent_hash(parent_hash);
|
||||
header.set_number(i);
|
||||
header.set_timestamp(rolling_timestamp);
|
||||
header.set_difficulty(*genesis_header.difficulty() * i.into());
|
||||
parent_hash = header.hash();
|
||||
|
||||
let mut tx = db.transaction();
|
||||
let pending = chain.insert(&mut tx, header).unwrap();
|
||||
db.write(tx).unwrap();
|
||||
chain.apply_pending(pending);
|
||||
|
||||
rolling_timestamp += 10;
|
||||
}
|
||||
}
|
||||
|
||||
let chain = HeaderChain::new(db.clone(), None, &::rlp::encode(&genesis_header)).unwrap();
|
||||
assert!(chain.block_header(BlockId::Number(10)).is_none());
|
||||
assert!(chain.block_header(BlockId::Number(9000)).is_some());
|
||||
assert!(chain.cht_root(2).is_some());
|
||||
assert!(chain.cht_root(3).is_none());
|
||||
assert_eq!(chain.block_header(BlockId::Latest).unwrap().number(), 9999);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn restore_higher_non_canonical() {
|
||||
let spec = Spec::new_test();
|
||||
let genesis_header = spec.genesis_header();
|
||||
let db = make_db();
|
||||
|
||||
{
|
||||
let chain = HeaderChain::new(db.clone(), None, &::rlp::encode(&genesis_header)).unwrap();
|
||||
let mut parent_hash = genesis_header.hash();
|
||||
let mut rolling_timestamp = genesis_header.timestamp();
|
||||
|
||||
// push 100 low-difficulty blocks.
|
||||
for i in 1..101 {
|
||||
let mut header = Header::new();
|
||||
header.set_parent_hash(parent_hash);
|
||||
header.set_number(i);
|
||||
header.set_timestamp(rolling_timestamp);
|
||||
header.set_difficulty(*genesis_header.difficulty() * i.into());
|
||||
parent_hash = header.hash();
|
||||
|
||||
let mut tx = db.transaction();
|
||||
let pending = chain.insert(&mut tx, header).unwrap();
|
||||
db.write(tx).unwrap();
|
||||
chain.apply_pending(pending);
|
||||
|
||||
rolling_timestamp += 10;
|
||||
}
|
||||
|
||||
// push fewer high-difficulty blocks.
|
||||
for i in 1..11 {
|
||||
let mut header = Header::new();
|
||||
header.set_parent_hash(parent_hash);
|
||||
header.set_number(i);
|
||||
header.set_timestamp(rolling_timestamp);
|
||||
header.set_difficulty(*genesis_header.difficulty() * i.into() * 1000.into());
|
||||
parent_hash = header.hash();
|
||||
|
||||
let mut tx = db.transaction();
|
||||
let pending = chain.insert(&mut tx, header).unwrap();
|
||||
db.write(tx).unwrap();
|
||||
chain.apply_pending(pending);
|
||||
|
||||
rolling_timestamp += 10;
|
||||
}
|
||||
|
||||
assert_eq!(chain.block_header(BlockId::Latest).unwrap().number(), 10);
|
||||
}
|
||||
|
||||
// after restoration, non-canonical eras should still be loaded.
|
||||
let chain = HeaderChain::new(db.clone(), None, &::rlp::encode(&genesis_header)).unwrap();
|
||||
assert_eq!(chain.block_header(BlockId::Latest).unwrap().number(), 10);
|
||||
assert!(chain.candidates.read().get(&100).is_some())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -32,6 +32,7 @@ use ethcore::encoded;
|
||||
use io::IoChannel;
|
||||
|
||||
use util::{H256, Mutex, RwLock};
|
||||
use util::kvdb::{KeyValueDB, CompactionProfile};
|
||||
|
||||
use self::header_chain::{AncestryIter, HeaderChain};
|
||||
|
||||
@@ -45,6 +46,14 @@ mod service;
|
||||
pub struct Config {
|
||||
/// Verification queue config.
|
||||
pub queue: queue::Config,
|
||||
/// Chain column in database.
|
||||
pub chain_column: Option<u32>,
|
||||
/// Database cache size. `None` => rocksdb default.
|
||||
pub db_cache_size: Option<usize>,
|
||||
/// State db compaction profile
|
||||
pub db_compaction: CompactionProfile,
|
||||
/// Should db have WAL enabled?
|
||||
pub db_wal: bool,
|
||||
}
|
||||
|
||||
/// Trait for interacting with the header chain abstractly.
|
||||
@@ -113,18 +122,30 @@ pub struct Client {
|
||||
chain: HeaderChain,
|
||||
report: RwLock<ClientReport>,
|
||||
import_lock: Mutex<()>,
|
||||
db: Arc<KeyValueDB>,
|
||||
}
|
||||
|
||||
impl Client {
|
||||
/// Create a new `Client`.
|
||||
pub fn new(config: Config, spec: &Spec, io_channel: IoChannel<ClientIoMessage>) -> Self {
|
||||
Client {
|
||||
pub fn new(config: Config, db: Arc<KeyValueDB>, chain_col: Option<u32>, spec: &Spec, io_channel: IoChannel<ClientIoMessage>) -> Result<Self, String> {
|
||||
let gh = ::rlp::encode(&spec.genesis_header());
|
||||
|
||||
Ok(Client {
|
||||
queue: HeaderQueue::new(config.queue, spec.engine.clone(), io_channel, true),
|
||||
engine: spec.engine.clone(),
|
||||
chain: HeaderChain::new(&::rlp::encode(&spec.genesis_header())),
|
||||
chain: HeaderChain::new(db.clone(), chain_col, &gh)?,
|
||||
report: RwLock::new(ClientReport::default()),
|
||||
import_lock: Mutex::new(()),
|
||||
}
|
||||
db: db,
|
||||
})
|
||||
}
|
||||
|
||||
/// Create a new `Client` backed purely in-memory.
|
||||
/// This will ignore all database options in the configuration.
|
||||
pub fn in_memory(config: Config, spec: &Spec, io_channel: IoChannel<ClientIoMessage>) -> Self {
|
||||
let db = ::util::kvdb::in_memory(0);
|
||||
|
||||
Client::new(config, Arc::new(db), None, spec, io_channel).expect("New DB creation infallible; qed")
|
||||
}
|
||||
|
||||
/// Import a header to the queue for additional verification.
|
||||
@@ -208,15 +229,23 @@ impl Client {
|
||||
for verified_header in self.queue.drain(MAX) {
|
||||
let (num, hash) = (verified_header.number(), verified_header.hash());
|
||||
|
||||
match self.chain.insert(verified_header) {
|
||||
Ok(()) => {
|
||||
let mut tx = self.db.transaction();
|
||||
let pending = match self.chain.insert(&mut tx, verified_header) {
|
||||
Ok(pending) => {
|
||||
good.push(hash);
|
||||
self.report.write().blocks_imported += 1;
|
||||
pending
|
||||
}
|
||||
Err(e) => {
|
||||
debug!(target: "client", "Error importing header {:?}: {}", (num, hash), e);
|
||||
bad.push(hash);
|
||||
break;
|
||||
}
|
||||
};
|
||||
self.db.write_buffered(tx);
|
||||
self.chain.apply_pending(pending);
|
||||
if let Err(e) = self.db.flush() {
|
||||
panic!("Database flush failed: {}. Check disk health and space.", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -17,33 +17,80 @@
|
||||
//! Minimal IO service for light client.
|
||||
//! Just handles block import messages and passes them to the client.
|
||||
|
||||
use std::fmt;
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
|
||||
use ethcore::db;
|
||||
use ethcore::service::ClientIoMessage;
|
||||
use ethcore::spec::Spec;
|
||||
use io::{IoContext, IoError, IoHandler, IoService};
|
||||
use util::kvdb::{Database, DatabaseConfig};
|
||||
|
||||
use super::{Client, Config as ClientConfig};
|
||||
|
||||
/// Errors on service initialization.
|
||||
#[derive(Debug)]
|
||||
pub enum Error {
|
||||
/// Database error.
|
||||
Database(String),
|
||||
/// I/O service error.
|
||||
Io(IoError),
|
||||
}
|
||||
|
||||
impl fmt::Display for Error {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
match *self {
|
||||
Error::Database(ref msg) => write!(f, "Database error: {}", msg),
|
||||
Error::Io(ref err) => write!(f, "I/O service error: {}", err),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Light client service.
|
||||
pub struct Service {
|
||||
client: Arc<Client>,
|
||||
_io_service: IoService<ClientIoMessage>,
|
||||
io_service: IoService<ClientIoMessage>,
|
||||
}
|
||||
|
||||
impl Service {
|
||||
/// Start the service: initialize I/O workers and client itself.
|
||||
pub fn start(config: ClientConfig, spec: &Spec) -> Result<Self, IoError> {
|
||||
let io_service = try!(IoService::<ClientIoMessage>::start());
|
||||
let client = Arc::new(Client::new(config, spec, io_service.channel()));
|
||||
try!(io_service.register_handler(Arc::new(ImportBlocks(client.clone()))));
|
||||
pub fn start(config: ClientConfig, spec: &Spec, path: &Path) -> Result<Self, Error> {
|
||||
// initialize database.
|
||||
let mut db_config = DatabaseConfig::with_columns(db::NUM_COLUMNS);
|
||||
|
||||
// give all rocksdb cache to the header chain column.
|
||||
if let Some(size) = config.db_cache_size {
|
||||
db_config.set_cache(db::COL_LIGHT_CHAIN, size);
|
||||
}
|
||||
|
||||
db_config.compaction = config.db_compaction;
|
||||
db_config.wal = config.db_wal;
|
||||
|
||||
let db = Arc::new(Database::open(
|
||||
&db_config,
|
||||
&path.to_str().expect("DB path could not be converted to string.")
|
||||
).map_err(Error::Database)?);
|
||||
|
||||
let io_service = IoService::<ClientIoMessage>::start().map_err(Error::Io)?;
|
||||
let client = Arc::new(Client::new(config,
|
||||
db,
|
||||
db::COL_LIGHT_CHAIN,
|
||||
spec,
|
||||
io_service.channel(),
|
||||
).map_err(Error::Database)?);
|
||||
io_service.register_handler(Arc::new(ImportBlocks(client.clone()))).map_err(Error::Io)?;
|
||||
Ok(Service {
|
||||
client: client,
|
||||
_io_service: io_service,
|
||||
io_service: io_service,
|
||||
})
|
||||
}
|
||||
|
||||
/// Register an I/O handler on the service.
|
||||
pub fn register_handler(&self, handler: Arc<IoHandler<ClientIoMessage> + Send>) -> Result<(), IoError> {
|
||||
self.io_service.register_handler(handler)
|
||||
}
|
||||
|
||||
/// Get a handle to the client.
|
||||
pub fn client(&self) -> &Arc<Client> {
|
||||
&self.client
|
||||
@@ -63,11 +110,13 @@ impl IoHandler<ClientIoMessage> for ImportBlocks {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::Service;
|
||||
use devtools::RandomTempPath;
|
||||
use ethcore::spec::Spec;
|
||||
|
||||
#[test]
|
||||
fn it_works() {
|
||||
let spec = Spec::new_test();
|
||||
Service::start(Default::default(), &spec).unwrap();
|
||||
let temp_path = RandomTempPath::new();
|
||||
Service::start(Default::default(), &spec, temp_path.as_path()).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -55,6 +55,7 @@ pub mod remote {
|
||||
|
||||
mod types;
|
||||
|
||||
pub use self::cache::Cache;
|
||||
pub use self::provider::Provider;
|
||||
pub use self::transaction_queue::TransactionQueue;
|
||||
pub use types::request as request;
|
||||
@@ -76,3 +77,6 @@ extern crate stats;
|
||||
|
||||
#[cfg(feature = "ipc")]
|
||||
extern crate ethcore_ipc as ipc;
|
||||
|
||||
#[cfg(test)]
|
||||
extern crate ethcore_devtools as devtools;
|
||||
|
||||
@@ -61,10 +61,12 @@ impl<'a> IoContext for NetworkContext<'a> {
|
||||
}
|
||||
|
||||
fn disconnect_peer(&self, peer: PeerId) {
|
||||
trace!(target: "pip", "Initiating disconnect of peer {}", peer);
|
||||
NetworkContext::disconnect_peer(self, peer);
|
||||
}
|
||||
|
||||
fn disable_peer(&self, peer: PeerId) {
|
||||
trace!(target: "pip", "Initiating disable of peer {}", peer);
|
||||
NetworkContext::disable_peer(self, peer);
|
||||
}
|
||||
|
||||
|
||||
@@ -27,7 +27,7 @@ use util::hash::H256;
|
||||
use util::{DBValue, Mutex, RwLock, U256};
|
||||
use time::{Duration, SteadyTime};
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::fmt;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
@@ -61,6 +61,9 @@ const TIMEOUT_INTERVAL_MS: u64 = 1000;
|
||||
const TICK_TIMEOUT: TimerToken = 1;
|
||||
const TICK_TIMEOUT_INTERVAL_MS: u64 = 5000;
|
||||
|
||||
const PROPAGATE_TIMEOUT: TimerToken = 2;
|
||||
const PROPAGATE_TIMEOUT_INTERVAL_MS: u64 = 5000;
|
||||
|
||||
// minimum interval between updates.
|
||||
const UPDATE_INTERVAL_MS: i64 = 5000;
|
||||
|
||||
@@ -132,6 +135,7 @@ pub struct Peer {
|
||||
last_update: SteadyTime,
|
||||
pending_requests: RequestSet,
|
||||
failed_requests: Vec<ReqId>,
|
||||
propagated_transactions: HashSet<H256>,
|
||||
}
|
||||
|
||||
/// A light protocol event handler.
|
||||
@@ -303,12 +307,18 @@ impl LightProtocol {
|
||||
match peer.remote_flow {
|
||||
None => Err(Error::NotServer),
|
||||
Some((ref mut creds, ref params)) => {
|
||||
// check that enough credits are available.
|
||||
let mut temp_creds: Credits = creds.clone();
|
||||
for request in requests.requests() {
|
||||
temp_creds.deduct_cost(params.compute_cost(request))?;
|
||||
// apply recharge to credits if there's no pending requests.
|
||||
if peer.pending_requests.is_empty() {
|
||||
params.recharge(creds);
|
||||
}
|
||||
*creds = temp_creds;
|
||||
|
||||
// compute and deduct cost.
|
||||
let pre_creds = creds.current();
|
||||
let cost = params.compute_cost_multi(requests.requests());
|
||||
creds.deduct_cost(cost)?;
|
||||
|
||||
trace!(target: "pip", "requesting from peer {}. Cost: {}; Available: {}",
|
||||
peer_id, cost, pre_creds);
|
||||
|
||||
let req_id = ReqId(self.req_id.fetch_add(1, Ordering::SeqCst));
|
||||
io.send(*peer_id, packet::REQUEST, {
|
||||
@@ -318,7 +328,7 @@ impl LightProtocol {
|
||||
});
|
||||
|
||||
// begin timeout.
|
||||
peer.pending_requests.insert(req_id, requests, SteadyTime::now());
|
||||
peer.pending_requests.insert(req_id, requests, cost, SteadyTime::now());
|
||||
Ok(req_id)
|
||||
}
|
||||
}
|
||||
@@ -401,20 +411,25 @@ impl LightProtocol {
|
||||
let req_id = ReqId(raw.val_at(0)?);
|
||||
let cur_credits: U256 = raw.val_at(1)?;
|
||||
|
||||
trace!(target: "pip", "pre-verifying response from peer {}", peer);
|
||||
trace!(target: "pip", "pre-verifying response for {} from peer {}", req_id, peer);
|
||||
|
||||
let peers = self.peers.read();
|
||||
let res = match peers.get(peer) {
|
||||
Some(peer_info) => {
|
||||
let mut peer_info = peer_info.lock();
|
||||
let req_info = peer_info.pending_requests.remove(&req_id, SteadyTime::now());
|
||||
let cumulative_cost = peer_info.pending_requests.cumulative_cost();
|
||||
let flow_info = peer_info.remote_flow.as_mut();
|
||||
|
||||
match (req_info, flow_info) {
|
||||
(Some(_), Some(flow_info)) => {
|
||||
let &mut (ref mut c, ref mut flow) = flow_info;
|
||||
let actual_credits = ::std::cmp::min(cur_credits, *flow.limit());
|
||||
c.update_to(actual_credits);
|
||||
|
||||
// only update if the cumulative cost of the request set is zero.
|
||||
if cumulative_cost == 0.into() {
|
||||
let actual_credits = ::std::cmp::min(cur_credits, *flow.limit());
|
||||
c.update_to(actual_credits);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -488,6 +503,47 @@ impl LightProtocol {
|
||||
}
|
||||
}
|
||||
|
||||
// propagate transactions to relay peers.
|
||||
// if we aren't on the mainnet, we just propagate to all relay peers
|
||||
fn propagate_transactions(&self, io: &IoContext) {
|
||||
if self.capabilities.read().tx_relay { return }
|
||||
|
||||
let ready_transactions = self.provider.ready_transactions();
|
||||
if ready_transactions.is_empty() { return }
|
||||
|
||||
trace!(target: "pip", "propagate transactions: {} ready", ready_transactions.len());
|
||||
|
||||
let all_transaction_hashes: HashSet<_> = ready_transactions.iter().map(|tx| tx.hash()).collect();
|
||||
let mut buf = Vec::new();
|
||||
|
||||
let peers = self.peers.read();
|
||||
for (peer_id, peer_info) in peers.iter() {
|
||||
let mut peer_info = peer_info.lock();
|
||||
if !peer_info.capabilities.tx_relay { continue }
|
||||
|
||||
let prop_filter = &mut peer_info.propagated_transactions;
|
||||
*prop_filter = &*prop_filter & &all_transaction_hashes;
|
||||
|
||||
// fill the buffer with all non-propagated transactions.
|
||||
let to_propagate = ready_transactions.iter()
|
||||
.filter(|tx| prop_filter.insert(tx.hash()))
|
||||
.map(|tx| &tx.transaction);
|
||||
|
||||
buf.extend(to_propagate);
|
||||
|
||||
// propagate to the given peer.
|
||||
if buf.is_empty() { continue }
|
||||
io.send(*peer_id, packet::SEND_TRANSACTIONS, {
|
||||
let mut stream = RlpStream::new_list(buf.len());
|
||||
for pending_tx in buf.drain(..) {
|
||||
stream.append(pending_tx);
|
||||
}
|
||||
|
||||
stream.out()
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// called when a peer connects.
|
||||
pub fn on_connect(&self, peer: &PeerId, io: &IoContext) {
|
||||
let proto_version = match io.protocol_version(*peer).ok_or(Error::WrongNetwork) {
|
||||
@@ -520,6 +576,7 @@ impl LightProtocol {
|
||||
last_update: SteadyTime::now(),
|
||||
});
|
||||
|
||||
trace!(target: "pip", "Sending status to peer {}", peer);
|
||||
io.send(*peer, packet::STATUS, status_packet);
|
||||
}
|
||||
|
||||
@@ -601,6 +658,7 @@ impl LightProtocol {
|
||||
last_update: pending.last_update,
|
||||
pending_requests: RequestSet::default(),
|
||||
failed_requests: Vec::new(),
|
||||
propagated_transactions: HashSet::new(),
|
||||
}));
|
||||
|
||||
for handler in &self.handlers {
|
||||
@@ -683,6 +741,8 @@ impl LightProtocol {
|
||||
trace!(target: "pip", "Received requests (id: {}) from peer {}", req_id, peer_id);
|
||||
|
||||
// deserialize requests, check costs and request validity.
|
||||
self.flow_params.recharge(&mut peer.local_credits);
|
||||
|
||||
peer.local_credits.deduct_cost(self.flow_params.base_cost())?;
|
||||
for request_rlp in raw.at(1)?.iter().take(MAX_REQUESTS) {
|
||||
let request: Request = request_rlp.as_val()?;
|
||||
@@ -709,6 +769,7 @@ impl LightProtocol {
|
||||
});
|
||||
|
||||
trace!(target: "pip", "Responded to {}/{} requests in packet {}", responses.len(), num_requests, req_id);
|
||||
trace!(target: "pip", "Peer {} has {} credits remaining.", peer_id, peer.local_credits.current());
|
||||
|
||||
io.respond(packet::RESPONSE, {
|
||||
let mut stream = RlpStream::new_list(3);
|
||||
@@ -782,6 +843,8 @@ impl NetworkProtocolHandler for LightProtocol {
|
||||
.expect("Error registering sync timer.");
|
||||
io.register_timer(TICK_TIMEOUT, TICK_TIMEOUT_INTERVAL_MS)
|
||||
.expect("Error registering sync timer.");
|
||||
io.register_timer(PROPAGATE_TIMEOUT, PROPAGATE_TIMEOUT_INTERVAL_MS)
|
||||
.expect("Error registering sync timer.");
|
||||
}
|
||||
|
||||
fn read(&self, io: &NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]) {
|
||||
@@ -800,6 +863,7 @@ impl NetworkProtocolHandler for LightProtocol {
|
||||
match timer {
|
||||
TIMEOUT => self.timeout_check(io),
|
||||
TICK_TIMEOUT => self.tick_handlers(io),
|
||||
PROPAGATE_TIMEOUT => self.propagate_transactions(io),
|
||||
_ => warn!(target: "pip", "received timeout on unknown token {}", timer),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -27,22 +27,29 @@ use std::iter::FromIterator;
|
||||
use request::Request;
|
||||
use request::Requests;
|
||||
use net::{timeout, ReqId};
|
||||
use util::U256;
|
||||
|
||||
use time::{Duration, SteadyTime};
|
||||
|
||||
// Request set entry: requests + cost.
|
||||
#[derive(Debug)]
|
||||
struct Entry(Requests, U256);
|
||||
|
||||
/// Request set.
|
||||
#[derive(Debug)]
|
||||
pub struct RequestSet {
|
||||
counter: u64,
|
||||
cumulative_cost: U256,
|
||||
base: Option<SteadyTime>,
|
||||
ids: HashMap<ReqId, u64>,
|
||||
reqs: BTreeMap<u64, Requests>,
|
||||
reqs: BTreeMap<u64, Entry>,
|
||||
}
|
||||
|
||||
impl Default for RequestSet {
|
||||
fn default() -> Self {
|
||||
RequestSet {
|
||||
counter: 0,
|
||||
cumulative_cost: 0.into(),
|
||||
base: None,
|
||||
ids: HashMap::new(),
|
||||
reqs: BTreeMap::new(),
|
||||
@@ -52,10 +59,12 @@ impl Default for RequestSet {
|
||||
|
||||
impl RequestSet {
|
||||
/// Push requests onto the stack.
|
||||
pub fn insert(&mut self, req_id: ReqId, req: Requests, now: SteadyTime) {
|
||||
pub fn insert(&mut self, req_id: ReqId, req: Requests, cost: U256, now: SteadyTime) {
|
||||
let counter = self.counter;
|
||||
self.cumulative_cost = self.cumulative_cost + cost;
|
||||
|
||||
self.ids.insert(req_id, counter);
|
||||
self.reqs.insert(counter, req);
|
||||
self.reqs.insert(counter, Entry(req, cost));
|
||||
|
||||
if self.reqs.keys().next().map_or(true, |x| *x == counter) {
|
||||
self.base = Some(now);
|
||||
@@ -71,7 +80,7 @@ impl RequestSet {
|
||||
None => return None,
|
||||
};
|
||||
|
||||
let req = self.reqs.remove(&id).expect("entry in `ids` implies entry in `reqs`; qed");
|
||||
let Entry(req, cost) = self.reqs.remove(&id).expect("entry in `ids` implies entry in `reqs`; qed");
|
||||
|
||||
match self.reqs.keys().next() {
|
||||
Some(k) if *k > id => self.base = Some(now),
|
||||
@@ -79,6 +88,7 @@ impl RequestSet {
|
||||
_ => {}
|
||||
}
|
||||
|
||||
self.cumulative_cost = self.cumulative_cost - cost;
|
||||
Some(req)
|
||||
}
|
||||
|
||||
@@ -93,7 +103,7 @@ impl RequestSet {
|
||||
let first_req = self.reqs.values().next()
|
||||
.expect("base existing implies `reqs` non-empty; qed");
|
||||
|
||||
base + compute_timeout(&first_req) <= now
|
||||
base + compute_timeout(&first_req.0) <= now
|
||||
}
|
||||
|
||||
/// Collect all pending request ids.
|
||||
@@ -108,6 +118,9 @@ impl RequestSet {
|
||||
|
||||
/// Whether the set is empty.
|
||||
pub fn is_empty(&self) -> bool { self.len() == 0 }
|
||||
|
||||
/// The cumulative cost of all requests in the set.
|
||||
pub fn cumulative_cost(&self) -> U256 { self.cumulative_cost }
|
||||
}
|
||||
|
||||
// helper to calculate timeout for a specific set of requests.
|
||||
@@ -141,8 +154,8 @@ mod tests {
|
||||
|
||||
let the_req = RequestBuilder::default().build();
|
||||
let req_time = compute_timeout(&the_req);
|
||||
req_set.insert(ReqId(0), the_req.clone(), test_begin);
|
||||
req_set.insert(ReqId(1), the_req, test_begin + Duration::seconds(1));
|
||||
req_set.insert(ReqId(0), the_req.clone(), 0.into(), test_begin);
|
||||
req_set.insert(ReqId(1), the_req, 0.into(), test_begin + Duration::seconds(1));
|
||||
|
||||
assert_eq!(req_set.base, Some(test_begin));
|
||||
|
||||
@@ -153,4 +166,22 @@ mod tests {
|
||||
assert!(!req_set.check_timeout(test_end));
|
||||
assert!(req_set.check_timeout(test_end + Duration::seconds(1)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cumulative_cost() {
|
||||
let the_req = RequestBuilder::default().build();
|
||||
let test_begin = SteadyTime::now();
|
||||
let test_end = test_begin + Duration::seconds(1);
|
||||
let mut req_set = RequestSet::default();
|
||||
|
||||
for i in 0..5 {
|
||||
req_set.insert(ReqId(i), the_req.clone(), 1.into(), test_begin);
|
||||
assert_eq!(req_set.cumulative_cost, (i + 1).into());
|
||||
}
|
||||
|
||||
for i in (0..5).rev() {
|
||||
assert!(req_set.remove(&ReqId(i), test_end).is_some());
|
||||
assert_eq!(req_set.cumulative_cost, i.into());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -600,8 +600,8 @@ fn id_guard() {
|
||||
|
||||
let mut pending_requests = RequestSet::default();
|
||||
|
||||
pending_requests.insert(req_id_1, req.clone(), ::time::SteadyTime::now());
|
||||
pending_requests.insert(req_id_2, req, ::time::SteadyTime::now());
|
||||
pending_requests.insert(req_id_1, req.clone(), 0.into(), ::time::SteadyTime::now());
|
||||
pending_requests.insert(req_id_2, req, 1.into(), ::time::SteadyTime::now());
|
||||
|
||||
proto.peers.write().insert(peer_id, ::util::Mutex::new(Peer {
|
||||
local_credits: flow_params.create_credits(),
|
||||
@@ -612,6 +612,7 @@ fn id_guard() {
|
||||
last_update: ::time::SteadyTime::now(),
|
||||
pending_requests: pending_requests,
|
||||
failed_requests: Vec::new(),
|
||||
propagated_transactions: Default::default(),
|
||||
}));
|
||||
|
||||
// first, malformed responses.
|
||||
|
||||
@@ -37,7 +37,7 @@ use rlp::RlpStream;
|
||||
use util::{Bytes, RwLock, Mutex, U256, H256};
|
||||
use util::sha3::{SHA3_NULL_RLP, SHA3_EMPTY_LIST_RLP};
|
||||
|
||||
use net::{Handler, Status, Capabilities, Announcement, EventContext, BasicContext, ReqId};
|
||||
use net::{self, Handler, Status, Capabilities, Announcement, EventContext, BasicContext, ReqId};
|
||||
use cache::Cache;
|
||||
use request::{self as basic_request, Request as NetworkRequest, Response as NetworkResponse};
|
||||
|
||||
@@ -57,15 +57,15 @@ impl Peer {
|
||||
self.capabilities.serve_headers && self.status.head_num > req.num(),
|
||||
Pending::HeaderByHash(_, _) => self.capabilities.serve_headers,
|
||||
Pending::Block(ref req, _) =>
|
||||
self.capabilities.serve_chain_since.as_ref().map_or(false, |x| *x >= req.header.number()),
|
||||
self.capabilities.serve_chain_since.as_ref().map_or(false, |x| *x <= req.header.number()),
|
||||
Pending::BlockReceipts(ref req, _) =>
|
||||
self.capabilities.serve_chain_since.as_ref().map_or(false, |x| *x >= req.0.number()),
|
||||
self.capabilities.serve_chain_since.as_ref().map_or(false, |x| *x <= req.0.number()),
|
||||
Pending::Account(ref req, _) =>
|
||||
self.capabilities.serve_state_since.as_ref().map_or(false, |x| *x >= req.header.number()),
|
||||
self.capabilities.serve_state_since.as_ref().map_or(false, |x| *x <= req.header.number()),
|
||||
Pending::Code(ref req, _) =>
|
||||
self.capabilities.serve_state_since.as_ref().map_or(false, |x| *x >= req.block_id.1),
|
||||
self.capabilities.serve_state_since.as_ref().map_or(false, |x| *x <= req.block_id.1),
|
||||
Pending::TxProof(ref req, _) =>
|
||||
self.capabilities.serve_state_since.as_ref().map_or(false, |x| *x >= req.header.number()),
|
||||
self.capabilities.serve_state_since.as_ref().map_or(false, |x| *x <= req.header.number()),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -210,7 +210,7 @@ impl OnDemand {
|
||||
/// it as easily.
|
||||
pub fn header_by_hash(&self, ctx: &BasicContext, req: request::HeaderByHash) -> Receiver<encoded::Header> {
|
||||
let (sender, receiver) = oneshot::channel();
|
||||
match self.cache.lock().block_header(&req.0) {
|
||||
match { self.cache.lock().block_header(&req.0) } {
|
||||
Some(hdr) => sender.send(hdr).expect(RECEIVER_IN_SCOPE),
|
||||
None => self.dispatch(ctx, Pending::HeaderByHash(req, sender)),
|
||||
}
|
||||
@@ -232,11 +232,13 @@ impl OnDemand {
|
||||
|
||||
sender.send(encoded::Block::new(stream.out())).expect(RECEIVER_IN_SCOPE);
|
||||
} else {
|
||||
match self.cache.lock().block_body(&req.hash) {
|
||||
match { self.cache.lock().block_body(&req.hash) } {
|
||||
Some(body) => {
|
||||
let mut stream = RlpStream::new_list(3);
|
||||
let body = body.rlp();
|
||||
stream.append_raw(&req.header.into_inner(), 1);
|
||||
stream.append_raw(&body.into_inner(), 2);
|
||||
stream.append_raw(&body.at(0).as_raw(), 1);
|
||||
stream.append_raw(&body.at(1).as_raw(), 1);
|
||||
|
||||
sender.send(encoded::Block::new(stream.out())).expect(RECEIVER_IN_SCOPE);
|
||||
}
|
||||
@@ -255,7 +257,7 @@ impl OnDemand {
|
||||
if req.0.receipts_root() == SHA3_NULL_RLP {
|
||||
sender.send(Vec::new()).expect(RECEIVER_IN_SCOPE);
|
||||
} else {
|
||||
match self.cache.lock().block_receipts(&req.0.hash()) {
|
||||
match { self.cache.lock().block_receipts(&req.0.hash()) } {
|
||||
Some(receipts) => sender.send(receipts).expect(RECEIVER_IN_SCOPE),
|
||||
None => self.dispatch(ctx, Pending::BlockReceipts(req, sender)),
|
||||
}
|
||||
@@ -303,23 +305,26 @@ impl OnDemand {
|
||||
|
||||
let complete = builder.build();
|
||||
|
||||
let kind = complete.requests()[0].kind();
|
||||
for (id, peer) in self.peers.read().iter() {
|
||||
if !peer.can_handle(&pending) { continue }
|
||||
match ctx.request_from(*id, complete.clone()) {
|
||||
Ok(req_id) => {
|
||||
trace!(target: "on_demand", "Assigning request to peer {}", id);
|
||||
trace!(target: "on_demand", "{}: Assigned {:?} to peer {}",
|
||||
req_id, kind, id);
|
||||
|
||||
self.pending_requests.write().insert(
|
||||
req_id,
|
||||
pending,
|
||||
);
|
||||
return
|
||||
}
|
||||
Err(net::Error::NoCredits) => {}
|
||||
Err(e) =>
|
||||
trace!(target: "on_demand", "Failed to make request of peer {}: {:?}", id, e),
|
||||
}
|
||||
}
|
||||
|
||||
trace!(target: "on_demand", "No suitable peer for request");
|
||||
self.orphaned_requests.write().push(pending);
|
||||
}
|
||||
|
||||
@@ -353,6 +358,7 @@ impl OnDemand {
|
||||
|
||||
let to_dispatch = ::std::mem::replace(&mut *self.orphaned_requests.write(), Vec::new());
|
||||
|
||||
trace!(target: "on_demand", "Attempting to dispatch {} orphaned requests.", to_dispatch.len());
|
||||
for mut orphaned in to_dispatch {
|
||||
let hung_up = match orphaned {
|
||||
Pending::HeaderProof(_, ref mut sender) => match *sender {
|
||||
@@ -397,10 +403,12 @@ impl Handler for OnDemand {
|
||||
}
|
||||
|
||||
fn on_announcement(&self, ctx: &EventContext, announcement: &Announcement) {
|
||||
let mut peers = self.peers.write();
|
||||
if let Some(ref mut peer) = peers.get_mut(&ctx.peer()) {
|
||||
peer.status.update_from(&announcement);
|
||||
peer.capabilities.update_from(&announcement);
|
||||
{
|
||||
let mut peers = self.peers.write();
|
||||
if let Some(ref mut peer) = peers.get_mut(&ctx.peer()) {
|
||||
peer.status.update_from(&announcement);
|
||||
peer.capabilities.update_from(&announcement);
|
||||
}
|
||||
}
|
||||
|
||||
self.dispatch_orphaned(ctx.as_basic());
|
||||
@@ -422,6 +430,8 @@ impl Handler for OnDemand {
|
||||
}
|
||||
};
|
||||
|
||||
trace!(target: "on_demand", "Handling response for request {}, kind={:?}", req_id, response.kind());
|
||||
|
||||
// handle the response appropriately for the request.
|
||||
// all branches which do not return early lead to disabling of the peer
|
||||
// due to misbehavior.
|
||||
@@ -441,7 +451,7 @@ impl Handler for OnDemand {
|
||||
}
|
||||
return
|
||||
}
|
||||
Err(e) => warn!("Error handling response for header request: {:?}", e),
|
||||
Err(e) => warn!(target: "on_demand", "Error handling response for header request: {:?}", e),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -454,7 +464,7 @@ impl Handler for OnDemand {
|
||||
let _ = sender.send(header);
|
||||
return
|
||||
}
|
||||
Err(e) => warn!("Error handling response for header request: {:?}", e),
|
||||
Err(e) => warn!(target: "on_demand", "Error handling response for header request: {:?}", e),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -467,7 +477,7 @@ impl Handler for OnDemand {
|
||||
let _ = sender.send(block);
|
||||
return
|
||||
}
|
||||
Err(e) => warn!("Error handling response for block request: {:?}", e),
|
||||
Err(e) => warn!(target: "on_demand", "Error handling response for block request: {:?}", e),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -480,7 +490,7 @@ impl Handler for OnDemand {
|
||||
let _ = sender.send(receipts);
|
||||
return
|
||||
}
|
||||
Err(e) => warn!("Error handling response for receipts request: {:?}", e),
|
||||
Err(e) => warn!(target: "on_demand", "Error handling response for receipts request: {:?}", e),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -493,7 +503,7 @@ impl Handler for OnDemand {
|
||||
let _ = sender.send(maybe_account);
|
||||
return
|
||||
}
|
||||
Err(e) => warn!("Error handling response for state request: {:?}", e),
|
||||
Err(e) => warn!(target: "on_demand", "Error handling response for state request: {:?}", e),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -504,7 +514,7 @@ impl Handler for OnDemand {
|
||||
let _ = sender.send(response.code.clone());
|
||||
return
|
||||
}
|
||||
Err(e) => warn!("Error handling response for code request: {:?}", e),
|
||||
Err(e) => warn!(target: "on_demand", "Error handling response for code request: {:?}", e),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -519,7 +529,7 @@ impl Handler for OnDemand {
|
||||
let _ = sender.send(Err(err));
|
||||
return
|
||||
}
|
||||
ProvedExecution::BadProof => warn!("Error handling response for transaction proof request"),
|
||||
ProvedExecution::BadProof => warn!(target: "on_demand", "Error handling response for transaction proof request"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -151,7 +151,8 @@ impl Body {
|
||||
// concatenate the header and the body.
|
||||
let mut stream = RlpStream::new_list(3);
|
||||
stream.append_raw(self.header.rlp().as_raw(), 1);
|
||||
stream.append_raw(&body.rlp().as_raw(), 2);
|
||||
stream.append_raw(body.rlp().at(0).as_raw(), 1);
|
||||
stream.append_raw(body.rlp().at(1).as_raw(), 1);
|
||||
|
||||
Ok(encoded::Block::new(stream.out()))
|
||||
}
|
||||
@@ -243,12 +244,14 @@ impl TransactionProof {
|
||||
pub fn check_response(&self, state_items: &[DBValue]) -> ProvedExecution {
|
||||
let root = self.header.state_root();
|
||||
|
||||
let mut env_info = self.env_info.clone();
|
||||
env_info.gas_limit = self.tx.gas.clone();
|
||||
state::check_proof(
|
||||
state_items,
|
||||
root,
|
||||
&self.tx,
|
||||
&*self.engine,
|
||||
&self.env_info,
|
||||
&env_info,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -244,7 +244,8 @@ pub enum CompleteRequest {
|
||||
}
|
||||
|
||||
impl Request {
|
||||
fn kind(&self) -> Kind {
|
||||
/// Get the request kind.
|
||||
pub fn kind(&self) -> Kind {
|
||||
match *self {
|
||||
Request::Headers(_) => Kind::Headers,
|
||||
Request::HeaderProof(_) => Kind::HeaderProof,
|
||||
@@ -435,7 +436,8 @@ impl Response {
|
||||
}
|
||||
}
|
||||
|
||||
fn kind(&self) -> Kind {
|
||||
/// Inspect the kind of this response.
|
||||
pub fn kind(&self) -> Kind {
|
||||
match *self {
|
||||
Response::Headers(_) => Kind::Headers,
|
||||
Response::HeaderProof(_) => Kind::HeaderProof,
|
||||
@@ -726,7 +728,6 @@ pub mod header_proof {
|
||||
|
||||
impl Decodable for Response {
|
||||
fn decode(rlp: &UntrustedRlp) -> Result<Self, DecoderError> {
|
||||
|
||||
Ok(Response {
|
||||
proof: rlp.list_at(0)?,
|
||||
hash: rlp.val_at(1)?,
|
||||
@@ -737,12 +738,10 @@ pub mod header_proof {
|
||||
|
||||
impl Encodable for Response {
|
||||
fn rlp_append(&self, s: &mut RlpStream) {
|
||||
s.begin_list(3).begin_list(self.proof.len());
|
||||
for item in &self.proof {
|
||||
s.append_list(&item);
|
||||
}
|
||||
|
||||
s.append(&self.hash).append(&self.td);
|
||||
s.begin_list(3)
|
||||
.append_list::<Vec<u8>,_>(&self.proof[..])
|
||||
.append(&self.hash)
|
||||
.append(&self.td);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -826,7 +825,6 @@ pub mod block_receipts {
|
||||
|
||||
impl Decodable for Response {
|
||||
fn decode(rlp: &UntrustedRlp) -> Result<Self, DecoderError> {
|
||||
|
||||
Ok(Response {
|
||||
receipts: rlp.as_list()?,
|
||||
})
|
||||
@@ -923,8 +921,8 @@ pub mod block_body {
|
||||
use ethcore::transaction::UnverifiedTransaction;
|
||||
|
||||
// check body validity.
|
||||
let _: Vec<FullHeader> = rlp.list_at(0)?;
|
||||
let _: Vec<UnverifiedTransaction> = rlp.list_at(1)?;
|
||||
let _: Vec<UnverifiedTransaction> = rlp.list_at(0)?;
|
||||
let _: Vec<FullHeader> = rlp.list_at(1)?;
|
||||
|
||||
Ok(Response {
|
||||
body: encoded::Body::new(rlp.as_raw().to_owned()),
|
||||
@@ -1063,12 +1061,9 @@ pub mod account {
|
||||
|
||||
impl Encodable for Response {
|
||||
fn rlp_append(&self, s: &mut RlpStream) {
|
||||
s.begin_list(5).begin_list(self.proof.len());
|
||||
for item in &self.proof {
|
||||
s.append_list(&item);
|
||||
}
|
||||
|
||||
s.append(&self.nonce)
|
||||
s.begin_list(5)
|
||||
.append_list::<Vec<u8>,_>(&self.proof[..])
|
||||
.append(&self.nonce)
|
||||
.append(&self.balance)
|
||||
.append(&self.code_hash)
|
||||
.append(&self.storage_root);
|
||||
@@ -1207,11 +1202,9 @@ pub mod storage {
|
||||
|
||||
impl Encodable for Response {
|
||||
fn rlp_append(&self, s: &mut RlpStream) {
|
||||
s.begin_list(2).begin_list(self.proof.len());
|
||||
for item in &self.proof {
|
||||
s.append_list(&item);
|
||||
}
|
||||
s.append(&self.value);
|
||||
s.begin_list(2)
|
||||
.append_list::<Vec<u8>,_>(&self.proof[..])
|
||||
.append(&self.value);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1486,9 +1479,16 @@ mod tests {
|
||||
fn check_roundtrip<T>(val: T)
|
||||
where T: ::rlp::Encodable + ::rlp::Decodable + PartialEq + ::std::fmt::Debug
|
||||
{
|
||||
// check as single value.
|
||||
let bytes = ::rlp::encode(&val);
|
||||
let new_val: T = ::rlp::decode(&bytes);
|
||||
assert_eq!(val, new_val);
|
||||
|
||||
// check as list containing single value.
|
||||
let list = [val];
|
||||
let bytes = ::rlp::encode_list(&list);
|
||||
let new_list: Vec<T> = ::rlp::decode_list(&bytes);
|
||||
assert_eq!(&list, &new_list[..]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -1540,7 +1540,7 @@ mod tests {
|
||||
|
||||
let full_req = Request::HeaderProof(req.clone());
|
||||
let res = HeaderProofResponse {
|
||||
proof: Vec::new(),
|
||||
proof: vec![vec![1, 2, 3], vec![4, 5, 6]],
|
||||
hash: Default::default(),
|
||||
td: 100.into(),
|
||||
};
|
||||
@@ -1572,6 +1572,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn body_roundtrip() {
|
||||
use ethcore::transaction::{Transaction, UnverifiedTransaction};
|
||||
let req = IncompleteBodyRequest {
|
||||
hash: Field::Scalar(Default::default()),
|
||||
};
|
||||
@@ -1579,8 +1580,12 @@ mod tests {
|
||||
let full_req = Request::Body(req.clone());
|
||||
let res = BodyResponse {
|
||||
body: {
|
||||
let header = ::ethcore::header::Header::default();
|
||||
let tx = UnverifiedTransaction::from(Transaction::default().fake_sign(Default::default()));
|
||||
let mut stream = RlpStream::new_list(2);
|
||||
stream.begin_list(0).begin_list(0);
|
||||
stream.begin_list(2).append(&tx).append(&tx)
|
||||
.begin_list(1).append(&header);
|
||||
|
||||
::ethcore::encoded::Body::new(stream.out())
|
||||
},
|
||||
};
|
||||
@@ -1601,7 +1606,7 @@ mod tests {
|
||||
|
||||
let full_req = Request::Account(req.clone());
|
||||
let res = AccountResponse {
|
||||
proof: Vec::new(),
|
||||
proof: vec![vec![1, 2, 3], vec![4, 5, 6]],
|
||||
nonce: 100.into(),
|
||||
balance: 123456.into(),
|
||||
code_hash: Default::default(),
|
||||
@@ -1625,7 +1630,7 @@ mod tests {
|
||||
|
||||
let full_req = Request::Storage(req.clone());
|
||||
let res = StorageResponse {
|
||||
proof: Vec::new(),
|
||||
proof: vec![vec![1, 2, 3], vec![4, 5, 6]],
|
||||
value: H256::default(),
|
||||
};
|
||||
let full_res = Response::Storage(res.clone());
|
||||
@@ -1707,4 +1712,31 @@ mod tests {
|
||||
assert_eq!(rlp.val_at::<usize>(0).unwrap(), 100usize);
|
||||
assert_eq!(rlp.list_at::<Request>(1).unwrap(), reqs);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn responses_vec() {
|
||||
let mut stream = RlpStream::new_list(2);
|
||||
stream.begin_list(0).begin_list(0);
|
||||
|
||||
let body = ::ethcore::encoded::Body::new(stream.out());
|
||||
let reqs = vec![
|
||||
Response::Headers(HeadersResponse { headers: vec![] }),
|
||||
Response::HeaderProof(HeaderProofResponse { proof: vec![], hash: Default::default(), td: 100.into()}),
|
||||
Response::Receipts(ReceiptsResponse { receipts: vec![Default::default()] }),
|
||||
Response::Body(BodyResponse { body: body }),
|
||||
Response::Account(AccountResponse {
|
||||
proof: vec![],
|
||||
nonce: 100.into(),
|
||||
balance: 123.into(),
|
||||
code_hash: Default::default(),
|
||||
storage_root: Default::default()
|
||||
}),
|
||||
Response::Storage(StorageResponse { proof: vec![], value: H256::default() }),
|
||||
Response::Code(CodeResponse { code: vec![1, 2, 3, 4, 5] }),
|
||||
Response::Execution(ExecutionResponse { items: vec![] }),
|
||||
];
|
||||
|
||||
let raw = ::rlp::encode_list(&reqs);
|
||||
assert_eq!(::rlp::decode_list::<Response>(&raw), reqs);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user