Bundle protocol and packet_id together in chain sync (#10315)
Define a new `enum` where devp2p subprotocol packet ids (currently eth and par) are defined. Additionally provide functionality to query id value and protocol of a given id object.
This commit is contained in:
parent
ea589a17a4
commit
3adb640d2b
10
Cargo.lock
generated
10
Cargo.lock
generated
@ -588,6 +588,14 @@ dependencies = [
|
|||||||
"heapsize 0.4.2 (git+https://github.com/cheme/heapsize.git?branch=ec-macfix)",
|
"heapsize 0.4.2 (git+https://github.com/cheme/heapsize.git?branch=ec-macfix)",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "enum_primitive"
|
||||||
|
version = "0.1.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
dependencies = [
|
||||||
|
"num-traits 0.1.43 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "env_logger"
|
name = "env_logger"
|
||||||
version = "0.5.13"
|
version = "0.5.13"
|
||||||
@ -1108,6 +1116,7 @@ name = "ethcore-sync"
|
|||||||
version = "1.12.0"
|
version = "1.12.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"common-types 0.1.0",
|
"common-types 0.1.0",
|
||||||
|
"enum_primitive 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
"env_logger 0.5.13 (registry+https://github.com/rust-lang/crates.io-index)",
|
"env_logger 0.5.13 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
"ethcore 1.12.0",
|
"ethcore 1.12.0",
|
||||||
"ethcore-io 1.12.0",
|
"ethcore-io 1.12.0",
|
||||||
@ -4510,6 +4519,7 @@ dependencies = [
|
|||||||
"checksum edit-distance 2.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "3bd26878c3d921f89797a4e1a1711919f999a9f6946bb6f5a4ffda126d297b7e"
|
"checksum edit-distance 2.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "3bd26878c3d921f89797a4e1a1711919f999a9f6946bb6f5a4ffda126d297b7e"
|
||||||
"checksum either 1.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "3be565ca5c557d7f59e7cfcf1844f9e3033650c929c6566f511e8005f205c1d0"
|
"checksum either 1.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "3be565ca5c557d7f59e7cfcf1844f9e3033650c929c6566f511e8005f205c1d0"
|
||||||
"checksum elastic-array 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)" = "88d4851b005ef16de812ea9acdb7bece2f0a40dd86c07b85631d7dafa54537bb"
|
"checksum elastic-array 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)" = "88d4851b005ef16de812ea9acdb7bece2f0a40dd86c07b85631d7dafa54537bb"
|
||||||
|
"checksum enum_primitive 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "be4551092f4d519593039259a9ed8daedf0da12e5109c5280338073eaeb81180"
|
||||||
"checksum env_logger 0.5.13 (registry+https://github.com/rust-lang/crates.io-index)" = "15b0a4d2e39f8420210be8b27eeda28029729e2fd4291019455016c348240c38"
|
"checksum env_logger 0.5.13 (registry+https://github.com/rust-lang/crates.io-index)" = "15b0a4d2e39f8420210be8b27eeda28029729e2fd4291019455016c348240c38"
|
||||||
"checksum error-chain 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)" = "07e791d3be96241c77c43846b665ef1384606da2cd2a48730abe606a12906e02"
|
"checksum error-chain 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)" = "07e791d3be96241c77c43846b665ef1384606da2cd2a48730abe606a12906e02"
|
||||||
"checksum eth-secp256k1 0.5.7 (git+https://github.com/paritytech/rust-secp256k1)" = "<none>"
|
"checksum eth-secp256k1 0.5.7 (git+https://github.com/paritytech/rust-secp256k1)" = "<none>"
|
||||||
|
@ -9,6 +9,7 @@ authors = ["Parity Technologies <admin@parity.io>"]
|
|||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
common-types = { path = "../types" }
|
common-types = { path = "../types" }
|
||||||
|
enum_primitive = "0.1.1"
|
||||||
ethcore = { path = ".." }
|
ethcore = { path = ".." }
|
||||||
ethcore-io = { path = "../../util/io" }
|
ethcore-io = { path = "../../util/io" }
|
||||||
ethcore-light = { path = "../light" }
|
ethcore-light = { path = "../light" }
|
||||||
|
@ -39,8 +39,8 @@ use std::net::{SocketAddr, AddrParseError};
|
|||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
use parking_lot::{RwLock, Mutex};
|
use parking_lot::{RwLock, Mutex};
|
||||||
use chain::{ETH_PROTOCOL_VERSION_63, ETH_PROTOCOL_VERSION_62,
|
use chain::{ETH_PROTOCOL_VERSION_63, ETH_PROTOCOL_VERSION_62,
|
||||||
PAR_PROTOCOL_VERSION_1, PAR_PROTOCOL_VERSION_2, PAR_PROTOCOL_VERSION_3,
|
PAR_PROTOCOL_VERSION_1, PAR_PROTOCOL_VERSION_2, PAR_PROTOCOL_VERSION_3};
|
||||||
PRIVATE_TRANSACTION_PACKET, SIGNED_PRIVATE_TRANSACTION_PACKET};
|
use chain::sync_packet::SyncPacket::{PrivateTransactionPacket, SignedPrivateTransactionPacket};
|
||||||
use light::client::AsLightClient;
|
use light::client::AsLightClient;
|
||||||
use light::Provider;
|
use light::Provider;
|
||||||
use light::net::{
|
use light::net::{
|
||||||
@ -579,9 +579,9 @@ impl ChainNotify for EthSync {
|
|||||||
match message_type {
|
match message_type {
|
||||||
ChainMessageType::Consensus(message) => self.eth_handler.sync.write().propagate_consensus_packet(&mut sync_io, message),
|
ChainMessageType::Consensus(message) => self.eth_handler.sync.write().propagate_consensus_packet(&mut sync_io, message),
|
||||||
ChainMessageType::PrivateTransaction(transaction_hash, message) =>
|
ChainMessageType::PrivateTransaction(transaction_hash, message) =>
|
||||||
self.eth_handler.sync.write().propagate_private_transaction(&mut sync_io, transaction_hash, WARP_SYNC_PROTOCOL_ID, PRIVATE_TRANSACTION_PACKET, message),
|
self.eth_handler.sync.write().propagate_private_transaction(&mut sync_io, transaction_hash, PrivateTransactionPacket, message),
|
||||||
ChainMessageType::SignedPrivateTransaction(transaction_hash, message) =>
|
ChainMessageType::SignedPrivateTransaction(transaction_hash, message) =>
|
||||||
self.eth_handler.sync.write().propagate_private_transaction(&mut sync_io, transaction_hash, WARP_SYNC_PROTOCOL_ID, SIGNED_PRIVATE_TRANSACTION_PACKET, message),
|
self.eth_handler.sync.write().propagate_private_transaction(&mut sync_io, transaction_hash, SignedPrivateTransactionPacket, message),
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
@ -17,6 +17,7 @@
|
|||||||
use api::WARP_SYNC_PROTOCOL_ID;
|
use api::WARP_SYNC_PROTOCOL_ID;
|
||||||
use block_sync::{BlockDownloaderImportError as DownloaderImportError, DownloadAction};
|
use block_sync::{BlockDownloaderImportError as DownloaderImportError, DownloadAction};
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
|
use enum_primitive::FromPrimitive;
|
||||||
use ethcore::error::{Error as EthcoreError, ErrorKind as EthcoreErrorKind, ImportErrorKind, BlockError};
|
use ethcore::error::{Error as EthcoreError, ErrorKind as EthcoreErrorKind, ImportErrorKind, BlockError};
|
||||||
use ethcore::snapshot::{ManifestData, RestorationStatus};
|
use ethcore::snapshot::{ManifestData, RestorationStatus};
|
||||||
use ethcore::verification::queue::kind::blocks::Unverified;
|
use ethcore::verification::queue::kind::blocks::Unverified;
|
||||||
@ -33,6 +34,20 @@ use types::BlockNumber;
|
|||||||
use types::block_status::BlockStatus;
|
use types::block_status::BlockStatus;
|
||||||
use types::ids::BlockId;
|
use types::ids::BlockId;
|
||||||
|
|
||||||
|
use super::sync_packet::{PacketInfo, SyncPacket};
|
||||||
|
use super::sync_packet::SyncPacket::{
|
||||||
|
StatusPacket,
|
||||||
|
NewBlockHashesPacket,
|
||||||
|
BlockHeadersPacket,
|
||||||
|
BlockBodiesPacket,
|
||||||
|
NewBlockPacket,
|
||||||
|
ReceiptsPacket,
|
||||||
|
SnapshotManifestPacket,
|
||||||
|
SnapshotDataPacket,
|
||||||
|
PrivateTransactionPacket,
|
||||||
|
SignedPrivateTransactionPacket,
|
||||||
|
};
|
||||||
|
|
||||||
use super::{
|
use super::{
|
||||||
BlockSet,
|
BlockSet,
|
||||||
ChainSync,
|
ChainSync,
|
||||||
@ -48,16 +63,6 @@ use super::{
|
|||||||
MAX_NEW_HASHES,
|
MAX_NEW_HASHES,
|
||||||
PAR_PROTOCOL_VERSION_1,
|
PAR_PROTOCOL_VERSION_1,
|
||||||
PAR_PROTOCOL_VERSION_3,
|
PAR_PROTOCOL_VERSION_3,
|
||||||
BLOCK_BODIES_PACKET,
|
|
||||||
BLOCK_HEADERS_PACKET,
|
|
||||||
NEW_BLOCK_HASHES_PACKET,
|
|
||||||
NEW_BLOCK_PACKET,
|
|
||||||
PRIVATE_TRANSACTION_PACKET,
|
|
||||||
RECEIPTS_PACKET,
|
|
||||||
SIGNED_PRIVATE_TRANSACTION_PACKET,
|
|
||||||
SNAPSHOT_DATA_PACKET,
|
|
||||||
SNAPSHOT_MANIFEST_PACKET,
|
|
||||||
STATUS_PACKET,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
/// The Chain Sync Handler: handles responses from peers
|
/// The Chain Sync Handler: handles responses from peers
|
||||||
@ -67,36 +72,40 @@ impl SyncHandler {
|
|||||||
/// Handle incoming packet from peer
|
/// Handle incoming packet from peer
|
||||||
pub fn on_packet(sync: &mut ChainSync, io: &mut SyncIo, peer: PeerId, packet_id: u8, data: &[u8]) {
|
pub fn on_packet(sync: &mut ChainSync, io: &mut SyncIo, peer: PeerId, packet_id: u8, data: &[u8]) {
|
||||||
let rlp = Rlp::new(data);
|
let rlp = Rlp::new(data);
|
||||||
let result = match packet_id {
|
if let Some(packet_id) = SyncPacket::from_u8(packet_id) {
|
||||||
STATUS_PACKET => SyncHandler::on_peer_status(sync, io, peer, &rlp),
|
let result = match packet_id {
|
||||||
BLOCK_HEADERS_PACKET => SyncHandler::on_peer_block_headers(sync, io, peer, &rlp),
|
StatusPacket => SyncHandler::on_peer_status(sync, io, peer, &rlp),
|
||||||
BLOCK_BODIES_PACKET => SyncHandler::on_peer_block_bodies(sync, io, peer, &rlp),
|
BlockHeadersPacket => SyncHandler::on_peer_block_headers(sync, io, peer, &rlp),
|
||||||
RECEIPTS_PACKET => SyncHandler::on_peer_block_receipts(sync, io, peer, &rlp),
|
BlockBodiesPacket => SyncHandler::on_peer_block_bodies(sync, io, peer, &rlp),
|
||||||
NEW_BLOCK_PACKET => SyncHandler::on_peer_new_block(sync, io, peer, &rlp),
|
ReceiptsPacket => SyncHandler::on_peer_block_receipts(sync, io, peer, &rlp),
|
||||||
NEW_BLOCK_HASHES_PACKET => SyncHandler::on_peer_new_hashes(sync, io, peer, &rlp),
|
NewBlockPacket => SyncHandler::on_peer_new_block(sync, io, peer, &rlp),
|
||||||
SNAPSHOT_MANIFEST_PACKET => SyncHandler::on_snapshot_manifest(sync, io, peer, &rlp),
|
NewBlockHashesPacket => SyncHandler::on_peer_new_hashes(sync, io, peer, &rlp),
|
||||||
SNAPSHOT_DATA_PACKET => SyncHandler::on_snapshot_data(sync, io, peer, &rlp),
|
SnapshotManifestPacket => SyncHandler::on_snapshot_manifest(sync, io, peer, &rlp),
|
||||||
PRIVATE_TRANSACTION_PACKET => SyncHandler::on_private_transaction(sync, io, peer, &rlp),
|
SnapshotDataPacket => SyncHandler::on_snapshot_data(sync, io, peer, &rlp),
|
||||||
SIGNED_PRIVATE_TRANSACTION_PACKET => SyncHandler::on_signed_private_transaction(sync, io, peer, &rlp),
|
PrivateTransactionPacket => SyncHandler::on_private_transaction(sync, io, peer, &rlp),
|
||||||
_ => {
|
SignedPrivateTransactionPacket => SyncHandler::on_signed_private_transaction(sync, io, peer, &rlp),
|
||||||
debug!(target: "sync", "{}: Unknown packet {}", peer, packet_id);
|
_ => {
|
||||||
Ok(())
|
debug!(target: "sync", "{}: Unknown packet {}", peer, packet_id.id());
|
||||||
}
|
Ok(())
|
||||||
};
|
}
|
||||||
|
};
|
||||||
|
|
||||||
match result {
|
match result {
|
||||||
Err(DownloaderImportError::Invalid) => {
|
Err(DownloaderImportError::Invalid) => {
|
||||||
debug!(target:"sync", "{} -> Invalid packet {}", peer, packet_id);
|
debug!(target:"sync", "{} -> Invalid packet {}", peer, packet_id.id());
|
||||||
io.disable_peer(peer);
|
io.disable_peer(peer);
|
||||||
sync.deactivate_peer(io, peer);
|
sync.deactivate_peer(io, peer);
|
||||||
},
|
},
|
||||||
Err(DownloaderImportError::Useless) => {
|
Err(DownloaderImportError::Useless) => {
|
||||||
sync.deactivate_peer(io, peer);
|
sync.deactivate_peer(io, peer);
|
||||||
},
|
},
|
||||||
Ok(()) => {
|
Ok(()) => {
|
||||||
// give a task to the same peer first
|
// give a task to the same peer first
|
||||||
sync.sync_peer(io, peer, false);
|
sync.sync_peer(io, peer, false);
|
||||||
},
|
},
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
debug!(target: "sync", "{}: Unknown packet {}", peer, packet_id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -88,6 +88,7 @@
|
|||||||
//! All other messages are ignored.
|
//! All other messages are ignored.
|
||||||
|
|
||||||
mod handler;
|
mod handler;
|
||||||
|
pub mod sync_packet;
|
||||||
mod propagator;
|
mod propagator;
|
||||||
mod requester;
|
mod requester;
|
||||||
mod supplier;
|
mod supplier;
|
||||||
@ -103,7 +104,7 @@ use fastmap::{H256FastMap, H256FastSet};
|
|||||||
use parking_lot::{Mutex, RwLock, RwLockWriteGuard};
|
use parking_lot::{Mutex, RwLock, RwLockWriteGuard};
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use rlp::{RlpStream, DecoderError};
|
use rlp::{RlpStream, DecoderError};
|
||||||
use network::{self, PeerId, PacketId, ProtocolId};
|
use network::{self, PeerId, PacketId};
|
||||||
use network::client_version::ClientVersion;
|
use network::client_version::ClientVersion;
|
||||||
use ethcore::client::{BlockChainClient, BlockStatus, BlockId, BlockChainInfo, BlockQueueInfo};
|
use ethcore::client::{BlockChainClient, BlockStatus, BlockId, BlockChainInfo, BlockQueueInfo};
|
||||||
use ethcore::snapshot::{RestorationStatus};
|
use ethcore::snapshot::{RestorationStatus};
|
||||||
@ -112,13 +113,19 @@ use super::{WarpSync, SyncConfig};
|
|||||||
use block_sync::{BlockDownloader, DownloadAction};
|
use block_sync::{BlockDownloader, DownloadAction};
|
||||||
use rand::Rng;
|
use rand::Rng;
|
||||||
use snapshot::{Snapshot};
|
use snapshot::{Snapshot};
|
||||||
use api::{EthProtocolInfo as PeerInfoDigest, ETH_PROTOCOL, WARP_SYNC_PROTOCOL_ID, PriorityTask};
|
use api::{EthProtocolInfo as PeerInfoDigest, WARP_SYNC_PROTOCOL_ID, PriorityTask};
|
||||||
use private_tx::PrivateTxHandler;
|
use private_tx::PrivateTxHandler;
|
||||||
use transactions_stats::{TransactionsStats, Stats as TransactionStats};
|
use transactions_stats::{TransactionsStats, Stats as TransactionStats};
|
||||||
use types::transaction::UnverifiedTransaction;
|
use types::transaction::UnverifiedTransaction;
|
||||||
use types::BlockNumber;
|
use types::BlockNumber;
|
||||||
|
|
||||||
use self::handler::SyncHandler;
|
use self::handler::SyncHandler;
|
||||||
|
use self::sync_packet::{PacketInfo, SyncPacket};
|
||||||
|
use self::sync_packet::SyncPacket::{
|
||||||
|
NewBlockPacket,
|
||||||
|
StatusPacket,
|
||||||
|
};
|
||||||
|
|
||||||
use self::propagator::SyncPropagator;
|
use self::propagator::SyncPropagator;
|
||||||
use self::requester::SyncRequester;
|
use self::requester::SyncRequester;
|
||||||
pub(crate) use self::supplier::SyncSupplier;
|
pub(crate) use self::supplier::SyncSupplier;
|
||||||
@ -154,28 +161,6 @@ const MAX_TRANSACTION_PACKET_SIZE: usize = 5 * 1024 * 1024;
|
|||||||
const SNAPSHOT_RESTORE_THRESHOLD: BlockNumber = 30000;
|
const SNAPSHOT_RESTORE_THRESHOLD: BlockNumber = 30000;
|
||||||
const SNAPSHOT_MIN_PEERS: usize = 3;
|
const SNAPSHOT_MIN_PEERS: usize = 3;
|
||||||
|
|
||||||
pub const STATUS_PACKET: u8 = 0x00;
|
|
||||||
const NEW_BLOCK_HASHES_PACKET: u8 = 0x01;
|
|
||||||
const TRANSACTIONS_PACKET: u8 = 0x02;
|
|
||||||
pub const GET_BLOCK_HEADERS_PACKET: u8 = 0x03;
|
|
||||||
pub const BLOCK_HEADERS_PACKET: u8 = 0x04;
|
|
||||||
pub const GET_BLOCK_BODIES_PACKET: u8 = 0x05;
|
|
||||||
const BLOCK_BODIES_PACKET: u8 = 0x06;
|
|
||||||
const NEW_BLOCK_PACKET: u8 = 0x07;
|
|
||||||
|
|
||||||
pub const GET_NODE_DATA_PACKET: u8 = 0x0d;
|
|
||||||
pub const NODE_DATA_PACKET: u8 = 0x0e;
|
|
||||||
pub const GET_RECEIPTS_PACKET: u8 = 0x0f;
|
|
||||||
pub const RECEIPTS_PACKET: u8 = 0x10;
|
|
||||||
|
|
||||||
pub const GET_SNAPSHOT_MANIFEST_PACKET: u8 = 0x11;
|
|
||||||
pub const SNAPSHOT_MANIFEST_PACKET: u8 = 0x12;
|
|
||||||
pub const GET_SNAPSHOT_DATA_PACKET: u8 = 0x13;
|
|
||||||
pub const SNAPSHOT_DATA_PACKET: u8 = 0x14;
|
|
||||||
pub const CONSENSUS_DATA_PACKET: u8 = 0x15;
|
|
||||||
pub const PRIVATE_TRANSACTION_PACKET: u8 = 0x16;
|
|
||||||
pub const SIGNED_PRIVATE_TRANSACTION_PACKET: u8 = 0x17;
|
|
||||||
|
|
||||||
const MAX_SNAPSHOT_CHUNKS_DOWNLOAD_AHEAD: usize = 3;
|
const MAX_SNAPSHOT_CHUNKS_DOWNLOAD_AHEAD: usize = 3;
|
||||||
|
|
||||||
const WAIT_PEERS_TIMEOUT: Duration = Duration::from_secs(5);
|
const WAIT_PEERS_TIMEOUT: Duration = Duration::from_secs(5);
|
||||||
@ -484,7 +469,7 @@ impl ChainSyncApi {
|
|||||||
for peers in sync.get_peers(&chain_info, PeerState::SameBlock).chunks(10) {
|
for peers in sync.get_peers(&chain_info, PeerState::SameBlock).chunks(10) {
|
||||||
check_deadline(deadline)?;
|
check_deadline(deadline)?;
|
||||||
for peer in peers {
|
for peer in peers {
|
||||||
SyncPropagator::send_packet(io, ETH_PROTOCOL, *peer, NEW_BLOCK_PACKET, rlp.clone());
|
SyncPropagator::send_packet(io, *peer, NewBlockPacket, rlp.clone());
|
||||||
if let Some(ref mut peer) = sync.peers.get_mut(peer) {
|
if let Some(ref mut peer) = sync.peers.get_mut(peer) {
|
||||||
peer.latest_hash = hash;
|
peer.latest_hash = hash;
|
||||||
}
|
}
|
||||||
@ -1146,7 +1131,7 @@ impl ChainSync {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
packet.complete_unbounded_list();
|
packet.complete_unbounded_list();
|
||||||
io.respond(STATUS_PACKET, packet.out())
|
io.respond(StatusPacket.id(), packet.out())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn maintain_peers(&mut self, io: &mut SyncIo) {
|
pub fn maintain_peers(&mut self, io: &mut SyncIo) {
|
||||||
@ -1331,8 +1316,8 @@ impl ChainSync {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Broadcast private transaction message to peers.
|
/// Broadcast private transaction message to peers.
|
||||||
pub fn propagate_private_transaction(&mut self, io: &mut SyncIo, transaction_hash: H256, protocol: ProtocolId, packet_id: PacketId, packet: Bytes) {
|
pub fn propagate_private_transaction(&mut self, io: &mut SyncIo, transaction_hash: H256, packet_id: SyncPacket, packet: Bytes) {
|
||||||
SyncPropagator::propagate_private_transaction(self, io, transaction_hash, protocol, packet_id, packet);
|
SyncPropagator::propagate_private_transaction(self, io, transaction_hash, packet_id, packet);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -17,12 +17,11 @@
|
|||||||
use std::cmp;
|
use std::cmp;
|
||||||
use std::collections::HashSet;
|
use std::collections::HashSet;
|
||||||
|
|
||||||
use api::{ETH_PROTOCOL, WARP_SYNC_PROTOCOL_ID};
|
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use ethereum_types::H256;
|
use ethereum_types::H256;
|
||||||
use fastmap::H256FastSet;
|
use fastmap::H256FastSet;
|
||||||
use network::{PeerId, PacketId, ProtocolId};
|
|
||||||
use network::client_version::ClientCapabilities;
|
use network::client_version::ClientCapabilities;
|
||||||
|
use network::PeerId;
|
||||||
use rand::Rng;
|
use rand::Rng;
|
||||||
use rlp::{Encodable, RlpStream};
|
use rlp::{Encodable, RlpStream};
|
||||||
use sync_io::SyncIo;
|
use sync_io::SyncIo;
|
||||||
@ -30,6 +29,14 @@ use types::transaction::SignedTransaction;
|
|||||||
use types::BlockNumber;
|
use types::BlockNumber;
|
||||||
use types::blockchain_info::BlockChainInfo;
|
use types::blockchain_info::BlockChainInfo;
|
||||||
|
|
||||||
|
use super::sync_packet::SyncPacket;
|
||||||
|
use super::sync_packet::SyncPacket::{
|
||||||
|
NewBlockHashesPacket,
|
||||||
|
TransactionsPacket,
|
||||||
|
NewBlockPacket,
|
||||||
|
ConsensusDataPacket,
|
||||||
|
};
|
||||||
|
|
||||||
use super::{
|
use super::{
|
||||||
random,
|
random,
|
||||||
ChainSync,
|
ChainSync,
|
||||||
@ -37,10 +44,6 @@ use super::{
|
|||||||
MAX_PEER_LAG_PROPAGATION,
|
MAX_PEER_LAG_PROPAGATION,
|
||||||
MAX_PEERS_PROPAGATION,
|
MAX_PEERS_PROPAGATION,
|
||||||
MIN_PEERS_PROPAGATION,
|
MIN_PEERS_PROPAGATION,
|
||||||
CONSENSUS_DATA_PACKET,
|
|
||||||
NEW_BLOCK_HASHES_PACKET,
|
|
||||||
NEW_BLOCK_PACKET,
|
|
||||||
TRANSACTIONS_PACKET,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
/// The Chain Sync Propagator: propagates data to peers
|
/// The Chain Sync Propagator: propagates data to peers
|
||||||
@ -53,7 +56,8 @@ impl SyncPropagator {
|
|||||||
let sent = peers.len();
|
let sent = peers.len();
|
||||||
let mut send_packet = |io: &mut SyncIo, rlp: Bytes| {
|
let mut send_packet = |io: &mut SyncIo, rlp: Bytes| {
|
||||||
for peer_id in peers {
|
for peer_id in peers {
|
||||||
SyncPropagator::send_packet(io, ETH_PROTOCOL, *peer_id, NEW_BLOCK_PACKET, rlp.clone());
|
SyncPropagator::send_packet(io, *peer_id, NewBlockPacket, rlp.clone());
|
||||||
|
|
||||||
if let Some(ref mut peer) = sync.peers.get_mut(peer_id) {
|
if let Some(ref mut peer) = sync.peers.get_mut(peer_id) {
|
||||||
peer.latest_hash = chain_info.best_block_hash.clone();
|
peer.latest_hash = chain_info.best_block_hash.clone();
|
||||||
}
|
}
|
||||||
@ -88,7 +92,7 @@ impl SyncPropagator {
|
|||||||
if let Some(ref mut peer) = sync.peers.get_mut(peer_id) {
|
if let Some(ref mut peer) = sync.peers.get_mut(peer_id) {
|
||||||
peer.latest_hash = best_block_hash;
|
peer.latest_hash = best_block_hash;
|
||||||
}
|
}
|
||||||
SyncPropagator::send_packet(io, ETH_PROTOCOL, *peer_id, NEW_BLOCK_HASHES_PACKET, rlp.clone());
|
SyncPropagator::send_packet(io, *peer_id, NewBlockHashesPacket, rlp.clone());
|
||||||
}
|
}
|
||||||
sent
|
sent
|
||||||
}
|
}
|
||||||
@ -156,7 +160,7 @@ impl SyncPropagator {
|
|||||||
|
|
||||||
let send_packet = |io: &mut SyncIo, peer_id: PeerId, sent: usize, rlp: Bytes| {
|
let send_packet = |io: &mut SyncIo, peer_id: PeerId, sent: usize, rlp: Bytes| {
|
||||||
let size = rlp.len();
|
let size = rlp.len();
|
||||||
SyncPropagator::send_packet(io, ETH_PROTOCOL, peer_id, TRANSACTIONS_PACKET, rlp);
|
SyncPropagator::send_packet(io, peer_id, TransactionsPacket, rlp);
|
||||||
trace!(target: "sync", "{:02} <- Transactions ({} entries; {} bytes)", peer_id, sent, size);
|
trace!(target: "sync", "{:02} <- Transactions ({} entries; {} bytes)", peer_id, sent, size);
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -275,7 +279,7 @@ impl SyncPropagator {
|
|||||||
io.chain().chain_info().total_difficulty
|
io.chain().chain_info().total_difficulty
|
||||||
);
|
);
|
||||||
for peer_id in &peers {
|
for peer_id in &peers {
|
||||||
SyncPropagator::send_packet(io, ETH_PROTOCOL, *peer_id, NEW_BLOCK_PACKET, rlp.clone());
|
SyncPropagator::send_packet(io, *peer_id, NewBlockPacket, rlp.clone());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -285,12 +289,12 @@ impl SyncPropagator {
|
|||||||
let lucky_peers = ChainSync::select_random_peers(&sync.get_consensus_peers());
|
let lucky_peers = ChainSync::select_random_peers(&sync.get_consensus_peers());
|
||||||
trace!(target: "sync", "Sending consensus packet to {:?}", lucky_peers);
|
trace!(target: "sync", "Sending consensus packet to {:?}", lucky_peers);
|
||||||
for peer_id in lucky_peers {
|
for peer_id in lucky_peers {
|
||||||
SyncPropagator::send_packet(io, WARP_SYNC_PROTOCOL_ID, peer_id, CONSENSUS_DATA_PACKET, packet.clone());
|
SyncPropagator::send_packet(io, peer_id, ConsensusDataPacket, packet.clone());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Broadcast private transaction message to peers.
|
/// Broadcast private transaction message to peers.
|
||||||
pub fn propagate_private_transaction(sync: &mut ChainSync, io: &mut SyncIo, transaction_hash: H256, protocol: ProtocolId, packet_id: PacketId, packet: Bytes) {
|
pub fn propagate_private_transaction(sync: &mut ChainSync, io: &mut SyncIo, transaction_hash: H256, packet_id: SyncPacket, packet: Bytes) {
|
||||||
let lucky_peers = ChainSync::select_random_peers(&sync.get_private_transaction_peers(&transaction_hash));
|
let lucky_peers = ChainSync::select_random_peers(&sync.get_private_transaction_peers(&transaction_hash));
|
||||||
if lucky_peers.is_empty() {
|
if lucky_peers.is_empty() {
|
||||||
error!(target: "privatetx", "Cannot propagate the packet, no peers with private tx enabled connected");
|
error!(target: "privatetx", "Cannot propagate the packet, no peers with private tx enabled connected");
|
||||||
@ -300,7 +304,7 @@ impl SyncPropagator {
|
|||||||
if let Some(ref mut peer) = sync.peers.get_mut(&peer_id) {
|
if let Some(ref mut peer) = sync.peers.get_mut(&peer_id) {
|
||||||
peer.last_sent_private_transactions.insert(transaction_hash);
|
peer.last_sent_private_transactions.insert(transaction_hash);
|
||||||
}
|
}
|
||||||
SyncPropagator::send_packet(io, protocol, peer_id, packet_id, packet.clone());
|
SyncPropagator::send_packet(io, peer_id, packet_id, packet.clone());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -321,8 +325,8 @@ impl SyncPropagator {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Generic packet sender
|
/// Generic packet sender
|
||||||
pub fn send_packet(sync: &mut SyncIo, protocol: ProtocolId, peer_id: PeerId, packet_id: PacketId, packet: Bytes) {
|
pub fn send_packet(sync: &mut SyncIo, peer_id: PeerId, packet_id: SyncPacket, packet: Bytes) {
|
||||||
if let Err(e) = sync.send_protocol(protocol, peer_id, packet_id, packet) {
|
if let Err(e) = sync.send(peer_id, packet_id, packet) {
|
||||||
debug!(target:"sync", "Error sending packet: {:?}", e);
|
debug!(target:"sync", "Error sending packet: {:?}", e);
|
||||||
sync.disconnect_peer(peer_id);
|
sync.disconnect_peer(peer_id);
|
||||||
}
|
}
|
||||||
|
@ -14,25 +14,28 @@
|
|||||||
// You should have received a copy of the GNU General Public License
|
// You should have received a copy of the GNU General Public License
|
||||||
// along with Parity Ethereum. If not, see <http://www.gnu.org/licenses/>.
|
// along with Parity Ethereum. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
use api::{ETH_PROTOCOL, WARP_SYNC_PROTOCOL_ID};
|
|
||||||
use block_sync::BlockRequest;
|
use block_sync::BlockRequest;
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use ethereum_types::H256;
|
use ethereum_types::H256;
|
||||||
use network::{PeerId, PacketId, ProtocolId};
|
use network::{PeerId};
|
||||||
use rlp::RlpStream;
|
use rlp::RlpStream;
|
||||||
use std::time::Instant;
|
use std::time::Instant;
|
||||||
use sync_io::SyncIo;
|
use sync_io::SyncIo;
|
||||||
use types::BlockNumber;
|
use types::BlockNumber;
|
||||||
|
|
||||||
|
use super::sync_packet::SyncPacket;
|
||||||
|
use super::sync_packet::SyncPacket::{
|
||||||
|
GetBlockHeadersPacket,
|
||||||
|
GetBlockBodiesPacket,
|
||||||
|
GetReceiptsPacket,
|
||||||
|
GetSnapshotManifestPacket,
|
||||||
|
GetSnapshotDataPacket,
|
||||||
|
};
|
||||||
|
|
||||||
use super::{
|
use super::{
|
||||||
BlockSet,
|
BlockSet,
|
||||||
ChainSync,
|
ChainSync,
|
||||||
PeerAsking,
|
PeerAsking,
|
||||||
GET_BLOCK_BODIES_PACKET,
|
|
||||||
GET_BLOCK_HEADERS_PACKET,
|
|
||||||
GET_RECEIPTS_PACKET,
|
|
||||||
GET_SNAPSHOT_DATA_PACKET,
|
|
||||||
GET_SNAPSHOT_MANIFEST_PACKET,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
/// The Chain Sync Requester: requesting data to other peers
|
/// The Chain Sync Requester: requesting data to other peers
|
||||||
@ -61,9 +64,7 @@ impl SyncRequester {
|
|||||||
for h in &hashes {
|
for h in &hashes {
|
||||||
rlp.append(&h.clone());
|
rlp.append(&h.clone());
|
||||||
}
|
}
|
||||||
|
SyncRequester::send_request(sync, io, peer_id, PeerAsking::BlockBodies, GetBlockBodiesPacket, rlp.out());
|
||||||
SyncRequester::send_request(sync, io, peer_id, PeerAsking::BlockBodies, ETH_PROTOCOL, GET_BLOCK_BODIES_PACKET, rlp.out());
|
|
||||||
|
|
||||||
let peer = sync.peers.get_mut(&peer_id).expect("peer_id may originate either from on_packet, where it is already validated or from enumerating self.peers. qed");
|
let peer = sync.peers.get_mut(&peer_id).expect("peer_id may originate either from on_packet, where it is already validated or from enumerating self.peers. qed");
|
||||||
peer.asking_blocks = hashes;
|
peer.asking_blocks = hashes;
|
||||||
peer.block_set = Some(set);
|
peer.block_set = Some(set);
|
||||||
@ -77,7 +78,7 @@ impl SyncRequester {
|
|||||||
rlp.append(&1u32);
|
rlp.append(&1u32);
|
||||||
rlp.append(&0u32);
|
rlp.append(&0u32);
|
||||||
rlp.append(&0u32);
|
rlp.append(&0u32);
|
||||||
SyncRequester::send_request(sync, io, peer_id, PeerAsking::ForkHeader, ETH_PROTOCOL, GET_BLOCK_HEADERS_PACKET, rlp.out());
|
SyncRequester::send_request(sync, io, peer_id, PeerAsking::ForkHeader, GetBlockHeadersPacket, rlp.out());
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Find some headers or blocks to download for a peer.
|
/// Find some headers or blocks to download for a peer.
|
||||||
@ -95,7 +96,7 @@ impl SyncRequester {
|
|||||||
pub fn request_snapshot_manifest(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId) {
|
pub fn request_snapshot_manifest(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId) {
|
||||||
trace!(target: "sync", "{} <- GetSnapshotManifest", peer_id);
|
trace!(target: "sync", "{} <- GetSnapshotManifest", peer_id);
|
||||||
let rlp = RlpStream::new_list(0);
|
let rlp = RlpStream::new_list(0);
|
||||||
SyncRequester::send_request(sync, io, peer_id, PeerAsking::SnapshotManifest, WARP_SYNC_PROTOCOL_ID, GET_SNAPSHOT_MANIFEST_PACKET, rlp.out());
|
SyncRequester::send_request(sync, io, peer_id, PeerAsking::SnapshotManifest, GetSnapshotManifestPacket, rlp.out());
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Request headers from a peer by block hash
|
/// Request headers from a peer by block hash
|
||||||
@ -106,7 +107,7 @@ impl SyncRequester {
|
|||||||
rlp.append(&count);
|
rlp.append(&count);
|
||||||
rlp.append(&skip);
|
rlp.append(&skip);
|
||||||
rlp.append(&if reverse {1u32} else {0u32});
|
rlp.append(&if reverse {1u32} else {0u32});
|
||||||
SyncRequester::send_request(sync, io, peer_id, PeerAsking::BlockHeaders, ETH_PROTOCOL, GET_BLOCK_HEADERS_PACKET, rlp.out());
|
SyncRequester::send_request(sync, io, peer_id, PeerAsking::BlockHeaders, GetBlockHeadersPacket, rlp.out());
|
||||||
let peer = sync.peers.get_mut(&peer_id).expect("peer_id may originate either from on_packet, where it is already validated or from enumerating self.peers. qed");
|
let peer = sync.peers.get_mut(&peer_id).expect("peer_id may originate either from on_packet, where it is already validated or from enumerating self.peers. qed");
|
||||||
peer.asking_hash = Some(h.clone());
|
peer.asking_hash = Some(h.clone());
|
||||||
peer.block_set = Some(set);
|
peer.block_set = Some(set);
|
||||||
@ -119,7 +120,7 @@ impl SyncRequester {
|
|||||||
for h in &hashes {
|
for h in &hashes {
|
||||||
rlp.append(&h.clone());
|
rlp.append(&h.clone());
|
||||||
}
|
}
|
||||||
SyncRequester::send_request(sync, io, peer_id, PeerAsking::BlockReceipts, ETH_PROTOCOL, GET_RECEIPTS_PACKET, rlp.out());
|
SyncRequester::send_request(sync, io, peer_id, PeerAsking::BlockReceipts, GetReceiptsPacket, rlp.out());
|
||||||
let peer = sync.peers.get_mut(&peer_id).expect("peer_id may originate either from on_packet, where it is already validated or from enumerating self.peers. qed");
|
let peer = sync.peers.get_mut(&peer_id).expect("peer_id may originate either from on_packet, where it is already validated or from enumerating self.peers. qed");
|
||||||
peer.asking_blocks = hashes;
|
peer.asking_blocks = hashes;
|
||||||
peer.block_set = Some(set);
|
peer.block_set = Some(set);
|
||||||
@ -130,11 +131,11 @@ impl SyncRequester {
|
|||||||
trace!(target: "sync", "{} <- GetSnapshotData {:?}", peer_id, chunk);
|
trace!(target: "sync", "{} <- GetSnapshotData {:?}", peer_id, chunk);
|
||||||
let mut rlp = RlpStream::new_list(1);
|
let mut rlp = RlpStream::new_list(1);
|
||||||
rlp.append(chunk);
|
rlp.append(chunk);
|
||||||
SyncRequester::send_request(sync, io, peer_id, PeerAsking::SnapshotData, WARP_SYNC_PROTOCOL_ID, GET_SNAPSHOT_DATA_PACKET, rlp.out());
|
SyncRequester::send_request(sync, io, peer_id, PeerAsking::SnapshotData, GetSnapshotDataPacket, rlp.out());
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Generic request sender
|
/// Generic request sender
|
||||||
fn send_request(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, asking: PeerAsking, protocol: ProtocolId, packet_id: PacketId, packet: Bytes) {
|
fn send_request(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, asking: PeerAsking, packet_id: SyncPacket, packet: Bytes) {
|
||||||
if let Some(ref mut peer) = sync.peers.get_mut(&peer_id) {
|
if let Some(ref mut peer) = sync.peers.get_mut(&peer_id) {
|
||||||
if peer.asking != PeerAsking::Nothing {
|
if peer.asking != PeerAsking::Nothing {
|
||||||
warn!(target:"sync", "Asking {:?} while requesting {:?}", peer.asking, asking);
|
warn!(target:"sync", "Asking {:?} while requesting {:?}", peer.asking, asking);
|
||||||
@ -142,7 +143,7 @@ impl SyncRequester {
|
|||||||
peer.asking = asking;
|
peer.asking = asking;
|
||||||
peer.ask_time = Instant::now();
|
peer.ask_time = Instant::now();
|
||||||
|
|
||||||
let result = io.send_protocol(protocol, peer_id, packet_id, packet);
|
let result = io.send(peer_id, packet_id, packet);
|
||||||
|
|
||||||
if let Err(e) = result {
|
if let Err(e) = result {
|
||||||
debug!(target:"sync", "Error sending request: {:?}", e);
|
debug!(target:"sync", "Error sending request: {:?}", e);
|
||||||
|
@ -15,6 +15,7 @@
|
|||||||
// along with Parity Ethereum. If not, see <http://www.gnu.org/licenses/>.
|
// along with Parity Ethereum. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
|
use enum_primitive::FromPrimitive;
|
||||||
use ethereum_types::H256;
|
use ethereum_types::H256;
|
||||||
use network::{self, PeerId};
|
use network::{self, PeerId};
|
||||||
use parking_lot::RwLock;
|
use parking_lot::RwLock;
|
||||||
@ -25,30 +26,34 @@ use types::ids::BlockId;
|
|||||||
|
|
||||||
use sync_io::SyncIo;
|
use sync_io::SyncIo;
|
||||||
|
|
||||||
|
use super::sync_packet::{PacketInfo, SyncPacket};
|
||||||
|
use super::sync_packet::SyncPacket::{
|
||||||
|
StatusPacket,
|
||||||
|
TransactionsPacket,
|
||||||
|
GetBlockHeadersPacket,
|
||||||
|
BlockHeadersPacket,
|
||||||
|
GetBlockBodiesPacket,
|
||||||
|
BlockBodiesPacket,
|
||||||
|
GetNodeDataPacket,
|
||||||
|
NodeDataPacket,
|
||||||
|
GetReceiptsPacket,
|
||||||
|
ReceiptsPacket,
|
||||||
|
GetSnapshotManifestPacket,
|
||||||
|
SnapshotManifestPacket,
|
||||||
|
GetSnapshotDataPacket,
|
||||||
|
SnapshotDataPacket,
|
||||||
|
ConsensusDataPacket,
|
||||||
|
};
|
||||||
|
|
||||||
use super::{
|
use super::{
|
||||||
ChainSync,
|
ChainSync,
|
||||||
SyncHandler,
|
SyncHandler,
|
||||||
RlpResponseResult,
|
RlpResponseResult,
|
||||||
PacketDecodeError,
|
PacketDecodeError,
|
||||||
BLOCK_BODIES_PACKET,
|
|
||||||
BLOCK_HEADERS_PACKET,
|
|
||||||
CONSENSUS_DATA_PACKET,
|
|
||||||
GET_BLOCK_BODIES_PACKET,
|
|
||||||
GET_BLOCK_HEADERS_PACKET,
|
|
||||||
GET_NODE_DATA_PACKET,
|
|
||||||
GET_RECEIPTS_PACKET,
|
|
||||||
GET_SNAPSHOT_DATA_PACKET,
|
|
||||||
GET_SNAPSHOT_MANIFEST_PACKET,
|
|
||||||
MAX_BODIES_TO_SEND,
|
MAX_BODIES_TO_SEND,
|
||||||
MAX_HEADERS_TO_SEND,
|
MAX_HEADERS_TO_SEND,
|
||||||
MAX_NODE_DATA_TO_SEND,
|
MAX_NODE_DATA_TO_SEND,
|
||||||
MAX_RECEIPTS_HEADERS_TO_SEND,
|
MAX_RECEIPTS_HEADERS_TO_SEND,
|
||||||
NODE_DATA_PACKET,
|
|
||||||
RECEIPTS_PACKET,
|
|
||||||
SNAPSHOT_DATA_PACKET,
|
|
||||||
SNAPSHOT_MANIFEST_PACKET,
|
|
||||||
STATUS_PACKET,
|
|
||||||
TRANSACTIONS_PACKET,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
/// The Chain Sync Supplier: answers requests from peers with available data
|
/// The Chain Sync Supplier: answers requests from peers with available data
|
||||||
@ -56,72 +61,83 @@ pub struct SyncSupplier;
|
|||||||
|
|
||||||
impl SyncSupplier {
|
impl SyncSupplier {
|
||||||
/// Dispatch incoming requests and responses
|
/// Dispatch incoming requests and responses
|
||||||
|
// Take a u8 and not a SyncPacketId because this is the entry point
|
||||||
|
// to chain sync from the outside world.
|
||||||
pub fn dispatch_packet(sync: &RwLock<ChainSync>, io: &mut SyncIo, peer: PeerId, packet_id: u8, data: &[u8]) {
|
pub fn dispatch_packet(sync: &RwLock<ChainSync>, io: &mut SyncIo, peer: PeerId, packet_id: u8, data: &[u8]) {
|
||||||
let rlp = Rlp::new(data);
|
let rlp = Rlp::new(data);
|
||||||
|
|
||||||
let result = match packet_id {
|
if let Some(id) = SyncPacket::from_u8(packet_id) {
|
||||||
GET_BLOCK_BODIES_PACKET => SyncSupplier::return_rlp(io, &rlp, peer,
|
let result = match id {
|
||||||
SyncSupplier::return_block_bodies,
|
GetBlockBodiesPacket => SyncSupplier::return_rlp(
|
||||||
|e| format!("Error sending block bodies: {:?}", e)),
|
io, &rlp, peer,
|
||||||
|
SyncSupplier::return_block_bodies,
|
||||||
|
|e| format!("Error sending block bodies: {:?}", e)),
|
||||||
|
|
||||||
GET_BLOCK_HEADERS_PACKET => SyncSupplier::return_rlp(io, &rlp, peer,
|
GetBlockHeadersPacket => SyncSupplier::return_rlp(
|
||||||
SyncSupplier::return_block_headers,
|
io, &rlp, peer,
|
||||||
|e| format!("Error sending block headers: {:?}", e)),
|
SyncSupplier::return_block_headers,
|
||||||
|
|e| format!("Error sending block headers: {:?}", e)),
|
||||||
|
|
||||||
GET_RECEIPTS_PACKET => SyncSupplier::return_rlp(io, &rlp, peer,
|
GetReceiptsPacket => SyncSupplier::return_rlp(
|
||||||
SyncSupplier::return_receipts,
|
io, &rlp, peer,
|
||||||
|e| format!("Error sending receipts: {:?}", e)),
|
SyncSupplier::return_receipts,
|
||||||
|
|e| format!("Error sending receipts: {:?}", e)),
|
||||||
|
|
||||||
GET_NODE_DATA_PACKET => SyncSupplier::return_rlp(io, &rlp, peer,
|
GetNodeDataPacket => SyncSupplier::return_rlp(
|
||||||
SyncSupplier::return_node_data,
|
io, &rlp, peer,
|
||||||
|e| format!("Error sending nodes: {:?}", e)),
|
SyncSupplier::return_node_data,
|
||||||
|
|e| format!("Error sending nodes: {:?}", e)),
|
||||||
|
|
||||||
GET_SNAPSHOT_MANIFEST_PACKET => SyncSupplier::return_rlp(io, &rlp, peer,
|
GetSnapshotManifestPacket => SyncSupplier::return_rlp(
|
||||||
SyncSupplier::return_snapshot_manifest,
|
io, &rlp, peer,
|
||||||
|e| format!("Error sending snapshot manifest: {:?}", e)),
|
SyncSupplier::return_snapshot_manifest,
|
||||||
|
|e| format!("Error sending snapshot manifest: {:?}", e)),
|
||||||
|
|
||||||
GET_SNAPSHOT_DATA_PACKET => SyncSupplier::return_rlp(io, &rlp, peer,
|
GetSnapshotDataPacket => SyncSupplier::return_rlp(
|
||||||
SyncSupplier::return_snapshot_data,
|
io, &rlp, peer,
|
||||||
|e| format!("Error sending snapshot data: {:?}", e)),
|
SyncSupplier::return_snapshot_data,
|
||||||
|
|e| format!("Error sending snapshot data: {:?}", e)),
|
||||||
|
|
||||||
STATUS_PACKET => {
|
StatusPacket => {
|
||||||
sync.write().on_packet(io, peer, packet_id, data);
|
sync.write().on_packet(io, peer, packet_id, data);
|
||||||
Ok(())
|
Ok(())
|
||||||
},
|
},
|
||||||
// Packets that require the peer to be confirmed
|
// Packets that require the peer to be confirmed
|
||||||
_ => {
|
_ => {
|
||||||
if !sync.read().peers.contains_key(&peer) {
|
if !sync.read().peers.contains_key(&peer) {
|
||||||
debug!(target:"sync", "Unexpected packet {} from unregistered peer: {}:{}", packet_id, peer, io.peer_version(peer));
|
debug!(target:"sync", "Unexpected packet {} from unregistered peer: {}:{}", packet_id, peer, io.peer_version(peer));
|
||||||
return;
|
return;
|
||||||
}
|
|
||||||
debug!(target: "sync", "{} -> Dispatching packet: {}", peer, packet_id);
|
|
||||||
|
|
||||||
match packet_id {
|
|
||||||
CONSENSUS_DATA_PACKET => {
|
|
||||||
SyncHandler::on_consensus_packet(io, peer, &rlp)
|
|
||||||
},
|
|
||||||
TRANSACTIONS_PACKET => {
|
|
||||||
let res = {
|
|
||||||
let sync_ro = sync.read();
|
|
||||||
SyncHandler::on_peer_transactions(&*sync_ro, io, peer, &rlp)
|
|
||||||
};
|
|
||||||
if res.is_err() {
|
|
||||||
// peer sent invalid data, disconnect.
|
|
||||||
io.disable_peer(peer);
|
|
||||||
sync.write().deactivate_peer(io, peer);
|
|
||||||
}
|
|
||||||
},
|
|
||||||
_ => {
|
|
||||||
sync.write().on_packet(io, peer, packet_id, data);
|
|
||||||
}
|
}
|
||||||
}
|
debug!(target: "sync", "{} -> Dispatching packet: {}", peer, packet_id);
|
||||||
|
|
||||||
Ok(())
|
match id {
|
||||||
}
|
ConsensusDataPacket => {
|
||||||
};
|
SyncHandler::on_consensus_packet(io, peer, &rlp)
|
||||||
result.unwrap_or_else(|e| {
|
},
|
||||||
debug!(target:"sync", "{} -> Malformed packet {} : {}", peer, packet_id, e);
|
TransactionsPacket => {
|
||||||
})
|
let res = {
|
||||||
|
let sync_ro = sync.read();
|
||||||
|
SyncHandler::on_peer_transactions(&*sync_ro, io, peer, &rlp)
|
||||||
|
};
|
||||||
|
if res.is_err() {
|
||||||
|
// peer sent invalid data, disconnect.
|
||||||
|
io.disable_peer(peer);
|
||||||
|
sync.write().deactivate_peer(io, peer);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
_ => {
|
||||||
|
sync.write().on_packet(io, peer, packet_id, data);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
result.unwrap_or_else(|e| {
|
||||||
|
debug!(target:"sync", "{} -> Malformed packet {} : {}", peer, packet_id, e);
|
||||||
|
})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Respond to GetBlockHeaders request
|
/// Respond to GetBlockHeaders request
|
||||||
@ -148,11 +164,11 @@ impl SyncSupplier {
|
|||||||
trace!(target:"sync", "Returning single header: {:?}", hash);
|
trace!(target:"sync", "Returning single header: {:?}", hash);
|
||||||
let mut rlp = RlpStream::new_list(1);
|
let mut rlp = RlpStream::new_list(1);
|
||||||
rlp.append_raw(&hdr.into_inner(), 1);
|
rlp.append_raw(&hdr.into_inner(), 1);
|
||||||
return Ok(Some((BLOCK_HEADERS_PACKET, rlp)));
|
return Ok(Some((BlockHeadersPacket.id(), rlp)));
|
||||||
}
|
}
|
||||||
number
|
number
|
||||||
}
|
}
|
||||||
None => return Ok(Some((BLOCK_HEADERS_PACKET, RlpStream::new_list(0)))) //no such header, return nothing
|
None => return Ok(Some((BlockHeadersPacket.id(), RlpStream::new_list(0)))) //no such header, return nothing
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
let number = r.val_at::<BlockNumber>(0)?;
|
let number = r.val_at::<BlockNumber>(0)?;
|
||||||
@ -202,7 +218,7 @@ impl SyncSupplier {
|
|||||||
let mut rlp = RlpStream::new_list(count as usize);
|
let mut rlp = RlpStream::new_list(count as usize);
|
||||||
rlp.append_raw(&data, count as usize);
|
rlp.append_raw(&data, count as usize);
|
||||||
trace!(target: "sync", "{} -> GetBlockHeaders: returned {} entries", peer_id, count);
|
trace!(target: "sync", "{} -> GetBlockHeaders: returned {} entries", peer_id, count);
|
||||||
Ok(Some((BLOCK_HEADERS_PACKET, rlp)))
|
Ok(Some((BlockHeadersPacket.id(), rlp)))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Respond to GetBlockBodies request
|
/// Respond to GetBlockBodies request
|
||||||
@ -229,7 +245,7 @@ impl SyncSupplier {
|
|||||||
let mut rlp = RlpStream::new_list(added);
|
let mut rlp = RlpStream::new_list(added);
|
||||||
rlp.append_raw(&data, added);
|
rlp.append_raw(&data, added);
|
||||||
trace!(target: "sync", "{} -> GetBlockBodies: returned {} entries", peer_id, added);
|
trace!(target: "sync", "{} -> GetBlockBodies: returned {} entries", peer_id, added);
|
||||||
Ok(Some((BLOCK_BODIES_PACKET, rlp)))
|
Ok(Some((BlockBodiesPacket.id(), rlp)))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Respond to GetNodeData request
|
/// Respond to GetNodeData request
|
||||||
@ -261,7 +277,7 @@ impl SyncSupplier {
|
|||||||
for d in data {
|
for d in data {
|
||||||
rlp.append(&d);
|
rlp.append(&d);
|
||||||
}
|
}
|
||||||
Ok(Some((NODE_DATA_PACKET, rlp)))
|
Ok(Some((NodeDataPacket.id(), rlp)))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn return_receipts(io: &SyncIo, rlp: &Rlp, peer_id: PeerId) -> RlpResponseResult {
|
fn return_receipts(io: &SyncIo, rlp: &Rlp, peer_id: PeerId) -> RlpResponseResult {
|
||||||
@ -287,7 +303,7 @@ impl SyncSupplier {
|
|||||||
}
|
}
|
||||||
let mut rlp_result = RlpStream::new_list(added_headers);
|
let mut rlp_result = RlpStream::new_list(added_headers);
|
||||||
rlp_result.append_raw(&data, added_headers);
|
rlp_result.append_raw(&data, added_headers);
|
||||||
Ok(Some((RECEIPTS_PACKET, rlp_result)))
|
Ok(Some((ReceiptsPacket.id(), rlp_result)))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Respond to GetSnapshotManifest request
|
/// Respond to GetSnapshotManifest request
|
||||||
@ -310,7 +326,7 @@ impl SyncSupplier {
|
|||||||
RlpStream::new_list(0)
|
RlpStream::new_list(0)
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
Ok(Some((SNAPSHOT_MANIFEST_PACKET, rlp)))
|
Ok(Some((SnapshotManifestPacket.id(), rlp)))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Respond to GetSnapshotData request
|
/// Respond to GetSnapshotData request
|
||||||
@ -329,7 +345,7 @@ impl SyncSupplier {
|
|||||||
RlpStream::new_list(0)
|
RlpStream::new_list(0)
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
Ok(Some((SNAPSHOT_DATA_PACKET, rlp)))
|
Ok(Some((SnapshotDataPacket.id(), rlp)))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn return_rlp<FRlp, FError>(io: &mut SyncIo, rlp: &Rlp, peer: PeerId, rlp_func: FRlp, error_func: FError) -> Result<(), PacketDecodeError>
|
fn return_rlp<FRlp, FError>(io: &mut SyncIo, rlp: &Rlp, peer: PeerId, rlp_func: FRlp, error_func: FError) -> Result<(), PacketDecodeError>
|
||||||
@ -491,7 +507,7 @@ mod test {
|
|||||||
|
|
||||||
io.sender = Some(2usize);
|
io.sender = Some(2usize);
|
||||||
|
|
||||||
SyncSupplier::dispatch_packet(&RwLock::new(sync), &mut io, 0usize, GET_NODE_DATA_PACKET, &node_request);
|
SyncSupplier::dispatch_packet(&RwLock::new(sync), &mut io, 0usize, GetNodeDataPacket.id(), &node_request);
|
||||||
assert_eq!(1, io.packets.len());
|
assert_eq!(1, io.packets.len());
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -533,7 +549,7 @@ mod test {
|
|||||||
assert_eq!(603, rlp_result.unwrap().1.out().len());
|
assert_eq!(603, rlp_result.unwrap().1.out().len());
|
||||||
|
|
||||||
io.sender = Some(2usize);
|
io.sender = Some(2usize);
|
||||||
SyncSupplier::dispatch_packet(&RwLock::new(sync), &mut io, 0usize, GET_RECEIPTS_PACKET, &receipts_request);
|
SyncSupplier::dispatch_packet(&RwLock::new(sync), &mut io, 0usize, GetReceiptsPacket.id(), &receipts_request);
|
||||||
assert_eq!(1, io.packets.len());
|
assert_eq!(1, io.packets.len());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
141
ethcore/sync/src/chain/sync_packet.rs
Normal file
141
ethcore/sync/src/chain/sync_packet.rs
Normal file
@ -0,0 +1,141 @@
|
|||||||
|
// Copyright 2015-2019 Parity Technologies (UK) Ltd.
|
||||||
|
// This file is part of Parity Ethereum.
|
||||||
|
|
||||||
|
// Parity Ethereum is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
|
||||||
|
// Parity Ethereum is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU General Public License for more details.
|
||||||
|
|
||||||
|
// You should have received a copy of the GNU General Public License
|
||||||
|
// along with Parity Ethereum. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
//! When sending packets over p2p we specify both which subprotocol
|
||||||
|
//! to use and what kind of packet we are sending (through a packet id).
|
||||||
|
//! Likewise when receiving packets from other peers we decode the
|
||||||
|
//! subprotocol and the packet id. This module helps coupling both
|
||||||
|
//! pieces of information together and provides an easy mechanism
|
||||||
|
//! to convert to/from the packet id values transmitted over the
|
||||||
|
//! wire.
|
||||||
|
|
||||||
|
use api::{ETH_PROTOCOL, WARP_SYNC_PROTOCOL_ID};
|
||||||
|
use network::{PacketId, ProtocolId};
|
||||||
|
|
||||||
|
/// An enum that defines all known packet ids in the context of
|
||||||
|
/// synchronization and provides a mechanism to convert from
|
||||||
|
/// packet ids (of type PacketId or u8) directly read from the network
|
||||||
|
/// to enum variants. This implicitly provides a mechanism to
|
||||||
|
/// check whether a given packet id is known, and to prevent
|
||||||
|
/// packet id clashes when defining new ids.
|
||||||
|
enum_from_primitive! {
|
||||||
|
#[derive(Clone, Copy, Debug, PartialEq)]
|
||||||
|
pub enum SyncPacket {
|
||||||
|
StatusPacket = 0x00,
|
||||||
|
NewBlockHashesPacket = 0x01,
|
||||||
|
TransactionsPacket = 0x02,
|
||||||
|
GetBlockHeadersPacket = 0x03,
|
||||||
|
BlockHeadersPacket = 0x04,
|
||||||
|
GetBlockBodiesPacket = 0x05,
|
||||||
|
BlockBodiesPacket = 0x06,
|
||||||
|
NewBlockPacket = 0x07,
|
||||||
|
|
||||||
|
GetNodeDataPacket = 0x0d,
|
||||||
|
NodeDataPacket = 0x0e,
|
||||||
|
GetReceiptsPacket = 0x0f,
|
||||||
|
ReceiptsPacket = 0x10,
|
||||||
|
|
||||||
|
GetSnapshotManifestPacket = 0x11,
|
||||||
|
SnapshotManifestPacket = 0x12,
|
||||||
|
GetSnapshotDataPacket = 0x13,
|
||||||
|
SnapshotDataPacket = 0x14,
|
||||||
|
ConsensusDataPacket = 0x15,
|
||||||
|
PrivateTransactionPacket = 0x16,
|
||||||
|
SignedPrivateTransactionPacket = 0x17,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
use self::SyncPacket::*;
|
||||||
|
|
||||||
|
/// Provide both subprotocol and packet id information within the
|
||||||
|
/// same object.
|
||||||
|
pub trait PacketInfo {
|
||||||
|
fn id(&self) -> PacketId;
|
||||||
|
fn protocol(&self) -> ProtocolId;
|
||||||
|
}
|
||||||
|
|
||||||
|
// The mechanism to match packet ids and protocol may be improved
|
||||||
|
// through some macro magic, but for now this works.
|
||||||
|
impl PacketInfo for SyncPacket {
|
||||||
|
fn protocol(&self) -> ProtocolId {
|
||||||
|
match self {
|
||||||
|
StatusPacket |
|
||||||
|
NewBlockHashesPacket |
|
||||||
|
TransactionsPacket |
|
||||||
|
GetBlockHeadersPacket |
|
||||||
|
BlockHeadersPacket |
|
||||||
|
GetBlockBodiesPacket |
|
||||||
|
BlockBodiesPacket |
|
||||||
|
NewBlockPacket |
|
||||||
|
|
||||||
|
GetNodeDataPacket|
|
||||||
|
NodeDataPacket |
|
||||||
|
GetReceiptsPacket |
|
||||||
|
ReceiptsPacket
|
||||||
|
|
||||||
|
=> ETH_PROTOCOL,
|
||||||
|
|
||||||
|
GetSnapshotManifestPacket|
|
||||||
|
SnapshotManifestPacket |
|
||||||
|
GetSnapshotDataPacket |
|
||||||
|
SnapshotDataPacket |
|
||||||
|
ConsensusDataPacket |
|
||||||
|
PrivateTransactionPacket |
|
||||||
|
SignedPrivateTransactionPacket
|
||||||
|
|
||||||
|
=> WARP_SYNC_PROTOCOL_ID,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn id(&self) -> PacketId {
|
||||||
|
(*self) as PacketId
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
use enum_primitive::FromPrimitive;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn packet_ids_from_u8_when_from_primitive_zero_then_equals_status_packet() {
|
||||||
|
assert_eq!(SyncPacket::from_u8(0x00), Some(StatusPacket));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn packet_ids_from_u8_when_from_primitive_eleven_then_equals_get_snapshot_manifest_packet() {
|
||||||
|
assert_eq!(SyncPacket::from_u8(0x11), Some(GetSnapshotManifestPacket));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn packet_ids_from_u8_when_invalid_packet_id_then_none() {
|
||||||
|
assert!(SyncPacket::from_u8(0x99).is_none());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn when_status_packet_then_id_and_protocol_match() {
|
||||||
|
assert_eq!(StatusPacket.id(), StatusPacket as PacketId);
|
||||||
|
assert_eq!(StatusPacket.protocol(), ETH_PROTOCOL);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn when_consensus_data_packet_then_id_and_protocol_match() {
|
||||||
|
assert_eq!(ConsensusDataPacket.id(), ConsensusDataPacket as PacketId);
|
||||||
|
assert_eq!(ConsensusDataPacket.protocol(), WARP_SYNC_PROTOCOL_ID);
|
||||||
|
}
|
||||||
|
}
|
@ -44,6 +44,8 @@ extern crate ethcore_light as light;
|
|||||||
#[cfg(test)] extern crate kvdb_memorydb;
|
#[cfg(test)] extern crate kvdb_memorydb;
|
||||||
#[cfg(test)] extern crate rustc_hex;
|
#[cfg(test)] extern crate rustc_hex;
|
||||||
|
|
||||||
|
#[macro_use]
|
||||||
|
extern crate enum_primitive;
|
||||||
#[macro_use]
|
#[macro_use]
|
||||||
extern crate macros;
|
extern crate macros;
|
||||||
#[macro_use]
|
#[macro_use]
|
||||||
|
@ -15,6 +15,7 @@
|
|||||||
// along with Parity Ethereum. If not, see <http://www.gnu.org/licenses/>.
|
// along with Parity Ethereum. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
|
use chain::sync_packet::{PacketInfo, SyncPacket};
|
||||||
use network::{NetworkContext, PeerId, PacketId, Error, SessionInfo, ProtocolId};
|
use network::{NetworkContext, PeerId, PacketId, Error, SessionInfo, ProtocolId};
|
||||||
use network::client_version::ClientVersion;
|
use network::client_version::ClientVersion;
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
@ -34,7 +35,7 @@ pub trait SyncIo {
|
|||||||
/// Respond to current request with a packet. Can be called from an IO handler for incoming packet.
|
/// Respond to current request with a packet. Can be called from an IO handler for incoming packet.
|
||||||
fn respond(&mut self, packet_id: PacketId, data: Vec<u8>) -> Result<(), Error>;
|
fn respond(&mut self, packet_id: PacketId, data: Vec<u8>) -> Result<(), Error>;
|
||||||
/// Send a packet to a peer using specified protocol.
|
/// Send a packet to a peer using specified protocol.
|
||||||
fn send_protocol(&mut self, protocol: ProtocolId, peer_id: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), Error>;
|
fn send(&mut self, peer_id: PeerId, packet_id: SyncPacket, data: Vec<u8>) -> Result<(), Error>;
|
||||||
/// Get the blockchain
|
/// Get the blockchain
|
||||||
fn chain(&self) -> &BlockChainClient;
|
fn chain(&self) -> &BlockChainClient;
|
||||||
/// Get the snapshot service.
|
/// Get the snapshot service.
|
||||||
@ -97,8 +98,8 @@ impl<'s> SyncIo for NetSyncIo<'s> {
|
|||||||
self.network.respond(packet_id, data)
|
self.network.respond(packet_id, data)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn send_protocol(&mut self, protocol: ProtocolId, peer_id: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), Error>{
|
fn send(&mut self, peer_id: PeerId, packet_id: SyncPacket, data: Vec<u8>) -> Result<(), Error>{
|
||||||
self.network.send_protocol(protocol, peer_id, packet_id, data)
|
self.network.send_protocol(packet_id.protocol(), peer_id, packet_id.id(), data)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn chain(&self) -> &BlockChainClient {
|
fn chain(&self) -> &BlockChainClient {
|
||||||
|
@ -30,8 +30,11 @@ use ethcore::miner::Miner;
|
|||||||
use ethcore::test_helpers;
|
use ethcore::test_helpers;
|
||||||
use sync_io::SyncIo;
|
use sync_io::SyncIo;
|
||||||
use io::{IoChannel, IoContext, IoHandler};
|
use io::{IoChannel, IoContext, IoHandler};
|
||||||
use api::{ETH_PROTOCOL, WARP_SYNC_PROTOCOL_ID};
|
use api::WARP_SYNC_PROTOCOL_ID;
|
||||||
use chain::{ChainSync, ETH_PROTOCOL_VERSION_63, PAR_PROTOCOL_VERSION_3, PRIVATE_TRANSACTION_PACKET, SyncSupplier, STATUS_PACKET, RECEIPTS_PACKET, GET_SNAPSHOT_MANIFEST_PACKET, SIGNED_PRIVATE_TRANSACTION_PACKET};
|
use chain::{ChainSync, SyncSupplier, ETH_PROTOCOL_VERSION_63, PAR_PROTOCOL_VERSION_3};
|
||||||
|
use chain::sync_packet::{PacketInfo, SyncPacket};
|
||||||
|
use chain::sync_packet::SyncPacket::{PrivateTransactionPacket, SignedPrivateTransactionPacket};
|
||||||
|
|
||||||
use SyncConfig;
|
use SyncConfig;
|
||||||
use private_tx::SimplePrivateTxHandler;
|
use private_tx::SimplePrivateTxHandler;
|
||||||
use types::BlockNumber;
|
use types::BlockNumber;
|
||||||
@ -80,16 +83,6 @@ impl<'p, C> Drop for TestIo<'p, C> where C: FlushingBlockChainClient, C: 'p {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn assert_packet_id_matches_protocol(protocol: &ProtocolId, packet_id: &PacketId) {
|
|
||||||
match packet_id {
|
|
||||||
STATUS_PACKET ... RECEIPTS_PACKET => assert_eq!(*protocol, ETH_PROTOCOL),
|
|
||||||
GET_SNAPSHOT_MANIFEST_PACKET ... SIGNED_PRIVATE_TRANSACTION_PACKET => assert_eq!(*protocol, WARP_SYNC_PROTOCOL_ID),
|
|
||||||
// What about light?
|
|
||||||
_ => assert!(false)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
impl<'p, C> SyncIo for TestIo<'p, C> where C: FlushingBlockChainClient, C: 'p {
|
impl<'p, C> SyncIo for TestIo<'p, C> where C: FlushingBlockChainClient, C: 'p {
|
||||||
fn disable_peer(&mut self, peer_id: PeerId) {
|
fn disable_peer(&mut self, peer_id: PeerId) {
|
||||||
self.disconnect_peer(peer_id);
|
self.disconnect_peer(peer_id);
|
||||||
@ -112,12 +105,10 @@ impl<'p, C> SyncIo for TestIo<'p, C> where C: FlushingBlockChainClient, C: 'p {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn send_protocol(&mut self, protocol: ProtocolId, peer_id: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), network::Error> {
|
fn send(&mut self,peer_id: PeerId, packet_id: SyncPacket, data: Vec<u8>) -> Result<(), network::Error> {
|
||||||
assert_packet_id_matches_protocol(&protocol, &packet_id);
|
|
||||||
|
|
||||||
self.packets.push(TestPacket {
|
self.packets.push(TestPacket {
|
||||||
data: data,
|
data: data,
|
||||||
packet_id: packet_id,
|
packet_id: packet_id.id(),
|
||||||
recipient: peer_id,
|
recipient: peer_id,
|
||||||
});
|
});
|
||||||
Ok(())
|
Ok(())
|
||||||
@ -244,9 +235,9 @@ impl<C> EthPeer<C> where C: FlushingBlockChainClient {
|
|||||||
match message {
|
match message {
|
||||||
ChainMessageType::Consensus(data) => self.sync.write().propagate_consensus_packet(&mut io, data),
|
ChainMessageType::Consensus(data) => self.sync.write().propagate_consensus_packet(&mut io, data),
|
||||||
ChainMessageType::PrivateTransaction(transaction_hash, data) =>
|
ChainMessageType::PrivateTransaction(transaction_hash, data) =>
|
||||||
self.sync.write().propagate_private_transaction(&mut io, transaction_hash, WARP_SYNC_PROTOCOL_ID, PRIVATE_TRANSACTION_PACKET, data),
|
self.sync.write().propagate_private_transaction(&mut io, transaction_hash, PrivateTransactionPacket, data),
|
||||||
ChainMessageType::SignedPrivateTransaction(transaction_hash, data) =>
|
ChainMessageType::SignedPrivateTransaction(transaction_hash, data) =>
|
||||||
self.sync.write().propagate_private_transaction(&mut io, transaction_hash, WARP_SYNC_PROTOCOL_ID, SIGNED_PRIVATE_TRANSACTION_PACKET, data),
|
self.sync.write().propagate_private_transaction(&mut io, transaction_hash, SignedPrivateTransactionPacket, data),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user