2016-02-05 13:40:41 +01:00
// Copyright 2015, 2016 Ethcore (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
2016-05-16 14:41:41 +02:00
2016-02-15 00:51:50 +01:00
///
2016-04-06 10:07:24 +02:00
/// `BlockChain` synchronization strategy.
2016-02-15 00:51:50 +01:00
/// Syncs to peers and keeps up to date.
2016-01-09 18:40:13 +01:00
/// This implementation uses ethereum protocol v63
///
2016-05-16 14:41:41 +02:00
/// Syncing strategy summary.
/// Split the chain into ranges of N blocks each. Download ranges sequentially. Split each range into subchains of M blocks. Download subchains in parallel.
/// State.
/// Sync state consists of the following data:
2016-05-25 17:03:58 +02:00
/// - s: State enum which can be one of the following values: `ChainHead`, `Blocks`, `Idle`
2016-05-16 14:41:41 +02:00
/// - H: A set of downloaded block headers
/// - B: A set of downloaded block bodies
/// - S: Set of block subchain start block hashes to download.
/// - l: Last imported / common block hash
/// - P: A set of connected peers. For each peer we maintain its last known total difficulty and starting block hash being requested if any.
/// General behaviour.
2016-05-25 17:03:58 +02:00
/// We start with all sets empty, l is set to the best block in the block chain, s is set to `ChainHead`.
/// If at any moment a bad block is reported by the block queue, we set s to `ChainHead`, reset l to the best block in the block chain and clear H, B and S.
/// If at any moment P becomes empty, we set s to `ChainHead`, and clear H, B and S.
2016-05-16 14:41:41 +02:00
///
2016-05-25 17:03:58 +02:00
/// Workflow for `ChainHead` state.
/// In this state we try to get subchain headers with a single `GetBlockHeaders` request.
/// On `NewPeer` / On `Restart`:
2016-05-16 14:41:41 +02:00
/// If peer's total difficulty is higher, request N/M headers with interval M+1 starting from l
2016-05-25 17:03:58 +02:00
/// On `BlockHeaders(R)`:
2016-05-16 14:41:41 +02:00
/// If R is empty:
/// If l is equal to genesis block hash or l is more than 1000 blocks behind our best hash:
/// Remove current peer from P. set l to the best block in the block chain. Select peer with maximum total difficulty from P and restart.
/// Else
/// Set l to l’ s parent and restart.
/// Else if we already have all the headers in the block chain or the block queue:
2016-05-25 17:03:58 +02:00
/// Set s to `Idle`,
2016-05-16 14:41:41 +02:00
/// Else
2016-05-25 17:03:58 +02:00
/// Set S to R, set s to `Blocks`.
2016-05-16 14:41:41 +02:00
///
///
/// All other messages are ignored.
2016-05-25 17:03:58 +02:00
/// Workflow for `Blocks` state.
2016-05-16 14:41:41 +02:00
/// In this state we download block headers and bodies from multiple peers.
2016-05-25 17:03:58 +02:00
/// On `NewPeer` / On `Restart`:
2016-05-16 14:41:41 +02:00
/// For all idle peers:
/// Find a set of 256 or less block hashes in H which are not in B and not being downloaded by other peers. If the set is not empty:
/// Request block bodies for the hashes in the set.
/// Else
/// Find an element in S which is not being downloaded by other peers. If found: Request M headers starting from the element.
///
2016-05-25 17:03:58 +02:00
/// On `BlockHeaders(R)`:
2016-05-16 14:41:41 +02:00
/// If R is empty remove current peer from P and restart.
/// Validate received headers. For each header find a parent in H or R or the blockchain. Restart if there is a block with unknown parent.
2016-05-25 17:03:58 +02:00
/// Go to `CollectBlocks`.
2016-05-16 14:41:41 +02:00
///
2016-05-25 17:03:58 +02:00
/// On `BlockBodies(R)`:
2016-05-16 14:41:41 +02:00
/// If R is empty remove current peer from P and restart.
/// Add bodies with a matching header in H to B.
2016-05-25 17:03:58 +02:00
/// Go to `CollectBlocks`.
2016-01-09 18:40:13 +01:00
///
2016-05-25 17:03:58 +02:00
/// `CollectBlocks`:
2016-05-16 14:41:41 +02:00
/// Find a chain of blocks C in H starting from h where h’ s parent equals to l. The chain ends with the first block which does not have a body in B.
/// Add all blocks from the chain to the block queue. Remove them from H and B. Set l to the hash of the last block from C.
/// Update and merge subchain heads in S. For each h in S find a chain of blocks in B starting from h. Remove h from S. if the chain does not include an element from S add the end of the chain to S.
2016-05-25 17:03:58 +02:00
/// If H is empty and S contains a single element set s to `ChainHead`.
2016-05-16 14:41:41 +02:00
/// Restart.
///
/// All other messages are ignored.
/// Workflow for Idle state.
2016-05-25 17:03:58 +02:00
/// On `NewBlock`:
/// Import the block. If the block is unknown set s to `ChainHead` and restart.
/// On `NewHashes`:
/// Set s to `ChainHead` and restart.
2016-05-16 14:41:41 +02:00
///
/// All other messages are ignored.
2016-01-09 18:40:13 +01:00
///
2016-01-10 14:11:23 +01:00
use util ::* ;
2015-12-22 22:19:50 +01:00
use std ::mem ::{ replace } ;
2016-05-16 14:41:41 +02:00
use ethcore ::views ::{ HeaderView , BlockView } ;
2016-02-03 21:42:30 +01:00
use ethcore ::header ::{ BlockNumber , Header as BlockHeader } ;
2016-05-19 11:00:32 +02:00
use ethcore ::client ::{ BlockChainClient , BlockStatus , BlockID , BlockChainInfo } ;
2016-02-03 21:42:30 +01:00
use ethcore ::error ::* ;
2016-03-05 16:46:04 +01:00
use ethcore ::transaction ::SignedTransaction ;
2016-03-08 15:46:44 +01:00
use ethcore ::block ::Block ;
2016-02-03 21:42:30 +01:00
use io ::SyncIo ;
use time ;
2016-02-24 21:23:58 +01:00
use super ::SyncConfig ;
2016-05-16 14:41:41 +02:00
use blocks ::BlockCollection ;
2016-05-31 20:54:02 +02:00
use ethcore ::miner ::{ AccountDetails , TransactionImportResult } ;
2015-12-22 22:19:50 +01:00
2016-05-16 14:41:41 +02:00
known_heap_size! ( 0 , PeerInfo ) ;
2015-12-25 14:55:55 +01:00
2016-01-08 17:52:25 +01:00
type PacketDecodeError = DecoderError ;
2015-12-24 17:18:47 +01:00
const PROTOCOL_VERSION : u8 = 63 u8 ;
const MAX_BODIES_TO_SEND : usize = 256 ;
2015-12-27 00:48:03 +01:00
const MAX_HEADERS_TO_SEND : usize = 512 ;
2015-12-24 17:18:47 +01:00
const MAX_NODE_DATA_TO_SEND : usize = 1024 ;
const MAX_RECEIPTS_TO_SEND : usize = 1024 ;
2016-03-13 15:59:25 +01:00
const MAX_RECEIPTS_HEADERS_TO_SEND : usize = 256 ;
2016-05-27 18:57:12 +02:00
const MAX_HEADERS_TO_REQUEST : usize = 128 ;
2016-04-26 14:04:00 +02:00
const MAX_BODIES_TO_REQUEST : usize = 64 ;
2016-02-05 18:34:08 +01:00
const MIN_PEERS_PROPAGATION : usize = 4 ;
const MAX_PEERS_PROPAGATION : usize = 128 ;
2016-02-06 18:56:21 +01:00
const MAX_PEER_LAG_PROPAGATION : BlockNumber = 20 ;
2016-05-27 18:57:12 +02:00
const SUBCHAIN_SIZE : usize = 64 ;
2015-12-24 17:18:47 +01:00
2015-12-22 22:19:50 +01:00
const STATUS_PACKET : u8 = 0x00 ;
const NEW_BLOCK_HASHES_PACKET : u8 = 0x01 ;
const TRANSACTIONS_PACKET : u8 = 0x02 ;
const GET_BLOCK_HEADERS_PACKET : u8 = 0x03 ;
const BLOCK_HEADERS_PACKET : u8 = 0x04 ;
const GET_BLOCK_BODIES_PACKET : u8 = 0x05 ;
const BLOCK_BODIES_PACKET : u8 = 0x06 ;
const NEW_BLOCK_PACKET : u8 = 0x07 ;
const GET_NODE_DATA_PACKET : u8 = 0x0d ;
const NODE_DATA_PACKET : u8 = 0x0e ;
const GET_RECEIPTS_PACKET : u8 = 0x0f ;
const RECEIPTS_PACKET : u8 = 0x10 ;
2016-05-27 16:49:29 +02:00
const CONNECTION_TIMEOUT_SEC : f64 = 15 f64 ;
2016-02-03 21:42:30 +01:00
2015-12-22 22:19:50 +01:00
#[ derive(Copy, Clone, Eq, PartialEq, Debug) ]
2016-02-03 21:05:04 +01:00
/// Sync state
2015-12-22 22:19:50 +01:00
pub enum SyncState {
2016-05-16 14:41:41 +02:00
/// Downloading subchain heads
ChainHead ,
2015-12-22 22:19:50 +01:00
/// Initial chain sync complete. Waiting for new packets
Idle ,
2016-01-10 23:37:09 +01:00
/// Block downloading paused. Waiting for block queue to process blocks and free some space
2015-12-22 22:19:50 +01:00
Waiting ,
/// Downloading blocks
Blocks ,
2016-05-02 13:13:12 +02:00
/// Downloading blocks learned from `NewHashes` packet
2015-12-22 22:19:50 +01:00
NewBlocks ,
}
2016-01-09 18:40:13 +01:00
/// Syncing status and statistics
2016-03-11 10:17:20 +01:00
#[ derive(Clone) ]
2015-12-22 22:19:50 +01:00
pub struct SyncStatus {
2016-01-09 18:40:13 +01:00
/// State
2015-12-25 14:55:55 +01:00
pub state : SyncState ,
2016-01-09 18:40:13 +01:00
/// Syncing protocol version. That's the maximum protocol version we connect to.
2015-12-25 14:55:55 +01:00
pub protocol_version : u8 ,
2016-03-26 00:22:09 +01:00
/// The underlying p2p network version.
pub network_id : U256 ,
2016-04-06 10:07:24 +02:00
/// `BlockChain` height for the moment the sync started.
2015-12-25 14:55:55 +01:00
pub start_block_number : BlockNumber ,
2016-02-03 21:05:04 +01:00
/// Last fully downloaded and imported block number (if any).
pub last_imported_block_number : Option < BlockNumber > ,
/// Highest block number in the download queue (if any).
pub highest_block_number : Option < BlockNumber > ,
2016-01-09 18:40:13 +01:00
/// Total number of blocks for the sync process.
2016-02-03 21:05:04 +01:00
pub blocks_total : BlockNumber ,
2016-01-09 18:40:13 +01:00
/// Number of blocks downloaded so far.
2016-02-03 21:05:04 +01:00
pub blocks_received : BlockNumber ,
2016-01-22 04:54:38 +01:00
/// Total number of connected peers
pub num_peers : usize ,
/// Total number of active peers
pub num_active_peers : usize ,
2016-02-24 22:37:28 +01:00
/// Heap memory used in bytes
pub mem_used : usize ,
2015-12-22 22:19:50 +01:00
}
2016-02-06 18:56:21 +01:00
#[ derive(PartialEq, Eq, Debug, Clone) ]
2016-01-10 23:37:09 +01:00
/// Peer data type requested
2016-01-08 16:26:00 +01:00
enum PeerAsking {
2015-12-22 22:19:50 +01:00
Nothing ,
BlockHeaders ,
BlockBodies ,
2016-05-16 14:41:41 +02:00
Heads ,
2015-12-22 22:19:50 +01:00
}
2016-02-06 18:56:21 +01:00
#[ derive(Clone) ]
2016-01-10 23:37:09 +01:00
/// Syncing peer information
2015-12-22 22:19:50 +01:00
struct PeerInfo {
2016-01-10 23:37:09 +01:00
/// eth protocol version
2015-12-22 22:19:50 +01:00
protocol_version : u32 ,
2016-01-10 23:37:09 +01:00
/// Peer chain genesis hash
2015-12-22 22:19:50 +01:00
genesis : H256 ,
2016-02-03 21:05:04 +01:00
/// Peer network id
2015-12-22 22:19:50 +01:00
network_id : U256 ,
2016-01-10 23:37:09 +01:00
/// Peer best block hash
2016-02-14 17:10:55 +01:00
latest_hash : H256 ,
/// Peer best block number if known
latest_number : Option < BlockNumber > ,
2016-05-16 14:41:41 +02:00
/// Peer total difficulty if known
difficulty : Option < U256 > ,
2016-01-10 23:37:09 +01:00
/// Type of data currenty being requested from peer.
2015-12-22 22:19:50 +01:00
asking : PeerAsking ,
2016-01-10 23:37:09 +01:00
/// A set of block numbers being requested
2016-05-16 14:41:41 +02:00
asking_blocks : Vec < H256 > ,
2016-02-12 13:07:02 +01:00
/// Holds requested header hash if currently requesting block header by hash
asking_hash : Option < H256 > ,
2016-02-03 21:42:30 +01:00
/// Request timestamp
ask_time : f64 ,
2015-12-22 22:19:50 +01:00
}
2016-01-10 23:37:09 +01:00
/// Blockchain sync handler.
/// See module documentation for more details.
2015-12-22 22:19:50 +01:00
pub struct ChainSync {
/// Sync state
state : SyncState ,
/// Last block number for the start of sync
starting_block : BlockNumber ,
/// Highest block number seen
2016-02-03 21:05:04 +01:00
highest_block : Option < BlockNumber > ,
2016-05-16 14:41:41 +02:00
/// All connected peers
2015-12-22 22:19:50 +01:00
peers : HashMap < PeerId , PeerInfo > ,
2016-05-16 14:41:41 +02:00
/// Peers active for current sync round
active_peers : HashSet < PeerId > ,
/// Downloaded blocks, holds `H`, `B` and `S`
blocks : BlockCollection ,
2015-12-22 22:19:50 +01:00
/// Last impoted block number
2016-05-16 14:41:41 +02:00
last_imported_block : BlockNumber ,
2015-12-27 00:48:03 +01:00
/// Last impoted block hash
2016-05-16 14:41:41 +02:00
last_imported_hash : H256 ,
2015-12-22 22:19:50 +01:00
/// Syncing total difficulty
syncing_difficulty : U256 ,
2016-02-12 13:07:02 +01:00
/// Last propagated block number
2016-03-07 12:16:37 +01:00
last_sent_block_number : BlockNumber ,
2016-02-24 21:23:58 +01:00
/// Max blocks to download ahead
2016-05-16 14:41:41 +02:00
_max_download_ahead_blocks : usize ,
/// Number of blocks imported this round
imported_this_round : Option < usize > ,
2016-02-24 21:23:58 +01:00
/// Network ID
network_id : U256 ,
2015-12-22 22:19:50 +01:00
}
2016-02-04 23:24:36 +01:00
type RlpResponseResult = Result < Option < ( PacketId , RlpStream ) > , PacketDecodeError > ;
2015-12-22 22:19:50 +01:00
impl ChainSync {
2016-01-09 19:13:58 +01:00
/// Create a new instance of syncing strategy.
2016-05-31 20:54:02 +02:00
pub fn new ( config : SyncConfig , chain : & BlockChainClient ) -> ChainSync {
2016-05-16 14:41:41 +02:00
let chain = chain . chain_info ( ) ;
let mut sync = ChainSync {
state : SyncState ::ChainHead ,
2016-05-24 21:56:17 +02:00
starting_block : chain . best_block_number ,
2016-02-03 21:05:04 +01:00
highest_block : None ,
2016-05-16 14:41:41 +02:00
last_imported_block : chain . best_block_number ,
last_imported_hash : chain . best_block_hash ,
2015-12-22 22:19:50 +01:00
peers : HashMap ::new ( ) ,
2016-05-16 14:41:41 +02:00
active_peers : HashSet ::new ( ) ,
blocks : BlockCollection ::new ( ) ,
2015-12-22 22:19:50 +01:00
syncing_difficulty : U256 ::from ( 0 u64 ) ,
2016-03-07 12:16:37 +01:00
last_sent_block_number : 0 ,
2016-05-16 14:41:41 +02:00
imported_this_round : None ,
_max_download_ahead_blocks : max ( MAX_HEADERS_TO_REQUEST , config . max_download_ahead_blocks ) ,
2016-02-24 21:23:58 +01:00
network_id : config . network_id ,
2016-05-16 14:41:41 +02:00
} ;
sync . reset ( ) ;
sync
2015-12-22 22:19:50 +01:00
}
/// @returns Synchonization status
pub fn status ( & self ) -> SyncStatus {
SyncStatus {
state : self . state . clone ( ) ,
protocol_version : 63 ,
2016-03-26 00:22:09 +01:00
network_id : self . network_id ,
2015-12-22 22:19:50 +01:00
start_block_number : self . starting_block ,
2016-05-16 14:41:41 +02:00
last_imported_block_number : Some ( self . last_imported_block ) ,
2016-05-24 21:56:17 +02:00
highest_block_number : self . highest_block . map ( | n | max ( n , self . last_imported_block ) ) ,
2016-05-16 14:41:41 +02:00
blocks_received : if self . last_imported_block > self . starting_block { self . last_imported_block - self . starting_block } else { 0 } ,
2016-02-29 14:59:10 +01:00
blocks_total : match self . highest_block { Some ( x ) if x > self . starting_block = > x - self . starting_block , _ = > 0 } ,
2016-01-22 04:54:38 +01:00
num_peers : self . peers . len ( ) ,
num_active_peers : self . peers . values ( ) . filter ( | p | p . asking ! = PeerAsking ::Nothing ) . count ( ) ,
2016-03-01 22:30:23 +01:00
mem_used :
2016-02-24 22:37:28 +01:00
// TODO: https://github.com/servo/heapsize/pull/50
2016-03-01 22:30:23 +01:00
//+ self.downloading_bodies.heap_size_of_children()
2016-05-16 14:41:41 +02:00
//+ self.downloading_headers.heap_size_of_children()
self . blocks . heap_size ( )
+ self . peers . heap_size_of_children ( ) ,
2015-12-22 22:19:50 +01:00
}
}
/// Abort all sync activity
pub fn abort ( & mut self , io : & mut SyncIo ) {
self . restart ( io ) ;
self . peers . clear ( ) ;
}
2016-03-11 11:16:49 +01:00
#[ cfg_attr(feature= " dev " , allow(for_kv_map)) ] // Because it's not possible to get `values_mut()`
2016-05-16 14:41:41 +02:00
/// Reset sync. Clear all downloaded data but keep the queue
2015-12-22 22:19:50 +01:00
fn reset ( & mut self ) {
2016-05-16 14:41:41 +02:00
self . blocks . clear ( ) ;
2016-01-17 15:56:09 +01:00
for ( _ , ref mut p ) in & mut self . peers {
2015-12-22 22:19:50 +01:00
p . asking_blocks . clear ( ) ;
2016-02-12 13:07:02 +01:00
p . asking_hash = None ;
2015-12-22 22:19:50 +01:00
}
self . syncing_difficulty = From ::from ( 0 u64 ) ;
self . state = SyncState ::Idle ;
2016-05-16 14:41:41 +02:00
self . blocks . clear ( ) ;
self . active_peers = self . peers . keys ( ) . cloned ( ) . collect ( ) ;
2015-12-22 22:19:50 +01:00
}
/// Restart sync
pub fn restart ( & mut self , io : & mut SyncIo ) {
2016-05-16 14:41:41 +02:00
trace! ( target : " sync " , " Restarting " ) ;
2015-12-22 22:19:50 +01:00
self . reset ( ) ;
2016-05-16 14:41:41 +02:00
self . start_sync_round ( io ) ;
self . continue_sync ( io ) ;
}
/// Remove peer from active peer set
fn deactivate_peer ( & mut self , io : & mut SyncIo , peer_id : PeerId ) {
self . active_peers . remove ( & peer_id ) ;
if self . active_peers . is_empty ( ) {
trace! ( target : " sync " , " No more active peers " ) ;
if self . state = = SyncState ::ChainHead {
self . complete_sync ( ) ;
} else {
self . restart ( io ) ;
}
}
2015-12-22 22:19:50 +01:00
}
2016-03-26 12:00:05 +01:00
/// Restart sync after bad block has been detected. May end up re-downloading up to QUEUE_SIZE blocks
2016-05-16 14:41:41 +02:00
fn restart_on_bad_block ( & mut self , io : & mut SyncIo ) {
2016-03-26 12:00:05 +01:00
// Do not assume that the block queue/chain still has our last_imported_block
2016-05-16 14:41:41 +02:00
let chain = io . chain ( ) . chain_info ( ) ;
self . last_imported_block = chain . best_block_number ;
self . last_imported_hash = chain . best_block_hash ;
self . restart ( io ) ;
2016-03-26 12:00:05 +01:00
}
2016-05-16 14:41:41 +02:00
2015-12-22 22:19:50 +01:00
/// Called by peer to report status
2016-01-14 19:03:48 +01:00
fn on_peer_status ( & mut self , io : & mut SyncIo , peer_id : PeerId , r : & UntrustedRlp ) -> Result < ( ) , PacketDecodeError > {
2016-02-06 21:08:20 +01:00
let peer = PeerInfo {
2016-01-08 17:52:25 +01:00
protocol_version : try ! ( r . val_at ( 0 ) ) ,
network_id : try ! ( r . val_at ( 1 ) ) ,
2016-05-16 14:41:41 +02:00
difficulty : Some ( try ! ( r . val_at ( 2 ) ) ) ,
2016-02-14 17:10:55 +01:00
latest_hash : try ! ( r . val_at ( 3 ) ) ,
latest_number : None ,
2016-01-08 17:52:25 +01:00
genesis : try ! ( r . val_at ( 4 ) ) ,
2015-12-22 22:19:50 +01:00
asking : PeerAsking ::Nothing ,
asking_blocks : Vec ::new ( ) ,
2016-02-12 13:07:02 +01:00
asking_hash : None ,
2016-02-03 21:42:30 +01:00
ask_time : 0 f64 ,
2015-12-22 22:19:50 +01:00
} ;
2015-12-25 14:55:55 +01:00
2016-02-14 17:10:55 +01:00
trace! ( target : " sync " , " New peer {} (protocol: {}, network: {:?}, difficulty: {:?}, latest:{}, genesis:{}) " , peer_id , peer . protocol_version , peer . network_id , peer . difficulty , peer . latest_hash , peer . genesis ) ;
2016-02-03 21:05:04 +01:00
2016-02-12 13:07:02 +01:00
if self . peers . contains_key ( & peer_id ) {
warn! ( " Unexpected status packet from {}:{} " , peer_id , io . peer_info ( peer_id ) ) ;
return Ok ( ( ) ) ;
}
2016-01-10 14:11:23 +01:00
let chain_info = io . chain ( ) . chain_info ( ) ;
if peer . genesis ! = chain_info . genesis_hash {
io . disable_peer ( peer_id ) ;
2016-05-16 14:41:41 +02:00
trace! ( target : " sync " , " Peer {} genesis hash mismatch (ours: {}, theirs: {}) " , peer_id , chain_info . genesis_hash , peer . genesis ) ;
2016-01-10 14:11:23 +01:00
return Ok ( ( ) ) ;
}
2016-02-24 21:23:58 +01:00
if peer . network_id ! = self . network_id {
2016-01-10 14:11:23 +01:00
io . disable_peer ( peer_id ) ;
2016-05-16 14:41:41 +02:00
trace! ( target : " sync " , " Peer {} network id mismatch (ours: {}, theirs: {}) " , peer_id , self . network_id , peer . network_id ) ;
2016-01-10 14:11:23 +01:00
return Ok ( ( ) ) ;
}
2015-12-25 14:55:55 +01:00
2016-02-12 13:07:02 +01:00
self . peers . insert ( peer_id . clone ( ) , peer ) ;
2016-05-16 14:41:41 +02:00
self . active_peers . insert ( peer_id . clone ( ) ) ;
2016-02-14 01:03:48 +01:00
debug! ( target : " sync " , " Connected {}:{} " , peer_id , io . peer_info ( peer_id ) ) ;
2015-12-22 22:19:50 +01:00
self . sync_peer ( io , peer_id , false ) ;
2016-01-08 17:52:25 +01:00
Ok ( ( ) )
2015-12-22 22:19:50 +01:00
}
2016-03-11 11:16:49 +01:00
#[ cfg_attr(feature= " dev " , allow(cyclomatic_complexity)) ]
2015-12-22 22:19:50 +01:00
/// Called by peer once it has new block headers during sync
2016-01-14 19:03:48 +01:00
fn on_peer_block_headers ( & mut self , io : & mut SyncIo , peer_id : PeerId , r : & UntrustedRlp ) -> Result < ( ) , PacketDecodeError > {
2016-05-16 14:41:41 +02:00
self . clear_peer_download ( peer_id ) ;
let expected_asking = if self . state = = SyncState ::ChainHead { PeerAsking ::Heads } else { PeerAsking ::BlockHeaders } ;
if ! self . reset_peer_asking ( peer_id , expected_asking ) {
trace! ( target : " sync " , " Ignored unexpected headers " ) ;
self . continue_sync ( io ) ;
return Ok ( ( ) ) ;
}
2015-12-22 22:19:50 +01:00
let item_count = r . item_count ( ) ;
2016-01-08 16:26:00 +01:00
trace! ( target : " sync " , " {} -> BlockHeaders ({} entries) " , peer_id , item_count ) ;
2016-05-16 14:41:41 +02:00
if self . state = = SyncState ::Idle {
2015-12-22 22:19:50 +01:00
trace! ( target : " sync " , " Ignored unexpected block headers " ) ;
2016-05-16 14:41:41 +02:00
self . continue_sync ( io ) ;
2016-01-08 17:52:25 +01:00
return Ok ( ( ) ) ;
2015-12-22 22:19:50 +01:00
}
2016-01-08 16:26:00 +01:00
if self . state = = SyncState ::Waiting {
2015-12-22 22:19:50 +01:00
trace! ( target : " sync " , " Ignored block headers while waiting " ) ;
2016-05-16 14:41:41 +02:00
self . continue_sync ( io ) ;
return Ok ( ( ) ) ;
}
if item_count = = 0 & & ( self . state = = SyncState ::Blocks | | self . state = = SyncState ::NewBlocks ) {
self . deactivate_peer ( io , peer_id ) ; //TODO: is this too harsh?
2016-01-08 17:52:25 +01:00
return Ok ( ( ) ) ;
2015-12-22 22:19:50 +01:00
}
2015-12-24 17:18:47 +01:00
2016-05-16 14:41:41 +02:00
let mut headers = Vec ::new ( ) ;
let mut hashes = Vec ::new ( ) ;
2015-12-24 17:18:47 +01:00
for i in 0 .. item_count {
2016-01-08 17:52:25 +01:00
let info : BlockHeader = try ! ( r . val_at ( i ) ) ;
let number = BlockNumber ::from ( info . number ) ;
2016-05-16 14:41:41 +02:00
if self . blocks . contains ( & info . hash ( ) ) {
trace! ( target : " sync " , " Skipping existing block header {} ({:?}) " , number , info . hash ( ) ) ;
2015-12-24 17:18:47 +01:00
continue ;
}
2016-02-03 21:05:04 +01:00
if self . highest_block = = None | | number > self . highest_block . unwrap ( ) {
self . highest_block = Some ( number ) ;
2015-12-24 17:18:47 +01:00
}
2016-01-08 17:52:25 +01:00
let hash = info . hash ( ) ;
2016-05-19 11:00:32 +02:00
match io . chain ( ) . block_status ( BlockID ::Hash ( hash . clone ( ) ) ) {
2016-03-12 14:54:44 +01:00
BlockStatus ::InChain | BlockStatus ::Queued = > {
2016-05-16 14:41:41 +02:00
match self . state {
SyncState ::Blocks | SyncState ::NewBlocks = > trace! ( target : " sync " , " Header already in chain {} ({}) " , number , hash ) ,
_ = > trace! ( target : " sync " , " Unexpected header already in chain {} ({}), state = {:?} " , number , hash , self . state ) ,
2016-03-11 18:50:29 +01:00
}
2016-05-16 14:41:41 +02:00
headers . push ( try ! ( r . at ( i ) ) . as_raw ( ) . to_vec ( ) ) ;
hashes . push ( hash ) ;
2015-12-24 17:18:47 +01:00
} ,
2016-05-16 14:41:41 +02:00
BlockStatus ::Bad = > {
warn! ( target : " sync " , " Bad header {} ({}) from {}: {}, state = {:?} " , number , hash , peer_id , io . peer_info ( peer_id ) , self . state ) ;
io . disable_peer ( peer_id ) ;
return Ok ( ( ) ) ;
} ,
BlockStatus ::Unknown = > {
headers . push ( try ! ( r . at ( i ) ) . as_raw ( ) . to_vec ( ) ) ;
hashes . push ( hash ) ;
2015-12-24 17:18:47 +01:00
}
}
}
2016-05-16 14:41:41 +02:00
match self . state {
SyncState ::ChainHead = > {
if headers . is_empty ( ) {
// peer is not on our chain
// track back and try again
self . imported_this_round = Some ( 0 ) ;
self . start_sync_round ( io ) ;
} else {
// TODO: validate heads better. E.g. check that there is enough distance between blocks.
trace! ( target : " sync " , " Received {} subchain heads, proceeding to download " , headers . len ( ) ) ;
self . blocks . reset_to ( hashes ) ;
self . state = SyncState ::Blocks ;
}
} ,
SyncState ::Blocks | SyncState ::NewBlocks | SyncState ::Waiting = > {
trace! ( target : " sync " , " Inserted {} headers " , headers . len ( ) ) ;
self . blocks . insert_headers ( headers ) ;
} ,
_ = > trace! ( target : " sync " , " Unexpected headers({}) from {} ({}), state = {:?} " , headers . len ( ) , peer_id , io . peer_info ( peer_id ) , self . state )
}
2015-12-27 00:48:03 +01:00
self . collect_blocks ( io ) ;
self . continue_sync ( io ) ;
2016-01-08 17:52:25 +01:00
Ok ( ( ) )
2015-12-22 22:19:50 +01:00
}
/// Called by peer once it has new block bodies
2016-01-14 19:03:48 +01:00
fn on_peer_block_bodies ( & mut self , io : & mut SyncIo , peer_id : PeerId , r : & UntrustedRlp ) -> Result < ( ) , PacketDecodeError > {
2016-05-16 14:41:41 +02:00
self . clear_peer_download ( peer_id ) ;
2015-12-27 00:48:03 +01:00
self . reset_peer_asking ( peer_id , PeerAsking ::BlockBodies ) ;
2015-12-24 17:18:47 +01:00
let item_count = r . item_count ( ) ;
2016-01-08 16:26:00 +01:00
trace! ( target : " sync " , " {} -> BlockBodies ({} entries) " , peer_id , item_count ) ;
2016-05-16 14:41:41 +02:00
if item_count = = 0 {
self . deactivate_peer ( io , peer_id ) ;
}
else if self . state ! = SyncState ::Blocks & & self . state ! = SyncState ::NewBlocks & & self . state ! = SyncState ::Waiting {
2015-12-24 17:18:47 +01:00
trace! ( target : " sync " , " Ignored unexpected block bodies " ) ;
}
2016-05-16 14:41:41 +02:00
else if self . state = = SyncState ::Waiting {
2015-12-24 17:18:47 +01:00
trace! ( target : " sync " , " Ignored block bodies while waiting " ) ;
2016-03-29 23:29:43 +02:00
}
2016-05-16 14:41:41 +02:00
else
{
let mut bodies = Vec ::with_capacity ( item_count ) ;
for i in 0 .. item_count {
bodies . push ( try ! ( r . at ( i ) ) . as_raw ( ) . to_vec ( ) ) ;
2015-12-24 17:18:47 +01:00
}
2016-05-16 14:41:41 +02:00
self . blocks . insert_bodies ( bodies ) ;
self . collect_blocks ( io ) ;
2015-12-24 17:18:47 +01:00
}
self . continue_sync ( io ) ;
2016-01-08 17:52:25 +01:00
Ok ( ( ) )
2015-12-22 22:19:50 +01:00
}
/// Called by peer once it has new block bodies
2016-03-11 11:16:49 +01:00
#[ cfg_attr(feature= " dev " , allow(cyclomatic_complexity)) ]
2016-01-14 19:03:48 +01:00
fn on_peer_new_block ( & mut self , io : & mut SyncIo , peer_id : PeerId , r : & UntrustedRlp ) -> Result < ( ) , PacketDecodeError > {
2016-01-08 17:52:25 +01:00
let block_rlp = try ! ( r . at ( 0 ) ) ;
let header_rlp = try ! ( block_rlp . at ( 0 ) ) ;
2016-01-08 16:00:32 +01:00
let h = header_rlp . as_raw ( ) . sha3 ( ) ;
2016-01-08 16:26:00 +01:00
trace! ( target : " sync " , " {} -> NewBlock ({}) " , peer_id , h ) ;
2016-05-16 14:41:41 +02:00
if self . state ! = SyncState ::Idle {
2016-03-15 01:22:58 +01:00
trace! ( target : " sync " , " NewBlock ignored while seeking " ) ;
2016-03-15 18:14:52 +01:00
return Ok ( ( ) ) ;
2016-03-15 01:22:58 +01:00
}
2016-02-07 22:08:15 +01:00
let header : BlockHeader = try ! ( header_rlp . as_val ( ) ) ;
2016-05-16 14:41:41 +02:00
let mut unknown = false ;
2016-02-12 13:07:02 +01:00
{
2016-02-12 14:20:18 +01:00
let peer = self . peers . get_mut ( & peer_id ) . unwrap ( ) ;
2016-02-14 17:10:55 +01:00
peer . latest_hash = header . hash ( ) ;
peer . latest_number = Some ( header . number ( ) ) ;
2016-02-12 13:07:02 +01:00
}
2016-05-16 14:41:41 +02:00
if header . number < = self . last_imported_block + 1 {
2016-01-17 23:07:58 +01:00
match io . chain ( ) . import_block ( block_rlp . as_raw ( ) . to_vec ( ) ) {
2016-03-01 00:02:48 +01:00
Err ( Error ::Import ( ImportError ::AlreadyInChain ) ) = > {
2016-01-10 15:08:57 +01:00
trace! ( target : " sync " , " New block already in chain {:?} " , h ) ;
} ,
2016-03-01 00:02:48 +01:00
Err ( Error ::Import ( ImportError ::AlreadyQueued ) ) = > {
2016-01-10 15:08:57 +01:00
trace! ( target : " sync " , " New block already queued {:?} " , h ) ;
} ,
2016-01-27 13:28:15 +01:00
Ok ( _ ) = > {
2016-05-16 14:41:41 +02:00
if header . number = = self . last_imported_block + 1 {
self . last_imported_block = header . number ;
self . last_imported_hash = header . hash ( ) ;
2016-03-04 19:11:44 +01:00
}
2016-03-15 01:22:58 +01:00
trace! ( target : " sync " , " New block queued {:?} ({}) " , h , header . number ) ;
2016-01-10 15:08:57 +01:00
} ,
2016-03-01 00:02:48 +01:00
Err ( Error ::Block ( BlockError ::UnknownParent ( p ) ) ) = > {
2016-02-03 21:42:30 +01:00
unknown = true ;
2016-03-01 00:02:48 +01:00
trace! ( target : " sync " , " New block with unknown parent ({:?}) {:?} " , p , h ) ;
2016-02-03 21:42:30 +01:00
} ,
2016-01-10 23:37:09 +01:00
Err ( e ) = > {
debug! ( target : " sync " , " Bad new block {:?} : {:?} " , h , e ) ;
2016-01-10 15:08:57 +01:00
io . disable_peer ( peer_id ) ;
2015-12-24 17:18:47 +01:00
}
2016-01-10 15:08:57 +01:00
} ;
2016-02-02 12:12:32 +01:00
}
2016-05-16 14:41:41 +02:00
else {
2016-02-03 21:42:30 +01:00
unknown = true ;
}
if unknown {
2016-05-16 14:41:41 +02:00
trace! ( target : " sync " , " New unknown block {:?} " , h ) ;
2016-01-10 23:37:09 +01:00
//TODO: handle too many unknown blocks
let difficulty : U256 = try ! ( r . val_at ( 1 ) ) ;
2016-05-16 14:41:41 +02:00
if let Some ( ref mut peer ) = self . peers . get_mut ( & peer_id ) {
if peer . difficulty . map_or ( true , | pd | difficulty > pd ) {
//self.state = SyncState::ChainHead;
peer . difficulty = Some ( difficulty ) ;
trace! ( target : " sync " , " Received block {:?} with no known parent. Peer needs syncing... " , h ) ;
}
2016-01-10 23:37:09 +01:00
}
2016-05-16 14:41:41 +02:00
self . sync_peer ( io , peer_id , true ) ;
2016-01-10 23:37:09 +01:00
}
2016-01-08 17:52:25 +01:00
Ok ( ( ) )
2015-12-22 22:19:50 +01:00
}
2016-05-02 13:13:12 +02:00
/// Handles `NewHashes` packet. Initiates headers download for any unknown hashes.
2016-01-14 19:03:48 +01:00
fn on_peer_new_hashes ( & mut self , io : & mut SyncIo , peer_id : PeerId , r : & UntrustedRlp ) -> Result < ( ) , PacketDecodeError > {
2016-05-16 14:41:41 +02:00
if self . state ! = SyncState ::Idle {
2015-12-24 17:18:47 +01:00
trace! ( target : " sync " , " Ignoring new hashes since we're already downloading. " ) ;
2016-01-08 17:52:25 +01:00
return Ok ( ( ) ) ;
2015-12-24 17:18:47 +01:00
}
2016-01-08 16:26:00 +01:00
trace! ( target : " sync " , " {} -> NewHashes ({} entries) " , peer_id , r . item_count ( ) ) ;
2016-02-12 13:07:02 +01:00
let hashes = r . iter ( ) . map ( | item | ( item . val_at ::< H256 > ( 0 ) , item . val_at ::< BlockNumber > ( 1 ) ) ) ;
let mut max_height : BlockNumber = 0 ;
2016-05-16 14:41:41 +02:00
let mut new_hashes = Vec ::new ( ) ;
2016-01-08 17:52:25 +01:00
for ( rh , rd ) in hashes {
let h = try ! ( rh ) ;
let d = try ! ( rd ) ;
2016-05-16 14:41:41 +02:00
if self . blocks . is_downloading ( & h ) {
2016-02-12 13:07:02 +01:00
continue ;
}
2016-05-19 11:00:32 +02:00
match io . chain ( ) . block_status ( BlockID ::Hash ( h . clone ( ) ) ) {
2015-12-24 17:18:47 +01:00
BlockStatus ::InChain = > {
trace! ( target : " sync " , " New block hash already in chain {:?} " , h ) ;
} ,
2016-01-10 23:37:09 +01:00
BlockStatus ::Queued = > {
2015-12-24 17:18:47 +01:00
trace! ( target : " sync " , " New hash block already queued {:?} " , h ) ;
} ,
BlockStatus ::Unknown = > {
2016-05-16 14:41:41 +02:00
new_hashes . push ( h . clone ( ) ) ;
2015-12-24 17:18:47 +01:00
if d > max_height {
2016-02-12 13:07:02 +01:00
trace! ( target : " sync " , " New unknown block hash {:?} " , h ) ;
2016-02-12 14:20:18 +01:00
let peer = self . peers . get_mut ( & peer_id ) . unwrap ( ) ;
2016-02-14 17:10:55 +01:00
peer . latest_hash = h . clone ( ) ;
peer . latest_number = Some ( d ) ;
2015-12-24 17:18:47 +01:00
max_height = d ;
}
} ,
2016-05-16 14:41:41 +02:00
BlockStatus ::Bad = > {
2015-12-24 17:18:47 +01:00
debug! ( target : " sync " , " Bad new block hash {:?} " , h ) ;
io . disable_peer ( peer_id ) ;
2016-01-08 17:52:25 +01:00
return Ok ( ( ) ) ;
2015-12-24 17:18:47 +01:00
}
}
2016-01-08 17:52:25 +01:00
} ;
2016-02-12 13:07:02 +01:00
if max_height ! = 0 {
2016-05-16 14:41:41 +02:00
trace! ( target : " sync " , " Downloading blocks for new hashes " ) ;
self . blocks . reset_to ( new_hashes ) ;
self . state = SyncState ::NewBlocks ;
2016-01-23 18:44:45 +01:00
self . sync_peer ( io , peer_id , true ) ;
}
2016-01-08 17:52:25 +01:00
Ok ( ( ) )
2015-12-22 22:19:50 +01:00
}
/// Called by peer when it is disconnecting
2016-01-14 19:03:48 +01:00
pub fn on_peer_aborting ( & mut self , io : & mut SyncIo , peer : PeerId ) {
2016-05-16 14:41:41 +02:00
trace! ( target : " sync " , " == Disconnecting {}: {} " , peer , io . peer_info ( peer ) ) ;
2016-01-10 14:11:23 +01:00
if self . peers . contains_key ( & peer ) {
2016-02-14 01:03:48 +01:00
debug! ( target : " sync " , " Disconnected {} " , peer ) ;
2016-01-10 14:11:23 +01:00
self . clear_peer_download ( peer ) ;
2016-01-15 12:26:04 +01:00
self . peers . remove ( & peer ) ;
2016-05-16 14:41:41 +02:00
self . active_peers . remove ( & peer ) ;
2016-01-10 14:11:23 +01:00
self . continue_sync ( io ) ;
}
2015-12-24 17:18:47 +01:00
}
2016-01-09 18:40:13 +01:00
/// Called when a new peer is connected
2016-01-14 19:03:48 +01:00
pub fn on_peer_connected ( & mut self , io : & mut SyncIo , peer : PeerId ) {
2016-05-16 14:41:41 +02:00
trace! ( target : " sync " , " == Connected {}: {} " , peer , io . peer_info ( peer ) ) ;
2016-02-26 11:37:06 +01:00
if let Err ( e ) = self . send_status ( io ) {
2016-03-13 10:33:55 +01:00
debug! ( target :" sync " , " Error sending status request: {:?} " , e ) ;
2016-02-26 11:37:06 +01:00
io . disable_peer ( peer ) ;
}
2015-12-22 22:19:50 +01:00
}
2015-12-27 02:27:15 +01:00
/// Resume downloading
2015-12-24 17:18:47 +01:00
fn continue_sync ( & mut self , io : & mut SyncIo ) {
2016-05-16 14:41:41 +02:00
let mut peers : Vec < ( PeerId , U256 ) > = self . peers . iter ( ) . map ( | ( k , p ) | ( * k , p . difficulty . unwrap_or_else ( U256 ::zero ) ) ) . collect ( ) ;
2015-12-27 02:27:15 +01:00
peers . sort_by ( | & ( _ , d1 ) , & ( _ , d2 ) | d1 . cmp ( & d2 ) . reverse ( ) ) ; //TODO: sort by rating
for ( p , _ ) in peers {
2016-05-16 14:41:41 +02:00
if self . active_peers . contains ( & p ) {
self . sync_peer ( io , p , false ) ;
}
}
2016-05-27 18:57:12 +02:00
if self . state ! = SyncState ::Waiting & & ! self . peers . values ( ) . any ( | p | p . asking ! = PeerAsking ::Nothing ) {
2016-05-16 14:41:41 +02:00
self . complete_sync ( ) ;
2015-12-24 17:18:47 +01:00
}
2015-12-22 22:19:50 +01:00
}
2016-05-16 14:41:41 +02:00
/// Called after all blocks have been downloaded
2015-12-22 22:19:50 +01:00
fn complete_sync ( & mut self ) {
2015-12-27 00:48:03 +01:00
trace! ( target : " sync " , " Sync complete " ) ;
2015-12-24 17:18:47 +01:00
self . reset ( ) ;
self . state = SyncState ::Idle ;
2015-12-22 22:19:50 +01:00
}
/// Enter waiting state
fn pause_sync ( & mut self ) {
2015-12-25 14:55:55 +01:00
trace! ( target : " sync " , " Block queue full, pausing sync " ) ;
2015-12-24 17:18:47 +01:00
self . state = SyncState ::Waiting ;
2015-12-22 22:19:50 +01:00
}
2016-01-09 18:40:13 +01:00
/// Find something to do for a peer. Called for a new peer or when a peer is done with it's task.
2016-01-14 19:03:48 +01:00
fn sync_peer ( & mut self , io : & mut SyncIo , peer_id : PeerId , force : bool ) {
2015-12-22 22:19:50 +01:00
let ( peer_latest , peer_difficulty ) = {
2016-02-12 14:20:18 +01:00
let peer = self . peers . get_mut ( & peer_id ) . unwrap ( ) ;
2015-12-27 00:48:03 +01:00
if peer . asking ! = PeerAsking ::Nothing {
2015-12-22 22:19:50 +01:00
return ;
}
2015-12-27 00:48:03 +01:00
if self . state = = SyncState ::Waiting {
2016-05-27 18:57:12 +02:00
trace! ( target : " sync " , " Waiting for the block queue " ) ;
2015-12-22 22:19:50 +01:00
return ;
}
2016-02-14 17:10:55 +01:00
( peer . latest_hash . clone ( ) , peer . difficulty . clone ( ) )
2015-12-22 22:19:50 +01:00
} ;
2016-05-16 14:41:41 +02:00
let chain_info = io . chain ( ) . chain_info ( ) ;
let td = chain_info . pending_total_difficulty ;
2015-12-22 22:19:50 +01:00
let syncing_difficulty = max ( self . syncing_difficulty , td ) ;
2016-05-16 14:41:41 +02:00
if force | | self . state = = SyncState ::NewBlocks | | peer_difficulty . map_or ( true , | pd | pd > syncing_difficulty ) {
match self . state {
SyncState ::Idle = > {
if self . last_imported_block < chain_info . best_block_number {
self . last_imported_block = chain_info . best_block_number ;
self . last_imported_hash = chain_info . best_block_hash ;
}
trace! ( target : " sync " , " Starting sync with {} " , peer_id ) ;
self . start_sync_round ( io ) ;
self . sync_peer ( io , peer_id , force ) ;
} ,
SyncState ::ChainHead = > {
// Request subchain headers
trace! ( target : " sync " , " Starting sync with better chain " ) ;
let last = self . last_imported_hash . clone ( ) ;
2016-05-27 18:57:12 +02:00
self . request_headers_by_hash ( io , peer_id , & last , SUBCHAIN_SIZE , MAX_HEADERS_TO_REQUEST - 1 , false , PeerAsking ::Heads ) ;
2016-05-16 14:41:41 +02:00
} ,
SyncState ::Blocks | SyncState ::NewBlocks = > {
if io . chain ( ) . block_status ( BlockID ::Hash ( peer_latest ) ) = = BlockStatus ::Unknown {
self . request_blocks ( io , peer_id , false ) ;
}
}
SyncState ::Waiting = > ( )
2015-12-22 22:19:50 +01:00
}
}
2016-02-03 21:05:04 +01:00
}
2016-05-16 14:41:41 +02:00
fn start_sync_round ( & mut self , io : & mut SyncIo ) {
self . state = SyncState ::ChainHead ;
trace! ( target : " sync " , " Starting round (last imported count = {:?}, block = {:?} " , self . imported_this_round , self . last_imported_block ) ;
2016-05-27 18:57:12 +02:00
// Check if need to retract to find the common block. The problem is that the peers still return headers by hash even
// from the non-canonical part of the tree. So we also retract if nothing has been imported last round.
2016-05-16 14:41:41 +02:00
if self . imported_this_round . is_some ( ) & & self . imported_this_round . unwrap ( ) = = 0 & & self . last_imported_block > 0 {
match io . chain ( ) . block_hash ( BlockID ::Number ( self . last_imported_block - 1 ) ) {
Some ( h ) = > {
self . last_imported_block - = 1 ;
self . last_imported_hash = h ;
trace! ( target : " sync " , " Searching common header {} ({}) " , self . last_imported_block , self . last_imported_hash ) ;
}
None = > {
// TODO: get hash by number from the block queue
trace! ( target : " sync " , " Could not revert to previous block, last: {} ({}) " , self . last_imported_block , self . last_imported_hash ) ;
2015-12-22 22:19:50 +01:00
}
}
}
2016-05-16 14:41:41 +02:00
self . imported_this_round = None ;
2016-03-01 22:03:29 +01:00
}
/// Find some headers or blocks to download for a peer.
fn request_blocks ( & mut self , io : & mut SyncIo , peer_id : PeerId , ignore_others : bool ) {
self . clear_peer_download ( peer_id ) ;
if io . chain ( ) . queue_info ( ) . is_full ( ) {
self . pause_sync ( ) ;
return ;
}
// check to see if we need to download any block bodies first
2016-05-16 14:41:41 +02:00
let needed_bodies = self . blocks . needed_bodies ( MAX_BODIES_TO_REQUEST , ignore_others ) ;
2015-12-22 22:19:50 +01:00
if ! needed_bodies . is_empty ( ) {
2016-05-16 14:41:41 +02:00
replace ( & mut self . peers . get_mut ( & peer_id ) . unwrap ( ) . asking_blocks , needed_bodies . clone ( ) ) ;
self . request_bodies ( io , peer_id , needed_bodies ) ;
2016-03-01 22:03:29 +01:00
return ;
2015-12-22 22:19:50 +01:00
}
2016-03-01 22:03:29 +01:00
2016-05-16 14:41:41 +02:00
// find subchain to download
if let Some ( ( h , count ) ) = self . blocks . needed_headers ( MAX_HEADERS_TO_REQUEST , ignore_others ) {
replace ( & mut self . peers . get_mut ( & peer_id ) . unwrap ( ) . asking_blocks , vec! [ h . clone ( ) ] ) ;
self . request_headers_by_hash ( io , peer_id , & h , count , 0 , false , PeerAsking ::BlockHeaders ) ;
2016-03-01 22:03:29 +01:00
}
2015-12-22 22:19:50 +01:00
}
2016-01-09 18:40:13 +01:00
/// Clear all blocks/headers marked as being downloaded by a peer.
2016-01-14 19:03:48 +01:00
fn clear_peer_download ( & mut self , peer_id : PeerId ) {
2016-02-12 14:20:18 +01:00
let peer = self . peers . get_mut ( & peer_id ) . unwrap ( ) ;
2016-05-16 14:41:41 +02:00
match peer . asking {
PeerAsking ::BlockHeaders | PeerAsking ::Heads = > {
for b in & peer . asking_blocks {
2016-05-25 17:03:58 +02:00
self . blocks . clear_header_download ( b ) ;
2016-05-16 14:41:41 +02:00
}
} ,
PeerAsking ::BlockBodies = > {
for b in & peer . asking_blocks {
2016-05-25 17:03:58 +02:00
self . blocks . clear_body_download ( b ) ;
2016-05-16 14:41:41 +02:00
}
} ,
_ = > ( ) ,
2015-12-24 17:18:47 +01:00
}
peer . asking_blocks . clear ( ) ;
2015-12-22 22:19:50 +01:00
}
2015-12-24 17:18:47 +01:00
2016-01-09 18:40:13 +01:00
/// Checks if there are blocks fully downloaded that can be imported into the blockchain and does the import.
2015-12-24 17:18:47 +01:00
fn collect_blocks ( & mut self , io : & mut SyncIo ) {
let mut restart = false ;
2016-05-16 14:41:41 +02:00
let mut imported = HashSet ::new ( ) ;
let blocks = self . blocks . drain ( ) ;
let count = blocks . len ( ) ;
for block in blocks {
let number = BlockView ::new ( & block ) . header_view ( ) . number ( ) ;
let h = BlockView ::new ( & block ) . header_view ( ) . sha3 ( ) ;
// Perform basic block verification
if ! Block ::is_good ( & block ) {
debug! ( target : " sync " , " Bad block rlp {:?} : {:?} " , h , block ) ;
restart = true ;
break ;
2015-12-24 17:18:47 +01:00
}
2016-05-16 14:41:41 +02:00
match io . chain ( ) . import_block ( block ) {
Err ( Error ::Import ( ImportError ::AlreadyInChain ) ) = > {
2016-05-27 18:57:12 +02:00
self . last_imported_block = number ;
self . last_imported_hash = h . clone ( ) ;
2016-05-16 14:41:41 +02:00
trace! ( target : " sync " , " Block already in chain {:?} " , h ) ;
} ,
Err ( Error ::Import ( ImportError ::AlreadyQueued ) ) = > {
2016-05-27 18:57:12 +02:00
self . last_imported_block = number ;
self . last_imported_hash = h . clone ( ) ;
2016-05-16 14:41:41 +02:00
trace! ( target : " sync " , " Block already queued {:?} " , h ) ;
} ,
Ok ( _ ) = > {
trace! ( target : " sync " , " Block queued {:?} " , h ) ;
self . last_imported_block = number ;
self . last_imported_hash = h . clone ( ) ;
imported . insert ( h . clone ( ) ) ;
} ,
Err ( Error ::Block ( BlockError ::UnknownParent ( _ ) ) ) if self . state = = SyncState ::NewBlocks = > {
trace! ( target : " sync " , " Unknown new block parent, restarting sync " ) ;
break ;
} ,
Err ( e ) = > {
debug! ( target : " sync " , " Bad block {:?} : {:?} " , h , e ) ;
2016-02-03 21:42:30 +01:00
restart = true ;
break ;
}
2015-12-24 17:18:47 +01:00
}
}
2016-05-16 14:41:41 +02:00
trace! ( target : " sync " , " Imported {} of {} " , imported . len ( ) , count ) ;
self . imported_this_round = Some ( self . imported_this_round . unwrap_or ( 0 ) + imported . len ( ) ) ;
2015-12-24 17:18:47 +01:00
if restart {
2016-03-26 12:00:05 +01:00
self . restart_on_bad_block ( io ) ;
2015-12-24 17:18:47 +01:00
return ;
}
2016-05-16 14:41:41 +02:00
if self . blocks . is_empty ( ) {
// complete sync round
trace! ( target : " sync " , " Sync round complete " ) ;
self . restart ( io ) ;
2015-12-24 17:18:47 +01:00
}
2015-12-22 22:19:50 +01:00
}
2015-12-24 17:18:47 +01:00
2016-01-09 18:40:13 +01:00
/// Request headers from a peer by block hash
2016-05-25 17:03:58 +02:00
#[ cfg_attr(feature= " dev " , allow(too_many_arguments)) ]
2016-05-16 14:41:41 +02:00
fn request_headers_by_hash ( & mut self , sync : & mut SyncIo , peer_id : PeerId , h : & H256 , count : usize , skip : usize , reverse : bool , asking : PeerAsking ) {
2016-01-09 18:40:13 +01:00
trace! ( target : " sync " , " {} <- GetBlockHeaders: {} entries starting from {} " , peer_id , count , h ) ;
2015-12-22 22:19:50 +01:00
let mut rlp = RlpStream ::new_list ( 4 ) ;
rlp . append ( h ) ;
rlp . append ( & count ) ;
rlp . append ( & skip ) ;
rlp . append ( & if reverse { 1 u32 } else { 0 u32 } ) ;
2016-05-16 14:41:41 +02:00
self . send_request ( sync , peer_id , asking , GET_BLOCK_HEADERS_PACKET , rlp . out ( ) ) ;
2015-12-22 22:19:50 +01:00
}
2016-01-09 18:40:13 +01:00
/// Request block bodies from a peer
2016-01-14 19:03:48 +01:00
fn request_bodies ( & mut self , sync : & mut SyncIo , peer_id : PeerId , hashes : Vec < H256 > ) {
2015-12-22 22:19:50 +01:00
let mut rlp = RlpStream ::new_list ( hashes . len ( ) ) ;
2016-05-16 14:41:41 +02:00
trace! ( target : " sync " , " {} <- GetBlockBodies: {} entries starting from {:?} " , peer_id , hashes . len ( ) , hashes . first ( ) ) ;
2015-12-22 22:19:50 +01:00
for h in hashes {
rlp . append ( & h ) ;
}
2015-12-25 14:55:55 +01:00
self . send_request ( sync , peer_id , PeerAsking ::BlockBodies , GET_BLOCK_BODIES_PACKET , rlp . out ( ) ) ;
2015-12-22 22:19:50 +01:00
}
2016-01-09 18:40:13 +01:00
/// Reset peer status after request is complete.
2016-05-16 14:41:41 +02:00
fn reset_peer_asking ( & mut self , peer_id : PeerId , asking : PeerAsking ) -> bool {
2016-02-12 14:20:18 +01:00
let peer = self . peers . get_mut ( & peer_id ) . unwrap ( ) ;
2015-12-27 00:48:03 +01:00
if peer . asking ! = asking {
2016-05-16 14:41:41 +02:00
trace! ( target :" sync " , " Asking {:?} while expected {:?} " , peer . asking , asking ) ;
peer . asking = PeerAsking ::Nothing ;
false
2015-12-27 00:48:03 +01:00
}
else {
peer . asking = PeerAsking ::Nothing ;
2016-05-16 14:41:41 +02:00
true
2015-12-27 00:48:03 +01:00
}
}
2016-01-09 18:40:13 +01:00
/// Generic request sender
2016-01-14 19:03:48 +01:00
fn send_request ( & mut self , sync : & mut SyncIo , peer_id : PeerId , asking : PeerAsking , packet_id : PacketId , packet : Bytes ) {
2016-05-27 18:57:12 +02:00
let peer = self . peers . get_mut ( & peer_id ) . unwrap ( ) ;
if peer . asking ! = PeerAsking ::Nothing {
warn! ( target :" sync " , " Asking {:?} while requesting {:?} " , peer . asking , asking ) ;
2015-12-22 22:19:50 +01:00
}
2016-05-27 16:35:52 +02:00
peer . asking = asking ;
peer . ask_time = time ::precise_time_s ( ) ;
if let Err ( e ) = sync . send ( peer_id , packet_id , packet ) {
debug! ( target :" sync " , " Error sending request: {:?} " , e ) ;
sync . disable_peer ( peer_id ) ;
2015-12-22 22:19:50 +01:00
}
}
2015-12-24 17:18:47 +01:00
2016-02-12 13:07:02 +01:00
/// Generic packet sender
fn send_packet ( & mut self , sync : & mut SyncIo , peer_id : PeerId , packet_id : PacketId , packet : Bytes ) {
if let Err ( e ) = sync . send ( peer_id , packet_id , packet ) {
2016-03-07 11:34:07 +01:00
debug! ( target :" sync " , " Error sending packet: {:?} " , e ) ;
2016-02-12 13:07:02 +01:00
sync . disable_peer ( peer_id ) ;
}
}
2016-03-28 18:11:00 +02:00
2016-01-09 18:40:13 +01:00
/// Called when peer sends us new transactions
2016-03-05 16:46:04 +01:00
fn on_peer_transactions ( & mut self , io : & mut SyncIo , peer_id : PeerId , r : & UntrustedRlp ) -> Result < ( ) , PacketDecodeError > {
2016-03-17 12:17:53 +01:00
// accepting transactions once only fully synced
2016-03-17 14:56:19 +01:00
if ! io . is_chain_queue_empty ( ) {
2016-03-17 12:17:53 +01:00
return Ok ( ( ) ) ;
}
2016-03-05 16:46:04 +01:00
let item_count = r . item_count ( ) ;
trace! ( target : " sync " , " {} -> Transactions ({} entries) " , peer_id , item_count ) ;
2016-03-06 11:04:13 +01:00
2016-03-08 15:46:44 +01:00
let mut transactions = Vec ::with_capacity ( item_count ) ;
2016-03-05 16:46:04 +01:00
for i in 0 .. item_count {
let tx : SignedTransaction = try ! ( r . val_at ( i ) ) ;
2016-03-08 15:46:44 +01:00
transactions . push ( tx ) ;
2016-03-05 16:46:04 +01:00
}
2016-03-08 16:23:32 +01:00
let chain = io . chain ( ) ;
2016-03-16 10:40:33 +01:00
let fetch_account = | a : & Address | AccountDetails {
2016-05-26 22:17:55 +02:00
nonce : chain . latest_nonce ( a ) ,
balance : chain . latest_balance ( a ) ,
2016-03-16 10:40:33 +01:00
} ;
2016-05-31 20:54:02 +02:00
let _ = io . chain ( ) . import_transactions ( transactions ) ;
2016-05-16 14:41:41 +02:00
Ok ( ( ) )
2015-12-24 17:18:47 +01:00
}
2016-01-09 18:40:13 +01:00
/// Send Status message
2016-02-26 11:37:06 +01:00
fn send_status ( & mut self , io : & mut SyncIo ) -> Result < ( ) , UtilError > {
2015-12-24 17:18:47 +01:00
let mut packet = RlpStream ::new_list ( 5 ) ;
2016-01-07 20:43:37 +01:00
let chain = io . chain ( ) . chain_info ( ) ;
2015-12-24 17:18:47 +01:00
packet . append ( & ( PROTOCOL_VERSION as u32 ) ) ;
2016-02-24 21:23:58 +01:00
packet . append ( & self . network_id ) ;
2015-12-24 17:18:47 +01:00
packet . append ( & chain . total_difficulty ) ;
2016-01-07 16:08:12 +01:00
packet . append ( & chain . best_block_hash ) ;
2015-12-24 17:18:47 +01:00
packet . append ( & chain . genesis_hash ) ;
2016-02-26 11:37:06 +01:00
io . respond ( STATUS_PACKET , packet . out ( ) )
2015-12-24 17:18:47 +01:00
}
2016-01-09 18:40:13 +01:00
/// Respond to GetBlockHeaders request
2016-02-04 23:24:36 +01:00
fn return_block_headers ( io : & SyncIo , r : & UntrustedRlp ) -> RlpResponseResult {
2015-12-24 17:18:47 +01:00
// Packet layout:
// [ block: { P , B_32 }, maxHeaders: P, skip: P, reverse: P in { 0 , 1 } ]
2016-01-08 17:52:25 +01:00
let max_headers : usize = try ! ( r . val_at ( 1 ) ) ;
let skip : usize = try ! ( r . val_at ( 2 ) ) ;
let reverse : bool = try ! ( r . val_at ( 3 ) ) ;
2016-01-07 20:43:37 +01:00
let last = io . chain ( ) . chain_info ( ) . best_block_number ;
2016-01-08 17:52:25 +01:00
let mut number = if try ! ( r . at ( 0 ) ) . size ( ) = = 32 {
2015-12-24 17:18:47 +01:00
// id is a hash
2016-01-08 17:52:25 +01:00
let hash : H256 = try ! ( r . val_at ( 0 ) ) ;
2015-12-27 00:48:03 +01:00
trace! ( target : " sync " , " -> GetBlockHeaders (hash: {}, max: {}, skip: {}, reverse:{}) " , hash , max_headers , skip , reverse ) ;
2016-05-19 11:00:32 +02:00
match io . chain ( ) . block_header ( BlockID ::Hash ( hash ) ) {
2015-12-28 12:03:05 +01:00
Some ( hdr ) = > From ::from ( HeaderView ::new ( & hdr ) . number ( ) ) ,
2016-05-16 14:41:41 +02:00
None = > return Ok ( Some ( ( BLOCK_HEADERS_PACKET , RlpStream ::new_list ( 0 ) ) ) ) //no such header, return nothing
2015-12-24 17:18:47 +01:00
}
2016-05-16 14:41:41 +02:00
} else {
2016-01-08 17:52:25 +01:00
trace! ( target : " sync " , " -> GetBlockHeaders (number: {}, max: {}, skip: {}, reverse:{}) " , try ! ( r . val_at ::< BlockNumber > ( 0 ) ) , max_headers , skip , reverse ) ;
try ! ( r . val_at ( 0 ) )
2015-12-24 17:18:47 +01:00
} ;
2015-12-27 00:48:03 +01:00
if reverse {
number = min ( last , number ) ;
} else {
2016-02-24 12:36:41 +01:00
number = max ( 0 , number ) ;
2015-12-27 00:48:03 +01:00
}
2015-12-24 17:18:47 +01:00
let max_count = min ( MAX_HEADERS_TO_SEND , max_headers ) ;
let mut count = 0 ;
let mut data = Bytes ::new ( ) ;
2016-01-08 16:26:00 +01:00
let inc = ( skip + 1 ) as BlockNumber ;
2016-02-24 13:01:45 +01:00
while number < = last & & count < max_count {
2016-05-19 11:00:32 +02:00
if let Some ( mut hdr ) = io . chain ( ) . block_header ( BlockID ::Number ( number ) ) {
2016-01-17 15:56:09 +01:00
data . append ( & mut hdr ) ;
count + = 1 ;
2015-12-24 17:18:47 +01:00
}
2015-12-25 14:55:55 +01:00
if reverse {
2016-02-24 13:01:45 +01:00
if number < = inc | | number = = 0 {
2016-01-08 16:26:00 +01:00
break ;
}
number - = inc ;
2015-12-25 14:55:55 +01:00
}
else {
2016-01-08 16:26:00 +01:00
number + = inc ;
2015-12-25 14:55:55 +01:00
}
2015-12-24 17:18:47 +01:00
}
let mut rlp = RlpStream ::new_list ( count as usize ) ;
rlp . append_raw ( & data , count as usize ) ;
2015-12-27 00:48:03 +01:00
trace! ( target : " sync " , " -> GetBlockHeaders: returned {} entries " , count ) ;
2016-02-04 23:24:36 +01:00
Ok ( Some ( ( BLOCK_HEADERS_PACKET , rlp ) ) )
2015-12-24 17:18:47 +01:00
}
2016-01-09 18:40:13 +01:00
/// Respond to GetBlockBodies request
2016-02-04 23:24:36 +01:00
fn return_block_bodies ( io : & SyncIo , r : & UntrustedRlp ) -> RlpResponseResult {
2015-12-24 17:18:47 +01:00
let mut count = r . item_count ( ) ;
if count = = 0 {
debug! ( target : " sync " , " Empty GetBlockBodies request, ignoring. " ) ;
2016-02-04 23:24:36 +01:00
return Ok ( None ) ;
2015-12-24 17:18:47 +01:00
}
2015-12-27 00:48:03 +01:00
trace! ( target : " sync " , " -> GetBlockBodies: {} entries " , count ) ;
2015-12-24 17:18:47 +01:00
count = min ( count , MAX_BODIES_TO_SEND ) ;
let mut added = 0 usize ;
let mut data = Bytes ::new ( ) ;
for i in 0 .. count {
2016-05-19 11:00:32 +02:00
if let Some ( mut hdr ) = io . chain ( ) . block_body ( BlockID ::Hash ( try ! ( r . val_at ::< H256 > ( i ) ) ) ) {
2016-01-19 11:10:38 +01:00
data . append ( & mut hdr ) ;
added + = 1 ;
2015-12-24 17:18:47 +01:00
}
}
let mut rlp = RlpStream ::new_list ( added ) ;
rlp . append_raw ( & data , added ) ;
2015-12-27 00:48:03 +01:00
trace! ( target : " sync " , " -> GetBlockBodies: returned {} entries " , added ) ;
2016-02-04 23:24:36 +01:00
Ok ( Some ( ( BLOCK_BODIES_PACKET , rlp ) ) )
2015-12-24 17:18:47 +01:00
}
2016-01-09 18:40:13 +01:00
/// Respond to GetNodeData request
2016-02-04 23:24:36 +01:00
fn return_node_data ( io : & SyncIo , r : & UntrustedRlp ) -> RlpResponseResult {
2015-12-24 17:18:47 +01:00
let mut count = r . item_count ( ) ;
if count = = 0 {
debug! ( target : " sync " , " Empty GetNodeData request, ignoring. " ) ;
2016-02-04 23:24:36 +01:00
return Ok ( None ) ;
2015-12-24 17:18:47 +01:00
}
count = min ( count , MAX_NODE_DATA_TO_SEND ) ;
let mut added = 0 usize ;
let mut data = Bytes ::new ( ) ;
for i in 0 .. count {
2016-01-19 11:10:38 +01:00
if let Some ( mut hdr ) = io . chain ( ) . state_data ( & try ! ( r . val_at ::< H256 > ( i ) ) ) {
data . append ( & mut hdr ) ;
added + = 1 ;
2015-12-24 17:18:47 +01:00
}
}
let mut rlp = RlpStream ::new_list ( added ) ;
rlp . append_raw ( & data , added ) ;
2016-02-04 23:24:36 +01:00
Ok ( Some ( ( NODE_DATA_PACKET , rlp ) ) )
2015-12-24 17:18:47 +01:00
}
2016-02-04 23:24:36 +01:00
fn return_receipts ( io : & SyncIo , rlp : & UntrustedRlp ) -> RlpResponseResult {
let mut count = rlp . item_count ( ) ;
2015-12-24 17:18:47 +01:00
if count = = 0 {
debug! ( target : " sync " , " Empty GetReceipts request, ignoring. " ) ;
2016-02-04 23:24:36 +01:00
return Ok ( None ) ;
2015-12-24 17:18:47 +01:00
}
2016-03-12 19:23:17 +01:00
count = min ( count , MAX_RECEIPTS_HEADERS_TO_SEND ) ;
let mut added_headers = 0 usize ;
let mut added_receipts = 0 usize ;
2015-12-24 17:18:47 +01:00
let mut data = Bytes ::new ( ) ;
for i in 0 .. count {
2016-03-12 19:23:17 +01:00
if let Some ( mut receipts_bytes ) = io . chain ( ) . block_receipts ( & try ! ( rlp . val_at ::< H256 > ( i ) ) ) {
data . append ( & mut receipts_bytes ) ;
added_receipts + = receipts_bytes . len ( ) ;
added_headers + = 1 ;
if added_receipts > MAX_RECEIPTS_TO_SEND { break ; }
2015-12-24 17:18:47 +01:00
}
}
2016-03-12 19:23:17 +01:00
let mut rlp_result = RlpStream ::new_list ( added_headers ) ;
rlp_result . append_raw ( & data , added_headers ) ;
2016-02-04 23:24:36 +01:00
Ok ( Some ( ( RECEIPTS_PACKET , rlp_result ) ) )
}
fn return_rlp < FRlp , FError > ( & self , io : & mut SyncIo , rlp : & UntrustedRlp , rlp_func : FRlp , error_func : FError ) -> Result < ( ) , PacketDecodeError >
where FRlp : Fn ( & SyncIo , & UntrustedRlp ) -> RlpResponseResult ,
FError : FnOnce ( UtilError ) -> String
{
let response = rlp_func ( io , rlp ) ;
match response {
Err ( e ) = > Err ( e ) ,
Ok ( Some ( ( packet_id , rlp_stream ) ) ) = > {
io . respond ( packet_id , rlp_stream . out ( ) ) . unwrap_or_else (
| e | debug! ( target : " sync " , " {:?} " , error_func ( e ) ) ) ;
Ok ( ( ) )
}
_ = > Ok ( ( ) )
}
2015-12-24 17:18:47 +01:00
}
2016-01-09 18:40:13 +01:00
/// Dispatch incoming requests and responses
2016-01-14 19:03:48 +01:00
pub fn on_packet ( & mut self , io : & mut SyncIo , peer : PeerId , packet_id : u8 , data : & [ u8 ] ) {
2016-01-08 17:52:25 +01:00
let rlp = UntrustedRlp ::new ( data ) ;
2016-02-12 14:20:18 +01:00
if packet_id ! = STATUS_PACKET & & ! self . peers . contains_key ( & peer ) {
2016-03-13 10:33:55 +01:00
debug! ( target :" sync " , " Unexpected packet from unregistered peer: {}:{} " , peer , io . peer_info ( peer ) ) ;
2016-02-12 14:20:18 +01:00
return ;
}
2016-01-08 17:52:25 +01:00
let result = match packet_id {
2015-12-24 17:18:47 +01:00
STATUS_PACKET = > self . on_peer_status ( io , peer , & rlp ) ,
TRANSACTIONS_PACKET = > self . on_peer_transactions ( io , peer , & rlp ) ,
BLOCK_HEADERS_PACKET = > self . on_peer_block_headers ( io , peer , & rlp ) ,
BLOCK_BODIES_PACKET = > self . on_peer_block_bodies ( io , peer , & rlp ) ,
NEW_BLOCK_PACKET = > self . on_peer_new_block ( io , peer , & rlp ) ,
NEW_BLOCK_HASHES_PACKET = > self . on_peer_new_hashes ( io , peer , & rlp ) ,
2016-02-04 23:24:36 +01:00
GET_BLOCK_BODIES_PACKET = > self . return_rlp ( io , & rlp ,
ChainSync ::return_block_bodies ,
| e | format! ( " Error sending block bodies: {:?} " , e ) ) ,
GET_BLOCK_HEADERS_PACKET = > self . return_rlp ( io , & rlp ,
ChainSync ::return_block_headers ,
| e | format! ( " Error sending block headers: {:?} " , e ) ) ,
GET_RECEIPTS_PACKET = > self . return_rlp ( io , & rlp ,
2016-02-05 01:01:50 +01:00
ChainSync ::return_receipts ,
| e | format! ( " Error sending receipts: {:?} " , e ) ) ,
2016-02-04 23:24:36 +01:00
GET_NODE_DATA_PACKET = > self . return_rlp ( io , & rlp ,
2016-02-05 01:01:50 +01:00
ChainSync ::return_node_data ,
| e | format! ( " Error sending nodes: {:?} " , e ) ) ,
2016-02-15 00:51:50 +01:00
2016-02-03 21:05:04 +01:00
_ = > {
2016-01-08 17:52:25 +01:00
debug! ( target : " sync " , " Unknown packet {} " , packet_id ) ;
Ok ( ( ) )
}
} ;
result . unwrap_or_else ( | e | {
debug! ( target :" sync " , " {} -> Malformed packet {} : {} " , peer , packet_id , e ) ;
} )
2015-12-24 17:18:47 +01:00
}
2015-12-22 22:19:50 +01:00
2016-02-03 21:42:30 +01:00
pub fn maintain_peers ( & self , io : & mut SyncIo ) {
let tick = time ::precise_time_s ( ) ;
for ( peer_id , peer ) in & self . peers {
if peer . asking ! = PeerAsking ::Nothing & & ( tick - peer . ask_time ) > CONNECTION_TIMEOUT_SEC {
2016-05-27 18:57:12 +02:00
trace! ( target :" sync " , " Timeouted {} " , peer_id ) ;
2016-02-03 21:42:30 +01:00
io . disconnect_peer ( * peer_id ) ;
}
}
}
2016-02-05 18:34:08 +01:00
fn check_resume ( & mut self , io : & mut SyncIo ) {
2016-02-08 12:14:39 +01:00
if ! io . chain ( ) . queue_info ( ) . is_full ( ) & & self . state = = SyncState ::Waiting {
2016-02-11 21:10:41 +01:00
self . state = SyncState ::Blocks ;
2016-02-04 02:28:16 +01:00
self . continue_sync ( io ) ;
}
2015-12-22 22:19:50 +01:00
}
2016-02-05 18:34:08 +01:00
2016-02-07 22:52:56 +01:00
/// creates rlp to send for the tree defined by 'from' and 'to' hashes
2016-02-05 18:34:08 +01:00
fn create_new_hashes_rlp ( chain : & BlockChainClient , from : & H256 , to : & H256 ) -> Option < Bytes > {
match chain . tree_route ( from , to ) {
Some ( route ) = > {
2016-05-24 21:56:17 +02:00
let uncles = chain . find_uncles ( from ) . unwrap_or_else ( Vec ::new ) ;
2016-02-05 18:34:08 +01:00
match route . blocks . len ( ) {
0 = > None ,
_ = > {
2016-05-24 21:56:17 +02:00
let mut blocks = route . blocks ;
blocks . extend ( uncles ) ;
let mut rlp_stream = RlpStream ::new_list ( blocks . len ( ) ) ;
for block_hash in blocks {
2016-02-07 23:39:02 +01:00
let mut hash_rlp = RlpStream ::new_list ( 2 ) ;
2016-05-19 11:00:32 +02:00
let difficulty = chain . block_total_difficulty ( BlockID ::Hash ( block_hash . clone ( ) ) ) . expect ( " Malformed block without a difficulty on the chain! " ) ;
2016-02-07 23:39:02 +01:00
hash_rlp . append ( & block_hash ) ;
hash_rlp . append ( & difficulty ) ;
rlp_stream . append_raw ( & hash_rlp . out ( ) , 1 ) ;
}
Some ( rlp_stream . out ( ) )
2016-02-05 18:34:08 +01:00
}
}
} ,
None = > None
}
}
2016-02-07 22:52:56 +01:00
/// creates latest block rlp for the given client
2016-02-06 21:00:52 +01:00
fn create_latest_block_rlp ( chain : & BlockChainClient ) -> Bytes {
2016-02-07 23:39:02 +01:00
let mut rlp_stream = RlpStream ::new_list ( 2 ) ;
2016-05-19 11:00:32 +02:00
rlp_stream . append_raw ( & chain . block ( BlockID ::Hash ( chain . chain_info ( ) . best_block_hash ) ) . unwrap ( ) , 1 ) ;
2016-02-07 23:39:02 +01:00
rlp_stream . append ( & chain . chain_info ( ) . total_difficulty ) ;
rlp_stream . out ( )
2016-02-06 21:00:52 +01:00
}
2016-02-07 22:52:56 +01:00
/// returns peer ids that have less blocks than our chain
2016-03-04 21:52:03 +01:00
fn get_lagging_peers ( & mut self , chain_info : & BlockChainInfo , io : & SyncIo ) -> Vec < ( PeerId , BlockNumber ) > {
2016-02-05 23:45:25 +01:00
let latest_hash = chain_info . best_block_hash ;
2016-02-06 18:56:21 +01:00
let latest_number = chain_info . best_block_number ;
2016-02-14 17:10:55 +01:00
self . peers . iter_mut ( ) . filter_map ( | ( & id , ref mut peer_info ) |
2016-05-19 11:00:32 +02:00
match io . chain ( ) . block_status ( BlockID ::Hash ( peer_info . latest_hash . clone ( ) ) ) {
2016-02-06 20:16:59 +01:00
BlockStatus ::InChain = > {
2016-02-14 17:10:55 +01:00
if peer_info . latest_number . is_none ( ) {
2016-05-19 11:00:32 +02:00
peer_info . latest_number = Some ( HeaderView ::new ( & io . chain ( ) . block_header ( BlockID ::Hash ( peer_info . latest_hash . clone ( ) ) ) . unwrap ( ) ) . number ( ) ) ;
2016-02-14 17:10:55 +01:00
}
if peer_info . latest_hash ! = latest_hash & & latest_number > peer_info . latest_number . unwrap ( ) {
Some ( ( id , peer_info . latest_number . unwrap ( ) ) )
} else { None }
2016-02-06 20:16:59 +01:00
} ,
2016-02-14 17:10:55 +01:00
_ = > None
2016-02-06 18:56:21 +01:00
} )
2016-02-14 17:10:55 +01:00
. collect ::< Vec < _ > > ( )
2016-02-05 23:45:25 +01:00
}
2016-02-05 18:34:08 +01:00
2016-05-29 00:38:10 +02:00
fn select_lagging_peers ( & mut self , chain_info : & BlockChainInfo , io : & mut SyncIo ) -> Vec < ( PeerId , BlockNumber ) > {
use rand ::Rng ;
let mut lagging_peers = self . get_lagging_peers ( chain_info , io ) ;
// take sqrt(x) peers
let mut count = ( self . peers . len ( ) as f64 ) . powf ( 0.5 ) . round ( ) as usize ;
count = min ( count , MAX_PEERS_PROPAGATION ) ;
count = max ( count , MIN_PEERS_PROPAGATION ) ;
::rand ::thread_rng ( ) . shuffle ( & mut lagging_peers ) ;
lagging_peers . into_iter ( ) . take ( count ) . collect ::< Vec < _ > > ( )
}
2016-02-05 18:34:08 +01:00
2016-05-29 00:38:10 +02:00
/// propagates latest block to lagging peers
fn propagate_blocks ( & mut self , chain_info : & BlockChainInfo , io : & mut SyncIo ) -> usize {
let lucky_peers = self . select_lagging_peers ( chain_info , io ) ;
2016-02-06 21:00:52 +01:00
let mut sent = 0 ;
2016-05-29 00:38:10 +02:00
for ( peer_id , _ ) in lucky_peers {
2016-02-06 21:00:52 +01:00
let rlp = ChainSync ::create_latest_block_rlp ( io . chain ( ) ) ;
2016-02-12 13:07:02 +01:00
self . send_packet ( io , peer_id , NEW_BLOCK_PACKET , rlp ) ;
2016-03-04 21:52:03 +01:00
self . peers . get_mut ( & peer_id ) . unwrap ( ) . latest_hash = chain_info . best_block_hash . clone ( ) ;
self . peers . get_mut ( & peer_id ) . unwrap ( ) . latest_number = Some ( chain_info . best_block_number ) ;
2016-05-17 10:32:05 +02:00
sent + = 1 ;
2016-02-06 21:00:52 +01:00
}
sent
}
2016-03-01 21:31:58 +01:00
/// propagates new known hashes to all peers
2016-03-04 21:52:03 +01:00
fn propagate_new_hashes ( & mut self , chain_info : & BlockChainInfo , io : & mut SyncIo ) -> usize {
2016-05-29 00:38:10 +02:00
let lucky_peers = self . select_lagging_peers ( chain_info , io ) ;
2016-02-05 18:34:08 +01:00
let mut sent = 0 ;
2016-05-19 11:00:32 +02:00
let last_parent = HeaderView ::new ( & io . chain ( ) . block_header ( BlockID ::Hash ( chain_info . best_block_hash . clone ( ) ) ) . unwrap ( ) ) . parent_hash ( ) ;
2016-05-29 00:38:10 +02:00
for ( peer_id , peer_number ) in lucky_peers {
2016-02-14 17:10:55 +01:00
let mut peer_best = self . peers . get ( & peer_id ) . unwrap ( ) . latest_hash . clone ( ) ;
2016-05-29 00:38:10 +02:00
if chain_info . best_block_number - peer_number > MAX_PEER_LAG_PROPAGATION as BlockNumber {
2016-02-17 01:39:16 +01:00
// If we think peer is too far behind just send one latest hash
2016-02-14 17:10:55 +01:00
peer_best = last_parent . clone ( ) ;
}
2016-05-17 10:32:05 +02:00
sent + = match ChainSync ::create_new_hashes_rlp ( io . chain ( ) , & peer_best , & chain_info . best_block_hash ) {
2016-02-05 18:34:08 +01:00
Some ( rlp ) = > {
2016-02-06 20:23:25 +01:00
{
2016-02-12 14:20:18 +01:00
let peer = self . peers . get_mut ( & peer_id ) . unwrap ( ) ;
2016-03-04 21:52:03 +01:00
peer . latest_hash = chain_info . best_block_hash . clone ( ) ;
peer . latest_number = Some ( chain_info . best_block_number ) ;
2016-02-06 20:23:25 +01:00
}
2016-02-12 13:07:02 +01:00
self . send_packet ( io , peer_id , NEW_BLOCK_HASHES_PACKET , rlp ) ;
2016-02-05 18:34:08 +01:00
1
} ,
None = > 0
}
}
sent
}
2016-03-28 18:11:00 +02:00
/// propagates new transactions to all peers
fn propagate_new_transactions ( & mut self , io : & mut SyncIo ) -> usize {
// Early out of nobody to send to.
2016-04-06 10:07:24 +02:00
if self . peers . is_empty ( ) {
2016-03-28 18:11:00 +02:00
return 0 ;
}
2016-05-31 19:01:37 +02:00
let mut transactions = io . chain ( ) . all_transactions ( ) ;
2016-04-06 23:03:07 +02:00
if transactions . is_empty ( ) {
return 0 ;
}
let mut packet = RlpStream ::new_list ( transactions . len ( ) ) ;
let tx_count = transactions . len ( ) ;
for tx in transactions . drain ( .. ) {
packet . append ( & tx ) ;
2016-03-28 18:11:00 +02:00
}
let rlp = packet . out ( ) ;
let lucky_peers = {
// sqrt(x)/x scaled to max u32
let fraction = ( self . peers . len ( ) as f64 ) . powf ( - 0.5 ) . mul ( u32 ::max_value ( ) as f64 ) . round ( ) as u32 ;
let small = self . peers . len ( ) < MIN_PEERS_PROPAGATION ;
let lucky_peers = self . peers . iter ( )
. filter_map ( | ( & p , _ ) | if small | | ::rand ::random ::< u32 > ( ) < fraction { Some ( p . clone ( ) ) } else { None } )
. collect ::< Vec < _ > > ( ) ;
// taking at max of MAX_PEERS_PROPAGATION
2016-04-06 10:07:24 +02:00
lucky_peers . iter ( ) . cloned ( ) . take ( min ( lucky_peers . len ( ) , MAX_PEERS_PROPAGATION ) ) . collect ::< Vec < PeerId > > ( )
2016-03-28 18:11:00 +02:00
} ;
let sent = lucky_peers . len ( ) ;
for peer_id in lucky_peers {
self . send_packet ( io , peer_id , TRANSACTIONS_PACKET , rlp . clone ( ) ) ;
}
2016-04-06 23:03:07 +02:00
trace! ( target : " sync " , " Sent {} transactions to {} peers. " , tx_count , sent ) ;
2016-03-28 18:11:00 +02:00
sent
}
2016-03-07 12:16:37 +01:00
fn propagate_latest_blocks ( & mut self , io : & mut SyncIo ) {
let chain_info = io . chain ( ) . chain_info ( ) ;
if ( ( ( chain_info . best_block_number as i64 ) - ( self . last_sent_block_number as i64 ) ) . abs ( ) as BlockNumber ) < MAX_PEER_LAG_PROPAGATION {
let hashes = self . propagate_new_hashes ( & chain_info , io ) ;
2016-05-27 17:26:50 +02:00
let blocks = self . propagate_blocks ( & chain_info , io ) ;
2016-02-12 13:07:02 +01:00
if blocks ! = 0 | | hashes ! = 0 {
trace! ( target : " sync " , " Sent latest {} blocks and {} hashes to peers. " , blocks , hashes ) ;
}
}
2016-05-27 17:26:50 +02:00
self . propagate_new_transactions ( io ) ;
2016-03-07 12:16:37 +01:00
self . last_sent_block_number = chain_info . best_block_number ;
2016-02-05 18:34:08 +01:00
}
2016-03-05 16:46:04 +01:00
2016-03-07 12:16:37 +01:00
/// Maintain other peers. Send out any new blocks and transactions
pub fn maintain_sync ( & mut self , io : & mut SyncIo ) {
self . check_resume ( io ) ;
}
2016-05-31 20:58:33 +02:00
/// called when block is imported to chain, updates transactions queue and propagates the blocks
pub fn chain_new_blocks ( & mut self , io : & mut SyncIo , _imported : & [ H256 ] , invalid : & [ H256 ] , _enacted : & [ H256 ] , _retracted : & [ H256 ] ) {
if io . is_chain_queue_empty ( ) {
// Propagate latests blocks
self . propagate_latest_blocks ( io ) ;
}
if ! invalid . is_empty ( ) {
trace! ( target : " sync " , " Bad blocks in the queue, restarting " ) ;
self . restart_on_bad_block ( io ) ;
}
}
2015-12-22 22:19:50 +01:00
}
2016-02-04 19:30:31 +01:00
#[ cfg(test) ]
mod tests {
2016-02-04 20:03:14 +01:00
use tests ::helpers ::* ;
use super ::* ;
2016-02-24 21:23:58 +01:00
use ::SyncConfig ;
2016-02-04 20:03:14 +01:00
use util ::* ;
2016-02-05 23:45:25 +01:00
use super ::{ PeerInfo , PeerAsking } ;
2016-03-17 10:20:35 +01:00
use ethcore ::views ::BlockView ;
2016-02-07 22:08:15 +01:00
use ethcore ::header ::* ;
use ethcore ::client ::* ;
2016-05-16 14:41:41 +02:00
use ethcore ::spec ::Spec ;
2016-02-07 22:08:15 +01:00
fn get_dummy_block ( order : u32 , parent_hash : H256 ) -> Bytes {
let mut header = Header ::new ( ) ;
header . gas_limit = x! ( 0 ) ;
header . difficulty = x! ( order * 100 ) ;
header . timestamp = ( order * 10 ) as u64 ;
header . number = order as u64 ;
header . parent_hash = parent_hash ;
header . state_root = H256 ::zero ( ) ;
let mut rlp = RlpStream ::new_list ( 3 ) ;
rlp . append ( & header ) ;
rlp . append_raw ( & rlp ::EMPTY_LIST_RLP , 1 ) ;
rlp . append_raw ( & rlp ::EMPTY_LIST_RLP , 1 ) ;
rlp . out ( )
}
fn get_dummy_blocks ( order : u32 , parent_hash : H256 ) -> Bytes {
let mut rlp = RlpStream ::new_list ( 1 ) ;
rlp . append_raw ( & get_dummy_block ( order , parent_hash ) , 1 ) ;
let difficulty : U256 = x! ( 100 * order ) ;
rlp . append ( & difficulty ) ;
rlp . out ( )
}
2016-02-04 20:03:14 +01:00
2016-02-07 22:20:59 +01:00
fn get_dummy_hashes ( ) -> Bytes {
let mut rlp = RlpStream ::new_list ( 5 ) ;
for _ in 0 .. 5 {
let mut hash_d_rlp = RlpStream ::new_list ( 2 ) ;
let hash : H256 = H256 ::from ( 0 u64 ) ;
let diff : U256 = U256 ::from ( 1 u64 ) ;
hash_d_rlp . append ( & hash ) ;
hash_d_rlp . append ( & diff ) ;
rlp . append_raw ( & hash_d_rlp . out ( ) , 1 ) ;
}
rlp . out ( )
}
2016-02-04 20:03:14 +01:00
2016-02-04 23:24:36 +01:00
#[ test ]
fn return_receipts_empty ( ) {
let mut client = TestBlockChainClient ::new ( ) ;
let mut queue = VecDeque ::new ( ) ;
let io = TestIo ::new ( & mut client , & mut queue , None ) ;
let result = ChainSync ::return_receipts ( & io , & UntrustedRlp ::new ( & [ 0xc0 ] ) ) ;
assert! ( result . is_ok ( ) ) ;
}
2016-02-04 20:03:14 +01:00
#[ test ]
fn return_receipts ( ) {
let mut client = TestBlockChainClient ::new ( ) ;
2016-02-04 23:24:36 +01:00
let mut queue = VecDeque ::new ( ) ;
2016-05-16 14:41:41 +02:00
let mut sync = dummy_sync_with_peer ( H256 ::new ( ) , & client ) ;
2016-02-04 23:58:32 +01:00
let mut io = TestIo ::new ( & mut client , & mut queue , None ) ;
2016-02-04 20:03:14 +01:00
2016-02-04 23:24:36 +01:00
let mut receipt_list = RlpStream ::new_list ( 4 ) ;
receipt_list . append ( & H256 ::from ( " 0000000000000000000000000000000000000000000000005555555555555555 " ) ) ;
receipt_list . append ( & H256 ::from ( " ff00000000000000000000000000000000000000000000000000000000000000 " ) ) ;
receipt_list . append ( & H256 ::from ( " fff0000000000000000000000000000000000000000000000000000000000000 " ) ) ;
receipt_list . append ( & H256 ::from ( " aff0000000000000000000000000000000000000000000000000000000000000 " ) ) ;
2016-02-04 20:03:14 +01:00
2016-02-04 23:58:32 +01:00
let receipts_request = receipt_list . out ( ) ;
2016-02-04 23:24:36 +01:00
// it returns rlp ONLY for hashes started with "f"
2016-02-04 23:58:32 +01:00
let result = ChainSync ::return_receipts ( & io , & UntrustedRlp ::new ( & receipts_request . clone ( ) ) ) ;
2016-02-04 20:03:14 +01:00
assert! ( result . is_ok ( ) ) ;
2016-02-04 23:24:36 +01:00
let rlp_result = result . unwrap ( ) ;
assert! ( rlp_result . is_some ( ) ) ;
// the length of two rlp-encoded receipts
2016-03-12 19:23:17 +01:00
assert_eq! ( 603 , rlp_result . unwrap ( ) . 1. out ( ) . len ( ) ) ;
2016-02-04 23:58:32 +01:00
io . sender = Some ( 2 usize ) ;
2016-02-12 14:20:18 +01:00
sync . on_packet ( & mut io , 0 usize , super ::GET_RECEIPTS_PACKET , & receipts_request ) ;
2016-02-05 00:57:51 +01:00
assert_eq! ( 1 , io . queue . len ( ) ) ;
2016-02-04 20:03:14 +01:00
}
2016-02-04 23:58:32 +01:00
2016-05-16 14:41:41 +02:00
#[ test ]
fn return_block_headers ( ) {
use ethcore ::views ::HeaderView ;
fn make_hash_req ( h : & H256 , count : usize , skip : usize , reverse : bool ) -> Bytes {
let mut rlp = RlpStream ::new_list ( 4 ) ;
rlp . append ( h ) ;
rlp . append ( & count ) ;
rlp . append ( & skip ) ;
rlp . append ( & if reverse { 1 u32 } else { 0 u32 } ) ;
rlp . out ( )
}
fn make_num_req ( n : usize , count : usize , skip : usize , reverse : bool ) -> Bytes {
let mut rlp = RlpStream ::new_list ( 4 ) ;
rlp . append ( & n ) ;
rlp . append ( & count ) ;
rlp . append ( & skip ) ;
rlp . append ( & if reverse { 1 u32 } else { 0 u32 } ) ;
rlp . out ( )
}
fn to_header_vec ( rlp : ::chain ::RlpResponseResult ) -> Vec < Bytes > {
Rlp ::new ( & rlp . unwrap ( ) . unwrap ( ) . 1. out ( ) ) . iter ( ) . map ( | r | r . as_raw ( ) . to_vec ( ) ) . collect ( )
}
let mut client = TestBlockChainClient ::new ( ) ;
client . add_blocks ( 100 , EachBlockWith ::Nothing ) ;
let blocks : Vec < _ > = ( 0 .. 100 ) . map ( | i | ( & client as & BlockChainClient ) . block ( BlockID ::Number ( i as BlockNumber ) ) . unwrap ( ) ) . collect ( ) ;
let headers : Vec < _ > = blocks . iter ( ) . map ( | b | Rlp ::new ( b ) . at ( 0 ) . as_raw ( ) . to_vec ( ) ) . collect ( ) ;
let hashes : Vec < _ > = headers . iter ( ) . map ( | h | HeaderView ::new ( h ) . sha3 ( ) ) . collect ( ) ;
let mut queue = VecDeque ::new ( ) ;
let io = TestIo ::new ( & mut client , & mut queue , None ) ;
let unknown : H256 = H256 ::new ( ) ;
let result = ChainSync ::return_block_headers ( & io , & UntrustedRlp ::new ( & make_hash_req ( & unknown , 1 , 0 , false ) ) ) ;
assert! ( to_header_vec ( result ) . is_empty ( ) ) ;
let result = ChainSync ::return_block_headers ( & io , & UntrustedRlp ::new ( & make_hash_req ( & unknown , 1 , 0 , true ) ) ) ;
assert! ( to_header_vec ( result ) . is_empty ( ) ) ;
let result = ChainSync ::return_block_headers ( & io , & UntrustedRlp ::new ( & make_hash_req ( & hashes [ 2 ] , 1 , 0 , true ) ) ) ;
assert_eq! ( to_header_vec ( result ) , vec! [ headers [ 2 ] . clone ( ) ] ) ;
let result = ChainSync ::return_block_headers ( & io , & UntrustedRlp ::new ( & make_hash_req ( & hashes [ 2 ] , 1 , 0 , false ) ) ) ;
assert_eq! ( to_header_vec ( result ) , vec! [ headers [ 2 ] . clone ( ) ] ) ;
let result = ChainSync ::return_block_headers ( & io , & UntrustedRlp ::new ( & make_hash_req ( & hashes [ 50 ] , 3 , 5 , false ) ) ) ;
assert_eq! ( to_header_vec ( result ) , vec! [ headers [ 50 ] . clone ( ) , headers [ 56 ] . clone ( ) , headers [ 62 ] . clone ( ) ] ) ;
let result = ChainSync ::return_block_headers ( & io , & UntrustedRlp ::new ( & make_hash_req ( & hashes [ 50 ] , 3 , 5 , true ) ) ) ;
assert_eq! ( to_header_vec ( result ) , vec! [ headers [ 50 ] . clone ( ) , headers [ 44 ] . clone ( ) , headers [ 38 ] . clone ( ) ] ) ;
let result = ChainSync ::return_block_headers ( & io , & UntrustedRlp ::new ( & make_num_req ( 2 , 1 , 0 , true ) ) ) ;
assert_eq! ( to_header_vec ( result ) , vec! [ headers [ 2 ] . clone ( ) ] ) ;
let result = ChainSync ::return_block_headers ( & io , & UntrustedRlp ::new ( & make_num_req ( 2 , 1 , 0 , false ) ) ) ;
assert_eq! ( to_header_vec ( result ) , vec! [ headers [ 2 ] . clone ( ) ] ) ;
let result = ChainSync ::return_block_headers ( & io , & UntrustedRlp ::new ( & make_num_req ( 50 , 3 , 5 , false ) ) ) ;
assert_eq! ( to_header_vec ( result ) , vec! [ headers [ 50 ] . clone ( ) , headers [ 56 ] . clone ( ) , headers [ 62 ] . clone ( ) ] ) ;
let result = ChainSync ::return_block_headers ( & io , & UntrustedRlp ::new ( & make_num_req ( 50 , 3 , 5 , true ) ) ) ;
assert_eq! ( to_header_vec ( result ) , vec! [ headers [ 50 ] . clone ( ) , headers [ 44 ] . clone ( ) , headers [ 38 ] . clone ( ) ] ) ;
}
2016-02-05 00:57:51 +01:00
#[ test ]
fn return_nodes ( ) {
let mut client = TestBlockChainClient ::new ( ) ;
let mut queue = VecDeque ::new ( ) ;
2016-05-16 14:41:41 +02:00
let mut sync = dummy_sync_with_peer ( H256 ::new ( ) , & client ) ;
2016-02-05 00:57:51 +01:00
let mut io = TestIo ::new ( & mut client , & mut queue , None ) ;
2016-02-04 23:58:32 +01:00
2016-02-05 00:57:51 +01:00
let mut node_list = RlpStream ::new_list ( 3 ) ;
node_list . append ( & H256 ::from ( " 0000000000000000000000000000000000000000000000005555555555555555 " ) ) ;
node_list . append ( & H256 ::from ( " ffffffffffffffffffffffffffffffffffffffffffffaaaaaaaaaaaaaaaaaaaa " ) ) ;
node_list . append ( & H256 ::from ( " aff0000000000000000000000000000000000000000000000000000000000000 " ) ) ;
let node_request = node_list . out ( ) ;
// it returns rlp ONLY for hashes started with "f"
let result = ChainSync ::return_node_data ( & io , & UntrustedRlp ::new ( & node_request . clone ( ) ) ) ;
assert! ( result . is_ok ( ) ) ;
let rlp_result = result . unwrap ( ) ;
assert! ( rlp_result . is_some ( ) ) ;
// the length of one rlp-encoded hashe
assert_eq! ( 34 , rlp_result . unwrap ( ) . 1. out ( ) . len ( ) ) ;
io . sender = Some ( 2 usize ) ;
2016-02-12 14:20:18 +01:00
sync . on_packet ( & mut io , 0 usize , super ::GET_NODE_DATA_PACKET , & node_request ) ;
2016-02-05 00:57:51 +01:00
assert_eq! ( 1 , io . queue . len ( ) ) ;
}
2016-02-05 23:45:25 +01:00
2016-05-16 14:41:41 +02:00
fn dummy_sync_with_peer ( peer_latest_hash : H256 , client : & BlockChainClient ) -> ChainSync {
let mut sync = ChainSync ::new ( SyncConfig ::default ( ) , Miner ::new ( false , Spec ::new_test ( ) ) , client ) ;
2016-02-05 23:45:25 +01:00
sync . peers . insert ( 0 ,
2016-05-16 14:41:41 +02:00
PeerInfo {
2016-02-06 18:56:21 +01:00
protocol_version : 0 ,
genesis : H256 ::zero ( ) ,
network_id : U256 ::zero ( ) ,
2016-02-14 17:10:55 +01:00
latest_hash : peer_latest_hash ,
latest_number : None ,
2016-05-16 14:41:41 +02:00
difficulty : None ,
2016-02-06 18:56:21 +01:00
asking : PeerAsking ::Nothing ,
2016-05-16 14:41:41 +02:00
asking_blocks : Vec ::new ( ) ,
2016-02-12 13:07:02 +01:00
asking_hash : None ,
2016-02-06 18:56:21 +01:00
ask_time : 0 f64 ,
2016-05-16 14:41:41 +02:00
} ) ;
2016-02-05 23:45:25 +01:00
sync
}
#[ test ]
fn finds_lagging_peers ( ) {
let mut client = TestBlockChainClient ::new ( ) ;
2016-03-05 16:46:04 +01:00
client . add_blocks ( 100 , EachBlockWith ::Uncle ) ;
2016-02-05 23:45:25 +01:00
let mut queue = VecDeque ::new ( ) ;
2016-05-16 14:41:41 +02:00
let mut sync = dummy_sync_with_peer ( client . block_hash_delta_minus ( 10 ) , & client ) ;
2016-03-04 21:52:03 +01:00
let chain_info = client . chain_info ( ) ;
2016-02-05 23:45:25 +01:00
let io = TestIo ::new ( & mut client , & mut queue , None ) ;
2016-03-04 21:52:03 +01:00
let lagging_peers = sync . get_lagging_peers ( & chain_info , & io ) ;
2016-02-05 23:45:25 +01:00
assert_eq! ( 1 , lagging_peers . len ( ) )
}
#[ test ]
fn calculates_tree_for_lagging_peer ( ) {
let mut client = TestBlockChainClient ::new ( ) ;
2016-03-05 16:46:04 +01:00
client . add_blocks ( 15 , EachBlockWith ::Uncle ) ;
2016-02-05 23:45:25 +01:00
let start = client . block_hash_delta_minus ( 4 ) ;
let end = client . block_hash_delta_minus ( 2 ) ;
// wrong way end -> start, should be None
let rlp = ChainSync ::create_new_hashes_rlp ( & client , & end , & start ) ;
assert! ( rlp . is_none ( ) ) ;
let rlp = ChainSync ::create_new_hashes_rlp ( & client , & start , & end ) . unwrap ( ) ;
2016-02-07 23:39:02 +01:00
// size of three rlp encoded hash-difficulty
assert_eq! ( 107 , rlp . len ( ) ) ;
2016-02-05 23:45:25 +01:00
}
#[ test ]
2016-02-06 21:00:52 +01:00
fn sends_new_hashes_to_lagging_peer ( ) {
2016-02-05 23:45:25 +01:00
let mut client = TestBlockChainClient ::new ( ) ;
2016-03-05 16:46:04 +01:00
client . add_blocks ( 100 , EachBlockWith ::Uncle ) ;
2016-02-05 23:45:25 +01:00
let mut queue = VecDeque ::new ( ) ;
2016-05-16 14:41:41 +02:00
let mut sync = dummy_sync_with_peer ( client . block_hash_delta_minus ( 5 ) , & client ) ;
2016-03-04 21:52:03 +01:00
let chain_info = client . chain_info ( ) ;
2016-02-05 23:45:25 +01:00
let mut io = TestIo ::new ( & mut client , & mut queue , None ) ;
2016-03-04 21:52:03 +01:00
let peer_count = sync . propagate_new_hashes ( & chain_info , & mut io ) ;
2016-02-05 23:45:25 +01:00
// 1 message should be send
assert_eq! ( 1 , io . queue . len ( ) ) ;
// 1 peer should be updated
2016-02-06 18:56:21 +01:00
assert_eq! ( 1 , peer_count ) ;
2016-02-05 23:45:25 +01:00
// NEW_BLOCK_HASHES_PACKET
assert_eq! ( 0x01 , io . queue [ 0 ] . packet_id ) ;
}
2016-02-06 21:00:52 +01:00
#[ test ]
fn sends_latest_block_to_lagging_peer ( ) {
let mut client = TestBlockChainClient ::new ( ) ;
2016-03-05 16:46:04 +01:00
client . add_blocks ( 100 , EachBlockWith ::Uncle ) ;
2016-02-06 21:00:52 +01:00
let mut queue = VecDeque ::new ( ) ;
2016-05-16 14:41:41 +02:00
let mut sync = dummy_sync_with_peer ( client . block_hash_delta_minus ( 5 ) , & client ) ;
2016-03-04 21:52:03 +01:00
let chain_info = client . chain_info ( ) ;
2016-02-06 21:00:52 +01:00
let mut io = TestIo ::new ( & mut client , & mut queue , None ) ;
2016-03-04 21:52:03 +01:00
let peer_count = sync . propagate_blocks ( & chain_info , & mut io ) ;
2016-02-06 21:00:52 +01:00
// 1 message should be send
assert_eq! ( 1 , io . queue . len ( ) ) ;
// 1 peer should be updated
assert_eq! ( 1 , peer_count ) ;
// NEW_BLOCK_PACKET
assert_eq! ( 0x07 , io . queue [ 0 ] . packet_id ) ;
}
2016-02-07 21:01:09 +01:00
#[ test ]
2016-05-14 14:28:44 +02:00
fn handles_peer_new_block_malformed ( ) {
2016-02-07 22:08:15 +01:00
let mut client = TestBlockChainClient ::new ( ) ;
2016-03-05 16:46:04 +01:00
client . add_blocks ( 10 , EachBlockWith ::Uncle ) ;
2016-02-07 22:08:15 +01:00
let block_data = get_dummy_block ( 11 , client . chain_info ( ) . best_block_hash ) ;
let mut queue = VecDeque ::new ( ) ;
2016-05-16 14:41:41 +02:00
let mut sync = dummy_sync_with_peer ( client . block_hash_delta_minus ( 5 ) , & client ) ;
//sync.have_common_block = true;
2016-02-07 22:08:15 +01:00
let mut io = TestIo ::new ( & mut client , & mut queue , None ) ;
let block = UntrustedRlp ::new ( & block_data ) ;
let result = sync . on_peer_new_block ( & mut io , 0 , & block ) ;
assert! ( result . is_err ( ) ) ;
}
#[ test ]
fn handles_peer_new_block ( ) {
let mut client = TestBlockChainClient ::new ( ) ;
2016-03-05 16:46:04 +01:00
client . add_blocks ( 10 , EachBlockWith ::Uncle ) ;
2016-02-07 22:08:15 +01:00
let block_data = get_dummy_blocks ( 11 , client . chain_info ( ) . best_block_hash ) ;
let mut queue = VecDeque ::new ( ) ;
2016-05-16 14:41:41 +02:00
let mut sync = dummy_sync_with_peer ( client . block_hash_delta_minus ( 5 ) , & client ) ;
2016-02-07 22:08:15 +01:00
let mut io = TestIo ::new ( & mut client , & mut queue , None ) ;
let block = UntrustedRlp ::new ( & block_data ) ;
let result = sync . on_peer_new_block ( & mut io , 0 , & block ) ;
assert! ( result . is_ok ( ) ) ;
}
#[ test ]
fn handles_peer_new_block_empty ( ) {
2016-02-07 21:01:09 +01:00
let mut client = TestBlockChainClient ::new ( ) ;
2016-03-05 16:46:04 +01:00
client . add_blocks ( 10 , EachBlockWith ::Uncle ) ;
2016-02-07 21:01:09 +01:00
let mut queue = VecDeque ::new ( ) ;
2016-05-16 14:41:41 +02:00
let mut sync = dummy_sync_with_peer ( client . block_hash_delta_minus ( 5 ) , & client ) ;
2016-02-07 21:01:09 +01:00
let mut io = TestIo ::new ( & mut client , & mut queue , None ) ;
let empty_data = vec! [ ] ;
let block = UntrustedRlp ::new ( & empty_data ) ;
let result = sync . on_peer_new_block ( & mut io , 0 , & block ) ;
assert! ( result . is_err ( ) ) ;
}
2016-02-07 22:20:59 +01:00
#[ test ]
fn handles_peer_new_hashes ( ) {
let mut client = TestBlockChainClient ::new ( ) ;
2016-03-05 16:46:04 +01:00
client . add_blocks ( 10 , EachBlockWith ::Uncle ) ;
2016-02-07 22:20:59 +01:00
let mut queue = VecDeque ::new ( ) ;
2016-05-16 14:41:41 +02:00
let mut sync = dummy_sync_with_peer ( client . block_hash_delta_minus ( 5 ) , & client ) ;
2016-02-07 22:20:59 +01:00
let mut io = TestIo ::new ( & mut client , & mut queue , None ) ;
let hashes_data = get_dummy_hashes ( ) ;
let hashes_rlp = UntrustedRlp ::new ( & hashes_data ) ;
let result = sync . on_peer_new_hashes ( & mut io , 0 , & hashes_rlp ) ;
assert! ( result . is_ok ( ) ) ;
}
#[ test ]
fn handles_peer_new_hashes_empty ( ) {
let mut client = TestBlockChainClient ::new ( ) ;
2016-03-05 16:46:04 +01:00
client . add_blocks ( 10 , EachBlockWith ::Uncle ) ;
2016-02-07 22:20:59 +01:00
let mut queue = VecDeque ::new ( ) ;
2016-05-16 14:41:41 +02:00
let mut sync = dummy_sync_with_peer ( client . block_hash_delta_minus ( 5 ) , & client ) ;
2016-02-07 22:20:59 +01:00
let mut io = TestIo ::new ( & mut client , & mut queue , None ) ;
let empty_hashes_data = vec! [ ] ;
let hashes_rlp = UntrustedRlp ::new ( & empty_hashes_data ) ;
let result = sync . on_peer_new_hashes ( & mut io , 0 , & hashes_rlp ) ;
assert! ( result . is_ok ( ) ) ;
}
2016-02-07 23:39:02 +01:00
2016-02-08 12:14:39 +01:00
// idea is that what we produce when propagading latest hashes should be accepted in
// on_peer_new_hashes in our code as well
2016-02-07 23:39:02 +01:00
#[ test ]
fn hashes_rlp_mutually_acceptable ( ) {
let mut client = TestBlockChainClient ::new ( ) ;
2016-03-05 16:46:04 +01:00
client . add_blocks ( 100 , EachBlockWith ::Uncle ) ;
2016-02-07 23:39:02 +01:00
let mut queue = VecDeque ::new ( ) ;
2016-05-16 14:41:41 +02:00
let mut sync = dummy_sync_with_peer ( client . block_hash_delta_minus ( 5 ) , & client ) ;
2016-03-04 21:52:03 +01:00
let chain_info = client . chain_info ( ) ;
2016-02-07 23:39:02 +01:00
let mut io = TestIo ::new ( & mut client , & mut queue , None ) ;
2016-03-04 21:52:03 +01:00
sync . propagate_new_hashes ( & chain_info , & mut io ) ;
2016-02-07 23:39:02 +01:00
let data = & io . queue [ 0 ] . data . clone ( ) ;
2016-05-17 10:32:05 +02:00
let result = sync . on_peer_new_hashes ( & mut io , 0 , & UntrustedRlp ::new ( data ) ) ;
2016-02-07 23:39:02 +01:00
assert! ( result . is_ok ( ) ) ;
}
2016-02-08 12:14:39 +01:00
// idea is that what we produce when propagading latest block should be accepted in
// on_peer_new_block in our code as well
2016-02-07 23:39:02 +01:00
#[ test ]
fn block_rlp_mutually_acceptable ( ) {
let mut client = TestBlockChainClient ::new ( ) ;
2016-03-05 16:46:04 +01:00
client . add_blocks ( 100 , EachBlockWith ::Uncle ) ;
2016-02-07 23:39:02 +01:00
let mut queue = VecDeque ::new ( ) ;
2016-05-16 14:41:41 +02:00
let mut sync = dummy_sync_with_peer ( client . block_hash_delta_minus ( 5 ) , & client ) ;
2016-03-04 21:52:03 +01:00
let chain_info = client . chain_info ( ) ;
2016-02-07 23:39:02 +01:00
let mut io = TestIo ::new ( & mut client , & mut queue , None ) ;
2016-03-04 21:52:03 +01:00
sync . propagate_blocks ( & chain_info , & mut io ) ;
2016-02-07 23:39:02 +01:00
let data = & io . queue [ 0 ] . data . clone ( ) ;
2016-05-17 10:32:05 +02:00
let result = sync . on_peer_new_block ( & mut io , 0 , & UntrustedRlp ::new ( data ) ) ;
2016-02-07 23:39:02 +01:00
assert! ( result . is_ok ( ) ) ;
}
2016-02-24 13:01:45 +01:00
2016-03-05 16:46:04 +01:00
#[ test ]
fn should_add_transactions_to_queue ( ) {
// given
let mut client = TestBlockChainClient ::new ( ) ;
client . add_blocks ( 98 , EachBlockWith ::Uncle ) ;
client . add_blocks ( 1 , EachBlockWith ::UncleAndTransaction ) ;
client . add_blocks ( 1 , EachBlockWith ::Transaction ) ;
2016-05-16 14:41:41 +02:00
let mut sync = dummy_sync_with_peer ( client . block_hash_delta_minus ( 5 ) , & client ) ;
2016-03-05 16:46:04 +01:00
let good_blocks = vec! [ client . block_hash_delta_minus ( 2 ) ] ;
let retracted_blocks = vec! [ client . block_hash_delta_minus ( 1 ) ] ;
2016-04-15 07:38:23 +02:00
// Add some balance to clients and reset nonces
2016-04-06 10:07:24 +02:00
for h in & [ good_blocks [ 0 ] , retracted_blocks [ 0 ] ] {
2016-05-19 11:00:32 +02:00
let block = client . block ( BlockID ::Hash ( * h ) ) . unwrap ( ) ;
2016-03-17 10:20:35 +01:00
let view = BlockView ::new ( & block ) ;
2016-03-17 11:27:38 +01:00
client . set_balance ( view . transactions ( ) [ 0 ] . sender ( ) . unwrap ( ) , U256 ::from ( 1_000_000_000 ) ) ;
2016-04-15 07:38:23 +02:00
client . set_nonce ( view . transactions ( ) [ 0 ] . sender ( ) . unwrap ( ) , U256 ::from ( 0 ) ) ;
2016-03-17 10:20:35 +01:00
}
2016-03-05 16:46:04 +01:00
// when
2016-04-15 07:38:23 +02:00
{
let mut queue = VecDeque ::new ( ) ;
let mut io = TestIo ::new ( & mut client , & mut queue , None ) ;
sync . chain_new_blocks ( & mut io , & [ ] , & [ ] , & [ ] , & good_blocks ) ;
assert_eq! ( sync . miner . status ( ) . transactions_in_future_queue , 0 ) ;
assert_eq! ( sync . miner . status ( ) . transactions_in_pending_queue , 1 ) ;
}
// We need to update nonce status (because we say that the block has been imported)
for h in & [ good_blocks [ 0 ] ] {
2016-05-19 11:00:32 +02:00
let block = client . block ( BlockID ::Hash ( * h ) ) . unwrap ( ) ;
2016-04-15 07:38:23 +02:00
let view = BlockView ::new ( & block ) ;
client . set_nonce ( view . transactions ( ) [ 0 ] . sender ( ) . unwrap ( ) , U256 ::from ( 1 ) ) ;
}
{
let mut queue = VecDeque ::new ( ) ;
let mut io = TestIo ::new ( & mut client , & mut queue , None ) ;
sync . chain_new_blocks ( & mut io , & [ ] , & [ ] , & good_blocks , & retracted_blocks ) ;
}
2016-03-05 16:46:04 +01:00
// then
2016-03-08 16:23:32 +01:00
let status = sync . miner . status ( ) ;
2016-03-17 11:19:12 +01:00
assert_eq! ( status . transactions_in_pending_queue , 1 ) ;
assert_eq! ( status . transactions_in_future_queue , 0 ) ;
2016-03-05 16:46:04 +01:00
}
2016-03-17 12:17:53 +01:00
#[ test ]
fn should_not_add_transactions_to_queue_if_not_synced ( ) {
// given
let mut client = TestBlockChainClient ::new ( ) ;
client . add_blocks ( 98 , EachBlockWith ::Uncle ) ;
client . add_blocks ( 1 , EachBlockWith ::UncleAndTransaction ) ;
client . add_blocks ( 1 , EachBlockWith ::Transaction ) ;
2016-05-16 14:41:41 +02:00
let mut sync = dummy_sync_with_peer ( client . block_hash_delta_minus ( 5 ) , & client ) ;
2016-03-17 12:17:53 +01:00
let good_blocks = vec! [ client . block_hash_delta_minus ( 2 ) ] ;
let retracted_blocks = vec! [ client . block_hash_delta_minus ( 1 ) ] ;
let mut queue = VecDeque ::new ( ) ;
let mut io = TestIo ::new ( & mut client , & mut queue , None ) ;
// when
2016-03-17 14:56:19 +01:00
sync . chain_new_blocks ( & mut io , & [ ] , & [ ] , & [ ] , & good_blocks ) ;
2016-03-17 14:19:12 +01:00
assert_eq! ( sync . miner . status ( ) . transactions_in_future_queue , 0 ) ;
assert_eq! ( sync . miner . status ( ) . transactions_in_pending_queue , 0 ) ;
2016-04-15 07:38:23 +02:00
sync . chain_new_blocks ( & mut io , & [ ] , & [ ] , & good_blocks , & retracted_blocks ) ;
2016-03-17 12:17:53 +01:00
// then
let status = sync . miner . status ( ) ;
2016-03-17 14:19:12 +01:00
assert_eq! ( status . transactions_in_pending_queue , 0 ) ;
assert_eq! ( status . transactions_in_future_queue , 0 ) ;
2016-03-17 12:17:53 +01:00
}
2016-02-11 18:36:26 +01:00
}