Backports for beta 2.2.2 (#9976)

* version: bump beta to 2.2.2

* Add experimental RPCs flag (#9928)

* WiP

* Enable experimental RPCs.

* Keep existing blocks when restoring a Snapshot (#8643)

* Rename db_restore => client

* First step: make it compile!

* Second step: working implementation!

* Refactoring

* Fix tests

* PR Grumbles

* PR Grumbles WIP

* Migrate ancient blocks interating backward

* Early return in block migration if snapshot is aborted

* Remove RwLock getter (PR Grumble I)

* Remove dependency on `Client`: only used Traits

* Add test for recovering aborted snapshot recovery

* Add test for migrating old blocks

* Fix build

* PR Grumble I

* PR Grumble II

* PR Grumble III

* PR Grumble IV

* PR Grumble V

* PR Grumble VI

* Fix one test

* Fix test

* PR Grumble

* PR Grumbles

* PR Grumbles II

* Fix tests

* Release RwLock earlier

* Revert Cargo.lock

* Update _update ancient block_ logic: set local in `commit`

* Update typo in ethcore/src/snapshot/service.rs

Co-Authored-By: ngotchac <ngotchac@gmail.com>

* Adjust requests costs for light client (#9925)

* PIP Table Cost relative to average peers instead of max peers

* Add tracing in PIP new_cost_table

* Update stat peer_count

* Use number of leeching peers for Light serve costs

* Fix test::light_params_load_share_depends_on_max_peers (wrong type)

* Remove (now) useless test

* Remove `load_share` from LightParams.Config
Prevent div. by 0

* Add LEECHER_COUNT_FACTOR

* PR Grumble: u64 to u32 for f64 casting

* Prevent u32 overflow for avg_peer_count

* Add tests for LightSync::Statistics

* Fix empty steps (#9939)

* Don't send empty step twice or empty step then block.

* Perform basic validation of locally sealed blocks.

* Don't include empty step twice.

* prevent silent errors in daemon mode, closes #9367 (#9946)

* Fix a deadlock (#9952)

* Update informant:
  - decimal in Mgas/s
  - print every 5s (not randomly between 5s and 10s)

* Fix dead-lock in `blockchain.rs`

* Update locks ordering

* Fix light client informant while syncing (#9932)

* Add `is_idle` to LightSync to check importing status

* Use SyncStateWrapper to make sure is_idle gets updates

* Update is_major_import to use verified queue size as well

* Add comment for `is_idle`

* Add Debug to `SyncStateWrapper`

* `fn get` -> `fn into_inner`

*  ci: rearrange pipeline by logic (#9970)

* ci: rearrange pipeline by logic

* ci: rename docs script

* fix docker build (#9971)

* Deny unknown fields for chainspec (#9972)

* Add deny_unknown_fields to chainspec

* Add tests and fix existing one

* Remove serde_ignored dependency for chainspec

* Fix rpc test eth chain spec

* Fix starting_nonce_test spec

* Improve block and transaction propagation (#9954)

* Refactor sync to add priority tasks.

* Send priority tasks notifications.

* Propagate blocks, optimize transactions.

* Implement transaction propagation. Use sync_channel.

* Tone down info.

* Prevent deadlock by not waiting forever for sync lock.

* Fix lock order.

* Don't use sync_channel to prevent deadlocks.

* Fix tests.

* Fix unstable peers and slowness in sync (#9967)

* Don't sync all peers after each response

* Update formating

* Fix tests: add `continue_sync` to `Sync_step`

* Update ethcore/sync/src/chain/mod.rs

Co-Authored-By: ngotchac <ngotchac@gmail.com>

* fix rpc middlewares

* fix Cargo.lock

* json: resolve merge in spec

* rpc: fix starting_nonce_test

* ci: allow nightl job to fail
This commit is contained in:
Afri Schoedon
2018-11-29 10:57:49 +01:00
committed by GitHub
parent 5c56fc5023
commit 78ceec6c6e
67 changed files with 1854 additions and 653 deletions

View File

@@ -14,7 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::sync::Arc;
use std::sync::{Arc, mpsc, atomic};
use std::collections::{HashMap, BTreeMap};
use std::io;
use std::ops::Range;
@@ -33,10 +33,10 @@ use ethcore::client::{BlockChainClient, ChainNotify, ChainRoute, ChainMessageTyp
use ethcore::snapshot::SnapshotService;
use ethcore::header::BlockNumber;
use sync_io::NetSyncIo;
use chain::{ChainSync, SyncStatus as EthSyncStatus};
use chain::{ChainSyncApi, SyncStatus as EthSyncStatus};
use std::net::{SocketAddr, AddrParseError};
use std::str::FromStr;
use parking_lot::RwLock;
use parking_lot::{RwLock, Mutex};
use chain::{ETH_PROTOCOL_VERSION_63, ETH_PROTOCOL_VERSION_62,
PAR_PROTOCOL_VERSION_1, PAR_PROTOCOL_VERSION_2, PAR_PROTOCOL_VERSION_3,
PRIVATE_TRANSACTION_PACKET, SIGNED_PRIVATE_TRANSACTION_PACKET};
@@ -228,6 +228,37 @@ impl AttachedProtocol {
}
}
/// A prioritized tasks run in a specialised timer.
/// Every task should be completed within a hard deadline,
/// if it's not it's either cancelled or split into multiple tasks.
/// NOTE These tasks might not complete at all, so anything
/// that happens here should work even if the task is cancelled.
#[derive(Debug)]
pub enum PriorityTask {
/// Propagate given block
PropagateBlock {
/// When the task was initiated
started: ::std::time::Instant,
/// Raw block RLP to propagate
block: Bytes,
/// Block hash
hash: H256,
/// Blocks difficulty
difficulty: U256,
},
/// Propagate a list of transactions
PropagateTransactions(::std::time::Instant, Arc<atomic::AtomicBool>),
}
impl PriorityTask {
/// Mark the task as being processed, right after it's retrieved from the queue.
pub fn starting(&self) {
match *self {
PriorityTask::PropagateTransactions(_, ref is_ready) => is_ready.store(true, atomic::Ordering::SeqCst),
_ => {},
}
}
}
/// EthSync initialization parameters.
pub struct Params {
/// Configuration.
@@ -260,16 +291,16 @@ pub struct EthSync {
subprotocol_name: [u8; 3],
/// Light subprotocol name.
light_subprotocol_name: [u8; 3],
/// Priority tasks notification channel
priority_tasks: Mutex<mpsc::Sender<PriorityTask>>,
}
fn light_params(
network_id: u64,
max_peers: u32,
median_peers: f64,
pruning_info: PruningInfo,
sample_store: Option<Box<SampleStore>>,
) -> LightParams {
const MAX_LIGHTSERV_LOAD: f64 = 0.5;
let mut light_params = LightParams {
network_id: network_id,
config: Default::default(),
@@ -282,9 +313,7 @@ fn light_params(
sample_store: sample_store,
};
let max_peers = ::std::cmp::max(max_peers, 1);
light_params.config.load_share = MAX_LIGHTSERV_LOAD / max_peers as f64;
light_params.config.median_peers = median_peers;
light_params
}
@@ -301,9 +330,10 @@ impl EthSync {
.map(|mut p| { p.push("request_timings"); light_net::FileStore(p) })
.map(|store| Box::new(store) as Box<_>);
let median_peers = (params.network_config.min_peers + params.network_config.max_peers) as f64 / 2.0;
let light_params = light_params(
params.config.network_id,
params.network_config.max_peers,
median_peers,
pruning_info,
sample_store,
);
@@ -315,13 +345,19 @@ impl EthSync {
})
};
let chain_sync = ChainSync::new(params.config, &*params.chain, params.private_tx_handler.clone());
let (priority_tasks_tx, priority_tasks_rx) = mpsc::channel();
let sync = ChainSyncApi::new(
params.config,
&*params.chain,
params.private_tx_handler.clone(),
priority_tasks_rx,
);
let service = NetworkService::new(params.network_config.clone().into_basic()?, connection_filter)?;
let sync = Arc::new(EthSync {
network: service,
eth_handler: Arc::new(SyncProtocolHandler {
sync: RwLock::new(chain_sync),
sync,
chain: params.chain,
snapshot_service: params.snapshot_service,
overlay: RwLock::new(HashMap::new()),
@@ -330,26 +366,32 @@ impl EthSync {
subprotocol_name: params.config.subprotocol_name,
light_subprotocol_name: params.config.light_subprotocol_name,
attached_protos: params.attached_protos,
priority_tasks: Mutex::new(priority_tasks_tx),
});
Ok(sync)
}
/// Priority tasks producer
pub fn priority_tasks(&self) -> mpsc::Sender<PriorityTask> {
self.priority_tasks.lock().clone()
}
}
impl SyncProvider for EthSync {
/// Get sync status
fn status(&self) -> EthSyncStatus {
self.eth_handler.sync.read().status()
self.eth_handler.sync.status()
}
/// Get sync peers
fn peers(&self) -> Vec<PeerInfo> {
self.network.with_context_eval(self.subprotocol_name, |ctx| {
let peer_ids = self.network.connected_peers();
let eth_sync = self.eth_handler.sync.read();
let light_proto = self.light_proto.as_ref();
peer_ids.into_iter().filter_map(|peer_id| {
let peer_info = self.eth_handler.sync.peer_info(&peer_ids);
peer_ids.into_iter().zip(peer_info).filter_map(|(peer_id, peer_info)| {
let session_info = match ctx.session_info(peer_id) {
None => return None,
Some(info) => info,
@@ -361,7 +403,7 @@ impl SyncProvider for EthSync {
capabilities: session_info.peer_capabilities.into_iter().map(|c| c.to_string()).collect(),
remote_address: session_info.remote_address,
local_address: session_info.local_address,
eth_info: eth_sync.peer_info(&peer_id),
eth_info: peer_info,
pip_info: light_proto.as_ref().and_then(|lp| lp.peer_status(peer_id)).map(Into::into),
})
}).collect()
@@ -373,17 +415,17 @@ impl SyncProvider for EthSync {
}
fn transactions_stats(&self) -> BTreeMap<H256, TransactionStats> {
let sync = self.eth_handler.sync.read();
sync.transactions_stats()
.iter()
.map(|(hash, stats)| (*hash, stats.into()))
.collect()
self.eth_handler.sync.transactions_stats()
}
}
const PEERS_TIMER: TimerToken = 0;
const SYNC_TIMER: TimerToken = 1;
const TX_TIMER: TimerToken = 2;
const MAINTAIN_SYNC_TIMER: TimerToken = 1;
const CONTINUE_SYNC_TIMER: TimerToken = 2;
const TX_TIMER: TimerToken = 3;
const PRIORITY_TIMER: TimerToken = 4;
pub(crate) const PRIORITY_TIMER_INTERVAL: Duration = Duration::from_millis(250);
struct SyncProtocolHandler {
/// Shared blockchain client.
@@ -391,7 +433,7 @@ struct SyncProtocolHandler {
/// Shared snapshot service.
snapshot_service: Arc<SnapshotService>,
/// Sync strategy
sync: RwLock<ChainSync>,
sync: ChainSyncApi,
/// Chain overlay used to cache data such as fork block.
overlay: RwLock<HashMap<BlockNumber, Bytes>>,
}
@@ -400,13 +442,16 @@ impl NetworkProtocolHandler for SyncProtocolHandler {
fn initialize(&self, io: &NetworkContext) {
if io.subprotocol_name() != WARP_SYNC_PROTOCOL_ID {
io.register_timer(PEERS_TIMER, Duration::from_millis(700)).expect("Error registering peers timer");
io.register_timer(SYNC_TIMER, Duration::from_millis(1100)).expect("Error registering sync timer");
io.register_timer(MAINTAIN_SYNC_TIMER, Duration::from_millis(1100)).expect("Error registering sync timer");
io.register_timer(CONTINUE_SYNC_TIMER, Duration::from_millis(2500)).expect("Error registering sync timer");
io.register_timer(TX_TIMER, Duration::from_millis(1300)).expect("Error registering transactions timer");
io.register_timer(PRIORITY_TIMER, PRIORITY_TIMER_INTERVAL).expect("Error registering peers timer");
}
}
fn read(&self, io: &NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]) {
ChainSync::dispatch_packet(&self.sync, &mut NetSyncIo::new(io, &*self.chain, &*self.snapshot_service, &self.overlay), *peer, packet_id, data);
self.sync.dispatch_packet(&mut NetSyncIo::new(io, &*self.chain, &*self.snapshot_service, &self.overlay), *peer, packet_id, data);
}
fn connected(&self, io: &NetworkContext, peer: &PeerId) {
@@ -431,16 +476,28 @@ impl NetworkProtocolHandler for SyncProtocolHandler {
let mut io = NetSyncIo::new(io, &*self.chain, &*self.snapshot_service, &self.overlay);
match timer {
PEERS_TIMER => self.sync.write().maintain_peers(&mut io),
SYNC_TIMER => self.sync.write().maintain_sync(&mut io),
TX_TIMER => {
self.sync.write().propagate_new_transactions(&mut io);
},
MAINTAIN_SYNC_TIMER => self.sync.write().maintain_sync(&mut io),
CONTINUE_SYNC_TIMER => self.sync.write().continue_sync(&mut io),
TX_TIMER => self.sync.write().propagate_new_transactions(&mut io),
PRIORITY_TIMER => self.sync.process_priority_queue(&mut io),
_ => warn!("Unknown timer {} triggered.", timer),
}
}
}
impl ChainNotify for EthSync {
fn block_pre_import(&self, bytes: &Bytes, hash: &H256, difficulty: &U256) {
let task = PriorityTask::PropagateBlock {
started: ::std::time::Instant::now(),
block: bytes.clone(),
hash: *hash,
difficulty: *difficulty,
};
if let Err(e) = self.priority_tasks.lock().send(task) {
warn!(target: "sync", "Unexpected error during priority block propagation: {:?}", e);
}
}
fn new_blocks(&self,
imported: Vec<H256>,
invalid: Vec<H256>,
@@ -940,19 +997,3 @@ impl LightSyncProvider for LightSync {
Default::default() // TODO
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn light_params_load_share_depends_on_max_peers() {
let pruning_info = PruningInfo {
earliest_chain: 0,
earliest_state: 0,
};
let params1 = light_params(0, 10, pruning_info.clone(), None);
let params2 = light_params(0, 20, pruning_info, None);
assert!(params1.config.load_share > params2.config.load_share)
}
}

View File

@@ -29,7 +29,6 @@ use rlp::Rlp;
use snapshot::ChunkType;
use std::cmp;
use std::mem;
use std::collections::HashSet;
use std::time::Instant;
use sync_io::SyncIo;
@@ -58,7 +57,6 @@ use super::{
SNAPSHOT_DATA_PACKET,
SNAPSHOT_MANIFEST_PACKET,
STATUS_PACKET,
TRANSACTIONS_PACKET,
};
/// The Chain Sync Handler: handles responses from peers
@@ -67,14 +65,9 @@ pub struct SyncHandler;
impl SyncHandler {
/// Handle incoming packet from peer
pub fn on_packet(sync: &mut ChainSync, io: &mut SyncIo, peer: PeerId, packet_id: u8, data: &[u8]) {
if packet_id != STATUS_PACKET && !sync.peers.contains_key(&peer) {
debug!(target:"sync", "Unexpected packet {} from unregistered peer: {}:{}", packet_id, peer, io.peer_info(peer));
return;
}
let rlp = Rlp::new(data);
let result = match packet_id {
STATUS_PACKET => SyncHandler::on_peer_status(sync, io, peer, &rlp),
TRANSACTIONS_PACKET => SyncHandler::on_peer_transactions(sync, io, peer, &rlp),
BLOCK_HEADERS_PACKET => SyncHandler::on_peer_block_headers(sync, io, peer, &rlp),
BLOCK_BODIES_PACKET => SyncHandler::on_peer_block_bodies(sync, io, peer, &rlp),
RECEIPTS_PACKET => SyncHandler::on_peer_block_receipts(sync, io, peer, &rlp),
@@ -104,15 +97,12 @@ impl SyncHandler {
sync.sync_peer(io, peer, false);
},
}
// give tasks to other peers
sync.continue_sync(io);
}
/// Called when peer sends us new consensus packet
pub fn on_consensus_packet(io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), PacketDecodeError> {
pub fn on_consensus_packet(io: &mut SyncIo, peer_id: PeerId, r: &Rlp) {
trace!(target: "sync", "Received consensus packet from {:?}", peer_id);
io.chain().queue_consensus_message(r.as_raw().to_vec());
Ok(())
}
/// Called by peer when it is disconnecting
@@ -578,8 +568,8 @@ impl SyncHandler {
asking_blocks: Vec::new(),
asking_hash: None,
ask_time: Instant::now(),
last_sent_transactions: HashSet::new(),
last_sent_private_transactions: HashSet::new(),
last_sent_transactions: Default::default(),
last_sent_private_transactions: Default::default(),
expired: false,
confirmation: if sync.fork_block.is_none() { ForkConfirmation::Confirmed } else { ForkConfirmation::Unconfirmed },
asking_snapshot_data: None,
@@ -635,7 +625,7 @@ impl SyncHandler {
}
/// Called when peer sends us new transactions
fn on_peer_transactions(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> {
pub fn on_peer_transactions(sync: &ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), PacketDecodeError> {
// Accept transactions only when fully synced
if !io.is_chain_queue_empty() || (sync.state != SyncState::Idle && sync.state != SyncState::NewBlocks) {
trace!(target: "sync", "{} Ignoring transactions while syncing", peer_id);

View File

@@ -92,17 +92,17 @@ mod propagator;
mod requester;
mod supplier;
use std::sync::Arc;
use std::collections::{HashSet, HashMap};
use std::sync::{Arc, mpsc};
use std::collections::{HashSet, HashMap, BTreeMap};
use std::cmp;
use std::time::{Duration, Instant};
use hash::keccak;
use heapsize::HeapSizeOf;
use ethereum_types::{H256, U256};
use fastmap::H256FastMap;
use parking_lot::RwLock;
use fastmap::{H256FastMap, H256FastSet};
use parking_lot::{Mutex, RwLock, RwLockWriteGuard};
use bytes::Bytes;
use rlp::{Rlp, RlpStream, DecoderError};
use rlp::{RlpStream, DecoderError};
use network::{self, PeerId, PacketId};
use ethcore::header::{BlockNumber};
use ethcore::client::{BlockChainClient, BlockStatus, BlockId, BlockChainInfo, BlockQueueInfo};
@@ -112,7 +112,7 @@ use super::{WarpSync, SyncConfig};
use block_sync::{BlockDownloader, DownloadAction};
use rand::Rng;
use snapshot::{Snapshot};
use api::{EthProtocolInfo as PeerInfoDigest, WARP_SYNC_PROTOCOL_ID};
use api::{EthProtocolInfo as PeerInfoDigest, WARP_SYNC_PROTOCOL_ID, PriorityTask};
use private_tx::PrivateTxHandler;
use transactions_stats::{TransactionsStats, Stats as TransactionStats};
use transaction::UnverifiedTransaction;
@@ -120,7 +120,7 @@ use transaction::UnverifiedTransaction;
use self::handler::SyncHandler;
use self::propagator::SyncPropagator;
use self::requester::SyncRequester;
use self::supplier::SyncSupplier;
pub(crate) use self::supplier::SyncSupplier;
known_heap_size!(0, PeerInfo);
@@ -187,6 +187,11 @@ const FORK_HEADER_TIMEOUT: Duration = Duration::from_secs(3);
const SNAPSHOT_MANIFEST_TIMEOUT: Duration = Duration::from_secs(5);
const SNAPSHOT_DATA_TIMEOUT: Duration = Duration::from_secs(120);
/// Defines how much time we have to complete priority transaction or block propagation.
/// after the deadline is reached the task is considered finished
/// (so we might sent only to some part of the peers we originally intended to send to)
const PRIORITY_TASK_DEADLINE: Duration = Duration::from_millis(100);
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
/// Sync state
pub enum SyncState {
@@ -323,9 +328,9 @@ pub struct PeerInfo {
/// Request timestamp
ask_time: Instant,
/// Holds a set of transactions recently sent to this peer to avoid spamming.
last_sent_transactions: HashSet<H256>,
last_sent_transactions: H256FastSet,
/// Holds a set of private transactions and their signatures recently sent to this peer to avoid spamming.
last_sent_private_transactions: HashSet<H256>,
last_sent_private_transactions: H256FastSet,
/// Pending request is expired and result should be ignored
expired: bool,
/// Peer fork confirmation status
@@ -375,6 +380,217 @@ pub mod random {
pub type RlpResponseResult = Result<Option<(PacketId, RlpStream)>, PacketDecodeError>;
pub type Peers = HashMap<PeerId, PeerInfo>;
/// Thread-safe wrapper for `ChainSync`.
///
/// NOTE always lock in order of fields declaration
pub struct ChainSyncApi {
/// Priority tasks queue
priority_tasks: Mutex<mpsc::Receiver<PriorityTask>>,
/// The rest of sync data
sync: RwLock<ChainSync>,
}
impl ChainSyncApi {
/// Creates new `ChainSyncApi`
pub fn new(
config: SyncConfig,
chain: &BlockChainClient,
private_tx_handler: Arc<PrivateTxHandler>,
priority_tasks: mpsc::Receiver<PriorityTask>,
) -> Self {
ChainSyncApi {
sync: RwLock::new(ChainSync::new(config, chain, private_tx_handler)),
priority_tasks: Mutex::new(priority_tasks),
}
}
/// Gives `write` access to underlying `ChainSync`
pub fn write(&self) -> RwLockWriteGuard<ChainSync> {
self.sync.write()
}
/// Returns info about given list of peers
pub fn peer_info(&self, ids: &[PeerId]) -> Vec<Option<PeerInfoDigest>> {
let sync = self.sync.read();
ids.iter().map(|id| sync.peer_info(id)).collect()
}
/// Returns synchonization status
pub fn status(&self) -> SyncStatus {
self.sync.read().status()
}
/// Returns transactions propagation statistics
pub fn transactions_stats(&self) -> BTreeMap<H256, ::TransactionStats> {
self.sync.read().transactions_stats()
.iter()
.map(|(hash, stats)| (*hash, stats.into()))
.collect()
}
/// Dispatch incoming requests and responses
pub fn dispatch_packet(&self, io: &mut SyncIo, peer: PeerId, packet_id: u8, data: &[u8]) {
SyncSupplier::dispatch_packet(&self.sync, io, peer, packet_id, data)
}
/// Process a priority propagation queue.
/// This task is run from a timer and should be time constrained.
/// Hence we set up a deadline for the execution and cancel the task if the deadline is exceeded.
///
/// NOTE This method should only handle stuff that can be canceled and would reach other peers
/// by other means.
pub fn process_priority_queue(&self, io: &mut SyncIo) {
fn check_deadline(deadline: Instant) -> Option<Duration> {
let now = Instant::now();
if now > deadline {
None
} else {
Some(deadline - now)
}
}
// deadline to get the task from the queue
let deadline = Instant::now() + ::api::PRIORITY_TIMER_INTERVAL;
let mut work = || {
let task = {
let tasks = self.priority_tasks.try_lock_until(deadline)?;
let left = check_deadline(deadline)?;
tasks.recv_timeout(left).ok()?
};
task.starting();
// wait for the sync lock until deadline,
// note we might drop the task here if we won't manage to acquire the lock.
let mut sync = self.sync.try_write_until(deadline)?;
// since we already have everything let's use a different deadline
// to do the rest of the job now, so that previous work is not wasted.
let deadline = Instant::now() + PRIORITY_TASK_DEADLINE;
let as_ms = move |prev| {
let dur: Duration = Instant::now() - prev;
dur.as_secs() * 1_000 + dur.subsec_millis() as u64
};
match task {
// NOTE We can't simply use existing methods,
// cause the block is not in the DB yet.
PriorityTask::PropagateBlock { started, block, hash, difficulty } => {
// try to send to peers that are on the same block as us
// (they will most likely accept the new block).
let chain_info = io.chain().chain_info();
let total_difficulty = chain_info.total_difficulty + difficulty;
let rlp = ChainSync::create_block_rlp(&block, total_difficulty);
for peers in sync.get_peers(&chain_info, PeerState::SameBlock).chunks(10) {
check_deadline(deadline)?;
for peer in peers {
SyncPropagator::send_packet(io, *peer, NEW_BLOCK_PACKET, rlp.clone());
if let Some(ref mut peer) = sync.peers.get_mut(peer) {
peer.latest_hash = hash;
}
}
}
debug!(target: "sync", "Finished block propagation, took {}ms", as_ms(started));
},
PriorityTask::PropagateTransactions(time, _) => {
SyncPropagator::propagate_new_transactions(&mut sync, io, || {
check_deadline(deadline).is_some()
});
debug!(target: "sync", "Finished transaction propagation, took {}ms", as_ms(time));
},
}
Some(())
};
// Process as many items as we can until the deadline is reached.
loop {
if work().is_none() {
return;
}
}
}
}
// Static methods
impl ChainSync {
/// creates rlp to send for the tree defined by 'from' and 'to' hashes
fn create_new_hashes_rlp(chain: &BlockChainClient, from: &H256, to: &H256) -> Option<Bytes> {
match chain.tree_route(from, to) {
Some(route) => {
let uncles = chain.find_uncles(from).unwrap_or_else(Vec::new);
match route.blocks.len() {
0 => None,
_ => {
let mut blocks = route.blocks;
blocks.extend(uncles);
let mut rlp_stream = RlpStream::new_list(blocks.len());
for block_hash in blocks {
let mut hash_rlp = RlpStream::new_list(2);
let number = chain.block_header(BlockId::Hash(block_hash.clone()))
.expect("chain.tree_route and chain.find_uncles only return hahses of blocks that are in the blockchain. qed.").number();
hash_rlp.append(&block_hash);
hash_rlp.append(&number);
rlp_stream.append_raw(hash_rlp.as_raw(), 1);
}
Some(rlp_stream.out())
}
}
},
None => None
}
}
/// creates rlp from block bytes and total difficulty
fn create_block_rlp(bytes: &Bytes, total_difficulty: U256) -> Bytes {
let mut rlp_stream = RlpStream::new_list(2);
rlp_stream.append_raw(bytes, 1);
rlp_stream.append(&total_difficulty);
rlp_stream.out()
}
/// creates latest block rlp for the given client
fn create_latest_block_rlp(chain: &BlockChainClient) -> Bytes {
Self::create_block_rlp(
&chain.block(BlockId::Hash(chain.chain_info().best_block_hash))
.expect("Best block always exists").into_inner(),
chain.chain_info().total_difficulty
)
}
/// creates given hash block rlp for the given client
fn create_new_block_rlp(chain: &BlockChainClient, hash: &H256) -> Bytes {
Self::create_block_rlp(
&chain.block(BlockId::Hash(hash.clone())).expect("Block has just been sealed; qed").into_inner(),
chain.block_total_difficulty(BlockId::Hash(hash.clone())).expect("Block has just been sealed; qed.")
)
}
fn select_random_peers(peers: &[PeerId]) -> Vec<PeerId> {
// take sqrt(x) peers
let mut peers = peers.to_vec();
let mut count = (peers.len() as f64).powf(0.5).round() as usize;
count = cmp::min(count, MAX_PEERS_PROPAGATION);
count = cmp::max(count, MIN_PEERS_PROPAGATION);
random::new().shuffle(&mut peers);
peers.truncate(count);
peers
}
fn get_init_state(warp_sync: WarpSync, chain: &BlockChainClient) -> SyncState {
let best_block = chain.chain_info().best_block_number;
match warp_sync {
WarpSync::Enabled => SyncState::WaitingPeers,
WarpSync::OnlyAndAfter(block) if block > best_block => SyncState::WaitingPeers,
_ => SyncState::Idle,
}
}
}
/// A peer query method for getting a list of peers
enum PeerState {
/// Peer is on different hash than us
Lagging,
/// Peer is on the same block as us
SameBlock
}
/// Blockchain sync handler.
/// See module documentation for more details.
pub struct ChainSync {
@@ -417,10 +633,14 @@ pub struct ChainSync {
impl ChainSync {
/// Create a new instance of syncing strategy.
pub fn new(config: SyncConfig, chain: &BlockChainClient, private_tx_handler: Arc<PrivateTxHandler>) -> ChainSync {
pub fn new(
config: SyncConfig,
chain: &BlockChainClient,
private_tx_handler: Arc<PrivateTxHandler>,
) -> Self {
let chain_info = chain.chain_info();
let best_block = chain.chain_info().best_block_number;
let state = ChainSync::get_init_state(config.warp_sync, chain);
let state = Self::get_init_state(config.warp_sync, chain);
let mut sync = ChainSync {
state,
@@ -445,15 +665,6 @@ impl ChainSync {
sync
}
fn get_init_state(warp_sync: WarpSync, chain: &BlockChainClient) -> SyncState {
let best_block = chain.chain_info().best_block_number;
match warp_sync {
WarpSync::Enabled => SyncState::WaitingPeers,
WarpSync::OnlyAndAfter(block) if block > best_block => SyncState::WaitingPeers,
_ => SyncState::Idle,
}
}
/// Returns synchonization status
pub fn status(&self) -> SyncStatus {
let last_imported_number = self.new_blocks.last_imported_block_number();
@@ -521,7 +732,7 @@ impl ChainSync {
}
}
}
self.state = state.unwrap_or_else(|| ChainSync::get_init_state(self.warp_sync, io.chain()));
self.state = state.unwrap_or_else(|| Self::get_init_state(self.warp_sync, io.chain()));
// Reactivate peers only if some progress has been made
// since the last sync round of if starting fresh.
self.active_peers = self.peers.keys().cloned().collect();
@@ -655,37 +866,35 @@ impl ChainSync {
}
/// Resume downloading
fn continue_sync(&mut self, io: &mut SyncIo) {
// Collect active peers that can sync
let confirmed_peers: Vec<(PeerId, u8)> = self.peers.iter().filter_map(|(peer_id, peer)|
if peer.can_sync() {
Some((*peer_id, peer.protocol_version))
} else {
None
}
).collect();
trace!(
target: "sync",
"Syncing with peers: {} active, {} confirmed, {} total",
self.active_peers.len(), confirmed_peers.len(), self.peers.len()
);
pub fn continue_sync(&mut self, io: &mut SyncIo) {
if self.state == SyncState::Waiting {
trace!(target: "sync", "Waiting for the block queue");
} else if self.state == SyncState::SnapshotWaiting {
trace!(target: "sync", "Waiting for the snapshot restoration");
} else {
let mut peers: Vec<(PeerId, u8)> = confirmed_peers.iter().filter(|&&(peer_id, _)|
self.active_peers.contains(&peer_id)
).map(|v| *v).collect();
// Collect active peers that can sync
let mut peers: Vec<(PeerId, u8)> = self.peers.iter().filter_map(|(peer_id, peer)|
if peer.can_sync() && peer.asking == PeerAsking::Nothing && self.active_peers.contains(&peer_id) {
Some((*peer_id, peer.protocol_version))
} else {
None
}
).collect();
random::new().shuffle(&mut peers); //TODO: sort by rating
// prefer peers with higher protocol version
peers.sort_by(|&(_, ref v1), &(_, ref v2)| v1.cmp(v2));
if peers.len() > 0 {
trace!(
target: "sync",
"Syncing with peers: {} active, {} available, {} total",
self.active_peers.len(), peers.len(), self.peers.len()
);
for (peer_id, _) in peers {
self.sync_peer(io, peer_id, false);
random::new().shuffle(&mut peers); // TODO (#646): sort by rating
// prefer peers with higher protocol version
peers.sort_by(|&(_, ref v1), &(_, ref v2)| v1.cmp(v2));
for (peer_id, _) in peers {
self.sync_peer(io, peer_id, false);
}
}
}
@@ -970,6 +1179,12 @@ impl ChainSync {
self.state = SyncState::Blocks;
self.continue_sync(io);
},
SyncState::SnapshotData => match io.snapshot_service().status() {
RestorationStatus::Inactive | RestorationStatus::Failed => {
self.state = SyncState::SnapshotWaiting;
},
RestorationStatus::Initializing { .. } | RestorationStatus::Ongoing { .. } => (),
},
SyncState::SnapshotWaiting => {
match io.snapshot_service().status() {
RestorationStatus::Inactive => {
@@ -998,67 +1213,24 @@ impl ChainSync {
}
}
/// creates rlp to send for the tree defined by 'from' and 'to' hashes
fn create_new_hashes_rlp(chain: &BlockChainClient, from: &H256, to: &H256) -> Option<Bytes> {
match chain.tree_route(from, to) {
Some(route) => {
let uncles = chain.find_uncles(from).unwrap_or_else(Vec::new);
match route.blocks.len() {
0 => None,
_ => {
let mut blocks = route.blocks;
blocks.extend(uncles);
let mut rlp_stream = RlpStream::new_list(blocks.len());
for block_hash in blocks {
let mut hash_rlp = RlpStream::new_list(2);
let number = chain.block_header(BlockId::Hash(block_hash.clone()))
.expect("chain.tree_route and chain.find_uncles only return hahses of blocks that are in the blockchain. qed.").number();
hash_rlp.append(&block_hash);
hash_rlp.append(&number);
rlp_stream.append_raw(hash_rlp.as_raw(), 1);
}
Some(rlp_stream.out())
}
}
},
None => None
}
/// returns peer ids that have different block than our chain
fn get_lagging_peers(&self, chain_info: &BlockChainInfo) -> Vec<PeerId> {
self.get_peers(chain_info, PeerState::Lagging)
}
/// creates rlp from block bytes and total difficulty
fn create_block_rlp(bytes: &Bytes, total_difficulty: U256) -> Bytes {
let mut rlp_stream = RlpStream::new_list(2);
rlp_stream.append_raw(bytes, 1);
rlp_stream.append(&total_difficulty);
rlp_stream.out()
}
/// creates latest block rlp for the given client
fn create_latest_block_rlp(chain: &BlockChainClient) -> Bytes {
ChainSync::create_block_rlp(
&chain.block(BlockId::Hash(chain.chain_info().best_block_hash))
.expect("Best block always exists").into_inner(),
chain.chain_info().total_difficulty
)
}
/// creates given hash block rlp for the given client
fn create_new_block_rlp(chain: &BlockChainClient, hash: &H256) -> Bytes {
ChainSync::create_block_rlp(
&chain.block(BlockId::Hash(hash.clone())).expect("Block has just been sealed; qed").into_inner(),
chain.block_total_difficulty(BlockId::Hash(hash.clone())).expect("Block has just been sealed; qed.")
)
}
/// returns peer ids that have different blocks than our chain
fn get_lagging_peers(&mut self, chain_info: &BlockChainInfo) -> Vec<PeerId> {
/// returns peer ids that have different or the same blocks than our chain
fn get_peers(&self, chain_info: &BlockChainInfo, peers: PeerState) -> Vec<PeerId> {
let latest_hash = chain_info.best_block_hash;
self
.peers
.iter_mut()
.iter()
.filter_map(|(&id, ref mut peer_info)| {
trace!(target: "sync", "Checking peer our best {} their best {}", latest_hash, peer_info.latest_hash);
if peer_info.latest_hash != latest_hash {
let matches = match peers {
PeerState::Lagging => peer_info.latest_hash != latest_hash,
PeerState::SameBlock => peer_info.latest_hash == latest_hash,
};
if matches {
Some(id)
} else {
None
@@ -1067,17 +1239,6 @@ impl ChainSync {
.collect::<Vec<_>>()
}
fn select_random_peers(peers: &[PeerId]) -> Vec<PeerId> {
// take sqrt(x) peers
let mut peers = peers.to_vec();
let mut count = (peers.len() as f64).powf(0.5).round() as usize;
count = cmp::min(count, MAX_PEERS_PROPAGATION);
count = cmp::max(count, MIN_PEERS_PROPAGATION);
random::new().shuffle(&mut peers);
peers.truncate(count);
peers
}
fn get_consensus_peers(&self) -> Vec<PeerId> {
self.peers.iter().filter_map(|(id, p)| if p.protocol_version >= PAR_PROTOCOL_VERSION_2.0 { Some(*id) } else { None }).collect()
}
@@ -1126,21 +1287,10 @@ impl ChainSync {
}
}
/// Dispatch incoming requests and responses
pub fn dispatch_packet(sync: &RwLock<ChainSync>, io: &mut SyncIo, peer: PeerId, packet_id: u8, data: &[u8]) {
SyncSupplier::dispatch_packet(sync, io, peer, packet_id, data)
}
pub fn on_packet(&mut self, io: &mut SyncIo, peer: PeerId, packet_id: u8, data: &[u8]) {
debug!(target: "sync", "{} -> Dispatching packet: {}", peer, packet_id);
SyncHandler::on_packet(self, io, peer, packet_id, data);
}
/// Called when peer sends us new consensus packet
pub fn on_consensus_packet(io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), PacketDecodeError> {
SyncHandler::on_consensus_packet(io, peer_id, r)
}
/// Called by peer when it is disconnecting
pub fn on_peer_aborting(&mut self, io: &mut SyncIo, peer: PeerId) {
SyncHandler::on_peer_aborting(self, io, peer);
@@ -1152,8 +1302,16 @@ impl ChainSync {
}
/// propagates new transactions to all peers
pub fn propagate_new_transactions(&mut self, io: &mut SyncIo) -> usize {
SyncPropagator::propagate_new_transactions(self, io)
pub fn propagate_new_transactions(&mut self, io: &mut SyncIo) {
let deadline = Instant::now() + Duration::from_millis(500);
SyncPropagator::propagate_new_transactions(self, io, || {
if deadline > Instant::now() {
true
} else {
debug!(target: "sync", "Wasn't able to finish transaction propagation within a deadline.");
false
}
});
}
/// Broadcast consensus message to peers.
@@ -1169,7 +1327,7 @@ impl ChainSync {
#[cfg(test)]
pub mod tests {
use std::collections::{HashSet, VecDeque};
use std::collections::{VecDeque};
use ethkey;
use network::PeerId;
use tests::helpers::{TestIo};
@@ -1285,8 +1443,8 @@ pub mod tests {
asking_blocks: Vec::new(),
asking_hash: None,
ask_time: Instant::now(),
last_sent_transactions: HashSet::new(),
last_sent_private_transactions: HashSet::new(),
last_sent_transactions: Default::default(),
last_sent_private_transactions: Default::default(),
expired: false,
confirmation: super::ForkConfirmation::Confirmed,
snapshot_number: None,
@@ -1301,7 +1459,7 @@ pub mod tests {
fn finds_lagging_peers() {
let mut client = TestBlockChainClient::new();
client.add_blocks(100, EachBlockWith::Uncle);
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(10), &client);
let sync = dummy_sync_with_peer(client.block_hash_delta_minus(10), &client);
let chain_info = client.chain_info();
let lagging_peers = sync.get_lagging_peers(&chain_info);
@@ -1441,3 +1599,4 @@ pub mod tests {
assert_eq!(status.status.transaction_count, 0);
}
}

View File

@@ -18,6 +18,7 @@ use bytes::Bytes;
use ethereum_types::H256;
use ethcore::client::BlockChainInfo;
use ethcore::header::BlockNumber;
use fastmap::H256FastSet;
use network::{PeerId, PacketId};
use rand::Rng;
use rlp::{Encodable, RlpStream};
@@ -69,49 +70,51 @@ impl SyncPropagator {
/// propagates latest block to a set of peers
pub fn propagate_blocks(sync: &mut ChainSync, chain_info: &BlockChainInfo, io: &mut SyncIo, blocks: &[H256], peers: &[PeerId]) -> usize {
trace!(target: "sync", "Sending NewBlocks to {:?}", peers);
let mut sent = 0;
for peer_id in peers {
if blocks.is_empty() {
let rlp = ChainSync::create_latest_block_rlp(io.chain());
SyncPropagator::send_packet(io, *peer_id, NEW_BLOCK_PACKET, rlp);
} else {
for h in blocks {
let rlp = ChainSync::create_new_block_rlp(io.chain(), h);
SyncPropagator::send_packet(io, *peer_id, NEW_BLOCK_PACKET, rlp);
let sent = peers.len();
let mut send_packet = |io: &mut SyncIo, rlp: Bytes| {
for peer_id in peers {
SyncPropagator::send_packet(io, *peer_id, NEW_BLOCK_PACKET, rlp.clone());
if let Some(ref mut peer) = sync.peers.get_mut(peer_id) {
peer.latest_hash = chain_info.best_block_hash.clone();
}
}
if let Some(ref mut peer) = sync.peers.get_mut(peer_id) {
peer.latest_hash = chain_info.best_block_hash.clone();
};
if blocks.is_empty() {
let rlp = ChainSync::create_latest_block_rlp(io.chain());
send_packet(io, rlp);
} else {
for h in blocks {
let rlp = ChainSync::create_new_block_rlp(io.chain(), h);
send_packet(io, rlp);
}
sent += 1;
}
sent
}
/// propagates new known hashes to all peers
pub fn propagate_new_hashes(sync: &mut ChainSync, chain_info: &BlockChainInfo, io: &mut SyncIo, peers: &[PeerId]) -> usize {
trace!(target: "sync", "Sending NewHashes to {:?}", peers);
let mut sent = 0;
let last_parent = *io.chain().best_block_header().parent_hash();
let best_block_hash = chain_info.best_block_hash;
let rlp = match ChainSync::create_new_hashes_rlp(io.chain(), &last_parent, &best_block_hash) {
Some(rlp) => rlp,
None => return 0
};
let sent = peers.len();
for peer_id in peers {
sent += match ChainSync::create_new_hashes_rlp(io.chain(), &last_parent, &chain_info.best_block_hash) {
Some(rlp) => {
{
if let Some(ref mut peer) = sync.peers.get_mut(peer_id) {
peer.latest_hash = chain_info.best_block_hash.clone();
}
}
SyncPropagator::send_packet(io, *peer_id, NEW_BLOCK_HASHES_PACKET, rlp);
1
},
None => 0
if let Some(ref mut peer) = sync.peers.get_mut(peer_id) {
peer.latest_hash = best_block_hash;
}
SyncPropagator::send_packet(io, *peer_id, NEW_BLOCK_HASHES_PACKET, rlp.clone());
}
sent
}
/// propagates new transactions to all peers
pub fn propagate_new_transactions(sync: &mut ChainSync, io: &mut SyncIo) -> usize {
pub fn propagate_new_transactions<F: FnMut() -> bool>(sync: &mut ChainSync, io: &mut SyncIo, mut should_continue: F) -> usize {
// Early out if nobody to send to.
if sync.peers.is_empty() {
return 0;
@@ -122,6 +125,10 @@ impl SyncPropagator {
return 0;
}
if !should_continue() {
return 0;
}
let (transactions, service_transactions): (Vec<_>, Vec<_>) = transactions.iter()
.map(|tx| tx.signed())
.partition(|tx| !tx.gas_price.is_zero());
@@ -130,24 +137,34 @@ impl SyncPropagator {
let mut affected_peers = HashSet::new();
if !transactions.is_empty() {
let peers = SyncPropagator::select_peers_for_transactions(sync, |_| true);
affected_peers = SyncPropagator::propagate_transactions_to_peers(sync, io, peers, transactions);
affected_peers = SyncPropagator::propagate_transactions_to_peers(
sync, io, peers, transactions, &mut should_continue,
);
}
// most of times service_transactions will be empty
// => there's no need to merge packets
if !service_transactions.is_empty() {
let service_transactions_peers = SyncPropagator::select_peers_for_transactions(sync, |peer_id| accepts_service_transaction(&io.peer_info(*peer_id)));
let service_transactions_affected_peers = SyncPropagator::propagate_transactions_to_peers(sync, io, service_transactions_peers, service_transactions);
let service_transactions_affected_peers = SyncPropagator::propagate_transactions_to_peers(
sync, io, service_transactions_peers, service_transactions, &mut should_continue
);
affected_peers.extend(&service_transactions_affected_peers);
}
affected_peers.len()
}
fn propagate_transactions_to_peers(sync: &mut ChainSync, io: &mut SyncIo, peers: Vec<PeerId>, transactions: Vec<&SignedTransaction>) -> HashSet<PeerId> {
fn propagate_transactions_to_peers<F: FnMut() -> bool>(
sync: &mut ChainSync,
io: &mut SyncIo,
peers: Vec<PeerId>,
transactions: Vec<&SignedTransaction>,
mut should_continue: F,
) -> HashSet<PeerId> {
let all_transactions_hashes = transactions.iter()
.map(|tx| tx.hash())
.collect::<HashSet<H256>>();
.collect::<H256FastSet>();
let all_transactions_rlp = {
let mut packet = RlpStream::new_list(transactions.len());
for tx in &transactions { packet.append(&**tx); }
@@ -157,102 +174,104 @@ impl SyncPropagator {
// Clear old transactions from stats
sync.transactions_stats.retain(&all_transactions_hashes);
// sqrt(x)/x scaled to max u32
let block_number = io.chain().chain_info().best_block_number;
let lucky_peers = {
peers.into_iter()
.filter_map(|peer_id| {
let stats = &mut sync.transactions_stats;
let peer_info = sync.peers.get_mut(&peer_id)
.expect("peer_id is form peers; peers is result of select_peers_for_transactions; select_peers_for_transactions selects peers from self.peers; qed");
// Send all transactions
if peer_info.last_sent_transactions.is_empty() {
// update stats
for hash in &all_transactions_hashes {
let id = io.peer_session_info(peer_id).and_then(|info| info.id);
stats.propagated(hash, id, block_number);
}
peer_info.last_sent_transactions = all_transactions_hashes.clone();
return Some((peer_id, all_transactions_hashes.len(), all_transactions_rlp.clone()));
}
// Get hashes of all transactions to send to this peer
let to_send = all_transactions_hashes.difference(&peer_info.last_sent_transactions)
.cloned()
.collect::<HashSet<_>>();
if to_send.is_empty() {
return None;
}
// Construct RLP
let (packet, to_send) = {
let mut to_send = to_send;
let mut packet = RlpStream::new();
packet.begin_unbounded_list();
let mut pushed = 0;
for tx in &transactions {
let hash = tx.hash();
if to_send.contains(&hash) {
let mut transaction = RlpStream::new();
tx.rlp_append(&mut transaction);
let appended = packet.append_raw_checked(&transaction.drain(), 1, MAX_TRANSACTION_PACKET_SIZE);
if !appended {
// Maximal packet size reached just proceed with sending
debug!(target: "sync", "Transaction packet size limit reached. Sending incomplete set of {}/{} transactions.", pushed, to_send.len());
to_send = to_send.into_iter().take(pushed).collect();
break;
}
pushed += 1;
}
}
packet.complete_unbounded_list();
(packet, to_send)
};
// Update stats
let id = io.peer_session_info(peer_id).and_then(|info| info.id);
for hash in &to_send {
// update stats
stats.propagated(hash, id, block_number);
}
peer_info.last_sent_transactions = all_transactions_hashes
.intersection(&peer_info.last_sent_transactions)
.chain(&to_send)
.cloned()
.collect();
Some((peer_id, to_send.len(), packet.out()))
})
.collect::<Vec<_>>()
let send_packet = |io: &mut SyncIo, peer_id: PeerId, sent: usize, rlp: Bytes| {
let size = rlp.len();
SyncPropagator::send_packet(io, peer_id, TRANSACTIONS_PACKET, rlp);
trace!(target: "sync", "{:02} <- Transactions ({} entries; {} bytes)", peer_id, sent, size);
};
// Send RLPs
let mut peers = HashSet::new();
if lucky_peers.len() > 0 {
let mut max_sent = 0;
let lucky_peers_len = lucky_peers.len();
for (peer_id, sent, rlp) in lucky_peers {
peers.insert(peer_id);
let size = rlp.len();
SyncPropagator::send_packet(io, peer_id, TRANSACTIONS_PACKET, rlp);
trace!(target: "sync", "{:02} <- Transactions ({} entries; {} bytes)", peer_id, sent, size);
max_sent = cmp::max(max_sent, sent);
let block_number = io.chain().chain_info().best_block_number;
let mut sent_to_peers = HashSet::new();
let mut max_sent = 0;
// for every peer construct and send transactions packet
for peer_id in peers {
if !should_continue() {
debug!(target: "sync", "Sent up to {} transactions to {} peers.", max_sent, sent_to_peers.len());
return sent_to_peers;
}
debug!(target: "sync", "Sent up to {} transactions to {} peers.", max_sent, lucky_peers_len);
let stats = &mut sync.transactions_stats;
let peer_info = sync.peers.get_mut(&peer_id)
.expect("peer_id is form peers; peers is result of select_peers_for_transactions; select_peers_for_transactions selects peers from self.peers; qed");
// Send all transactions, if the peer doesn't know about anything
if peer_info.last_sent_transactions.is_empty() {
// update stats
for hash in &all_transactions_hashes {
let id = io.peer_session_info(peer_id).and_then(|info| info.id);
stats.propagated(hash, id, block_number);
}
peer_info.last_sent_transactions = all_transactions_hashes.clone();
send_packet(io, peer_id, all_transactions_hashes.len(), all_transactions_rlp.clone());
sent_to_peers.insert(peer_id);
max_sent = cmp::max(max_sent, all_transactions_hashes.len());
continue;
}
// Get hashes of all transactions to send to this peer
let to_send = all_transactions_hashes.difference(&peer_info.last_sent_transactions)
.cloned()
.collect::<HashSet<_>>();
if to_send.is_empty() {
continue;
}
// Construct RLP
let (packet, to_send) = {
let mut to_send = to_send;
let mut packet = RlpStream::new();
packet.begin_unbounded_list();
let mut pushed = 0;
for tx in &transactions {
let hash = tx.hash();
if to_send.contains(&hash) {
let mut transaction = RlpStream::new();
tx.rlp_append(&mut transaction);
let appended = packet.append_raw_checked(&transaction.drain(), 1, MAX_TRANSACTION_PACKET_SIZE);
if !appended {
// Maximal packet size reached just proceed with sending
debug!(target: "sync", "Transaction packet size limit reached. Sending incomplete set of {}/{} transactions.", pushed, to_send.len());
to_send = to_send.into_iter().take(pushed).collect();
break;
}
pushed += 1;
}
}
packet.complete_unbounded_list();
(packet, to_send)
};
// Update stats
let id = io.peer_session_info(peer_id).and_then(|info| info.id);
for hash in &to_send {
// update stats
stats.propagated(hash, id, block_number);
}
peer_info.last_sent_transactions = all_transactions_hashes
.intersection(&peer_info.last_sent_transactions)
.chain(&to_send)
.cloned()
.collect();
send_packet(io, peer_id, to_send.len(), packet.out());
sent_to_peers.insert(peer_id);
max_sent = cmp::max(max_sent, to_send.len());
}
peers
debug!(target: "sync", "Sent up to {} transactions to {} peers.", max_sent, sent_to_peers.len());
sent_to_peers
}
pub fn propagate_latest_blocks(sync: &mut ChainSync, io: &mut SyncIo, sealed: &[H256]) {
let chain_info = io.chain().chain_info();
if (((chain_info.best_block_number as i64) - (sync.last_sent_block_number as i64)).abs() as BlockNumber) < MAX_PEER_LAG_PROPAGATION {
let mut peers = sync.get_lagging_peers(&chain_info);
let peers = sync.get_lagging_peers(&chain_info);
if sealed.is_empty() {
let hashes = SyncPropagator::propagate_new_hashes(sync, &chain_info, io, &peers);
peers = ChainSync::select_random_peers(&peers);
let peers = ChainSync::select_random_peers(&peers);
let blocks = SyncPropagator::propagate_blocks(sync, &chain_info, io, sealed, &peers);
if blocks != 0 || hashes != 0 {
trace!(target: "sync", "Sent latest {} blocks and {} hashes to peers.", blocks, hashes);
@@ -318,7 +337,7 @@ impl SyncPropagator {
}
/// Generic packet sender
fn send_packet(sync: &mut SyncIo, peer_id: PeerId, packet_id: PacketId, packet: Bytes) {
pub fn send_packet(sync: &mut SyncIo, peer_id: PeerId, packet_id: PacketId, packet: Bytes) {
if let Err(e) = sync.send(peer_id, packet_id, packet) {
debug!(target:"sync", "Error sending packet: {:?}", e);
sync.disconnect_peer(peer_id);
@@ -419,8 +438,8 @@ mod tests {
asking_blocks: Vec::new(),
asking_hash: None,
ask_time: Instant::now(),
last_sent_transactions: HashSet::new(),
last_sent_private_transactions: HashSet::new(),
last_sent_transactions: Default::default(),
last_sent_private_transactions: Default::default(),
expired: false,
confirmation: ForkConfirmation::Confirmed,
snapshot_number: None,
@@ -447,13 +466,13 @@ mod tests {
let queue = RwLock::new(VecDeque::new());
let ss = TestSnapshotService::new();
let mut io = TestIo::new(&mut client, &ss, &queue, None);
let peer_count = SyncPropagator::propagate_new_transactions(&mut sync, &mut io);
let peer_count = SyncPropagator::propagate_new_transactions(&mut sync, &mut io, || true);
// Try to propagate same transactions for the second time
let peer_count2 = SyncPropagator::propagate_new_transactions(&mut sync, &mut io);
let peer_count2 = SyncPropagator::propagate_new_transactions(&mut sync, &mut io, || true);
// Even after new block transactions should not be propagated twice
sync.chain_new_blocks(&mut io, &[], &[], &[], &[], &[], &[]);
// Try to propagate same transactions for the third time
let peer_count3 = SyncPropagator::propagate_new_transactions(&mut sync, &mut io);
let peer_count3 = SyncPropagator::propagate_new_transactions(&mut sync, &mut io, || true);
// 1 message should be send
assert_eq!(1, io.packets.len());
@@ -474,7 +493,7 @@ mod tests {
let queue = RwLock::new(VecDeque::new());
let ss = TestSnapshotService::new();
let mut io = TestIo::new(&mut client, &ss, &queue, None);
let peer_count = SyncPropagator::propagate_new_transactions(&mut sync, &mut io);
let peer_count = SyncPropagator::propagate_new_transactions(&mut sync, &mut io, || true);
io.chain.insert_transaction_to_queue();
// New block import should not trigger propagation.
// (we only propagate on timeout)
@@ -498,10 +517,10 @@ mod tests {
let queue = RwLock::new(VecDeque::new());
let ss = TestSnapshotService::new();
let mut io = TestIo::new(&mut client, &ss, &queue, None);
let peer_count = SyncPropagator::propagate_new_transactions(&mut sync, &mut io);
let peer_count = SyncPropagator::propagate_new_transactions(&mut sync, &mut io, || true);
sync.chain_new_blocks(&mut io, &[], &[], &[], &[], &[], &[]);
// Try to propagate same transactions for the second time
let peer_count2 = SyncPropagator::propagate_new_transactions(&mut sync, &mut io);
let peer_count2 = SyncPropagator::propagate_new_transactions(&mut sync, &mut io, || true);
assert_eq!(0, io.packets.len());
assert_eq!(0, peer_count);
@@ -519,7 +538,7 @@ mod tests {
// should sent some
{
let mut io = TestIo::new(&mut client, &ss, &queue, None);
let peer_count = SyncPropagator::propagate_new_transactions(&mut sync, &mut io);
let peer_count = SyncPropagator::propagate_new_transactions(&mut sync, &mut io, || true);
assert_eq!(1, io.packets.len());
assert_eq!(1, peer_count);
}
@@ -528,9 +547,9 @@ mod tests {
let (peer_count2, peer_count3) = {
let mut io = TestIo::new(&mut client, &ss, &queue, None);
// Propagate new transactions
let peer_count2 = SyncPropagator::propagate_new_transactions(&mut sync, &mut io);
let peer_count2 = SyncPropagator::propagate_new_transactions(&mut sync, &mut io, || true);
// And now the peer should have all transactions
let peer_count3 = SyncPropagator::propagate_new_transactions(&mut sync, &mut io);
let peer_count3 = SyncPropagator::propagate_new_transactions(&mut sync, &mut io, || true);
(peer_count2, peer_count3)
};
@@ -553,7 +572,7 @@ mod tests {
let queue = RwLock::new(VecDeque::new());
let ss = TestSnapshotService::new();
let mut io = TestIo::new(&mut client, &ss, &queue, None);
SyncPropagator::propagate_new_transactions(&mut sync, &mut io);
SyncPropagator::propagate_new_transactions(&mut sync, &mut io, || true);
let stats = sync.transactions_stats();
assert_eq!(stats.len(), 1, "Should maintain stats for single transaction.")
@@ -583,7 +602,7 @@ mod tests {
io.peers_info.insert(4, "Parity-Ethereum/v2.7.3-ABCDEFGH".to_owned());
// and new service transaction is propagated to peers
SyncPropagator::propagate_new_transactions(&mut sync, &mut io);
SyncPropagator::propagate_new_transactions(&mut sync, &mut io, || true);
// peer#2 && peer#4 are receiving service transaction
assert!(io.packets.iter().any(|p| p.packet_id == 0x02 && p.recipient == 2)); // TRANSACTIONS_PACKET
@@ -607,7 +626,7 @@ mod tests {
io.peers_info.insert(1, "Parity-Ethereum/v2.6".to_owned());
// and service + non-service transactions are propagated to peers
SyncPropagator::propagate_new_transactions(&mut sync, &mut io);
SyncPropagator::propagate_new_transactions(&mut sync, &mut io, || true);
// two separate packets for peer are queued:
// 1) with non-service-transaction

View File

@@ -27,6 +27,7 @@ use sync_io::SyncIo;
use super::{
ChainSync,
SyncHandler,
RlpResponseResult,
PacketDecodeError,
BLOCK_BODIES_PACKET,
@@ -47,6 +48,8 @@ use super::{
RECEIPTS_PACKET,
SNAPSHOT_DATA_PACKET,
SNAPSHOT_MANIFEST_PACKET,
STATUS_PACKET,
TRANSACTIONS_PACKET,
};
/// The Chain Sync Supplier: answers requests from peers with available data
@@ -56,6 +59,7 @@ impl SyncSupplier {
/// Dispatch incoming requests and responses
pub fn dispatch_packet(sync: &RwLock<ChainSync>, io: &mut SyncIo, peer: PeerId, packet_id: u8, data: &[u8]) {
let rlp = Rlp::new(data);
let result = match packet_id {
GET_BLOCK_BODIES_PACKET => SyncSupplier::return_rlp(io, &rlp, peer,
SyncSupplier::return_block_bodies,
@@ -80,9 +84,39 @@ impl SyncSupplier {
GET_SNAPSHOT_DATA_PACKET => SyncSupplier::return_rlp(io, &rlp, peer,
SyncSupplier::return_snapshot_data,
|e| format!("Error sending snapshot data: {:?}", e)),
CONSENSUS_DATA_PACKET => ChainSync::on_consensus_packet(io, peer, &rlp),
_ => {
STATUS_PACKET => {
sync.write().on_packet(io, peer, packet_id, data);
Ok(())
},
// Packets that require the peer to be confirmed
_ => {
if !sync.read().peers.contains_key(&peer) {
debug!(target:"sync", "Unexpected packet {} from unregistered peer: {}:{}", packet_id, peer, io.peer_info(peer));
return;
}
debug!(target: "sync", "{} -> Dispatching packet: {}", peer, packet_id);
match packet_id {
CONSENSUS_DATA_PACKET => {
SyncHandler::on_consensus_packet(io, peer, &rlp)
},
TRANSACTIONS_PACKET => {
let res = {
let sync_ro = sync.read();
SyncHandler::on_peer_transactions(&*sync_ro, io, peer, &rlp)
};
if res.is_err() {
// peer sent invalid data, disconnect.
io.disable_peer(peer);
sync.write().deactivate_peer(io, peer);
}
},
_ => {
sync.write().on_packet(io, peer, packet_id, data);
}
}
Ok(())
}
};
@@ -226,7 +260,8 @@ impl SyncSupplier {
let mut added_receipts = 0usize;
let mut data = Bytes::new();
for i in 0..count {
if let Some(mut receipts_bytes) = io.chain().encoded_block_receipts(&rlp.val_at::<H256>(i)?) {
if let Some(receipts) = io.chain().block_receipts(&rlp.val_at::<H256>(i)?) {
let mut receipts_bytes = ::rlp::encode(&receipts);
data.append(&mut receipts_bytes);
added_receipts += receipts_bytes.len();
added_headers += 1;
@@ -403,7 +438,7 @@ mod test {
io.sender = Some(2usize);
ChainSync::dispatch_packet(&RwLock::new(sync), &mut io, 0usize, GET_NODE_DATA_PACKET, &node_request);
SyncSupplier::dispatch_packet(&RwLock::new(sync), &mut io, 0usize, GET_NODE_DATA_PACKET, &node_request);
assert_eq!(1, io.packets.len());
}
@@ -445,7 +480,7 @@ mod test {
assert_eq!(603, rlp_result.unwrap().1.out().len());
io.sender = Some(2usize);
ChainSync::dispatch_packet(&RwLock::new(sync), &mut io, 0usize, GET_RECEIPTS_PACKET, &receipts_request);
SyncSupplier::dispatch_packet(&RwLock::new(sync), &mut io, 0usize, GET_RECEIPTS_PACKET, &receipts_request);
assert_eq!(1, io.packets.len());
}
}

View File

@@ -34,6 +34,7 @@
use std::collections::{HashMap, HashSet};
use std::mem;
use std::ops::Deref;
use std::sync::Arc;
use std::time::{Instant, Duration};
@@ -213,6 +214,44 @@ enum SyncState {
Rounds(SyncRound),
}
/// A wrapper around the SyncState that makes sure to
/// update the giving reference to `is_idle`
#[derive(Debug)]
struct SyncStateWrapper {
state: SyncState,
}
impl SyncStateWrapper {
/// Create a new wrapper for SyncState::Idle
pub fn idle() -> Self {
SyncStateWrapper {
state: SyncState::Idle,
}
}
/// Set the new state's value, making sure `is_idle` gets updated
pub fn set(&mut self, state: SyncState, is_idle_handle: &mut bool) {
*is_idle_handle = match state {
SyncState::Idle => true,
_ => false,
};
self.state = state;
}
/// Returns the internal state's value
pub fn into_inner(self) -> SyncState {
self.state
}
}
impl Deref for SyncStateWrapper {
type Target = SyncState;
fn deref(&self) -> &SyncState {
&self.state
}
}
struct ResponseCtx<'a> {
peer: PeerId,
req_id: ReqId,
@@ -235,7 +274,9 @@ pub struct LightSync<L: AsLightClient> {
pending_reqs: Mutex<HashMap<ReqId, PendingReq>>, // requests from this handler
client: Arc<L>,
rng: Mutex<OsRng>,
state: Mutex<SyncState>,
state: Mutex<SyncStateWrapper>,
// We duplicate this state tracking to avoid deadlocks in `is_major_importing`.
is_idle: Mutex<bool>,
}
#[derive(Debug, Clone)]
@@ -309,16 +350,17 @@ impl<L: AsLightClient + Send + Sync> Handler for LightSync<L> {
if new_best.is_none() {
debug!(target: "sync", "No peers remain. Reverting to idle");
*self.state.lock() = SyncState::Idle;
self.set_state(&mut self.state.lock(), SyncState::Idle);
} else {
let mut state = self.state.lock();
*state = match mem::replace(&mut *state, SyncState::Idle) {
let next_state = match mem::replace(&mut *state, SyncStateWrapper::idle()).into_inner() {
SyncState::Idle => SyncState::Idle,
SyncState::AncestorSearch(search) =>
SyncState::AncestorSearch(search.requests_abandoned(unfulfilled)),
SyncState::Rounds(round) => SyncState::Rounds(round.requests_abandoned(unfulfilled)),
};
self.set_state(&mut state, next_state);
}
self.maintain_sync(ctx.as_basic());
@@ -390,12 +432,13 @@ impl<L: AsLightClient + Send + Sync> Handler for LightSync<L> {
data: headers,
};
*state = match mem::replace(&mut *state, SyncState::Idle) {
let next_state = match mem::replace(&mut *state, SyncStateWrapper::idle()).into_inner() {
SyncState::Idle => SyncState::Idle,
SyncState::AncestorSearch(search) =>
SyncState::AncestorSearch(search.process_response(&ctx, &*self.client)),
SyncState::Rounds(round) => SyncState::Rounds(round.process_response(&ctx)),
};
self.set_state(&mut state, next_state);
}
self.maintain_sync(ctx.as_basic());
@@ -408,12 +451,18 @@ impl<L: AsLightClient + Send + Sync> Handler for LightSync<L> {
// private helpers
impl<L: AsLightClient> LightSync<L> {
/// Sets the LightSync's state, and update
/// `is_idle`
fn set_state(&self, state: &mut SyncStateWrapper, next_state: SyncState) {
state.set(next_state, &mut self.is_idle.lock());
}
// Begins a search for the common ancestor and our best block.
// does not lock state, instead has a mutable reference to it passed.
fn begin_search(&self, state: &mut SyncState) {
fn begin_search(&self, state: &mut SyncStateWrapper) {
if let None = *self.best_seen.lock() {
// no peers.
*state = SyncState::Idle;
self.set_state(state, SyncState::Idle);
return;
}
@@ -422,7 +471,8 @@ impl<L: AsLightClient> LightSync<L> {
trace!(target: "sync", "Beginning search for common ancestor from {:?}",
(chain_info.best_block_number, chain_info.best_block_hash));
*state = SyncState::AncestorSearch(AncestorSearch::begin(chain_info.best_block_number));
let next_state = SyncState::AncestorSearch(AncestorSearch::begin(chain_info.best_block_number));
self.set_state(state, next_state);
}
// handles request dispatch, block import, state machine transitions, and timeouts.
@@ -435,7 +485,7 @@ impl<L: AsLightClient> LightSync<L> {
let chain_info = client.chain_info();
let mut state = self.state.lock();
debug!(target: "sync", "Maintaining sync ({:?})", &*state);
debug!(target: "sync", "Maintaining sync ({:?})", **state);
// drain any pending blocks into the queue.
{
@@ -445,11 +495,12 @@ impl<L: AsLightClient> LightSync<L> {
loop {
if client.queue_info().is_full() { break }
*state = match mem::replace(&mut *state, SyncState::Idle) {
let next_state = match mem::replace(&mut *state, SyncStateWrapper::idle()).into_inner() {
SyncState::Rounds(round)
=> SyncState::Rounds(round.drain(&mut sink, Some(DRAIN_AMOUNT))),
other => other,
};
self.set_state(&mut state, next_state);
if sink.is_empty() { break }
trace!(target: "sync", "Drained {} headers to import", sink.len());
@@ -483,15 +534,15 @@ impl<L: AsLightClient> LightSync<L> {
let network_score = other.as_ref().map(|target| target.head_td);
trace!(target: "sync", "No target to sync to. Network score: {:?}, Local score: {:?}",
network_score, best_td);
*state = SyncState::Idle;
self.set_state(&mut state, SyncState::Idle);
return;
}
};
match mem::replace(&mut *state, SyncState::Idle) {
match mem::replace(&mut *state, SyncStateWrapper::idle()).into_inner() {
SyncState::Rounds(SyncRound::Abort(reason, remaining)) => {
if remaining.len() > 0 {
*state = SyncState::Rounds(SyncRound::Abort(reason, remaining));
self.set_state(&mut state, SyncState::Rounds(SyncRound::Abort(reason, remaining)));
return;
}
@@ -505,7 +556,7 @@ impl<L: AsLightClient> LightSync<L> {
AbortReason::NoResponses => {}
AbortReason::TargetReached => {
debug!(target: "sync", "Sync target reached. Going idle");
*state = SyncState::Idle;
self.set_state(&mut state, SyncState::Idle);
return;
}
}
@@ -514,15 +565,15 @@ impl<L: AsLightClient> LightSync<L> {
self.begin_search(&mut state);
}
SyncState::AncestorSearch(AncestorSearch::FoundCommon(num, hash)) => {
*state = SyncState::Rounds(SyncRound::begin((num, hash), sync_target));
self.set_state(&mut state, SyncState::Rounds(SyncRound::begin((num, hash), sync_target)));
}
SyncState::AncestorSearch(AncestorSearch::Genesis) => {
// Same here.
let g_hash = chain_info.genesis_hash;
*state = SyncState::Rounds(SyncRound::begin((0, g_hash), sync_target));
self.set_state(&mut state, SyncState::Rounds(SyncRound::begin((0, g_hash), sync_target)));
}
SyncState::Idle => self.begin_search(&mut state),
other => *state = other, // restore displaced state.
other => self.set_state(&mut state, other), // restore displaced state.
}
}
@@ -543,12 +594,13 @@ impl<L: AsLightClient> LightSync<L> {
}
drop(pending_reqs);
*state = match mem::replace(&mut *state, SyncState::Idle) {
let next_state = match mem::replace(&mut *state, SyncStateWrapper::idle()).into_inner() {
SyncState::Idle => SyncState::Idle,
SyncState::AncestorSearch(search) =>
SyncState::AncestorSearch(search.requests_abandoned(&unfulfilled)),
SyncState::Rounds(round) => SyncState::Rounds(round.requests_abandoned(&unfulfilled)),
};
self.set_state(&mut state, next_state);
}
}
@@ -605,34 +657,14 @@ impl<L: AsLightClient> LightSync<L> {
None
};
*state = match mem::replace(&mut *state, SyncState::Idle) {
let next_state = match mem::replace(&mut *state, SyncStateWrapper::idle()).into_inner() {
SyncState::Rounds(round) =>
SyncState::Rounds(round.dispatch_requests(dispatcher)),
SyncState::AncestorSearch(search) =>
SyncState::AncestorSearch(search.dispatch_request(dispatcher)),
other => other,
};
}
}
fn is_major_importing_do_wait(&self, wait: bool) -> bool {
const EMPTY_QUEUE: usize = 3;
if self.client.as_light_client().queue_info().unverified_queue_size > EMPTY_QUEUE {
return true;
}
let mg_state = if wait {
self.state.lock()
} else {
if let Some(mg_state) = self.state.try_lock() {
mg_state
} else {
return false;
}
};
match *mg_state {
SyncState::Idle => false,
_ => true,
self.set_state(&mut state, next_state);
}
}
}
@@ -651,7 +683,8 @@ impl<L: AsLightClient> LightSync<L> {
pending_reqs: Mutex::new(HashMap::new()),
client: client,
rng: Mutex::new(OsRng::new()?),
state: Mutex::new(SyncState::Idle),
state: Mutex::new(SyncStateWrapper::idle()),
is_idle: Mutex::new(true),
})
}
}
@@ -666,9 +699,6 @@ pub trait SyncInfo {
/// Whether major sync is underway.
fn is_major_importing(&self) -> bool;
/// Whether major sync is underway, skipping some synchronization.
fn is_major_importing_no_sync(&self) -> bool;
}
impl<L: AsLightClient> SyncInfo for LightSync<L> {
@@ -681,11 +711,13 @@ impl<L: AsLightClient> SyncInfo for LightSync<L> {
}
fn is_major_importing(&self) -> bool {
self.is_major_importing_do_wait(true)
}
const EMPTY_QUEUE: usize = 3;
fn is_major_importing_no_sync(&self) -> bool {
self.is_major_importing_do_wait(false)
let queue_info = self.client.as_light_client().queue_info();
let is_verifying = queue_info.unverified_queue_size + queue_info.verified_queue_size > EMPTY_QUEUE;
let is_syncing = !*self.is_idle.lock();
is_verifying || is_syncing
}
}

View File

@@ -52,7 +52,7 @@ pub trait SyncIo {
fn protocol_version(&self, protocol: &ProtocolId, peer_id: PeerId) -> u8;
/// Returns if the chain block queue empty
fn is_chain_queue_empty(&self) -> bool {
self.chain().queue_info().is_empty()
self.chain().is_queue_empty()
}
/// Check if the session is expired
fn is_expired(&self) -> bool;

View File

@@ -33,7 +33,7 @@ use ethcore::test_helpers;
use sync_io::SyncIo;
use io::{IoChannel, IoContext, IoHandler};
use api::WARP_SYNC_PROTOCOL_ID;
use chain::{ChainSync, ETH_PROTOCOL_VERSION_63, PAR_PROTOCOL_VERSION_3, PRIVATE_TRANSACTION_PACKET, SIGNED_PRIVATE_TRANSACTION_PACKET};
use chain::{ChainSync, ETH_PROTOCOL_VERSION_63, PAR_PROTOCOL_VERSION_3, PRIVATE_TRANSACTION_PACKET, SIGNED_PRIVATE_TRANSACTION_PACKET, SyncSupplier};
use SyncConfig;
use private_tx::SimplePrivateTxHandler;
@@ -271,7 +271,7 @@ impl<C: FlushingBlockChainClient> Peer for EthPeer<C> {
fn receive_message(&self, from: PeerId, msg: TestPacket) -> HashSet<PeerId> {
let mut io = TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, Some(from));
ChainSync::dispatch_packet(&self.sync, &mut io, from, msg.packet_id, &msg.data);
SyncSupplier::dispatch_packet(&self.sync, &mut io, from, msg.packet_id, &msg.data);
self.chain.flush();
io.to_disconnect.clone()
}
@@ -286,10 +286,12 @@ impl<C: FlushingBlockChainClient> Peer for EthPeer<C> {
}
fn sync_step(&self) {
let mut io = TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, None);
self.chain.flush();
self.sync.write().maintain_peers(&mut TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, None));
self.sync.write().maintain_sync(&mut TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, None));
self.sync.write().propagate_new_transactions(&mut TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, None));
self.sync.write().maintain_peers(&mut io);
self.sync.write().maintain_sync(&mut io);
self.sync.write().continue_sync(&mut io);
self.sync.write().propagate_new_transactions(&mut io);
}
fn restart_sync(&self) {

View File

@@ -15,6 +15,7 @@
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use api::TransactionStats;
use std::hash::BuildHasher;
use std::collections::{HashSet, HashMap};
use ethereum_types::{H256, H512};
use fastmap::H256FastMap;
@@ -74,7 +75,7 @@ impl TransactionsStats {
}
/// Retains only transactions present in given `HashSet`.
pub fn retain(&mut self, hashes: &HashSet<H256>) {
pub fn retain<S: BuildHasher>(&mut self, hashes: &HashSet<H256, S>) {
let to_remove = self.pending_transactions.keys()
.filter(|hash| !hashes.contains(hash))
.cloned()