Run cargo fix on a few of the worst offenders (#10854)
* Run cargo fix on `vm` * Run cargo fix on ethcore-db * Run cargo fix on evm * Run cargo fix on ethcore-light * Run cargo fix on journaldb * Run cargo fix on wasm * Missing docs * Run cargo fix on ethcore-sync
This commit is contained in:
@@ -70,7 +70,7 @@ pub struct SyncHandler;
|
||||
|
||||
impl SyncHandler {
|
||||
/// Handle incoming packet from peer
|
||||
pub fn on_packet(sync: &mut ChainSync, io: &mut SyncIo, peer: PeerId, packet_id: u8, data: &[u8]) {
|
||||
pub fn on_packet(sync: &mut ChainSync, io: &mut dyn SyncIo, peer: PeerId, packet_id: u8, data: &[u8]) {
|
||||
let rlp = Rlp::new(data);
|
||||
if let Some(packet_id) = SyncPacket::from_u8(packet_id) {
|
||||
let result = match packet_id {
|
||||
@@ -110,13 +110,13 @@ impl SyncHandler {
|
||||
}
|
||||
|
||||
/// Called when peer sends us new consensus packet
|
||||
pub fn on_consensus_packet(io: &mut SyncIo, peer_id: PeerId, r: &Rlp) {
|
||||
pub fn on_consensus_packet(io: &mut dyn SyncIo, peer_id: PeerId, r: &Rlp) {
|
||||
trace!(target: "sync", "Received consensus packet from {:?}", peer_id);
|
||||
io.chain().queue_consensus_message(r.as_raw().to_vec());
|
||||
}
|
||||
|
||||
/// Called by peer when it is disconnecting
|
||||
pub fn on_peer_aborting(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId) {
|
||||
pub fn on_peer_aborting(sync: &mut ChainSync, io: &mut dyn SyncIo, peer_id: PeerId) {
|
||||
trace!(target: "sync", "== Disconnecting {}: {}", peer_id, io.peer_version(peer_id));
|
||||
sync.handshaking_peers.remove(&peer_id);
|
||||
if sync.peers.contains_key(&peer_id) {
|
||||
@@ -142,7 +142,7 @@ impl SyncHandler {
|
||||
}
|
||||
|
||||
/// Called when a new peer is connected
|
||||
pub fn on_peer_connected(sync: &mut ChainSync, io: &mut SyncIo, peer: PeerId) {
|
||||
pub fn on_peer_connected(sync: &mut ChainSync, io: &mut dyn SyncIo, peer: PeerId) {
|
||||
trace!(target: "sync", "== Connected {}: {}", peer, io.peer_version(peer));
|
||||
if let Err(e) = sync.send_status(io, peer) {
|
||||
debug!(target:"sync", "Error sending status request: {:?}", e);
|
||||
@@ -153,7 +153,7 @@ impl SyncHandler {
|
||||
}
|
||||
|
||||
/// Called by peer once it has new block bodies
|
||||
pub fn on_peer_new_block(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> {
|
||||
pub fn on_peer_new_block(sync: &mut ChainSync, io: &mut dyn SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> {
|
||||
if !sync.peers.get(&peer_id).map_or(false, |p| p.can_sync()) {
|
||||
trace!(target: "sync", "Ignoring new block from unconfirmed peer {}", peer_id);
|
||||
return Ok(());
|
||||
@@ -217,7 +217,7 @@ impl SyncHandler {
|
||||
}
|
||||
|
||||
/// Handles `NewHashes` packet. Initiates headers download for any unknown hashes.
|
||||
pub fn on_peer_new_hashes(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> {
|
||||
pub fn on_peer_new_hashes(sync: &mut ChainSync, io: &mut dyn SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> {
|
||||
if !sync.peers.get(&peer_id).map_or(false, |p| p.can_sync()) {
|
||||
trace!(target: "sync", "Ignoring new hashes from unconfirmed peer {}", peer_id);
|
||||
return Ok(());
|
||||
@@ -288,7 +288,7 @@ impl SyncHandler {
|
||||
}
|
||||
|
||||
/// Called by peer once it has new block bodies
|
||||
fn on_peer_block_bodies(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> {
|
||||
fn on_peer_block_bodies(sync: &mut ChainSync, io: &mut dyn SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> {
|
||||
sync.clear_peer_download(peer_id);
|
||||
let block_set = sync.peers.get(&peer_id)
|
||||
.and_then(|p| p.block_set)
|
||||
@@ -332,7 +332,7 @@ impl SyncHandler {
|
||||
}
|
||||
}
|
||||
|
||||
fn on_peer_fork_header(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> {
|
||||
fn on_peer_fork_header(sync: &mut ChainSync, io: &mut dyn SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> {
|
||||
{
|
||||
let peer = sync.peers.get_mut(&peer_id).expect("Is only called when peer is present in peers");
|
||||
peer.asking = PeerAsking::Nothing;
|
||||
@@ -364,7 +364,7 @@ impl SyncHandler {
|
||||
}
|
||||
|
||||
/// Called by peer once it has new block headers during sync
|
||||
fn on_peer_block_headers(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> {
|
||||
fn on_peer_block_headers(sync: &mut ChainSync, io: &mut dyn SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> {
|
||||
let is_fork_header_request = match sync.peers.get(&peer_id) {
|
||||
Some(peer) if peer.asking == PeerAsking::ForkHeader => true,
|
||||
_ => false,
|
||||
@@ -431,7 +431,7 @@ impl SyncHandler {
|
||||
}
|
||||
|
||||
/// Called by peer once it has new block receipts
|
||||
fn on_peer_block_receipts(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> {
|
||||
fn on_peer_block_receipts(sync: &mut ChainSync, io: &mut dyn SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> {
|
||||
sync.clear_peer_download(peer_id);
|
||||
let block_set = sync.peers.get(&peer_id).and_then(|p| p.block_set).unwrap_or(BlockSet::NewBlocks);
|
||||
let allowed = sync.peers.get(&peer_id).map(|p| p.is_allowed()).unwrap_or(false);
|
||||
@@ -473,7 +473,7 @@ impl SyncHandler {
|
||||
}
|
||||
|
||||
/// Called when snapshot manifest is downloaded from a peer.
|
||||
fn on_snapshot_manifest(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> {
|
||||
fn on_snapshot_manifest(sync: &mut ChainSync, io: &mut dyn SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> {
|
||||
if !sync.peers.get(&peer_id).map_or(false, |p| p.can_sync()) {
|
||||
trace!(target: "sync", "Ignoring snapshot manifest from unconfirmed peer {}", peer_id);
|
||||
return Ok(());
|
||||
@@ -502,7 +502,7 @@ impl SyncHandler {
|
||||
}
|
||||
|
||||
/// Called when snapshot data is downloaded from a peer.
|
||||
fn on_snapshot_data(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> {
|
||||
fn on_snapshot_data(sync: &mut ChainSync, io: &mut dyn SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> {
|
||||
if !sync.peers.get(&peer_id).map_or(false, |p| p.can_sync()) {
|
||||
trace!(target: "sync", "Ignoring snapshot data from unconfirmed peer {}", peer_id);
|
||||
return Ok(());
|
||||
@@ -568,7 +568,7 @@ impl SyncHandler {
|
||||
}
|
||||
|
||||
/// Called by peer to report status
|
||||
fn on_peer_status(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> {
|
||||
fn on_peer_status(sync: &mut ChainSync, io: &mut dyn SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> {
|
||||
sync.handshaking_peers.remove(&peer_id);
|
||||
let protocol_version: u8 = r.val_at(0)?;
|
||||
let warp_protocol_version = io.protocol_version(&WARP_SYNC_PROTOCOL_ID, peer_id);
|
||||
@@ -658,7 +658,7 @@ impl SyncHandler {
|
||||
}
|
||||
|
||||
/// Called when peer sends us new transactions
|
||||
pub fn on_peer_transactions(sync: &ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), PacketDecodeError> {
|
||||
pub fn on_peer_transactions(sync: &ChainSync, io: &mut dyn SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), PacketDecodeError> {
|
||||
// Accept transactions only when fully synced
|
||||
if !io.is_chain_queue_empty() || (sync.state != SyncState::Idle && sync.state != SyncState::NewBlocks) {
|
||||
trace!(target: "sync", "{} Ignoring transactions while syncing", peer_id);
|
||||
@@ -682,7 +682,7 @@ impl SyncHandler {
|
||||
}
|
||||
|
||||
/// Called when peer sends us signed private transaction packet
|
||||
fn on_signed_private_transaction(sync: &mut ChainSync, _io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> {
|
||||
fn on_signed_private_transaction(sync: &mut ChainSync, _io: &mut dyn SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> {
|
||||
if !sync.peers.get(&peer_id).map_or(false, |p| p.can_sync()) {
|
||||
trace!(target: "sync", "{} Ignoring packet from unconfirmed/unknown peer", peer_id);
|
||||
return Ok(());
|
||||
@@ -710,7 +710,7 @@ impl SyncHandler {
|
||||
}
|
||||
|
||||
/// Called when peer sends us new private transaction packet
|
||||
fn on_private_transaction(sync: &mut ChainSync, _io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> {
|
||||
fn on_private_transaction(sync: &mut ChainSync, _io: &mut dyn SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> {
|
||||
if !sync.peers.get(&peer_id).map_or(false, |p| p.can_sync()) {
|
||||
trace!(target: "sync", "{} Ignoring packet from unconfirmed/unknown peer", peer_id);
|
||||
return Ok(());
|
||||
|
||||
@@ -386,8 +386,8 @@ impl ChainSyncApi {
|
||||
/// Creates new `ChainSyncApi`
|
||||
pub fn new(
|
||||
config: SyncConfig,
|
||||
chain: &BlockChainClient,
|
||||
private_tx_handler: Option<Arc<PrivateTxHandler>>,
|
||||
chain: &dyn BlockChainClient,
|
||||
private_tx_handler: Option<Arc<dyn PrivateTxHandler>>,
|
||||
priority_tasks: mpsc::Receiver<PriorityTask>,
|
||||
) -> Self {
|
||||
ChainSyncApi {
|
||||
@@ -421,7 +421,7 @@ impl ChainSyncApi {
|
||||
}
|
||||
|
||||
/// Dispatch incoming requests and responses
|
||||
pub fn dispatch_packet(&self, io: &mut SyncIo, peer: PeerId, packet_id: u8, data: &[u8]) {
|
||||
pub fn dispatch_packet(&self, io: &mut dyn SyncIo, peer: PeerId, packet_id: u8, data: &[u8]) {
|
||||
SyncSupplier::dispatch_packet(&self.sync, io, peer, packet_id, data)
|
||||
}
|
||||
|
||||
@@ -431,7 +431,7 @@ impl ChainSyncApi {
|
||||
///
|
||||
/// NOTE This method should only handle stuff that can be canceled and would reach other peers
|
||||
/// by other means.
|
||||
pub fn process_priority_queue(&self, io: &mut SyncIo) {
|
||||
pub fn process_priority_queue(&self, io: &mut dyn SyncIo) {
|
||||
fn check_deadline(deadline: Instant) -> Option<Duration> {
|
||||
let now = Instant::now();
|
||||
if now > deadline {
|
||||
@@ -503,7 +503,7 @@ impl ChainSyncApi {
|
||||
// Static methods
|
||||
impl ChainSync {
|
||||
/// creates rlp to send for the tree defined by 'from' and 'to' hashes
|
||||
fn create_new_hashes_rlp(chain: &BlockChainClient, from: &H256, to: &H256) -> Option<Bytes> {
|
||||
fn create_new_hashes_rlp(chain: &dyn BlockChainClient, from: &H256, to: &H256) -> Option<Bytes> {
|
||||
match chain.tree_route(from, to) {
|
||||
Some(route) => {
|
||||
let uncles = chain.find_uncles(from).unwrap_or_else(Vec::new);
|
||||
@@ -538,7 +538,7 @@ impl ChainSync {
|
||||
}
|
||||
|
||||
/// creates latest block rlp for the given client
|
||||
fn create_latest_block_rlp(chain: &BlockChainClient) -> Bytes {
|
||||
fn create_latest_block_rlp(chain: &dyn BlockChainClient) -> Bytes {
|
||||
Self::create_block_rlp(
|
||||
&chain.block(BlockId::Hash(chain.chain_info().best_block_hash))
|
||||
.expect("Best block always exists").into_inner(),
|
||||
@@ -547,7 +547,7 @@ impl ChainSync {
|
||||
}
|
||||
|
||||
/// creates given hash block rlp for the given client
|
||||
fn create_new_block_rlp(chain: &BlockChainClient, hash: &H256) -> Bytes {
|
||||
fn create_new_block_rlp(chain: &dyn BlockChainClient, hash: &H256) -> Bytes {
|
||||
Self::create_block_rlp(
|
||||
&chain.block(BlockId::Hash(hash.clone())).expect("Block has just been sealed; qed").into_inner(),
|
||||
chain.block_total_difficulty(BlockId::Hash(hash.clone())).expect("Block has just been sealed; qed.")
|
||||
@@ -565,7 +565,7 @@ impl ChainSync {
|
||||
peers
|
||||
}
|
||||
|
||||
fn get_init_state(warp_sync: WarpSync, chain: &BlockChainClient) -> SyncState {
|
||||
fn get_init_state(warp_sync: WarpSync, chain: &dyn BlockChainClient) -> SyncState {
|
||||
let best_block = chain.chain_info().best_block_number;
|
||||
match warp_sync {
|
||||
WarpSync::Enabled => SyncState::WaitingPeers,
|
||||
@@ -620,7 +620,7 @@ pub struct ChainSync {
|
||||
download_old_blocks: bool,
|
||||
/// Shared private tx service.
|
||||
#[ignore_malloc_size_of = "arc on dyn trait here seems tricky, ignoring"]
|
||||
private_tx_handler: Option<Arc<PrivateTxHandler>>,
|
||||
private_tx_handler: Option<Arc<dyn PrivateTxHandler>>,
|
||||
/// Enable warp sync.
|
||||
warp_sync: WarpSync,
|
||||
|
||||
@@ -632,8 +632,8 @@ impl ChainSync {
|
||||
/// Create a new instance of syncing strategy.
|
||||
pub fn new(
|
||||
config: SyncConfig,
|
||||
chain: &BlockChainClient,
|
||||
private_tx_handler: Option<Arc<PrivateTxHandler>>,
|
||||
chain: &dyn BlockChainClient,
|
||||
private_tx_handler: Option<Arc<dyn PrivateTxHandler>>,
|
||||
) -> Self {
|
||||
let chain_info = chain.chain_info();
|
||||
let best_block = chain.chain_info().best_block_number;
|
||||
@@ -708,7 +708,7 @@ impl ChainSync {
|
||||
}
|
||||
|
||||
/// Abort all sync activity
|
||||
pub fn abort(&mut self, io: &mut SyncIo) {
|
||||
pub fn abort(&mut self, io: &mut dyn SyncIo) {
|
||||
self.reset_and_continue(io);
|
||||
self.peers.clear();
|
||||
}
|
||||
@@ -738,7 +738,7 @@ impl ChainSync {
|
||||
|
||||
/// Reset sync. Clear all downloaded data but keep the queue.
|
||||
/// Set sync state to the given state or to the initial state if `None` is provided.
|
||||
fn reset(&mut self, io: &mut SyncIo, state: Option<SyncState>) {
|
||||
fn reset(&mut self, io: &mut dyn SyncIo, state: Option<SyncState>) {
|
||||
self.new_blocks.reset();
|
||||
let chain_info = io.chain().chain_info();
|
||||
for (_, ref mut p) in &mut self.peers {
|
||||
@@ -760,7 +760,7 @@ impl ChainSync {
|
||||
}
|
||||
|
||||
/// Restart sync
|
||||
pub fn reset_and_continue(&mut self, io: &mut SyncIo) {
|
||||
pub fn reset_and_continue(&mut self, io: &mut dyn SyncIo) {
|
||||
trace!(target: "sync", "Restarting");
|
||||
if self.state == SyncState::SnapshotData {
|
||||
debug!(target:"sync", "Aborting snapshot restore");
|
||||
@@ -773,12 +773,12 @@ impl ChainSync {
|
||||
|
||||
/// Remove peer from active peer set. Peer will be reactivated on the next sync
|
||||
/// round.
|
||||
fn deactivate_peer(&mut self, _io: &mut SyncIo, peer_id: PeerId) {
|
||||
fn deactivate_peer(&mut self, _io: &mut dyn SyncIo, peer_id: PeerId) {
|
||||
trace!(target: "sync", "Deactivating peer {}", peer_id);
|
||||
self.active_peers.remove(&peer_id);
|
||||
}
|
||||
|
||||
fn maybe_start_snapshot_sync(&mut self, io: &mut SyncIo) {
|
||||
fn maybe_start_snapshot_sync(&mut self, io: &mut dyn SyncIo) {
|
||||
if !self.warp_sync.is_enabled() || io.snapshot_service().supported_versions().is_none() {
|
||||
trace!(target: "sync", "Skipping warp sync. Disabled or not supported.");
|
||||
return;
|
||||
@@ -845,7 +845,7 @@ impl ChainSync {
|
||||
}
|
||||
}
|
||||
|
||||
fn start_snapshot_sync(&mut self, io: &mut SyncIo, peers: &[PeerId]) {
|
||||
fn start_snapshot_sync(&mut self, io: &mut dyn SyncIo, peers: &[PeerId]) {
|
||||
if !self.snapshot.have_manifest() {
|
||||
for p in peers {
|
||||
if self.peers.get(p).map_or(false, |p| p.asking == PeerAsking::Nothing) {
|
||||
@@ -861,13 +861,13 @@ impl ChainSync {
|
||||
}
|
||||
|
||||
/// Restart sync disregarding the block queue status. May end up re-downloading up to QUEUE_SIZE blocks
|
||||
pub fn restart(&mut self, io: &mut SyncIo) {
|
||||
pub fn restart(&mut self, io: &mut dyn SyncIo) {
|
||||
self.update_targets(io.chain());
|
||||
self.reset_and_continue(io);
|
||||
}
|
||||
|
||||
/// Update sync after the blockchain has been changed externally.
|
||||
pub fn update_targets(&mut self, chain: &BlockChainClient) {
|
||||
pub fn update_targets(&mut self, chain: &dyn BlockChainClient) {
|
||||
// Do not assume that the block queue/chain still has our last_imported_block
|
||||
let chain = chain.chain_info();
|
||||
self.new_blocks = BlockDownloader::new(BlockSet::NewBlocks, &chain.best_block_hash, chain.best_block_number);
|
||||
@@ -887,7 +887,7 @@ impl ChainSync {
|
||||
}
|
||||
|
||||
/// Resume downloading
|
||||
pub fn continue_sync(&mut self, io: &mut SyncIo) {
|
||||
pub fn continue_sync(&mut self, io: &mut dyn SyncIo) {
|
||||
if self.state == SyncState::Waiting {
|
||||
trace!(target: "sync", "Waiting for the block queue");
|
||||
} else if self.state == SyncState::SnapshotWaiting {
|
||||
@@ -928,7 +928,7 @@ impl ChainSync {
|
||||
}
|
||||
|
||||
/// Called after all blocks have been downloaded
|
||||
fn complete_sync(&mut self, io: &mut SyncIo) {
|
||||
fn complete_sync(&mut self, io: &mut dyn SyncIo) {
|
||||
trace!(target: "sync", "Sync complete");
|
||||
self.reset(io, Some(SyncState::Idle));
|
||||
}
|
||||
@@ -940,7 +940,7 @@ impl ChainSync {
|
||||
}
|
||||
|
||||
/// Find something to do for a peer. Called for a new peer or when a peer is done with its task.
|
||||
fn sync_peer(&mut self, io: &mut SyncIo, peer_id: PeerId, force: bool) {
|
||||
fn sync_peer(&mut self, io: &mut dyn SyncIo, peer_id: PeerId, force: bool) {
|
||||
if !self.active_peers.contains(&peer_id) {
|
||||
trace!(target: "sync", "Skipping deactivated peer {}", peer_id);
|
||||
return;
|
||||
@@ -1081,7 +1081,7 @@ impl ChainSync {
|
||||
}
|
||||
|
||||
/// Checks if there are blocks fully downloaded that can be imported into the blockchain and does the import.
|
||||
fn collect_blocks(&mut self, io: &mut SyncIo, block_set: BlockSet) {
|
||||
fn collect_blocks(&mut self, io: &mut dyn SyncIo, block_set: BlockSet) {
|
||||
match block_set {
|
||||
BlockSet::NewBlocks => {
|
||||
if self.new_blocks.collect_blocks(io, self.state == SyncState::NewBlocks) == DownloadAction::Reset {
|
||||
@@ -1138,7 +1138,7 @@ impl ChainSync {
|
||||
}
|
||||
|
||||
/// Send Status message
|
||||
fn send_status(&mut self, io: &mut SyncIo, peer: PeerId) -> Result<(), network::Error> {
|
||||
fn send_status(&mut self, io: &mut dyn SyncIo, peer: PeerId) -> Result<(), network::Error> {
|
||||
let warp_protocol_version = io.protocol_version(&WARP_SYNC_PROTOCOL_ID, peer);
|
||||
let warp_protocol = warp_protocol_version != 0;
|
||||
let private_tx_protocol = warp_protocol_version >= PAR_PROTOCOL_VERSION_3.0;
|
||||
@@ -1166,7 +1166,7 @@ impl ChainSync {
|
||||
io.respond(StatusPacket.id(), packet.out())
|
||||
}
|
||||
|
||||
pub fn maintain_peers(&mut self, io: &mut SyncIo) {
|
||||
pub fn maintain_peers(&mut self, io: &mut dyn SyncIo) {
|
||||
let tick = Instant::now();
|
||||
let mut aborting = Vec::new();
|
||||
for (peer_id, peer) in &self.peers {
|
||||
@@ -1200,7 +1200,7 @@ impl ChainSync {
|
||||
}
|
||||
}
|
||||
|
||||
fn check_resume(&mut self, io: &mut SyncIo) {
|
||||
fn check_resume(&mut self, io: &mut dyn SyncIo) {
|
||||
match self.state {
|
||||
SyncState::Waiting if !io.chain().queue_info().is_full() => {
|
||||
self.set_state(SyncState::Blocks);
|
||||
@@ -1286,13 +1286,13 @@ impl ChainSync {
|
||||
}
|
||||
|
||||
/// Maintain other peers. Send out any new blocks and transactions
|
||||
pub fn maintain_sync(&mut self, io: &mut SyncIo) {
|
||||
pub fn maintain_sync(&mut self, io: &mut dyn SyncIo) {
|
||||
self.maybe_start_snapshot_sync(io);
|
||||
self.check_resume(io);
|
||||
}
|
||||
|
||||
/// called when block is imported to chain - propagates the blocks and updates transactions sent to peers
|
||||
pub fn chain_new_blocks(&mut self, io: &mut SyncIo, _imported: &[H256], invalid: &[H256], enacted: &[H256], _retracted: &[H256], sealed: &[H256], proposed: &[Bytes]) {
|
||||
pub fn chain_new_blocks(&mut self, io: &mut dyn SyncIo, _imported: &[H256], invalid: &[H256], enacted: &[H256], _retracted: &[H256], sealed: &[H256], proposed: &[Bytes]) {
|
||||
let queue_info = io.chain().queue_info();
|
||||
let is_syncing = self.status().is_syncing(queue_info);
|
||||
|
||||
@@ -1318,22 +1318,22 @@ impl ChainSync {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn on_packet(&mut self, io: &mut SyncIo, peer: PeerId, packet_id: u8, data: &[u8]) {
|
||||
pub fn on_packet(&mut self, io: &mut dyn SyncIo, peer: PeerId, packet_id: u8, data: &[u8]) {
|
||||
SyncHandler::on_packet(self, io, peer, packet_id, data);
|
||||
}
|
||||
|
||||
/// Called by peer when it is disconnecting
|
||||
pub fn on_peer_aborting(&mut self, io: &mut SyncIo, peer: PeerId) {
|
||||
pub fn on_peer_aborting(&mut self, io: &mut dyn SyncIo, peer: PeerId) {
|
||||
SyncHandler::on_peer_aborting(self, io, peer);
|
||||
}
|
||||
|
||||
/// Called when a new peer is connected
|
||||
pub fn on_peer_connected(&mut self, io: &mut SyncIo, peer: PeerId) {
|
||||
pub fn on_peer_connected(&mut self, io: &mut dyn SyncIo, peer: PeerId) {
|
||||
SyncHandler::on_peer_connected(self, io, peer);
|
||||
}
|
||||
|
||||
/// propagates new transactions to all peers
|
||||
pub fn propagate_new_transactions(&mut self, io: &mut SyncIo) {
|
||||
pub fn propagate_new_transactions(&mut self, io: &mut dyn SyncIo) {
|
||||
let deadline = Instant::now() + Duration::from_millis(500);
|
||||
SyncPropagator::propagate_new_transactions(self, io, || {
|
||||
if deadline > Instant::now() {
|
||||
@@ -1346,12 +1346,12 @@ impl ChainSync {
|
||||
}
|
||||
|
||||
/// Broadcast consensus message to peers.
|
||||
pub fn propagate_consensus_packet(&mut self, io: &mut SyncIo, packet: Bytes) {
|
||||
pub fn propagate_consensus_packet(&mut self, io: &mut dyn SyncIo, packet: Bytes) {
|
||||
SyncPropagator::propagate_consensus_packet(self, io, packet);
|
||||
}
|
||||
|
||||
/// Broadcast private transaction message to peers.
|
||||
pub fn propagate_private_transaction(&mut self, io: &mut SyncIo, transaction_hash: H256, packet_id: SyncPacket, packet: Bytes) {
|
||||
pub fn propagate_private_transaction(&mut self, io: &mut dyn SyncIo, transaction_hash: H256, packet_id: SyncPacket, packet: Bytes) {
|
||||
SyncPropagator::propagate_private_transaction(self, io, transaction_hash, packet_id, packet);
|
||||
}
|
||||
}
|
||||
@@ -1455,7 +1455,7 @@ pub mod tests {
|
||||
assert!(!sync_status(SyncState::Idle).is_syncing(queue_info(0, 0)));
|
||||
}
|
||||
|
||||
pub fn dummy_sync_with_peer(peer_latest_hash: H256, client: &BlockChainClient) -> ChainSync {
|
||||
pub fn dummy_sync_with_peer(peer_latest_hash: H256, client: &dyn BlockChainClient) -> ChainSync {
|
||||
|
||||
let mut sync = ChainSync::new(SyncConfig::default(), client, None,);
|
||||
insert_dummy_peer(&mut sync, 0, peer_latest_hash);
|
||||
|
||||
@@ -51,10 +51,10 @@ pub struct SyncPropagator;
|
||||
|
||||
impl SyncPropagator {
|
||||
/// propagates latest block to a set of peers
|
||||
pub fn propagate_blocks(sync: &mut ChainSync, chain_info: &BlockChainInfo, io: &mut SyncIo, blocks: &[H256], peers: &[PeerId]) -> usize {
|
||||
pub fn propagate_blocks(sync: &mut ChainSync, chain_info: &BlockChainInfo, io: &mut dyn SyncIo, blocks: &[H256], peers: &[PeerId]) -> usize {
|
||||
trace!(target: "sync", "Sending NewBlocks to {:?}", peers);
|
||||
let sent = peers.len();
|
||||
let mut send_packet = |io: &mut SyncIo, rlp: Bytes| {
|
||||
let mut send_packet = |io: &mut dyn SyncIo, rlp: Bytes| {
|
||||
for peer_id in peers {
|
||||
SyncPropagator::send_packet(io, *peer_id, NewBlockPacket, rlp.clone());
|
||||
|
||||
@@ -78,7 +78,7 @@ impl SyncPropagator {
|
||||
}
|
||||
|
||||
/// propagates new known hashes to all peers
|
||||
pub fn propagate_new_hashes(sync: &mut ChainSync, chain_info: &BlockChainInfo, io: &mut SyncIo, peers: &[PeerId]) -> usize {
|
||||
pub fn propagate_new_hashes(sync: &mut ChainSync, chain_info: &BlockChainInfo, io: &mut dyn SyncIo, peers: &[PeerId]) -> usize {
|
||||
trace!(target: "sync", "Sending NewHashes to {:?}", peers);
|
||||
let last_parent = *io.chain().best_block_header().parent_hash();
|
||||
let best_block_hash = chain_info.best_block_hash;
|
||||
@@ -98,7 +98,7 @@ impl SyncPropagator {
|
||||
}
|
||||
|
||||
/// propagates new transactions to all peers
|
||||
pub fn propagate_new_transactions<F: FnMut() -> bool>(sync: &mut ChainSync, io: &mut SyncIo, mut should_continue: F) -> usize {
|
||||
pub fn propagate_new_transactions<F: FnMut() -> bool>(sync: &mut ChainSync, io: &mut dyn SyncIo, mut should_continue: F) -> usize {
|
||||
// Early out if nobody to send to.
|
||||
if sync.peers.is_empty() {
|
||||
return 0;
|
||||
@@ -141,7 +141,7 @@ impl SyncPropagator {
|
||||
|
||||
fn propagate_transactions_to_peers<F: FnMut() -> bool>(
|
||||
sync: &mut ChainSync,
|
||||
io: &mut SyncIo,
|
||||
io: &mut dyn SyncIo,
|
||||
peers: Vec<PeerId>,
|
||||
transactions: Vec<&SignedTransaction>,
|
||||
mut should_continue: F,
|
||||
@@ -158,7 +158,7 @@ impl SyncPropagator {
|
||||
// Clear old transactions from stats
|
||||
sync.transactions_stats.retain(&all_transactions_hashes);
|
||||
|
||||
let send_packet = |io: &mut SyncIo, peer_id: PeerId, sent: usize, rlp: Bytes| {
|
||||
let send_packet = |io: &mut dyn SyncIo, peer_id: PeerId, sent: usize, rlp: Bytes| {
|
||||
let size = rlp.len();
|
||||
SyncPropagator::send_packet(io, peer_id, TransactionsPacket, rlp);
|
||||
trace!(target: "sync", "{:02} <- Transactions ({} entries; {} bytes)", peer_id, sent, size);
|
||||
@@ -249,7 +249,7 @@ impl SyncPropagator {
|
||||
sent_to_peers
|
||||
}
|
||||
|
||||
pub fn propagate_latest_blocks(sync: &mut ChainSync, io: &mut SyncIo, sealed: &[H256]) {
|
||||
pub fn propagate_latest_blocks(sync: &mut ChainSync, io: &mut dyn SyncIo, sealed: &[H256]) {
|
||||
let chain_info = io.chain().chain_info();
|
||||
if (((chain_info.best_block_number as i64) - (sync.last_sent_block_number as i64)).abs() as BlockNumber) < MAX_PEER_LAG_PROPAGATION {
|
||||
let peers = sync.get_lagging_peers(&chain_info);
|
||||
@@ -270,7 +270,7 @@ impl SyncPropagator {
|
||||
}
|
||||
|
||||
/// Distribute valid proposed blocks to subset of current peers.
|
||||
pub fn propagate_proposed_blocks(sync: &mut ChainSync, io: &mut SyncIo, proposed: &[Bytes]) {
|
||||
pub fn propagate_proposed_blocks(sync: &mut ChainSync, io: &mut dyn SyncIo, proposed: &[Bytes]) {
|
||||
let peers = sync.get_consensus_peers();
|
||||
trace!(target: "sync", "Sending proposed blocks to {:?}", peers);
|
||||
for block in proposed {
|
||||
@@ -285,7 +285,7 @@ impl SyncPropagator {
|
||||
}
|
||||
|
||||
/// Broadcast consensus message to peers.
|
||||
pub fn propagate_consensus_packet(sync: &mut ChainSync, io: &mut SyncIo, packet: Bytes) {
|
||||
pub fn propagate_consensus_packet(sync: &mut ChainSync, io: &mut dyn SyncIo, packet: Bytes) {
|
||||
let lucky_peers = ChainSync::select_random_peers(&sync.get_consensus_peers());
|
||||
trace!(target: "sync", "Sending consensus packet to {:?}", lucky_peers);
|
||||
for peer_id in lucky_peers {
|
||||
@@ -294,7 +294,7 @@ impl SyncPropagator {
|
||||
}
|
||||
|
||||
/// Broadcast private transaction message to peers.
|
||||
pub fn propagate_private_transaction(sync: &mut ChainSync, io: &mut SyncIo, transaction_hash: H256, packet_id: SyncPacket, packet: Bytes) {
|
||||
pub fn propagate_private_transaction(sync: &mut ChainSync, io: &mut dyn SyncIo, transaction_hash: H256, packet_id: SyncPacket, packet: Bytes) {
|
||||
let lucky_peers = ChainSync::select_random_peers(&sync.get_private_transaction_peers(&transaction_hash));
|
||||
if lucky_peers.is_empty() {
|
||||
error!(target: "privatetx", "Cannot propagate the packet, no peers with private tx enabled connected");
|
||||
@@ -325,7 +325,7 @@ impl SyncPropagator {
|
||||
}
|
||||
|
||||
/// Generic packet sender
|
||||
pub fn send_packet(sync: &mut SyncIo, peer_id: PeerId, packet_id: SyncPacket, packet: Bytes) {
|
||||
pub fn send_packet(sync: &mut dyn SyncIo, peer_id: PeerId, packet_id: SyncPacket, packet: Bytes) {
|
||||
if let Err(e) = sync.send(peer_id, packet_id, packet) {
|
||||
debug!(target:"sync", "Error sending packet: {:?}", e);
|
||||
sync.disconnect_peer(peer_id);
|
||||
|
||||
@@ -43,7 +43,7 @@ pub struct SyncRequester;
|
||||
|
||||
impl SyncRequester {
|
||||
/// Perform block download request`
|
||||
pub fn request_blocks(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, request: BlockRequest, block_set: BlockSet) {
|
||||
pub fn request_blocks(sync: &mut ChainSync, io: &mut dyn SyncIo, peer_id: PeerId, request: BlockRequest, block_set: BlockSet) {
|
||||
match request {
|
||||
BlockRequest::Headers { start, count, skip } => {
|
||||
SyncRequester::request_headers_by_hash(sync, io, peer_id, &start, count, skip, false, block_set);
|
||||
@@ -58,7 +58,7 @@ impl SyncRequester {
|
||||
}
|
||||
|
||||
/// Request block bodies from a peer
|
||||
fn request_bodies(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, hashes: Vec<H256>, set: BlockSet) {
|
||||
fn request_bodies(sync: &mut ChainSync, io: &mut dyn SyncIo, peer_id: PeerId, hashes: Vec<H256>, set: BlockSet) {
|
||||
let mut rlp = RlpStream::new_list(hashes.len());
|
||||
trace!(target: "sync", "{} <- GetBlockBodies: {} entries starting from {:?}, set = {:?}", peer_id, hashes.len(), hashes.first(), set);
|
||||
for h in &hashes {
|
||||
@@ -71,7 +71,7 @@ impl SyncRequester {
|
||||
}
|
||||
|
||||
/// Request headers from a peer by block number
|
||||
pub fn request_fork_header(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, n: BlockNumber) {
|
||||
pub fn request_fork_header(sync: &mut ChainSync, io: &mut dyn SyncIo, peer_id: PeerId, n: BlockNumber) {
|
||||
trace!(target: "sync", "{} <- GetForkHeader: at {}", peer_id, n);
|
||||
let mut rlp = RlpStream::new_list(4);
|
||||
rlp.append(&n);
|
||||
@@ -82,7 +82,7 @@ impl SyncRequester {
|
||||
}
|
||||
|
||||
/// Find some headers or blocks to download for a peer.
|
||||
pub fn request_snapshot_data(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId) {
|
||||
pub fn request_snapshot_data(sync: &mut ChainSync, io: &mut dyn SyncIo, peer_id: PeerId) {
|
||||
// find chunk data to download
|
||||
if let Some(hash) = sync.snapshot.needed_chunk() {
|
||||
if let Some(ref mut peer) = sync.peers.get_mut(&peer_id) {
|
||||
@@ -93,14 +93,14 @@ impl SyncRequester {
|
||||
}
|
||||
|
||||
/// Request snapshot manifest from a peer.
|
||||
pub fn request_snapshot_manifest(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId) {
|
||||
pub fn request_snapshot_manifest(sync: &mut ChainSync, io: &mut dyn SyncIo, peer_id: PeerId) {
|
||||
trace!(target: "sync", "{} <- GetSnapshotManifest", peer_id);
|
||||
let rlp = RlpStream::new_list(0);
|
||||
SyncRequester::send_request(sync, io, peer_id, PeerAsking::SnapshotManifest, GetSnapshotManifestPacket, rlp.out());
|
||||
}
|
||||
|
||||
/// Request headers from a peer by block hash
|
||||
fn request_headers_by_hash(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, h: &H256, count: u64, skip: u64, reverse: bool, set: BlockSet) {
|
||||
fn request_headers_by_hash(sync: &mut ChainSync, io: &mut dyn SyncIo, peer_id: PeerId, h: &H256, count: u64, skip: u64, reverse: bool, set: BlockSet) {
|
||||
trace!(target: "sync", "{} <- GetBlockHeaders: {} entries starting from {}, set = {:?}", peer_id, count, h, set);
|
||||
let mut rlp = RlpStream::new_list(4);
|
||||
rlp.append(h);
|
||||
@@ -114,7 +114,7 @@ impl SyncRequester {
|
||||
}
|
||||
|
||||
/// Request block receipts from a peer
|
||||
fn request_receipts(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, hashes: Vec<H256>, set: BlockSet) {
|
||||
fn request_receipts(sync: &mut ChainSync, io: &mut dyn SyncIo, peer_id: PeerId, hashes: Vec<H256>, set: BlockSet) {
|
||||
let mut rlp = RlpStream::new_list(hashes.len());
|
||||
trace!(target: "sync", "{} <- GetBlockReceipts: {} entries starting from {:?}, set = {:?}", peer_id, hashes.len(), hashes.first(), set);
|
||||
for h in &hashes {
|
||||
@@ -127,7 +127,7 @@ impl SyncRequester {
|
||||
}
|
||||
|
||||
/// Request snapshot chunk from a peer.
|
||||
fn request_snapshot_chunk(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, chunk: &H256) {
|
||||
fn request_snapshot_chunk(sync: &mut ChainSync, io: &mut dyn SyncIo, peer_id: PeerId, chunk: &H256) {
|
||||
trace!(target: "sync", "{} <- GetSnapshotData {:?}", peer_id, chunk);
|
||||
let mut rlp = RlpStream::new_list(1);
|
||||
rlp.append(chunk);
|
||||
@@ -135,7 +135,7 @@ impl SyncRequester {
|
||||
}
|
||||
|
||||
/// Generic request sender
|
||||
fn send_request(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, asking: PeerAsking, packet_id: SyncPacket, packet: Bytes) {
|
||||
fn send_request(sync: &mut ChainSync, io: &mut dyn SyncIo, peer_id: PeerId, asking: PeerAsking, packet_id: SyncPacket, packet: Bytes) {
|
||||
if let Some(ref mut peer) = sync.peers.get_mut(&peer_id) {
|
||||
if peer.asking != PeerAsking::Nothing {
|
||||
warn!(target:"sync", "Asking {:?} while requesting {:?}", peer.asking, asking);
|
||||
|
||||
@@ -63,7 +63,7 @@ impl SyncSupplier {
|
||||
/// Dispatch incoming requests and responses
|
||||
// Take a u8 and not a SyncPacketId because this is the entry point
|
||||
// to chain sync from the outside world.
|
||||
pub fn dispatch_packet(sync: &RwLock<ChainSync>, io: &mut SyncIo, peer: PeerId, packet_id: u8, data: &[u8]) {
|
||||
pub fn dispatch_packet(sync: &RwLock<ChainSync>, io: &mut dyn SyncIo, peer: PeerId, packet_id: u8, data: &[u8]) {
|
||||
let rlp = Rlp::new(data);
|
||||
|
||||
if let Some(id) = SyncPacket::from_u8(packet_id) {
|
||||
@@ -141,7 +141,7 @@ impl SyncSupplier {
|
||||
}
|
||||
|
||||
/// Respond to GetBlockHeaders request
|
||||
fn return_block_headers(io: &SyncIo, r: &Rlp, peer_id: PeerId) -> RlpResponseResult {
|
||||
fn return_block_headers(io: &dyn SyncIo, r: &Rlp, peer_id: PeerId) -> RlpResponseResult {
|
||||
let payload_soft_limit = io.payload_soft_limit();
|
||||
// Packet layout:
|
||||
// [ block: { P , B_32 }, maxHeaders: P, skip: P, reverse: P in { 0 , 1 } ]
|
||||
@@ -222,7 +222,7 @@ impl SyncSupplier {
|
||||
}
|
||||
|
||||
/// Respond to GetBlockBodies request
|
||||
fn return_block_bodies(io: &SyncIo, r: &Rlp, peer_id: PeerId) -> RlpResponseResult {
|
||||
fn return_block_bodies(io: &dyn SyncIo, r: &Rlp, peer_id: PeerId) -> RlpResponseResult {
|
||||
let payload_soft_limit = io.payload_soft_limit();
|
||||
let mut count = r.item_count().unwrap_or(0);
|
||||
if count == 0 {
|
||||
@@ -249,7 +249,7 @@ impl SyncSupplier {
|
||||
}
|
||||
|
||||
/// Respond to GetNodeData request
|
||||
fn return_node_data(io: &SyncIo, r: &Rlp, peer_id: PeerId) -> RlpResponseResult {
|
||||
fn return_node_data(io: &dyn SyncIo, r: &Rlp, peer_id: PeerId) -> RlpResponseResult {
|
||||
let payload_soft_limit = io.payload_soft_limit();
|
||||
let mut count = r.item_count().unwrap_or(0);
|
||||
trace!(target: "sync", "{} -> GetNodeData: {} entries", peer_id, count);
|
||||
@@ -280,7 +280,7 @@ impl SyncSupplier {
|
||||
Ok(Some((NodeDataPacket.id(), rlp)))
|
||||
}
|
||||
|
||||
fn return_receipts(io: &SyncIo, rlp: &Rlp, peer_id: PeerId) -> RlpResponseResult {
|
||||
fn return_receipts(io: &dyn SyncIo, rlp: &Rlp, peer_id: PeerId) -> RlpResponseResult {
|
||||
let payload_soft_limit = io.payload_soft_limit();
|
||||
let mut count = rlp.item_count().unwrap_or(0);
|
||||
trace!(target: "sync", "{} -> GetReceipts: {} entries", peer_id, count);
|
||||
@@ -307,7 +307,7 @@ impl SyncSupplier {
|
||||
}
|
||||
|
||||
/// Respond to GetSnapshotManifest request
|
||||
fn return_snapshot_manifest(io: &SyncIo, r: &Rlp, peer_id: PeerId) -> RlpResponseResult {
|
||||
fn return_snapshot_manifest(io: &dyn SyncIo, r: &Rlp, peer_id: PeerId) -> RlpResponseResult {
|
||||
let count = r.item_count().unwrap_or(0);
|
||||
trace!(target: "warp", "{} -> GetSnapshotManifest", peer_id);
|
||||
if count != 0 {
|
||||
@@ -330,7 +330,7 @@ impl SyncSupplier {
|
||||
}
|
||||
|
||||
/// Respond to GetSnapshotData request
|
||||
fn return_snapshot_data(io: &SyncIo, r: &Rlp, peer_id: PeerId) -> RlpResponseResult {
|
||||
fn return_snapshot_data(io: &dyn SyncIo, r: &Rlp, peer_id: PeerId) -> RlpResponseResult {
|
||||
let hash: H256 = r.val_at(0)?;
|
||||
trace!(target: "warp", "{} -> GetSnapshotData {:?}", peer_id, hash);
|
||||
let rlp = match io.snapshot_service().chunk(hash) {
|
||||
@@ -348,8 +348,8 @@ impl SyncSupplier {
|
||||
Ok(Some((SnapshotDataPacket.id(), rlp)))
|
||||
}
|
||||
|
||||
fn return_rlp<FRlp, FError>(io: &mut SyncIo, rlp: &Rlp, peer: PeerId, rlp_func: FRlp, error_func: FError) -> Result<(), PacketDecodeError>
|
||||
where FRlp : Fn(&SyncIo, &Rlp, PeerId) -> RlpResponseResult,
|
||||
fn return_rlp<FRlp, FError>(io: &mut dyn SyncIo, rlp: &Rlp, peer: PeerId, rlp_func: FRlp, error_func: FError) -> Result<(), PacketDecodeError>
|
||||
where FRlp : Fn(&dyn SyncIo, &Rlp, PeerId) -> RlpResponseResult,
|
||||
FError : FnOnce(network::Error) -> String
|
||||
{
|
||||
let response = rlp_func(io, rlp, peer);
|
||||
@@ -405,7 +405,7 @@ mod test {
|
||||
let mut client = TestBlockChainClient::new();
|
||||
client.add_blocks(100, EachBlockWith::Nothing);
|
||||
let blocks: Vec<_> = (0 .. 100)
|
||||
.map(|i| (&client as &BlockChainClient).block(BlockId::Number(i as BlockNumber)).map(|b| b.into_inner()).unwrap()).collect();
|
||||
.map(|i| (&client as &dyn BlockChainClient).block(BlockId::Number(i as BlockNumber)).map(|b| b.into_inner()).unwrap()).collect();
|
||||
let headers: Vec<_> = blocks.iter().map(|b| SyncHeader::from_rlp(Rlp::new(b).at(0).unwrap().as_raw().to_vec()).unwrap()).collect();
|
||||
let hashes: Vec<_> = headers.iter().map(|h| h.header.hash()).collect();
|
||||
|
||||
|
||||
Reference in New Issue
Block a user