Backport: Block sync stopped without any errors. #277 (#286)

* Backport: Block sync stopped without any errors. #277
* fmt
This commit is contained in:
rakita 2021-03-09 11:20:32 +01:00 committed by GitHub
parent 7ea5707904
commit 5b904476cd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 76 additions and 50 deletions

View File

@ -108,14 +108,17 @@ use ethereum_types::{H256, U256};
use fastmap::{H256FastMap, H256FastSet}; use fastmap::{H256FastMap, H256FastSet};
use hash::keccak; use hash::keccak;
use network::{self, client_version::ClientVersion, PeerId}; use network::{self, client_version::ClientVersion, PeerId};
use parking_lot::{Mutex, RwLock, RwLockWriteGuard}; use parking_lot::Mutex;
use rand::Rng; use rand::Rng;
use rlp::{DecoderError, RlpStream}; use rlp::{DecoderError, RlpStream};
use snapshot::Snapshot; use snapshot::Snapshot;
use std::{ use std::{
cmp, cmp,
collections::{BTreeMap, HashMap, HashSet}, collections::{BTreeMap, HashMap, HashSet},
sync::mpsc, sync::{
atomic::{AtomicBool, Ordering},
mpsc, RwLock as StdRwLock, RwLockWriteGuard as StdRwLockWriteGuard,
},
time::{Duration, Instant}, time::{Duration, Instant},
}; };
use sync_io::SyncIo; use sync_io::SyncIo;
@ -398,8 +401,10 @@ pub type Peers = HashMap<PeerId, PeerInfo>;
pub struct ChainSyncApi { pub struct ChainSyncApi {
/// Priority tasks queue /// Priority tasks queue
priority_tasks: Mutex<mpsc::Receiver<PriorityTask>>, priority_tasks: Mutex<mpsc::Receiver<PriorityTask>>,
/// Gate for executing only one priority timer.
priority_tasks_gate: AtomicBool,
/// The rest of sync data /// The rest of sync data
sync: RwLock<ChainSync>, sync: StdRwLock<ChainSync>,
} }
impl ChainSyncApi { impl ChainSyncApi {
@ -411,31 +416,33 @@ impl ChainSyncApi {
priority_tasks: mpsc::Receiver<PriorityTask>, priority_tasks: mpsc::Receiver<PriorityTask>,
) -> Self { ) -> Self {
ChainSyncApi { ChainSyncApi {
sync: RwLock::new(ChainSync::new(config, chain, fork_filter)), sync: StdRwLock::new(ChainSync::new(config, chain, fork_filter)),
priority_tasks: Mutex::new(priority_tasks), priority_tasks: Mutex::new(priority_tasks),
priority_tasks_gate: AtomicBool::new(false),
} }
} }
/// Gives `write` access to underlying `ChainSync` /// Gives `write` access to underlying `ChainSync`
pub fn write(&self) -> RwLockWriteGuard<ChainSync> { pub fn write(&self) -> StdRwLockWriteGuard<ChainSync> {
self.sync.write() self.sync.write().unwrap()
} }
/// Returns info about given list of peers /// Returns info about given list of peers
pub fn peer_info(&self, ids: &[PeerId]) -> Vec<Option<PeerInfoDigest>> { pub fn peer_info(&self, ids: &[PeerId]) -> Vec<Option<PeerInfoDigest>> {
let sync = self.sync.read(); let sync = self.sync.read().unwrap();
ids.iter().map(|id| sync.peer_info(id)).collect() ids.iter().map(|id| sync.peer_info(id)).collect()
} }
/// Returns synchonization status /// Returns synchonization status
pub fn status(&self) -> SyncStatus { pub fn status(&self) -> SyncStatus {
self.sync.read().status() self.sync.read().unwrap().status()
} }
/// Returns transactions propagation statistics /// Returns transactions propagation statistics
pub fn transactions_stats(&self) -> BTreeMap<H256, ::TransactionStats> { pub fn transactions_stats(&self) -> BTreeMap<H256, ::TransactionStats> {
self.sync self.sync
.read() .read()
.unwrap()
.transactions_stats() .transactions_stats()
.iter() .iter()
.map(|(hash, stats)| (*hash, stats.into())) .map(|(hash, stats)| (*hash, stats.into()))
@ -449,7 +456,7 @@ impl ChainSyncApi {
/// Process the queue with requests, that were delayed with response. /// Process the queue with requests, that were delayed with response.
pub fn process_delayed_requests(&self, io: &mut dyn SyncIo) { pub fn process_delayed_requests(&self, io: &mut dyn SyncIo) {
let requests = self.sync.write().retrieve_delayed_requests(); let requests = self.sync.write().unwrap().retrieve_delayed_requests();
if !requests.is_empty() { if !requests.is_empty() {
debug!(target: "sync", "Processing {} delayed requests", requests.len()); debug!(target: "sync", "Processing {} delayed requests", requests.len());
for (peer_id, packet_id, packet_data) in requests { for (peer_id, packet_id, packet_data) in requests {
@ -480,6 +487,13 @@ impl ChainSyncApi {
} }
} }
if self
.priority_tasks_gate
.compare_and_swap(false, true, Ordering::AcqRel)
{
return;
}
// deadline to get the task from the queue // deadline to get the task from the queue
let deadline = Instant::now() + ::api::PRIORITY_TIMER_INTERVAL; let deadline = Instant::now() + ::api::PRIORITY_TIMER_INTERVAL;
let mut work = || { let mut work = || {
@ -489,9 +503,8 @@ impl ChainSyncApi {
tasks.recv_timeout(left).ok()? tasks.recv_timeout(left).ok()?
}; };
task.starting(); task.starting();
// wait for the sync lock until deadline, // wait for the sync lock
// note we might drop the task here if we won't manage to acquire the lock. let mut sync = self.sync.write().unwrap();
let mut sync = self.sync.try_write_until(deadline)?;
// since we already have everything let's use a different deadline // since we already have everything let's use a different deadline
// to do the rest of the job now, so that previous work is not wasted. // to do the rest of the job now, so that previous work is not wasted.
let deadline = Instant::now() + PRIORITY_TASK_DEADLINE; let deadline = Instant::now() + PRIORITY_TASK_DEADLINE;
@ -538,6 +551,7 @@ impl ChainSyncApi {
// Process as many items as we can until the deadline is reached. // Process as many items as we can until the deadline is reached.
loop { loop {
if work().is_none() { if work().is_none() {
self.priority_tasks_gate.store(false, Ordering::Release);
return; return;
} }
} }

View File

@ -24,9 +24,8 @@ pub const PAYLOAD_SOFT_LIMIT: usize = 100_000;
use enum_primitive::FromPrimitive; use enum_primitive::FromPrimitive;
use ethereum_types::H256; use ethereum_types::H256;
use network::{self, PeerId}; use network::{self, PeerId};
use parking_lot::RwLock;
use rlp::{Rlp, RlpStream}; use rlp::{Rlp, RlpStream};
use std::cmp; use std::{cmp, sync::RwLock as StdRwLock};
use types::{ids::BlockId, BlockNumber}; use types::{ids::BlockId, BlockNumber};
use sync_io::SyncIo; use sync_io::SyncIo;
@ -54,7 +53,7 @@ impl SyncSupplier {
// Take a u8 and not a SyncPacketId because this is the entry point // Take a u8 and not a SyncPacketId because this is the entry point
// to chain sync from the outside world. // to chain sync from the outside world.
pub fn dispatch_packet( pub fn dispatch_packet(
sync: &RwLock<ChainSync>, sync: &StdRwLock<ChainSync>,
io: &mut dyn SyncIo, io: &mut dyn SyncIo,
peer: PeerId, peer: PeerId,
packet_id: u8, packet_id: u8,
@ -102,12 +101,12 @@ impl SyncSupplier {
), ),
StatusPacket => { StatusPacket => {
sync.write().on_packet(io, peer, packet_id, data); sync.write().unwrap().on_packet(io, peer, packet_id, data);
Ok(()) Ok(())
} }
// Packets that require the peer to be confirmed // Packets that require the peer to be confirmed
_ => { _ => {
if !sync.read().peers.contains_key(&peer) { if !sync.read().unwrap().peers.contains_key(&peer) {
debug!(target:"sync", "Unexpected packet {} from unregistered peer: {}:{}", packet_id, peer, io.peer_version(peer)); debug!(target:"sync", "Unexpected packet {} from unregistered peer: {}:{}", packet_id, peer, io.peer_version(peer));
return; return;
} }
@ -117,17 +116,17 @@ impl SyncSupplier {
ConsensusDataPacket => SyncHandler::on_consensus_packet(io, peer, &rlp), ConsensusDataPacket => SyncHandler::on_consensus_packet(io, peer, &rlp),
TransactionsPacket => { TransactionsPacket => {
let res = { let res = {
let sync_ro = sync.read(); let sync_ro = sync.read().unwrap();
SyncHandler::on_peer_transactions(&*sync_ro, io, peer, &rlp) SyncHandler::on_peer_transactions(&*sync_ro, io, peer, &rlp)
}; };
if res.is_err() { if res.is_err() {
// peer sent invalid data, disconnect. // peer sent invalid data, disconnect.
io.disable_peer(peer); io.disable_peer(peer);
sync.write().deactivate_peer(io, peer); sync.write().unwrap().deactivate_peer(io, peer);
} }
} }
_ => { _ => {
sync.write().on_packet(io, peer, packet_id, data); sync.write().unwrap().on_packet(io, peer, packet_id, data);
} }
} }
@ -139,9 +138,10 @@ impl SyncSupplier {
Err(PacketProcessError::Decoder(e)) => { Err(PacketProcessError::Decoder(e)) => {
debug!(target:"sync", "{} -> Malformed packet {} : {}", peer, packet_id, e) debug!(target:"sync", "{} -> Malformed packet {} : {}", peer, packet_id, e)
} }
Err(PacketProcessError::ClientBusy) => { Err(PacketProcessError::ClientBusy) => sync
sync.write().add_delayed_request(peer, packet_id, data) .write()
} .unwrap()
.add_delayed_request(peer, packet_id, data),
Ok(()) => {} Ok(()) => {}
} }
} }
@ -150,7 +150,7 @@ impl SyncSupplier {
/// Dispatch delayed request /// Dispatch delayed request
/// The main difference with dispatch packet is the direct send of the responses to the peer /// The main difference with dispatch packet is the direct send of the responses to the peer
pub fn dispatch_delayed_request( pub fn dispatch_delayed_request(
sync: &RwLock<ChainSync>, sync: &StdRwLock<ChainSync>,
io: &mut dyn SyncIo, io: &mut dyn SyncIo,
peer: PeerId, peer: PeerId,
packet_id: u8, packet_id: u8,
@ -178,9 +178,10 @@ impl SyncSupplier {
Err(PacketProcessError::Decoder(e)) => { Err(PacketProcessError::Decoder(e)) => {
debug!(target:"sync", "{} -> Malformed packet {} : {}", peer, packet_id, e) debug!(target:"sync", "{} -> Malformed packet {} : {}", peer, packet_id, e)
} }
Err(PacketProcessError::ClientBusy) => { Err(PacketProcessError::ClientBusy) => sync
sync.write().add_delayed_request(peer, packet_id, data) .write()
} .unwrap()
.add_delayed_request(peer, packet_id, data),
Ok(()) => {} Ok(()) => {}
} }
} }
@ -420,7 +421,7 @@ mod test {
use ethereum_types::H256; use ethereum_types::H256;
use parking_lot::RwLock; use parking_lot::RwLock;
use rlp::{Rlp, RlpStream}; use rlp::{Rlp, RlpStream};
use std::collections::VecDeque; use std::{collections::VecDeque, sync::RwLock as StdRwLock};
use tests::{helpers::TestIo, snapshot::TestSnapshotService}; use tests::{helpers::TestIo, snapshot::TestSnapshotService};
#[test] #[test]
@ -644,7 +645,7 @@ mod test {
io.sender = Some(2usize); io.sender = Some(2usize);
SyncSupplier::dispatch_packet( SyncSupplier::dispatch_packet(
&RwLock::new(sync), &StdRwLock::new(sync),
&mut io, &mut io,
0usize, 0usize,
GetReceiptsPacket.id(), GetReceiptsPacket.id(),

View File

@ -57,7 +57,7 @@ fn status_after_sync() {
net.peer(1).chain.add_blocks(1000, EachBlockWith::Uncle); net.peer(1).chain.add_blocks(1000, EachBlockWith::Uncle);
net.peer(2).chain.add_blocks(1000, EachBlockWith::Uncle); net.peer(2).chain.add_blocks(1000, EachBlockWith::Uncle);
net.sync(); net.sync();
let status = net.peer(0).sync.read().status(); let status = net.peer(0).sync.read().unwrap().status();
assert_eq!(status.state, SyncState::Idle); assert_eq!(status.state, SyncState::Idle);
} }
@ -177,19 +177,22 @@ fn restart() {
assert!(net.peer(0).chain.chain_info().best_block_number > 100); assert!(net.peer(0).chain.chain_info().best_block_number > 100);
net.restart_peer(0); net.restart_peer(0);
let status = net.peer(0).sync.read().status(); let status = net.peer(0).sync.read().unwrap().status();
assert_eq!(status.state, SyncState::Idle); assert_eq!(status.state, SyncState::Idle);
} }
#[test] #[test]
fn status_empty() { fn status_empty() {
let net = TestNet::new(2); let net = TestNet::new(2);
assert_eq!(net.peer(0).sync.read().status().state, SyncState::Idle); assert_eq!(
net.peer(0).sync.read().unwrap().status().state,
SyncState::Idle
);
let mut config = SyncConfig::default(); let mut config = SyncConfig::default();
config.warp_sync = WarpSync::Enabled; config.warp_sync = WarpSync::Enabled;
let net = TestNet::new_with_config(2, config); let net = TestNet::new_with_config(2, config);
assert_eq!( assert_eq!(
net.peer(0).sync.read().status().state, net.peer(0).sync.read().unwrap().status().state,
SyncState::WaitingPeers SyncState::WaitingPeers
); );
} }

View File

@ -37,7 +37,10 @@ use network::{self, client_version::ClientVersion, PacketId, PeerId, ProtocolId,
use parking_lot::RwLock; use parking_lot::RwLock;
use std::{ use std::{
collections::{HashMap, HashSet, VecDeque}, collections::{HashMap, HashSet, VecDeque},
sync::Arc, sync::{
Arc, RwLock as StdRwLock, RwLockReadGuard as StdRwLockReadGuard,
RwLockWriteGuard as StdRwLockWriteGuard,
},
}; };
use sync_io::SyncIo; use sync_io::SyncIo;
use tests::snapshot::*; use tests::snapshot::*;
@ -249,7 +252,7 @@ where
pub chain: Arc<C>, pub chain: Arc<C>,
pub miner: Arc<Miner>, pub miner: Arc<Miner>,
pub snapshot_service: Arc<TestSnapshotService>, pub snapshot_service: Arc<TestSnapshotService>,
pub sync: RwLock<ChainSync>, pub sync: StdRwLock<ChainSync>,
pub queue: RwLock<VecDeque<TestPacket>>, pub queue: RwLock<VecDeque<TestPacket>>,
pub io_queue: RwLock<VecDeque<ChainMessageType>>, pub io_queue: RwLock<VecDeque<ChainMessageType>>,
new_blocks_queue: RwLock<VecDeque<NewBlockMessage>>, new_blocks_queue: RwLock<VecDeque<NewBlockMessage>>,
@ -270,15 +273,17 @@ where
fn process_io_message(&self, message: ChainMessageType) { fn process_io_message(&self, message: ChainMessageType) {
let mut io = TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, None); let mut io = TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, None);
match message { match message {
ChainMessageType::Consensus(data) => { ChainMessageType::Consensus(data) => self
self.sync.write().propagate_consensus_packet(&mut io, data) .sync
} .write()
.unwrap()
.propagate_consensus_packet(&mut io, data),
} }
} }
fn process_new_block_message(&self, message: NewBlockMessage) { fn process_new_block_message(&self, message: NewBlockMessage) {
let mut io = TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, None); let mut io = TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, None);
self.sync.write().chain_new_blocks( self.sync.write().unwrap().chain_new_blocks(
&mut io, &mut io,
&message.imported, &message.imported,
&message.invalid, &message.invalid,
@ -294,8 +299,8 @@ impl<C: FlushingBlockChainClient> Peer for EthPeer<C> {
type Message = TestPacket; type Message = TestPacket;
fn on_connect(&self, other: PeerId) { fn on_connect(&self, other: PeerId) {
self.sync.write().update_targets(&*self.chain); self.sync.write().unwrap().update_targets(&*self.chain);
self.sync.write().on_peer_connected( self.sync.write().unwrap().on_peer_connected(
&mut TestIo::new( &mut TestIo::new(
&*self.chain, &*self.chain,
&self.snapshot_service, &self.snapshot_service,
@ -313,7 +318,7 @@ impl<C: FlushingBlockChainClient> Peer for EthPeer<C> {
&self.queue, &self.queue,
Some(other), Some(other),
); );
self.sync.write().on_peer_aborting(&mut io, other); self.sync.write().unwrap().on_peer_aborting(&mut io, other);
} }
fn receive_message(&self, from: PeerId, msg: TestPacket) -> HashSet<PeerId> { fn receive_message(&self, from: PeerId, msg: TestPacket) -> HashSet<PeerId> {
@ -340,14 +345,17 @@ impl<C: FlushingBlockChainClient> Peer for EthPeer<C> {
fn sync_step(&self) { fn sync_step(&self) {
let mut io = TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, None); let mut io = TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, None);
self.chain.flush(); self.chain.flush();
self.sync.write().maintain_peers(&mut io); self.sync.write().unwrap().maintain_peers(&mut io);
self.sync.write().maintain_sync(&mut io); self.sync.write().unwrap().maintain_sync(&mut io);
self.sync.write().continue_sync(&mut io); self.sync.write().unwrap().continue_sync(&mut io);
self.sync.write().propagate_new_transactions(&mut io); self.sync
.write()
.unwrap()
.propagate_new_transactions(&mut io);
} }
fn restart_sync(&self) { fn restart_sync(&self) {
self.sync.write().restart(&mut TestIo::new( self.sync.write().unwrap().restart(&mut TestIo::new(
&*self.chain, &*self.chain,
&self.snapshot_service, &self.snapshot_service,
&self.queue, &self.queue,
@ -400,7 +408,7 @@ impl TestNet<EthPeer<TestBlockChainClient>> {
let ss = Arc::new(TestSnapshotService::new()); let ss = Arc::new(TestSnapshotService::new());
let sync = ChainSync::new(config.clone(), &chain, ForkFilterApi::new_dummy(&chain)); let sync = ChainSync::new(config.clone(), &chain, ForkFilterApi::new_dummy(&chain));
net.peers.push(Arc::new(EthPeer { net.peers.push(Arc::new(EthPeer {
sync: RwLock::new(sync), sync: StdRwLock::new(sync),
snapshot_service: ss, snapshot_service: ss,
chain: Arc::new(chain), chain: Arc::new(chain),
miner: Arc::new(Miner::new_for_tests(&Spec::new_test(), None)), miner: Arc::new(Miner::new_for_tests(&Spec::new_test(), None)),
@ -449,7 +457,7 @@ impl TestNet<EthPeer<EthcoreClient>> {
let ss = Arc::new(TestSnapshotService::new()); let ss = Arc::new(TestSnapshotService::new());
let sync = ChainSync::new(config, &*client, ForkFilterApi::new_dummy(&*client)); let sync = ChainSync::new(config, &*client, ForkFilterApi::new_dummy(&*client));
let peer = Arc::new(EthPeer { let peer = Arc::new(EthPeer {
sync: RwLock::new(sync), sync: StdRwLock::new(sync),
snapshot_service: ss, snapshot_service: ss,
chain: client, chain: client,
miner, miner,
@ -558,7 +566,7 @@ where
impl<C: FlushingBlockChainClient> TestNet<EthPeer<C>> { impl<C: FlushingBlockChainClient> TestNet<EthPeer<C>> {
pub fn trigger_chain_new_blocks(&mut self, peer_id: usize) { pub fn trigger_chain_new_blocks(&mut self, peer_id: usize) {
let peer = &mut self.peers[peer_id]; let peer = &mut self.peers[peer_id];
peer.sync.write().chain_new_blocks( peer.sync.write().unwrap().chain_new_blocks(
&mut TestIo::new(&*peer.chain, &peer.snapshot_service, &peer.queue, None), &mut TestIo::new(&*peer.chain, &peer.snapshot_service, &peer.queue, None),
&[], &[],
&[], &[],