This commit is contained in:
arkpar 2016-01-09 18:40:13 +01:00
parent 8cd3aa4b43
commit 10fe6b937f
5 changed files with 112 additions and 74 deletions

View File

@ -5,21 +5,20 @@ extern crate rustc_serialize;
use std::io::*;
use std::env;
use std::sync::Arc;
use rustc_serialize::hex::FromHex;
use util::hash::*;
use util::network::{NetworkService};
use ethcore::client::Client;
use ethcore::sync::EthSync;
use ethcore::spec::Spec;
fn main() {
let mut service = NetworkService::start().unwrap();
//TODO: replace with proper genesis and chain params.
let genesis = "f901fcf901f7a00000000000000000000000000000000000000000000000000000000000000000a01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347948888f1f195afa192cfee860698584c030f4c9db1a07dba07d6b448a186e9612e5f737d1c909dce473e53199901a302c00646d523c1a056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421a056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421b90100000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000008302000080832fefd8808454c98c8142a059262c330941f3fe2a34d16d6e3c7b30d2ceb37c6a0e9a994c494ee1a61d2410885aa4c8bf8e56e264c0c0".from_hex().unwrap();
let frontier = Spec::new_frontier();
let mut dir = env::temp_dir();
dir.push(H32::random().hex());
let client = Arc::new(Client::new(&genesis, &dir));
let sync = Box::new(EthSync::new(client));
service.register_protocol(sync, "eth", &[62u8, 63u8]).expect("Error registering eth protocol handler");
let client = Arc::new(Client::new(&frontier.genesis_block(), &dir));
EthSync::register(&mut service, client);
loop {
let mut cmd = String::new();
stdin().read_line(&mut cmd).unwrap();

View File

@ -1,3 +1,18 @@
///
/// BlockChain synchronization strategy.
/// Syncs to peers and keeps up to date.
/// This implementation uses ethereum protocol v63
///
/// Syncing strategy.
///
/// 1. A peer arrives with a total difficulty better than ours
/// 2. Find a common best block between our an peer chain.
/// Start with out best block and request headers from peer backwards until a common block is found
/// 3. Download headers and block bodies from peers in parallel.
/// As soon as a set of the blocks is fully downloaded at the head of the queue it is fed to the blockchain
/// 4. Maintain sync by handling NewBlocks/NewHashes messages
///
use std::collections::{HashSet, HashMap};
use std::cmp::{min, max};
use std::mem::{replace};
@ -13,7 +28,7 @@ use client::{BlockNumber, BlockChainClient, BlockStatus, QueueStatus, ImportResu
use views::{HeaderView};
use header::{Header as BlockHeader};
use sync::range_collection::{RangeCollection, ToUsize, FromUsize};
use sync::{SyncIo};
use sync::io::SyncIo;
impl ToUsize for BlockNumber {
fn to_usize(&self) -> usize {
@ -52,7 +67,7 @@ const GET_RECEIPTS_PACKET: u8 = 0x0f;
const RECEIPTS_PACKET: u8 = 0x10;
struct Header {
///Header data
/// Header data
data: Bytes,
/// Block hash
hash: H256,
@ -81,13 +96,21 @@ pub enum SyncState {
NewBlocks,
}
/// Syncing status and statistics
pub struct SyncStatus {
/// State
pub state: SyncState,
/// Syncing protocol version. That's the maximum protocol version we connect to.
pub protocol_version: u8,
/// BlockChain height for the moment the sync started.
pub start_block_number: BlockNumber,
/// Last fully downloaded and imported block number.
pub last_imported_block_number: BlockNumber,
/// Highest block number in the download queue.
pub highest_block_number: BlockNumber,
/// Total number of blocks for the sync process.
pub blocks_total: usize,
/// Number of blocks downloaded so far.
pub blocks_received: usize,
}
@ -178,11 +201,7 @@ impl ChainSync {
self.peers.clear();
}
/// @returns true is Sync is in progress
pub fn is_syncing(&self) -> bool {
self.state != SyncState::Idle
}
/// Rest sync. Clear all downloaded data but keep the queue
fn reset(&mut self) {
self.downloading_headers.clear();
self.downloading_bodies.clear();
@ -389,6 +408,7 @@ impl ChainSync {
Ok(())
}
/// Handles NewHashes packet. Initiates headers download for any unknown hashes.
fn on_peer_new_hashes(&mut self, io: &mut SyncIo, peer_id: &PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> {
if self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer").asking != PeerAsking::Nothing {
trace!(target: "sync", "Ignoring new hashes since we're already downloading.");
@ -432,6 +452,7 @@ impl ChainSync {
self.continue_sync(io);
}
/// Called when a new peer is connected
pub fn on_peer_connected(&mut self, io: &mut SyncIo, peer: &PeerId) {
trace!(target: "sync", "== Connected {}", peer);
self.send_status(io, peer);
@ -459,6 +480,7 @@ impl ChainSync {
self.state = SyncState::Waiting;
}
/// Find something to do for a peer. Called for a new peer or when a peer is done with it's task.
fn sync_peer(&mut self, io: &mut SyncIo, peer_id: &PeerId, force: bool) {
let (peer_latest, peer_difficulty) = {
let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer");
@ -488,6 +510,7 @@ impl ChainSync {
}
}
/// Find some headers or blocks to download for a peer.
fn request_blocks(&mut self, io: &mut SyncIo, peer_id: &PeerId) {
self.clear_peer_download(peer_id);
@ -572,6 +595,7 @@ impl ChainSync {
}
}
/// Clear all blocks/headers marked as being downloaded by a peer.
fn clear_peer_download(&mut self, peer_id: &PeerId) {
let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer");
for b in &peer.asking_blocks {
@ -581,6 +605,7 @@ impl ChainSync {
peer.asking_blocks.clear();
}
/// Checks if there are blocks fully downloaded that can be imported into the blockchain and does the import.
fn collect_blocks(&mut self, io: &mut SyncIo) {
if !self.have_common_block || self.headers.is_empty() || self.bodies.is_empty() {
return;
@ -647,6 +672,8 @@ impl ChainSync {
}
}
/// Remove downloaded bocks/headers starting from specified number.
/// Used to recover from an error and re-download parts of the chain detected as bad.
fn remove_downloaded_blocks(&mut self, start: BlockNumber) {
for n in self.headers.get_tail(&start) {
match self.headers.find_item(&n) {
@ -667,8 +694,9 @@ impl ChainSync {
self.bodies.remove_tail(&start);
}
/// Request headers from a peer by block hash
fn request_headers_by_hash(&mut self, sync: &mut SyncIo, peer_id: &PeerId, h: &H256, count: usize, skip: usize, reverse: bool) {
trace!(target: "sync", "{}<- GetBlockHeaders: {} entries starting from {}", peer_id, count, h);
trace!(target: "sync", "{} <- GetBlockHeaders: {} entries starting from {}", peer_id, count, h);
let mut rlp = RlpStream::new_list(4);
rlp.append(h);
rlp.append(&count);
@ -677,9 +705,10 @@ impl ChainSync {
self.send_request(sync, peer_id, PeerAsking::BlockHeaders, GET_BLOCK_HEADERS_PACKET, rlp.out());
}
/// Request headers from a peer by block number
fn request_headers_by_number(&mut self, sync: &mut SyncIo, peer_id: &PeerId, n: BlockNumber, count: usize, skip: usize, reverse: bool) {
let mut rlp = RlpStream::new_list(4);
trace!(target: "sync", "{}<- GetBlockHeaders: {} entries starting from {}", peer_id, count, n);
trace!(target: "sync", "{} <- GetBlockHeaders: {} entries starting from {}", peer_id, count, n);
rlp.append(&n);
rlp.append(&count);
rlp.append(&skip);
@ -687,15 +716,17 @@ impl ChainSync {
self.send_request(sync, peer_id, PeerAsking::BlockHeaders, GET_BLOCK_HEADERS_PACKET, rlp.out());
}
/// Request block bodies from a peer
fn request_bodies(&mut self, sync: &mut SyncIo, peer_id: &PeerId, hashes: Vec<H256>) {
let mut rlp = RlpStream::new_list(hashes.len());
trace!(target: "sync", "{}<- GetBlockBodies: {} entries", peer_id, hashes.len());
trace!(target: "sync", "{} <- GetBlockBodies: {} entries", peer_id, hashes.len());
for h in hashes {
rlp.append(&h);
}
self.send_request(sync, peer_id, PeerAsking::BlockBodies, GET_BLOCK_BODIES_PACKET, rlp.out());
}
/// Reset peer status after request is complete.
fn reset_peer_asking(&mut self, peer_id: &PeerId, asking: PeerAsking) {
let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer");
if peer.asking != asking {
@ -706,6 +737,7 @@ impl ChainSync {
}
}
/// Generic request sender
fn send_request(&mut self, sync: &mut SyncIo, peer_id: &PeerId, asking: PeerAsking, packet_id: PacketId, packet: Bytes) {
{
let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer");
@ -726,10 +758,12 @@ impl ChainSync {
}
}
/// Called when peer sends us new transactions
fn on_peer_transactions(&mut self, _io: &mut SyncIo, _peer_id: &PeerId, _r: &UntrustedRlp) -> Result<(), PacketDecodeError> {
Ok(())
}
/// Send Status message
fn send_status(&mut self, io: &mut SyncIo, peer_id: &PeerId) {
let mut packet = RlpStream::new_list(5);
let chain = io.chain().chain_info();
@ -748,6 +782,7 @@ impl ChainSync {
}
}
/// Respond to GetBlockHeaders request
fn return_block_headers(&self, io: &mut SyncIo, r: &UntrustedRlp) -> Result<(), PacketDecodeError> {
// Packet layout:
// [ block: { P , B_32 }, maxHeaders: P, skip: P, reverse: P in { 0 , 1 } ]
@ -804,6 +839,7 @@ impl ChainSync {
Ok(())
}
/// Respond to GetBlockBodies request
fn return_block_bodies(&self, io: &mut SyncIo, r: &UntrustedRlp) -> Result<(), PacketDecodeError> {
let mut count = r.item_count();
if count == 0 {
@ -831,6 +867,7 @@ impl ChainSync {
Ok(())
}
/// Respond to GetNodeData request
fn return_node_data(&self, io: &mut SyncIo, r: &UntrustedRlp) -> Result<(), PacketDecodeError> {
let mut count = r.item_count();
if count == 0 {
@ -856,6 +893,7 @@ impl ChainSync {
Ok(())
}
/// Respond to GetReceipts request
fn return_receipts(&self, io: &mut SyncIo, r: &UntrustedRlp) -> Result<(), PacketDecodeError> {
let mut count = r.item_count();
if count == 0 {
@ -881,6 +919,7 @@ impl ChainSync {
Ok(())
}
/// Dispatch incoming requests and responses
pub fn on_packet(&mut self, io: &mut SyncIo, peer: &PeerId, packet_id: u8, data: &[u8]) {
let rlp = UntrustedRlp::new(data);
let result = match packet_id {
@ -904,6 +943,7 @@ impl ChainSync {
})
}
/// Maintain other peers. Send out any new blocks and transactions
pub fn maintain_sync(&mut self, _io: &mut SyncIo) {
}
}

View File

@ -1,88 +1,73 @@
/// Blockchain sync module
/// Implements ethereum protocol version 63 as specified here:
/// https://github.com/ethereum/wiki/wiki/Ethereum-Wire-Protocol
///
/// Usage example:
///
/// ```rust
/// extern crate ethcore_util as util;
/// extern crate ethcore;
/// use std::env;
/// use std::sync::Arc;
/// use util::network::NetworkService;
/// use ethcore::client::Client;
/// use ethcore::sync::EthSync;
/// use ethcore::spec::Spec;
///
/// fn main() {
/// let mut service = NetworkService::start().unwrap();
/// let frontier = Spec::new_frontier();
/// let dir = env::temp_dir();
/// let client = Arc::new(Client::new(&frontier.genesis_block(), &dir));
/// EthSync::register(&mut service, client);
/// }
/// ```
use std::sync::Arc;
use client::BlockChainClient;
use util::network::{ProtocolHandler, NetworkService, HandlerIo, TimerToken, PeerId, PacketId, Message, Error as NetworkError};
use util::network::{ProtocolHandler, NetworkService, HandlerIo, TimerToken, PeerId, Message};
use sync::chain::ChainSync;
use sync::io::NetSyncIo;
mod chain;
mod io;
mod range_collection;
#[cfg(test)]
mod tests;
pub fn new(_service: &mut NetworkService, eth_client: Arc<BlockChainClient + Send + Sized>) -> EthSync {
EthSync {
chain: eth_client,
sync: ChainSync::new(),
}
}
pub trait SyncIo {
fn disable_peer(&mut self, peer_id: &PeerId);
fn respond(&mut self, packet_id: PacketId, data: Vec<u8>) -> Result<(), NetworkError>;
fn send(&mut self, peer_id: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), NetworkError>;
fn chain<'s>(&'s mut self) -> &'s mut BlockChainClient;
}
pub struct NetSyncIo<'s, 'h> where 'h:'s {
network: &'s mut HandlerIo<'h>,
chain: &'s mut BlockChainClient
}
impl<'s, 'h> NetSyncIo<'s, 'h> {
pub fn new(network: &'s mut HandlerIo<'h>, chain: &'s mut BlockChainClient) -> NetSyncIo<'s,'h> {
NetSyncIo {
network: network,
chain: chain,
}
}
}
impl<'s, 'h> SyncIo for NetSyncIo<'s, 'h> {
fn disable_peer(&mut self, peer_id: &PeerId) {
self.network.disable_peer(*peer_id);
}
fn respond(&mut self, packet_id: PacketId, data: Vec<u8>) -> Result<(), NetworkError>{
self.network.respond(packet_id, data)
}
fn send(&mut self, peer_id: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), NetworkError>{
self.network.send(peer_id, packet_id, data)
}
fn chain<'a>(&'a mut self) -> &'a mut BlockChainClient {
self.chain
}
}
/// Ethereum network protocol handler
pub struct EthSync {
/// Shared blockchain client. TODO: this should evetually become an IPC endpoint
chain: Arc<BlockChainClient + Send + Sized>,
/// Sync strategy
sync: ChainSync
}
pub use self::chain::SyncStatus;
impl EthSync {
pub fn new(chain: Arc<BlockChainClient + Send + Sized>) -> EthSync {
EthSync {
/// Creates and register protocol with the network service
pub fn register(service: &mut NetworkService, chain: Arc<BlockChainClient + Send + Sized>) {
let sync = Box::new(EthSync {
chain: chain,
sync: ChainSync::new(),
}
}
pub fn is_syncing(&self) -> bool {
self.sync.is_syncing()
});
service.register_protocol(sync, "eth", &[62u8, 63u8]).expect("Error registering eth protocol handler");
}
/// Get sync status
pub fn status(&self) -> SyncStatus {
self.sync.status()
}
pub fn stop_network(&mut self, io: &mut HandlerIo) {
/// Stop sync
pub fn stop(&mut self, io: &mut HandlerIo) {
self.sync.abort(&mut NetSyncIo::new(io, Arc::get_mut(&mut self.chain).unwrap()));
}
pub fn start_network(&mut self, io: &mut HandlerIo) {
/// Restart sync
pub fn restart(&mut self, io: &mut HandlerIo) {
self.sync.restart(&mut NetSyncIo::new(io, Arc::get_mut(&mut self.chain).unwrap()));
}
}

View File

@ -1,3 +1,6 @@
/// This module defines a trait for a collection of ranged values and an implementation
/// for this trait over sorted vector.
use std::ops::{Add, Sub, Range};
pub trait ToUsize {
@ -8,16 +11,28 @@ pub trait FromUsize {
fn from_usize(s: usize) -> Self;
}
/// A key-value collection orderd by key with sequential key-value pairs grouped together.
/// Such group is called a range.
/// E.g. a set of collection of 5 pairs {1, a}, {2, b}, {10, x}, {11, y}, {12, z} will be grouped into two ranges: {1, [a,b]}, {10, [x,y,z]}
pub trait RangeCollection<K, V> {
/// Check if the given key is present in the collection.
fn have_item(&self, key: &K) -> bool;
/// Get value by key.
fn find_item(&self, key: &K) -> Option<&V>;
/// Get a range of keys from `key` till the end of the range that has `key`
/// Returns an empty range is key does not exist.
fn get_tail(&mut self, key: &K) -> Range<K>;
/// Remove all elements < `start` in the range that contains `start` - 1
fn remove_head(&mut self, start: &K);
/// Remove all elements >= `start` in the range that contains `start`
fn remove_tail(&mut self, start: &K);
/// Remove all elements >= `tail`
fn insert_item(&mut self, key: K, value: V);
/// Get an iterator over ranges
fn range_iter<'c>(&'c self) -> RangeIterator<'c, K, V>;
}
/// Range iterator. For each range yelds a key for the first element of the range and a vector of values.
pub struct RangeIterator<'c, K:'c, V:'c> {
range: usize,
collection: &'c Vec<(K, Vec<V>)>
@ -72,7 +87,6 @@ impl<K, V> RangeCollection<K, V> for Vec<(K, Vec<V>)> where K: Ord + PartialEq +
}
}
/// Get a range of elements from start till the end of the range
fn get_tail(&mut self, key: &K) -> Range<K> {
let kv = *key;
match self.binary_search_by(|&(k, _)| k.cmp(key).reverse()) {

View File

@ -7,8 +7,8 @@ use util::rlp::{self, Rlp, RlpStream, View, Stream};
use util::network::{PeerId, PacketId, Error as NetworkError};
use client::{BlockChainClient, BlockStatus, BlockNumber, TreeRoute, BlockQueueStatus, BlockChainInfo, ImportResult, QueueStatus};
use header::Header as BlockHeader;
use sync::{SyncIo};
use sync::chain::{ChainSync};
use sync::io::SyncIo;
use sync::chain::ChainSync;
struct TestBlockChainClient {
blocks: HashMap<H256, Bytes>,