diff --git a/Cargo.lock b/Cargo.lock index 21a03b3aa..5a25ec4e3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -574,6 +574,16 @@ dependencies = [ "winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "derive_more" +version = "0.99.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "proc-macro2 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)", + "quote 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", + "syn 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "difference" version = "1.0.0" @@ -1160,6 +1170,7 @@ name = "ethcore-sync" version = "1.12.0" dependencies = [ "common-types 0.1.0", + "derive_more 0.99.9 (registry+https://github.com/rust-lang/crates.io-index)", "enum_primitive 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "env_logger 0.5.13 (registry+https://github.com/rust-lang/crates.io-index)", "ethcore 1.12.0", @@ -4712,6 +4723,7 @@ dependencies = [ "checksum ct-logs 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "1b4660f8b07a560a88c02d76286edb9f0d5d64e495d2b0f233186155aa51be1f" "checksum ctr 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "022cd691704491df67d25d006fe8eca083098253c4d43516c2206479c58c6736" "checksum ctrlc 1.1.1 (git+https://github.com/paritytech/rust-ctrlc.git)" = "" +"checksum derive_more 0.99.9 (registry+https://github.com/rust-lang/crates.io-index)" = "298998b1cf6b5b2c8a7b023dfd45821825ce3ba8a8af55c921a0e734e4653f76" "checksum difference 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b3304d19798a8e067e48d8e69b2c37f0b5e9b4e462504ad9e27e9f3fce02bba8" "checksum digest 0.7.6 (registry+https://github.com/rust-lang/crates.io-index)" = "03b072242a8cbaf9c145665af9d250c59af3b958f83ed6824e13533cf76d5b90" "checksum digest 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "f3d0c8c8752312f9713efd397ff63acb9f85585afbf179282e720e7704954dd5" diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index a2a0d42af..98f833c25 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -2051,6 +2051,13 @@ impl BlockChainClient for Client { } } + fn is_processing_fork(&self) -> bool { + let chain = self.chain.read(); + self.importer + .block_queue + .is_processing_fork(&chain.best_block_hash(), &chain) + } + fn block_total_difficulty(&self, id: BlockId) -> Option { let chain = self.chain.read(); diff --git a/ethcore/src/client/test_client.rs b/ethcore/src/client/test_client.rs index 9acd6c182..0059dc3ce 100644 --- a/ethcore/src/client/test_client.rs +++ b/ethcore/src/client/test_client.rs @@ -887,6 +887,10 @@ impl BlockChainClient for TestBlockChainClient { } } + fn is_processing_fork(&self) -> bool { + false + } + // works only if blocks are one after another 1 -> 2 -> 3 fn tree_route(&self, from: &H256, to: &H256) -> Option { Some(TreeRoute { diff --git a/ethcore/src/client/traits.rs b/ethcore/src/client/traits.rs index faa23396b..6bde41acc 100644 --- a/ethcore/src/client/traits.rs +++ b/ethcore/src/client/traits.rs @@ -427,6 +427,9 @@ pub trait BlockChainClient: /// Get the address of the registry itself. fn registrar_address(&self) -> Option
; + + /// Returns true, if underlying import queue is processing possible fork at the moment + fn is_processing_fork(&self) -> bool; } /// Provides `reopen_block` method diff --git a/ethcore/src/verification/queue/mod.rs b/ethcore/src/verification/queue/mod.rs index 35b50b112..e15063924 100644 --- a/ethcore/src/verification/queue/mod.rs +++ b/ethcore/src/verification/queue/mod.rs @@ -17,6 +17,7 @@ //! A queue of blocks. Sits between network or other I/O and the `BlockChain`. //! Sorts them ready for blockchain insertion. +use blockchain::BlockChain; use client::ClientIoMessage; use engines::EthEngine; use error::{BlockError, Error, ErrorKind, ImportErrorKind}; @@ -43,6 +44,9 @@ pub mod kind; const MIN_MEM_LIMIT: usize = 16384; const MIN_QUEUE_LIMIT: usize = 512; +/// Empiric estimation of the minimal length of the processing queue, +/// That definitely doesn't contain forks inside. +const MAX_QUEUE_WITH_FORK: usize = 8; /// Type alias for block queue convenience. pub type BlockQueue = VerificationQueue; @@ -148,7 +152,7 @@ pub struct VerificationQueue { deleting: Arc, ready_signal: Arc, empty: Arc, - processing: RwLock>, // hash to difficulty + processing: RwLock>, // item's hash to difficulty and parent item hash ticks_since_adjustment: AtomicUsize, max_queue_size: usize, max_mem_use: usize, @@ -540,7 +544,7 @@ impl VerificationQueue { if self .processing .write() - .insert(hash, item.difficulty()) + .insert(hash, (item.difficulty(), item.parent_hash())) .is_some() { bail!(( @@ -553,6 +557,7 @@ impl VerificationQueue { .unverified .fetch_add(item.heap_size_of_children(), AtomicOrdering::SeqCst); + //self.processing.write().insert(hash, item.difficulty()); { let mut td = self.total_difficulty.write(); *td = *td + item.difficulty(); @@ -597,7 +602,7 @@ impl VerificationQueue { bad.reserve(hashes.len()); for hash in hashes { bad.insert(hash.clone()); - if let Some(difficulty) = processing.remove(hash) { + if let Some((difficulty, _)) = processing.remove(hash) { let mut td = self.total_difficulty.write(); *td = *td - difficulty; } @@ -609,7 +614,7 @@ impl VerificationQueue { if bad.contains(&output.parent_hash()) { removed_size += output.heap_size_of_children(); bad.insert(output.hash()); - if let Some(difficulty) = processing.remove(&output.hash()) { + if let Some((difficulty, _)) = processing.remove(&output.hash()) { let mut td = self.total_difficulty.write(); *td = *td - difficulty; } @@ -633,7 +638,7 @@ impl VerificationQueue { } let mut processing = self.processing.write(); for hash in hashes { - if let Some(difficulty) = processing.remove(hash) { + if let Some((difficulty, _)) = processing.remove(hash) { let mut td = self.total_difficulty.write(); *td = *td - difficulty; } @@ -670,6 +675,24 @@ impl VerificationQueue { v.unverified.load_len() == 0 && v.verifying.load_len() == 0 && v.verified.load_len() == 0 } + /// Returns true if there are descendants of the current best block in the processing queue + pub fn is_processing_fork(&self, best_block_hash: &H256, chain: &BlockChain) -> bool { + let processing = self.processing.read(); + if processing.is_empty() || processing.len() > MAX_QUEUE_WITH_FORK { + // Assume, that long enough processing queue doesn't have fork blocks + return false; + } + for (_, item_parent_hash) in processing.values() { + if chain + .tree_route(*best_block_hash, *item_parent_hash) + .map_or(true, |route| route.ancestor != *best_block_hash) + { + return true; + } + } + false + } + /// Get queue status. pub fn queue_info(&self) -> QueueInfo { use std::mem::size_of; diff --git a/ethcore/sync/Cargo.toml b/ethcore/sync/Cargo.toml index 09d5fa042..c75718488 100644 --- a/ethcore/sync/Cargo.toml +++ b/ethcore/sync/Cargo.toml @@ -10,6 +10,7 @@ authors = ["Parity Technologies "] [dependencies] common-types = { path = "../types" } enum_primitive = "0.1.1" +derive_more = "0.99" ethcore = { path = ".." } ethcore-io = { path = "../../util/io" } ethcore-network = { path = "../../util/network" } diff --git a/ethcore/sync/src/api.rs b/ethcore/sync/src/api.rs index 0b6000b96..782a673c8 100644 --- a/ethcore/sync/src/api.rs +++ b/ethcore/sync/src/api.rs @@ -316,6 +316,7 @@ const MAINTAIN_SYNC_TIMER: TimerToken = 1; const CONTINUE_SYNC_TIMER: TimerToken = 2; const TX_TIMER: TimerToken = 3; const PRIORITY_TIMER: TimerToken = 4; +const DELAYED_PROCESSING_TIMER: TimerToken = 5; pub(crate) const PRIORITY_TIMER_INTERVAL: Duration = Duration::from_millis(250); @@ -341,6 +342,8 @@ impl NetworkProtocolHandler for SyncProtocolHandler { .expect("Error registering sync timer"); io.register_timer(TX_TIMER, Duration::from_millis(1300)) .expect("Error registering transactions timer"); + io.register_timer(DELAYED_PROCESSING_TIMER, Duration::from_millis(2100)) + .expect("Error registering delayed processing timer"); io.register_timer(PRIORITY_TIMER, PRIORITY_TIMER_INTERVAL) .expect("Error registering peers timer"); @@ -388,6 +391,7 @@ impl NetworkProtocolHandler for SyncProtocolHandler { CONTINUE_SYNC_TIMER => self.sync.write().continue_sync(&mut io), TX_TIMER => self.sync.write().propagate_new_transactions(&mut io), PRIORITY_TIMER => self.sync.process_priority_queue(&mut io), + DELAYED_PROCESSING_TIMER => self.sync.process_delayed_requests(&mut io), _ => warn!("Unknown timer {} triggered.", timer), } } diff --git a/ethcore/sync/src/block_sync.rs b/ethcore/sync/src/block_sync.rs index f55157a1f..76e096b82 100644 --- a/ethcore/sync/src/block_sync.rs +++ b/ethcore/sync/src/block_sync.rs @@ -223,6 +223,18 @@ impl BlockDownloader { self.blocks.heap_size() + self.round_parents.heap_size_of_children() } + fn reset_to_block(&mut self, start_hash: &H256, start_number: BlockNumber) { + self.reset(); + self.last_imported_block = start_number; + self.last_imported_hash = start_hash.clone(); + self.last_round_start = start_number; + self.last_round_start_hash = start_hash.clone(); + self.imported_this_round = None; + self.round_parents = VecDeque::new(); + self.target_hash = None; + self.retract_step = 1; + } + /// Returns best imported block number. pub fn last_imported_block_number(&self) -> BlockNumber { self.last_imported_block @@ -491,6 +503,7 @@ impl BlockDownloader { ); } else { let best = io.chain().chain_info().best_block_number; + let best_hash = io.chain().chain_info().best_block_hash; let oldest_reorg = io.chain().pruning_info().earliest_state; if self.block_set == BlockSet::NewBlocks && best > start && start < oldest_reorg { @@ -500,29 +513,34 @@ impl BlockDownloader { start, start_hash ); - self.reset(); + self.reset_to_block(&best_hash, best); } else { let n = start - cmp::min(self.retract_step, start); - self.retract_step *= 2; - match io.chain().block_hash(BlockId::Number(n)) { - Some(h) => { - self.last_imported_block = n; - self.last_imported_hash = h; - trace_sync!( - self, - "Searching common header in the blockchain {} ({})", - start, - self.last_imported_hash - ); - } - None => { - debug_sync!( - self, - "Could not revert to previous block, last: {} ({})", - start, - self.last_imported_hash - ); - self.reset(); + if n == 0 { + debug_sync!(self, "Header not found, bottom line reached, resetting, last imported: {}", self.last_imported_hash); + self.reset_to_block(&best_hash, best); + } else { + self.retract_step *= 2; + match io.chain().block_hash(BlockId::Number(n)) { + Some(h) => { + self.last_imported_block = n; + self.last_imported_hash = h; + trace_sync!( + self, + "Searching common header in the blockchain {} ({})", + start, + self.last_imported_hash + ); + } + None => { + debug_sync!( + self, + "Could not revert to previous block, last: {} ({})", + start, + self.last_imported_hash + ); + self.reset_to_block(&best_hash, best); + } } } } diff --git a/ethcore/sync/src/chain/handler.rs b/ethcore/sync/src/chain/handler.rs index 6cbdedac1..87ff9ba9c 100644 --- a/ethcore/sync/src/chain/handler.rs +++ b/ethcore/sync/src/chain/handler.rs @@ -41,7 +41,7 @@ use super::sync_packet::{ }; use super::{ - BlockSet, ChainSync, ForkConfirmation, PacketDecodeError, PeerAsking, PeerInfo, SyncRequester, + BlockSet, ChainSync, ForkConfirmation, PacketProcessError, PeerAsking, PeerInfo, SyncRequester, SyncState, ETH_PROTOCOL_VERSION_62, ETH_PROTOCOL_VERSION_63, MAX_NEW_BLOCK_AGE, MAX_NEW_HASHES, PAR_PROTOCOL_VERSION_1, PAR_PROTOCOL_VERSION_2, }; @@ -108,6 +108,8 @@ impl SyncHandler { debug!(target: "sync", "Disconnected {}", peer_id); sync.clear_peer_download(peer_id); sync.peers.remove(&peer_id); + sync.delayed_requests + .retain(|(request_peer_id, _, _)| *request_peer_id != peer_id); sync.active_peers.remove(&peer_id); if sync.state == SyncState::SnapshotManifest { @@ -153,12 +155,6 @@ impl SyncHandler { trace!(target: "sync", "Ignoring new block from unconfirmed peer {}", peer_id); return Ok(()); } - let difficulty: U256 = r.val_at(1)?; - if let Some(ref mut peer) = sync.peers.get_mut(&peer_id) { - if peer.difficulty.map_or(true, |pd| difficulty > pd) { - peer.difficulty = Some(difficulty); - } - } let block = Unverified::from_rlp(r.at(0)?.as_raw().to_vec())?; let hash = block.header.hash(); let number = block.header.number(); @@ -166,10 +162,23 @@ impl SyncHandler { if number > sync.highest_block.unwrap_or(0) { sync.highest_block = Some(number); } + let parent_hash = block.header.parent_hash(); + let difficulty: U256 = r.val_at(1)?; + // Most probably the sent block is being imported by peer right now + // Use td and hash, that peer must have for now + let parent_td = difficulty.checked_sub(*block.header.difficulty()); + if let Some(ref mut peer) = sync.peers.get_mut(&peer_id) { + if peer + .difficulty + .map_or(true, |pd| parent_td.map_or(false, |td| td > pd)) + { + peer.difficulty = parent_td; + } + } let mut unknown = false; if let Some(ref mut peer) = sync.peers.get_mut(&peer_id) { - peer.latest_hash = hash; + peer.latest_hash = *parent_hash; } let last_imported_number = sync.new_blocks.last_imported_block_number(); @@ -755,7 +764,7 @@ impl SyncHandler { io: &mut dyn SyncIo, peer_id: PeerId, r: &Rlp, - ) -> Result<(), PacketDecodeError> { + ) -> Result<(), PacketProcessError> { // Accept transactions only when fully synced if !io.is_chain_queue_empty() || (sync.state != SyncState::Idle && sync.state != SyncState::NewBlocks) diff --git a/ethcore/sync/src/chain/mod.rs b/ethcore/sync/src/chain/mod.rs index ccea28817..99b237bae 100644 --- a/ethcore/sync/src/chain/mod.rs +++ b/ethcore/sync/src/chain/mod.rs @@ -97,6 +97,7 @@ use super::{SyncConfig, WarpSync}; use api::{EthProtocolInfo as PeerInfoDigest, PriorityTask, PAR_PROTOCOL}; use block_sync::{BlockDownloader, DownloadAction}; use bytes::Bytes; +use derive_more::Display; use ethcore::{ client::{BlockChainClient, BlockChainInfo, BlockId, BlockQueueInfo, BlockStatus}, snapshot::RestorationStatus, @@ -105,7 +106,7 @@ use ethereum_types::{H256, U256}; use fastmap::{H256FastMap, H256FastSet}; use hash::keccak; use heapsize::HeapSizeOf; -use network::{self, client_version::ClientVersion, PacketId, PeerId}; +use network::{self, client_version::ClientVersion, PeerId}; use parking_lot::{Mutex, RwLock, RwLockWriteGuard}; use rand::Rng; use rlp::{DecoderError, RlpStream}; @@ -124,7 +125,7 @@ use self::{ handler::SyncHandler, sync_packet::{ PacketInfo, - SyncPacket::{NewBlockPacket, StatusPacket}, + SyncPacket::{self, NewBlockPacket, StatusPacket}, }, }; @@ -133,7 +134,23 @@ use self::{propagator::SyncPropagator, requester::SyncRequester}; known_heap_size!(0, PeerInfo); -pub type PacketDecodeError = DecoderError; +/// Possible errors during packet's processing +#[derive(Debug, Display)] +pub enum PacketProcessError { + /// Error of RLP decoder + #[display(fmt = "Decoder Error: {}", _0)] + Decoder(DecoderError), + /// Underlying client is busy and cannot process the packet + /// The packet should be postponed for later response + #[display(fmt = "Underlying client is busy")] + ClientBusy, +} + +impl From for PacketProcessError { + fn from(err: DecoderError) -> Self { + PacketProcessError::Decoder(err).into() + } +} /// 63 version of Ethereum protocol. pub const ETH_PROTOCOL_VERSION_63: (u8, u8) = (63, 0x11); @@ -363,7 +380,7 @@ pub mod random { } } -pub type RlpResponseResult = Result, PacketDecodeError>; +pub type RlpResponseResult = Result, PacketProcessError>; pub type Peers = HashMap; /// Thread-safe wrapper for `ChainSync`. @@ -420,6 +437,23 @@ impl ChainSyncApi { SyncSupplier::dispatch_packet(&self.sync, io, peer, packet_id, data) } + /// Process the queue with requests, that were delayed with response. + pub fn process_delayed_requests(&self, io: &mut dyn SyncIo) { + let requests = self.sync.write().retrieve_delayed_requests(); + if !requests.is_empty() { + debug!(target: "sync", "Processing {} delayed requests", requests.len()); + for (peer_id, packet_id, packet_data) in requests { + SyncSupplier::dispatch_delayed_request( + &self.sync, + io, + peer_id, + packet_id, + &packet_data, + ); + } + } + } + /// Process a priority propagation queue. /// This task is run from a timer and should be time constrained. /// Hence we set up a deadline for the execution and cancel the task if the deadline is exceeded. @@ -622,6 +656,10 @@ pub struct ChainSync { /// Connected peers pending Status message. /// Value is request timestamp. handshaking_peers: HashMap, + /// Requests, that can not be processed at the moment + delayed_requests: Vec<(PeerId, u8, Vec)>, + /// Ids of delayed requests, used for lookup, id is composed from peer id and packet id + delayed_requests_ids: HashSet<(PeerId, u8)>, /// Sync start timestamp. Measured when first peer is connected sync_start_time: Option, /// Transactions propagation statistics @@ -646,6 +684,8 @@ impl ChainSync { peers: HashMap::new(), handshaking_peers: HashMap::new(), active_peers: HashSet::new(), + delayed_requests: Vec::new(), + delayed_requests_ids: HashSet::new(), new_blocks: BlockDownloader::new( BlockSet::NewBlocks, &chain_info.best_block_hash, @@ -753,6 +793,22 @@ impl ChainSync { self.active_peers = self.peers.keys().cloned().collect(); } + /// Add a request for later processing + pub fn add_delayed_request(&mut self, peer: PeerId, packet_id: u8, data: &[u8]) { + // Ignore the request, if there is a request already in queue with the same id + if !self.delayed_requests_ids.contains(&(peer, packet_id)) { + self.delayed_requests_ids.insert((peer, packet_id)); + self.delayed_requests.push((peer, packet_id, data.to_vec())); + debug!(target: "sync", "Delayed request with packet id {} from peer {} added", packet_id, peer); + } + } + + /// Drain and return all delayed requests + pub fn retrieve_delayed_requests(&mut self) -> Vec<(PeerId, u8, Vec)> { + self.delayed_requests_ids.clear(); + self.delayed_requests.drain(..).collect() + } + /// Restart sync pub fn reset_and_continue(&mut self, io: &mut dyn SyncIo) { trace!(target: "sync", "Restarting"); diff --git a/ethcore/sync/src/chain/supplier.rs b/ethcore/sync/src/chain/supplier.rs index ed3e777cb..99ab3baba 100644 --- a/ethcore/sync/src/chain/supplier.rs +++ b/ethcore/sync/src/chain/supplier.rs @@ -36,7 +36,7 @@ use super::sync_packet::{ }; use super::{ - ChainSync, PacketDecodeError, RlpResponseResult, SyncHandler, MAX_BODIES_TO_SEND, + ChainSync, PacketProcessError, RlpResponseResult, SyncHandler, MAX_BODIES_TO_SEND, MAX_HEADERS_TO_SEND, MAX_RECEIPTS_HEADERS_TO_SEND, }; @@ -129,14 +129,64 @@ impl SyncSupplier { } }; - result.unwrap_or_else(|e| { - debug!(target:"sync", "{} -> Malformed packet {} : {}", peer, packet_id, e); - }) + match result { + Err(PacketProcessError::Decoder(e)) => { + debug!(target:"sync", "{} -> Malformed packet {} : {}", peer, packet_id, e) + } + Err(PacketProcessError::ClientBusy) => { + sync.write().add_delayed_request(peer, packet_id, data) + } + Ok(()) => {} + } + } + } + + /// Dispatch delayed request + /// The main difference with dispatch packet is the direct send of the responses to the peer + pub fn dispatch_delayed_request( + sync: &RwLock, + io: &mut dyn SyncIo, + peer: PeerId, + packet_id: u8, + data: &[u8], + ) { + let rlp = Rlp::new(data); + + if let Some(id) = SyncPacket::from_u8(packet_id) { + let result = match id { + GetBlockHeadersPacket => SyncSupplier::send_rlp( + io, + &rlp, + peer, + SyncSupplier::return_block_headers, + |e| format!("Error sending block headers: {:?}", e), + ), + + _ => { + debug!(target:"sync", "Unexpected packet {} was dispatched for delayed processing", packet_id); + Ok(()) + } + }; + + match result { + Err(PacketProcessError::Decoder(e)) => { + debug!(target:"sync", "{} -> Malformed packet {} : {}", peer, packet_id, e) + } + Err(PacketProcessError::ClientBusy) => { + sync.write().add_delayed_request(peer, packet_id, data) + } + Ok(()) => {} + } } } /// Respond to GetBlockHeaders request fn return_block_headers(io: &dyn SyncIo, r: &Rlp, peer_id: PeerId) -> RlpResponseResult { + // Cannot return blocks, if forks processing is in progress, + // The request should be postponed for later processing + if io.chain().is_processing_fork() { + return Err(PacketProcessError::ClientBusy); + } let payload_soft_limit = io.payload_soft_limit(); // Packet layout: // [ block: { P , B_32 }, maxHeaders: P, skip: P, reverse: P in { 0 , 1 } ] @@ -161,11 +211,11 @@ impl SyncSupplier { trace!(target:"sync", "Returning single header: {:?}", hash); let mut rlp = RlpStream::new_list(1); rlp.append_raw(&hdr.into_inner(), 1); - return Ok(Some((BlockHeadersPacket.id(), rlp))); + return Ok(Some((BlockHeadersPacket, rlp))); } number } - None => return Ok(Some((BlockHeadersPacket.id(), RlpStream::new_list(0)))), //no such header, return nothing + None => return Ok(Some((BlockHeadersPacket, RlpStream::new_list(0)))), //no such header, return nothing } } else { let number = r.val_at::(0)?; @@ -215,7 +265,7 @@ impl SyncSupplier { let mut rlp = RlpStream::new_list(count as usize); rlp.append_raw(&data, count as usize); trace!(target: "sync", "{} -> GetBlockHeaders: returned {} entries", peer_id, count); - Ok(Some((BlockHeadersPacket.id(), rlp))) + Ok(Some((BlockHeadersPacket, rlp))) } /// Respond to GetBlockBodies request @@ -242,7 +292,7 @@ impl SyncSupplier { let mut rlp = RlpStream::new_list(added); rlp.append_raw(&data, added); trace!(target: "sync", "{} -> GetBlockBodies: returned {} entries", peer_id, added); - Ok(Some((BlockBodiesPacket.id(), rlp))) + Ok(Some((BlockBodiesPacket, rlp))) } fn return_receipts(io: &dyn SyncIo, rlp: &Rlp, peer_id: PeerId) -> RlpResponseResult { @@ -270,7 +320,7 @@ impl SyncSupplier { } let mut rlp_result = RlpStream::new_list(added_headers); rlp_result.append_raw(&data, added_headers); - Ok(Some((ReceiptsPacket.id(), rlp_result))) + Ok(Some((ReceiptsPacket, rlp_result))) } /// Respond to GetSnapshotManifest request @@ -293,7 +343,7 @@ impl SyncSupplier { RlpStream::new_list(0) } }; - Ok(Some((SnapshotManifestPacket.id(), rlp))) + Ok(Some((SnapshotManifestPacket, rlp))) } /// Respond to GetSnapshotData request @@ -312,7 +362,7 @@ impl SyncSupplier { RlpStream::new_list(0) } }; - Ok(Some((SnapshotDataPacket.id(), rlp))) + Ok(Some((SnapshotDataPacket, rlp))) } fn return_rlp( @@ -321,7 +371,26 @@ impl SyncSupplier { peer: PeerId, rlp_func: FRlp, error_func: FError, - ) -> Result<(), PacketDecodeError> + ) -> Result<(), PacketProcessError> + where + FRlp: Fn(&dyn SyncIo, &Rlp, PeerId) -> RlpResponseResult, + FError: FnOnce(network::Error) -> String, + { + let response = rlp_func(io, rlp, peer); + if let Some((packet_id, rlp_stream)) = response? { + io.respond(packet_id.id(), rlp_stream.out()) + .unwrap_or_else(|e| debug!(target: "sync", "{:?}", error_func(e))); + } + Ok(()) + } + + fn send_rlp( + io: &mut dyn SyncIo, + rlp: &Rlp, + peer: PeerId, + rlp_func: FRlp, + error_func: FError, + ) -> Result<(), PacketProcessError> where FRlp: Fn(&dyn SyncIo, &Rlp, PeerId) -> RlpResponseResult, FError: FnOnce(network::Error) -> String, @@ -330,7 +399,7 @@ impl SyncSupplier { match response { Err(e) => Err(e), Ok(Some((packet_id, rlp_stream))) => { - io.respond(packet_id, rlp_stream.out()) + io.send(peer, packet_id, rlp_stream.out()) .unwrap_or_else(|e| debug!(target: "sync", "{:?}", error_func(e))); Ok(()) } diff --git a/ethcore/sync/src/lib.rs b/ethcore/sync/src/lib.rs index 4440a4b27..0f41ff450 100644 --- a/ethcore/sync/src/lib.rs +++ b/ethcore/sync/src/lib.rs @@ -22,6 +22,7 @@ //! extern crate common_types as types; +extern crate derive_more; extern crate ethcore; extern crate ethcore_io as io; extern crate ethcore_network as network;