sync tests

This commit is contained in:
arkpar 2015-12-26 15:47:07 +01:00
parent 9087cc798b
commit bf9667a206
7 changed files with 436 additions and 118 deletions

View File

@ -19,7 +19,7 @@ use transaction::*;
use views::*;
/// Represents a tree route between `from` block and `to` block:
///
///
/// - `blocks` - a vector of hashes of all blocks, ordered from `from` to `to`.
///
/// - `ancestor` - best common ancestor of these blocks.
@ -59,7 +59,7 @@ impl BestBlock {
}
/// Structure providing fast access to blockchain data.
///
///
/// **Does not do input data verification.**
pub struct BlockChain {
best_block: RefCell<BestBlock>,
@ -80,7 +80,7 @@ pub struct BlockChain {
impl BlockChain {
/// Create new instance of blockchain from given Genesis
///
///
/// ```rust
/// extern crate ethcore_util as util;
/// extern crate ethcore;
@ -90,7 +90,7 @@ impl BlockChain {
/// use ethcore::blockchain::*;
/// use util::hash::*;
/// use util::uint::*;
///
///
/// fn main() {
/// let genesis = Genesis::new_frontier();
///
@ -152,7 +152,7 @@ impl BlockChain {
batch.put_extras(&header.number(), &hash);
batch.put(b"best", &hash).unwrap();
bc.extras_db.write(batch).unwrap();
hash
}
};
@ -168,44 +168,44 @@ impl BlockChain {
}
/// Returns a tree route between `from` and `to`, which is a tuple of:
///
///
/// - a vector of hashes of all blocks, ordered from `from` to `to`.
///
/// - common ancestor of these blocks.
///
/// - an index where best common ancestor would be
///
///
/// 1.) from newer to older
///
///
/// - bc: `A1 -> A2 -> A3 -> A4 -> A5`
/// - from: A5, to: A4
/// - route:
/// - route:
///
/// ```json
/// { blocks: [A5], ancestor: A4, index: 1 }
/// ```
///
///
/// 2.) from older to newer
///
///
/// - bc: `A1 -> A2 -> A3 -> A4 -> A5`
/// - from: A3, to: A4
/// - route:
///
/// - route:
///
/// ```json
/// { blocks: [A4], ancestor: A3, index: 0 }
/// ```
///
/// 3.) fork:
///
/// - bc:
/// - bc:
///
/// ```text
/// A1 -> A2 -> A3 -> A4
/// -> B3 -> B4
/// ```
/// ```
/// - from: B4, to: A4
/// - route:
///
/// - route:
///
/// ```json
/// { blocks: [B4, B3, A3, A4], ancestor: A2, index: 2 }
/// ```
@ -302,7 +302,7 @@ impl BlockChain {
// create views onto rlp
let block = BlockView::new(bytes);
let header = block.header_view();
// prepare variables
let hash = block.sha3();
let mut parent_details = self.block_details(&header.parent_hash()).expect("Invalid parent hash.");
@ -317,7 +317,7 @@ impl BlockChain {
parent: parent_hash.clone(),
children: vec![]
};
// prepare the batch
let batch = WriteBatch::new();
@ -333,7 +333,7 @@ impl BlockChain {
return (batch, None);
}
// if its new best block we need to make sure that all ancestors
// if its new best block we need to make sure that all ancestors
// are moved to "canon chain"
// find the route between old best block and the new one
let best_hash = self.best_block_hash();
@ -368,7 +368,7 @@ impl BlockChain {
(batch, Some(best_block))
}
/// Returns true if the given block is known
/// Returns true if the given block is known
/// (though not necessarily a part of the canon chain).
pub fn is_known(&self, hash: &H256) -> bool {
self.query_extras_exist(hash, &self.block_details)
@ -471,8 +471,8 @@ impl BlockChain {
}
}
fn query_extras<K, T>(&self, hash: &K, cache: &RefCell<HashMap<K, T>>) -> Option<T> where
T: Clone + Decodable + ExtrasIndexable,
fn query_extras<K, T>(&self, hash: &K, cache: &RefCell<HashMap<K, T>>) -> Option<T> where
T: Clone + Decodable + ExtrasIndexable,
K: ExtrasSliceConvertable + Eq + Hash + Clone {
{
let read = cache.borrow();
@ -489,7 +489,7 @@ impl BlockChain {
})
}
fn query_extras_exist<K, T>(&self, hash: &K, cache: &RefCell<HashMap<K, T>>) -> bool where
fn query_extras_exist<K, T>(&self, hash: &K, cache: &RefCell<HashMap<K, T>>) -> bool where
K: ExtrasSliceConvertable + Eq + Hash + Clone,
T: ExtrasIndexable {
{
@ -541,7 +541,7 @@ mod tests {
dir.push(H32::random().hex());
let bc = BlockChain::new(&genesis, &dir);
let genesis_hash = H256::from_str("3caa2203f3d7c136c0295ed128a7d31cea520b1ca5e27afe17d0853331798942").unwrap();
assert_eq!(bc.genesis_hash(), genesis_hash.clone());
@ -549,7 +549,7 @@ mod tests {
assert_eq!(bc.best_block_hash(), genesis_hash.clone());
assert_eq!(bc.block_hash(&U256::from(0u8)), Some(genesis_hash.clone()));
assert_eq!(bc.block_hash(&U256::from(1u8)), None);
let first = "f90285f90219a03caa2203f3d7c136c0295ed128a7d31cea520b1ca5e27afe17d0853331798942a01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347948888f1f195afa192cfee860698584c030f4c9db1a0bac6177a79e910c98d86ec31a09ae37ac2de15b754fd7bed1ba52362c49416bfa0d45893a296c1490a978e0bd321b5f2635d8280365c1fe9f693d65f233e791344a0c7778a7376099ee2e5c455791c1885b5c361b95713fddcbe32d97fd01334d296b90100000000000000000010000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000200000000000000000008000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000040000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000200000000000400000000000000000000000000000000000000000000000000000008302000001832fefd882560b845627cb99a00102030405060708091011121314151617181920212223242526272829303132a08ccb2837fb2923bd97e8f2d08ea32012d6e34be018c73e49a0f98843e8f47d5d88e53be49fec01012ef866f864800a82c35094095e7baea6a6c7c4c2dfeb977efac326af552d8785012a05f200801ba0cb088b8d2ff76a7b2c6616c9d02fb6b7a501afbf8b69d7180b09928a1b80b5e4a06448fe7476c606582039bb72a9f6f4b4fad18507b8dfbd00eebbe151cc573cd2c0".from_hex().unwrap();
@ -610,22 +610,22 @@ mod tests {
assert_eq!(r0_1.blocks, [b1_hash.clone()]);
assert_eq!(r0_1.index, 0);
let r0_2 = bc.tree_route(genesis_hash.clone(), b2_hash.clone());
let r0_2 = bc.tree_route(genesis_hash.clone(), b2_hash.clone());
assert_eq!(r0_2.ancestor, genesis_hash);
assert_eq!(r0_2.blocks, [b1_hash.clone(), b2_hash.clone()]);
assert_eq!(r0_2.index, 0);
let r1_3a = bc.tree_route(b1_hash.clone(), b3a_hash.clone());
let r1_3a = bc.tree_route(b1_hash.clone(), b3a_hash.clone());
assert_eq!(r1_3a.ancestor, b1_hash);
assert_eq!(r1_3a.blocks, [b2_hash.clone(), b3a_hash.clone()]);
assert_eq!(r1_3a.index, 0);
let r1_3b = bc.tree_route(b1_hash.clone(), b3b_hash.clone());
let r1_3b = bc.tree_route(b1_hash.clone(), b3b_hash.clone());
assert_eq!(r1_3b.ancestor, b1_hash);
assert_eq!(r1_3b.blocks, [b2_hash.clone(), b3b_hash.clone()]);
assert_eq!(r1_3b.index, 0);
let r3a_3b = bc.tree_route(b3a_hash.clone(), b3b_hash.clone());
let r3a_3b = bc.tree_route(b3a_hash.clone(), b3b_hash.clone());
assert_eq!(r3a_3b.ancestor, b2_hash);
assert_eq!(r3a_3b.blocks, [b3a_hash.clone(), b3b_hash.clone()]);
assert_eq!(r3a_3b.index, 1);
@ -639,7 +639,7 @@ mod tests {
assert_eq!(r2_0.ancestor, genesis_hash);
assert_eq!(r2_0.blocks, [b2_hash.clone(), b1_hash.clone()]);
assert_eq!(r2_0.index, 2);
let r3a_1 = bc.tree_route(b3a_hash.clone(), b1_hash.clone());
assert_eq!(r3a_1.ancestor, b1_hash);
assert_eq!(r3a_1.blocks, [b3a_hash.clone(), b2_hash.clone()]);

View File

@ -35,7 +35,7 @@ pub struct BlockQueueStatus {
pub full: bool,
}
pub struct TreeRoute;
pub type TreeRoute = ::blockchain::TreeRoute;
pub type BlockNumber = u32;
pub type BlockHeader = ::header::Header;
@ -46,9 +46,9 @@ pub trait BlockChainClient : Sync {
fn block(&self, h: &H256) -> Option<Bytes>;
fn block_status(&self, h: &H256) -> BlockStatus;
fn block_header_at(&self, n: BlockNumber) -> Option<Bytes>;
fn block_body_at(&self, h: BlockNumber) -> Option<Bytes>;
fn block_at(&self, h: BlockNumber) -> Option<Bytes>;
fn block_status_at(&self, h: BlockNumber) -> BlockStatus;
fn block_body_at(&self, n: BlockNumber) -> Option<Bytes>;
fn block_at(&self, n: BlockNumber) -> Option<Bytes>;
fn block_status_at(&self, n: BlockNumber) -> BlockStatus;
fn tree_route(&self, from: &H256, to: &H256) -> TreeRoute;
fn state_data(&self, h: &H256) -> Option<Bytes>;
fn block_receipts(&self, h: &H256) -> Option<Bytes>;

View File

@ -1,5 +1,6 @@
use std::io::Read;
use std::str::FromStr;
use std::cell::Cell;
use std::collections::HashMap;
use rustc_serialize::base64::FromBase64;
use rustc_serialize::json::Json;
@ -48,7 +49,7 @@ impl Genesis {
Genesis {
block: stream.out(),
state: state
state: state
}
}
@ -56,12 +57,12 @@ impl Genesis {
fn load_genesis_json(json: &Json, state_root: &H256) -> (Header, HashMap<Address, Account>) {
// once we commit ourselves to some json parsing library (serde?)
// move it to proper data structure
let empty_list = RlpStream::new_list(0).out();
let empty_list_sha3 = empty_list.sha3();
let empty_data = encode(&"");
let empty_data_sha3 = empty_data.sha3();
let header = Header {
parent_hash: H256::from_str(&json["parentHash"].as_string().unwrap()[2..]).unwrap(),
uncles_hash: empty_list_sha3.clone(),
@ -81,9 +82,10 @@ impl Genesis {
let mixhash = H256::from_str(&json["mixhash"].as_string().unwrap()[2..]).unwrap();
let nonce = H64::from_str(&json["nonce"].as_string().unwrap()[2..]).unwrap();
vec![mixhash.to_vec(), nonce.to_vec()]
}
},
hash: Cell::new(None)
};
let mut state = HashMap::new();
let accounts = json["alloc"].as_object().expect("Missing genesis state");
for (address, acc) in accounts.iter() {

View File

@ -1,4 +1,6 @@
use std::cell::Cell;
use util::hash::*;
use util::sha3::*;
use util::bytes::*;
use util::uint::*;
use util::rlp::*;
@ -28,6 +30,8 @@ pub struct Header {
pub difficulty: U256,
pub seal: Vec<Bytes>,
pub hash: Cell<Option<H256>>, //TODO: make this private
}
impl Header {
@ -50,37 +54,49 @@ impl Header {
difficulty: ZERO_U256.clone(),
seal: vec![],
hash: Cell::new(None),
}
}
pub fn hash(&self) -> H256 {
unimplemented!();
let hash = self.hash.get();
match hash {
Some(h) => h,
None => {
let mut stream = RlpStream::new();
stream.append(self);
let h = stream.raw().sha3();
self.hash.set(Some(h.clone()));
h
}
}
}
}
impl Decodable for Header {
fn decode<D>(decoder: &D) -> Result<Self, DecoderError> where D: Decoder {
let d = try!(decoder.as_list());
let r = decoder.as_rlp();
let mut blockheader = Header {
parent_hash: try!(Decodable::decode(&d[0])),
uncles_hash: try!(Decodable::decode(&d[1])),
author: try!(Decodable::decode(&d[2])),
state_root: try!(Decodable::decode(&d[3])),
transactions_root: try!(Decodable::decode(&d[4])),
receipts_root: try!(Decodable::decode(&d[5])),
log_bloom: try!(Decodable::decode(&d[6])),
difficulty: try!(Decodable::decode(&d[7])),
number: try!(Decodable::decode(&d[8])),
gas_limit: try!(Decodable::decode(&d[9])),
gas_used: try!(Decodable::decode(&d[10])),
timestamp: try!(Decodable::decode(&d[11])),
extra_data: try!(Decodable::decode(&d[12])),
parent_hash: try!(r.val_at(0)),
uncles_hash: try!(r.val_at(1)),
author: try!(r.val_at(2)),
state_root: try!(r.val_at(3)),
transactions_root: try!(r.val_at(4)),
receipts_root: try!(r.val_at(5)),
log_bloom: try!(r.val_at(6)),
difficulty: try!(r.val_at(7)),
number: try!(r.val_at(8)),
gas_limit: try!(r.val_at(9)),
gas_used: try!(r.val_at(10)),
timestamp: try!(r.val_at(11)),
extra_data: try!(r.val_at(12)),
seal: vec![],
hash: Cell::new(Some(r.raw().sha3()))
};
for i in 13..d.len() {
blockheader.seal.push(try!(Decodable::decode(&d[i])));
for i in 13..r.item_count() {
blockheader.seal.push(try!(r.val_at(i)))
}
Ok(blockheader)

View File

@ -1,7 +1,7 @@
use std::collections::{HashSet, HashMap};
use std::cmp::{min, max};
use std::mem::{replace};
use util::network::{PeerId, HandlerIo, PacketId};
use util::network::{PeerId, PacketId};
use util::hash::{H256};
use util::bytes::{Bytes};
use util::uint::{U256};
@ -10,23 +10,7 @@ use util::rlp::rlptraits::{Stream, View};
use util::sha3::Hashable;
use eth::{BlockNumber, BlockChainClient, BlockHeader, BlockStatus, QueueStatus, ImportResult};
use sync::range_collection::{RangeCollection, ToUsize, FromUsize};
pub struct SyncIo<'s, 'h> where 'h:'s {
network: &'s mut HandlerIo<'h>,
chain: &'s mut BlockChainClient
}
impl<'s, 'h> SyncIo<'s, 'h> {
pub fn new(network: &'s mut HandlerIo<'h>, chain: &'s mut BlockChainClient) -> SyncIo<'s,'h> {
SyncIo {
network: network,
chain: chain,
}
}
fn disable_peer(&mut self, peer_id: &PeerId) {
self.network.disable_peer(*peer_id);
}
}
use sync::{SyncIo};
impl ToUsize for BlockNumber {
fn to_usize(&self) -> usize {
@ -106,7 +90,6 @@ pub struct SyncStatus {
enum PeerAsking
{
Nothing,
State,
BlockHeaders,
BlockBodies,
}
@ -213,8 +196,8 @@ impl ChainSync {
self.starting_block = 0;
self.highest_block = 0;
self.have_common_block = false;
io.chain.clear_queue();
self.starting_block = io.chain.info().last_block_number;
io.chain().clear_queue();
self.starting_block = io.chain().info().last_block_number;
self.state = SyncState::NotSynced;
}
@ -263,7 +246,7 @@ impl ChainSync {
if number > self.highest_block {
self.highest_block = number;
}
match io.chain.block_status(&info.hash()) {
match io.chain().block_status(&info.hash()) {
BlockStatus::InChain => {
self.have_common_block = true;
self.last_imported_block = number;
@ -285,7 +268,7 @@ impl ChainSync {
}
}
let hdr = Header {
data: r.at(i).data().to_vec(),
data: r.at(i).raw().to_vec(),
hash: info.hash(),
parent: info.parent_hash,
};
@ -298,7 +281,7 @@ impl ChainSync {
//empty body, just mark as downloaded
let mut body_stream = RlpStream::new_list(2);
body_stream.append_raw(&rlp::EMPTY_LIST_RLP, 1);
body_stream.append_raw(&rlp::EMPTY_LIST_RLP, 1);
body_stream.append_raw(&rlp::NULL_RLP, 1);
self.bodies.insert_item(number, body_stream.out());
}
else {
@ -327,8 +310,8 @@ impl ChainSync {
for i in 0..item_count {
let body: Rlp = r.at(i);
let tx = body.at(0);
let tx_root = ::util::triehash::ordered_trie_root(tx.iter().map(|r| r.data().to_vec()).collect()); //TODO: get rid of vectors here
let uncles = body.at(1).data().sha3();
let tx_root = ::util::triehash::ordered_trie_root(tx.iter().map(|r| r.raw().to_vec()).collect()); //TODO: get rid of vectors here
let uncles = body.at(1).raw().sha3();
let header_id = HeaderId {
transactions_root: tx_root,
uncles: uncles
@ -336,7 +319,7 @@ impl ChainSync {
match self.header_ids.get(&header_id).map(|n| *n) {
Some(n) => {
self.header_ids.remove(&header_id);
self.bodies.insert_item(n, body.data().to_vec());
self.bodies.insert_item(n, body.raw().to_vec());
}
None => {
debug!(target: "sync", "Ignored unknown block body");
@ -351,9 +334,9 @@ impl ChainSync {
fn on_peer_new_block(&mut self, io: &mut SyncIo, peer_id: &PeerId, r: &Rlp) {
let block_rlp = r.at(0);
let header_rlp = block_rlp.at(0);
let h = header_rlp.data().sha3();
let h = header_rlp.raw().sha3();
match io.chain.import_block(block_rlp.data()) {
match io.chain().import_block(block_rlp.raw()) {
ImportResult::AlreadyInChain => {
trace!(target: "sync", "New block already in chain {:?}", h);
},
@ -388,7 +371,7 @@ impl ChainSync {
let hashes = r.iter().map(|item| (item.val_at::<H256>(0), item.val_at::<U256>(1)));
let mut max_height: U256 = From::from(0);
for (h, d) in hashes {
match io.chain.block_status(&h) {
match io.chain().block_status(&h) {
BlockStatus::InChain => {
trace!(target: "sync", "New block hash already in chain {:?}", h);
},
@ -458,7 +441,7 @@ impl ChainSync {
(peer.latest.clone(), peer.difficulty.clone())
};
let td = io.chain.info().pending_total_difficulty;
let td = io.chain().info().pending_total_difficulty;
let syncing_difficulty = max(self.syncing_difficulty, td);
if force || peer_difficulty > syncing_difficulty {
// start sync
@ -476,7 +459,7 @@ impl ChainSync {
fn request_blocks(&mut self, io: &mut SyncIo, peer_id: &PeerId) {
self.clear_peer_download(peer_id);
if io.chain.queue_status().full {
if io.chain().queue_status().full {
self.pause_sync();
return;
}
@ -511,7 +494,7 @@ impl ChainSync {
let mut start = 0usize;
if !self.have_common_block {
// download backwards until common block is found 1 header at a time
start = io.chain.info().last_block_number as usize;
start = io.chain().info().last_block_number as usize;
if !self.headers.is_empty() {
start = min(start, self.headers.range_iter().next().unwrap().0 as usize - 1);
}
@ -585,7 +568,7 @@ impl ChainSync {
block_rlp.append_raw(&headers.1[i].data, 1);
block_rlp.append_raw(&bodies.1[i], 2);
let h = &headers.1[i].hash;
match io.chain.import_block(&block_rlp.out()) {
match io.chain().import_block(&block_rlp.out()) {
ImportResult::AlreadyInChain => {
trace!(target: "sync", "Block already in chain {:?}", h);
self.last_imported_block = headers.0 + i as BlockNumber;
@ -676,7 +659,7 @@ impl ChainSync {
warn!(target:"sync", "Asking {:?} while requesting {:?}", asking, peer.asking);
}
}
match sync.network.send(*peer_id, packet_id, packet) {
match sync.send(*peer_id, packet_id, packet) {
Err(e) => {
warn!(target:"sync", "Error sending request: {:?}", e);
sync.disable_peer(peer_id);
@ -694,13 +677,20 @@ impl ChainSync {
fn send_status(&mut self, io: &mut SyncIo, peer_id: &PeerId) {
let mut packet = RlpStream::new_list(5);
let chain = io.chain.info();
let chain = io.chain().info();
packet.append(&(PROTOCOL_VERSION as u32));
packet.append(&0u32); //TODO: network id
packet.append(&chain.total_difficulty);
packet.append(&chain.last_block_hash);
packet.append(&chain.genesis_hash);
self.send_request(io, peer_id, PeerAsking::State, STATUS_PACKET, packet.out());
//TODO: handle timeout for status request
match io.send(*peer_id, STATUS_PACKET, packet.out()) {
Err(e) => {
warn!(target:"sync", "Error sending status request: {:?}", e);
io.disable_peer(peer_id);
}
Ok(_) => { }
}
}
fn return_block_headers(&self, io: &mut SyncIo, r: &Rlp) {
@ -709,12 +699,12 @@ impl ChainSync {
let max_headers: usize = r.val_at(1);
let skip: usize = r.val_at(2);
let reverse: bool = r.val_at(3);
let last = io.chain.info().last_block_number;
let last = io.chain().info().last_block_number;
let mut number = if r.at(0).size() == 32 {
// id is a hash
let hash: H256 = r.val_at(0);
trace!(target: "sync", "GetBlockHeaders (hash: {}, max: {}, skip: {}, reverse:{})", hash, max_headers, skip, reverse);
match io.chain.block_header(&hash) {
match io.chain().block_header(&hash) {
Some(hdr) => From::from(rlp::decode::<BlockHeader>(&hdr).number),
None => last
}
@ -729,7 +719,7 @@ impl ChainSync {
let mut count = 0;
let mut data = Bytes::new();
while number < last && number > 1 && count < max_count {
match io.chain.block_header_at(number) {
match io.chain().block_header_at(number) {
Some(mut hdr) => {
data.append(&mut hdr);
count += 1;
@ -745,7 +735,7 @@ impl ChainSync {
}
let mut rlp = RlpStream::new_list(count as usize);
rlp.append_raw(&data, count as usize);
io.network.respond(BLOCK_HEADERS_PACKET, rlp.out()).unwrap_or_else(|e|
io.respond(BLOCK_HEADERS_PACKET, rlp.out()).unwrap_or_else(|e|
debug!(target: "sync", "Error sending headers: {:?}", e));
}
@ -759,7 +749,7 @@ impl ChainSync {
let mut added = 0usize;
let mut data = Bytes::new();
for i in 0..count {
match io.chain.block_body(&r.val_at::<H256>(i)) {
match io.chain().block_body(&r.val_at::<H256>(i)) {
Some(mut hdr) => {
data.append(&mut hdr);
added += 1;
@ -769,7 +759,7 @@ impl ChainSync {
}
let mut rlp = RlpStream::new_list(added);
rlp.append_raw(&data, added);
io.network.respond(BLOCK_BODIES_PACKET, rlp.out()).unwrap_or_else(|e|
io.respond(BLOCK_BODIES_PACKET, rlp.out()).unwrap_or_else(|e|
debug!(target: "sync", "Error sending headers: {:?}", e));
}
@ -783,7 +773,7 @@ impl ChainSync {
let mut added = 0usize;
let mut data = Bytes::new();
for i in 0..count {
match io.chain.state_data(&r.val_at::<H256>(i)) {
match io.chain().state_data(&r.val_at::<H256>(i)) {
Some(mut hdr) => {
data.append(&mut hdr);
added += 1;
@ -793,7 +783,7 @@ impl ChainSync {
}
let mut rlp = RlpStream::new_list(added);
rlp.append_raw(&data, added);
io.network.respond(NODE_DATA_PACKET, rlp.out()).unwrap_or_else(|e|
io.respond(NODE_DATA_PACKET, rlp.out()).unwrap_or_else(|e|
debug!(target: "sync", "Error sending headers: {:?}", e));
}
@ -807,7 +797,7 @@ impl ChainSync {
let mut added = 0usize;
let mut data = Bytes::new();
for i in 0..count {
match io.chain.block_receipts(&r.val_at::<H256>(i)) {
match io.chain().block_receipts(&r.val_at::<H256>(i)) {
Some(mut hdr) => {
data.append(&mut hdr);
added += 1;
@ -817,7 +807,7 @@ impl ChainSync {
}
let mut rlp = RlpStream::new_list(added);
rlp.append_raw(&data, added);
io.network.respond(RECEIPTS_PACKET, rlp.out()).unwrap_or_else(|e|
io.respond(RECEIPTS_PACKET, rlp.out()).unwrap_or_else(|e|
debug!(target: "sync", "Error sending headers: {:?}", e));
}
@ -834,7 +824,7 @@ impl ChainSync {
NEW_BLOCK_HASHES_PACKET => self.on_peer_new_hashes(io, peer, &rlp),
GET_NODE_DATA_PACKET => self.return_node_data(io, &rlp),
GET_RECEIPTS_PACKET => self.return_receipts(io, &rlp),
_ => debug!(target: "sync", "Unkown packet {}", packet_id)
_ => debug!(target: "sync", "Unknown packet {}", packet_id)
}
}

View File

@ -1,7 +1,7 @@
use std::sync::{Arc};
use eth::{BlockChainClient};
use util::network::{ProtocolHandler, NetworkService, HandlerIo, TimerToken, PeerId};
use sync::chain::{ChainSync, SyncIo};
use util::network::{ProtocolHandler, NetworkService, HandlerIo, TimerToken, PeerId, PacketId, Error as NetworkError};
use sync::chain::{ChainSync};
mod chain;
mod range_collection;
@ -9,7 +9,6 @@ mod range_collection;
#[cfg(test)]
mod tests;
pub fn new(_service: &mut NetworkService, eth_client: Arc<BlockChainClient+Send+Sized>) -> EthSync {
EthSync {
chain: eth_client,
@ -17,6 +16,45 @@ pub fn new(_service: &mut NetworkService, eth_client: Arc<BlockChainClient+Send+
}
}
pub trait SyncIo {
fn disable_peer(&mut self, peer_id: &PeerId);
fn respond(&mut self, packet_id: PacketId, data: Vec<u8>) -> Result<(), NetworkError>;
fn send(&mut self, peer_id: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), NetworkError>;
fn chain<'s>(&'s mut self) -> &'s mut BlockChainClient;
}
pub struct NetSyncIo<'s, 'h> where 'h:'s {
network: &'s mut HandlerIo<'h>,
chain: &'s mut BlockChainClient
}
impl<'s, 'h> NetSyncIo<'s, 'h> {
pub fn new(network: &'s mut HandlerIo<'h>, chain: &'s mut BlockChainClient) -> NetSyncIo<'s,'h> {
NetSyncIo {
network: network,
chain: chain,
}
}
}
impl<'s, 'h> SyncIo for NetSyncIo<'s, 'h> {
fn disable_peer(&mut self, peer_id: &PeerId) {
self.network.disable_peer(*peer_id);
}
fn respond(&mut self, packet_id: PacketId, data: Vec<u8>) -> Result<(), NetworkError>{
self.network.respond(packet_id, data)
}
fn send(&mut self, peer_id: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), NetworkError>{
self.network.send(peer_id, packet_id, data)
}
fn chain<'a>(&'a mut self) -> &'a mut BlockChainClient {
self.chain
}
}
pub struct EthSync {
chain: Arc<BlockChainClient+Send+Sized>,
sync: ChainSync
@ -34,34 +72,34 @@ impl EthSync {
}
pub fn stop_network(&mut self, io: &mut HandlerIo) {
self.sync.abort(&mut SyncIo::new(io, Arc::get_mut(&mut self.chain).unwrap()));
self.sync.abort(&mut NetSyncIo::new(io, Arc::get_mut(&mut self.chain).unwrap()));
}
pub fn start_network(&mut self, io: &mut HandlerIo) {
self.sync.restart(&mut SyncIo::new(io, Arc::get_mut(&mut self.chain).unwrap()));
self.sync.restart(&mut NetSyncIo::new(io, Arc::get_mut(&mut self.chain).unwrap()));
}
}
impl ProtocolHandler for EthSync {
fn initialize(&mut self, io: &mut HandlerIo) {
self.sync.restart(&mut SyncIo::new(io, Arc::get_mut(&mut self.chain).unwrap()));
self.sync.restart(&mut NetSyncIo::new(io, Arc::get_mut(&mut self.chain).unwrap()));
io.register_timer(1000).unwrap();
}
fn read(&mut self, io: &mut HandlerIo, peer: &PeerId, packet_id: u8, data: &[u8]) {
self.sync.on_packet(&mut SyncIo::new(io, Arc::get_mut(&mut self.chain).unwrap()), peer, packet_id, data);
self.sync.on_packet(&mut NetSyncIo::new(io, Arc::get_mut(&mut self.chain).unwrap()), peer, packet_id, data);
}
fn connected(&mut self, io: &mut HandlerIo, peer: &PeerId) {
self.sync.on_peer_connected(&mut SyncIo::new(io, Arc::get_mut(&mut self.chain).unwrap()), peer);
self.sync.on_peer_connected(&mut NetSyncIo::new(io, Arc::get_mut(&mut self.chain).unwrap()), peer);
}
fn disconnected(&mut self, io: &mut HandlerIo, peer: &PeerId) {
self.sync.on_peer_aborting(&mut SyncIo::new(io, Arc::get_mut(&mut self.chain).unwrap()), peer);
self.sync.on_peer_aborting(&mut NetSyncIo::new(io, Arc::get_mut(&mut self.chain).unwrap()), peer);
}
fn timeout(&mut self, io: &mut HandlerIo, _timer: TimerToken) {
self.sync.maintain_sync(&mut SyncIo::new(io, Arc::get_mut(&mut self.chain).unwrap()));
self.sync.maintain_sync(&mut NetSyncIo::new(io, Arc::get_mut(&mut self.chain).unwrap()));
}
}

View File

@ -1,13 +1,285 @@
use std::collections::HashMap;
use std::collections::{HashMap, VecDeque};
use util::bytes::Bytes;
use util::hash::H256;
use util::hash::{H256, FixedHash};
use util::uint::{U256};
use util::sha3::Hashable;
use util::rlp::{self, Rlp, RlpStream, View, Stream};
use util::network::{PeerId, PacketId, Error as NetworkError};
use eth::{BlockChainClient, BlockStatus, BlockNumber, TreeRoute, BlockQueueStatus, BlockChainInfo, ImportResult, BlockHeader, QueueStatus};
use sync::{SyncIo};
use sync::chain::{ChainSync};
struct TestBlockChainClient {
blocks: Vec<Bytes>,
hashes: HashMap<H256, usize>,
genesis_hash: H256,
last_hash: H256,
difficulty: U256
}
impl TestBlockChainClient {
fn new() -> TestBlockChainClient {
let mut client = TestBlockChainClient {
blocks: Vec::new(),
hashes: HashMap::new(),
genesis_hash: H256::new(),
last_hash: H256::new(),
difficulty: From::from(0),
};
client.add_blocks(1, true); // add genesis block
client.genesis_hash = client.last_hash;
client
}
pub fn add_blocks(&mut self, count: usize, empty: bool) {
for n in self.blocks.len()..(self.blocks.len() + count) {
let mut header = BlockHeader::new();
header.difficulty = From::from(n);
header.parent_hash = self.last_hash;
header.number = From::from(n);
let mut uncles = RlpStream::new_list(if empty {0} else {1});
if !empty {
uncles.append(&H256::random());
header.uncles_hash = uncles.raw().sha3();
}
let mut rlp = RlpStream::new_list(3);
rlp.append(&header);
rlp.append_raw(uncles.raw(), 1);
rlp.append_raw(&rlp::NULL_RLP, 1);
self.import_block(rlp.raw());
}
}
}
impl BlockChainClient for TestBlockChainClient {
fn block_header(&self, h: &H256) -> Option<Bytes> {
self.hashes.get(h).and_then(|i| self.block_header_at(*i as BlockNumber))
}
fn block_body(&self, h: &H256) -> Option<Bytes> {
self.hashes.get(h).and_then(|i| self.block_body_at(*i as BlockNumber))
}
fn block(&self, h: &H256) -> Option<Bytes> {
self.hashes.get(h).map(|i| self.blocks[*i].clone())
}
fn block_status(&self, h: &H256) -> BlockStatus {
self.hashes.get(h).map(|i| self.block_status_at(*i as BlockNumber)).unwrap_or(BlockStatus::Unknown)
}
fn block_header_at(&self, n: BlockNumber) -> Option<Bytes> {
self.blocks.get(n as usize).map(|r| Rlp::new(r).at(0).raw().to_vec())
}
fn block_body_at(&self, n: BlockNumber) -> Option<Bytes> {
self.blocks.get(n as usize).map(|r| {
let mut stream = RlpStream::new_list(2);
stream.append_raw(Rlp::new(&r).at(1).raw(), 1);
stream.append_raw(Rlp::new(&r).at(2).raw(), 1);
stream.out()
})
}
fn block_at(&self, n: BlockNumber) -> Option<Bytes> {
self.blocks.get(n as usize).map(|b| b.clone())
}
fn block_status_at(&self, n: BlockNumber) -> BlockStatus {
if (n as usize) < self.blocks.len() {
BlockStatus::InChain
} else {
BlockStatus::Unknown
}
}
fn tree_route(&self, _from: &H256, _to: &H256) -> TreeRoute {
TreeRoute {
blocks: Vec::new(),
ancestor: H256::new(),
index: 0
}
}
fn state_data(&self, _h: &H256) -> Option<Bytes> {
None
}
fn block_receipts(&self, _h: &H256) -> Option<Bytes> {
None
}
fn import_block(&mut self, b: &[u8]) -> ImportResult {
let header = Rlp::new(&b).val_at::<BlockHeader>(0);
if header.number != From::from(self.blocks.len()) {
panic!("Unexpected block number");
}
if !self.blocks.is_empty() {
let parent = Rlp::new(self.blocks.last().unwrap()).val_at::<BlockHeader>(0);
if header.parent_hash != parent.hash() {
panic!("Unexpected block header");
}
}
self.difficulty = self.difficulty + header.difficulty;
self.last_hash = header.hash();
self.hashes.insert(header.hash(), self.blocks.len());
self.blocks.push(b.to_vec());
ImportResult::Queued(QueueStatus::Known)
}
fn queue_status(&self) -> BlockQueueStatus {
BlockQueueStatus {
full: false,
}
}
fn clear_queue(&mut self) {
}
fn info(&self) -> BlockChainInfo {
BlockChainInfo {
total_difficulty: self.difficulty,
pending_total_difficulty: self.difficulty,
genesis_hash: self.genesis_hash,
last_block_hash: self.last_hash,
last_block_number: self.blocks.len() as BlockNumber - 1,
}
}
}
struct TestIo<'p> {
chain: &'p mut TestBlockChainClient,
queue: &'p mut VecDeque<TestPacket>,
sender: Option<PeerId>,
}
impl<'p> TestIo<'p> {
fn new(chain: &'p mut TestBlockChainClient, queue: &'p mut VecDeque<TestPacket>, sender: Option<PeerId>) -> TestIo<'p> {
TestIo {
chain: chain,
queue: queue,
sender: sender
}
}
}
impl<'p> SyncIo for TestIo<'p> {
fn disable_peer(&mut self, _peer_id: &PeerId) {
}
fn respond(&mut self, packet_id: PacketId, data: Vec<u8>) -> Result<(), NetworkError> {
self.queue.push_back(TestPacket {
data: data,
packet_id: packet_id,
recipient: self.sender.unwrap()
});
Ok(())
}
fn send(&mut self, peer_id: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), NetworkError> {
self.queue.push_back(TestPacket {
data: data,
packet_id: packet_id,
recipient: peer_id,
});
Ok(())
}
fn chain<'a>(&'a mut self) -> &'a mut BlockChainClient {
self.chain
}
}
struct TestPacket {
data: Bytes,
packet_id: PacketId,
recipient: PeerId,
}
struct TestPeer {
chain: TestBlockChainClient,
sync: ChainSync,
queue: VecDeque<TestPacket>,
}
struct TestNet {
peers: Vec<TestPeer>
}
impl TestNet {
pub fn new(n: usize) -> TestNet {
let mut net = TestNet {
peers: Vec::new(),
};
for _ in 0..n {
net.peers.push(TestPeer {
chain: TestBlockChainClient::new(),
sync: ChainSync::new(),
queue: VecDeque::new(),
});
}
net
}
pub fn peer(&mut self, i: usize) -> &mut TestPeer {
self.peers.get_mut(i).unwrap()
}
pub fn start(&mut self) {
for peer in 0..self.peers.len() {
for client in 0..self.peers.len() {
if peer != client {
let mut p = self.peers.get_mut(peer).unwrap();
p.sync.on_peer_connected(&mut TestIo::new(&mut p.chain, &mut p.queue, Some(client as PeerId)), &(client as PeerId));
}
}
}
}
pub fn sync_step(&mut self) {
for peer in 0..self.peers.len() {
match self.peers[peer].queue.pop_front() {
Some(packet) => {
let mut p = self.peers.get_mut(packet.recipient).unwrap();
p.sync.on_packet(&mut TestIo::new(&mut p.chain, &mut p.queue, Some(peer as PeerId)), &(peer as PeerId), packet.packet_id, &packet.data);
},
None => {}
}
let mut p = self.peers.get_mut(peer).unwrap();
p.sync.maintain_sync(&mut TestIo::new(&mut p.chain, &mut p.queue, None));
}
}
pub fn sync(&mut self) {
self.start();
while !self.done() {
self.sync_step()
}
}
pub fn done(&self) -> bool {
self.peers.iter().all(|p| p.queue.is_empty())
}
}
#[test]
fn full_sync() {
fn full_sync_two_peers() {
let mut net = TestNet::new(3);
net.peer(1).chain.add_blocks(1000, false);
net.peer(2).chain.add_blocks(1000, false);
net.sync();
assert_eq!(net.peer(0).chain.block_at(50000), net.peer(1).chain.block_at(50000));
}
#[test]
fn full_sync_empty_blocks() {
let mut net = TestNet::new(3);
for n in 0..200 {
net.peer(1).chain.add_blocks(5, n % 2 == 0);
net.peer(2).chain.add_blocks(5, n % 2 == 0);
}
net.sync();
assert_eq!(net.peer(0).chain.block_at(50000), net.peer(1).chain.block_at(50000));
}