Private transactions integration pr (#6422)
* Private transaction message added * Empty line removed * Private transactions logic removed from client into the separate module * Fixed compilation after merge with head * Signed private transaction message added as well * Comments after the review fixed * Private tx execution * Test update * Renamed some methods * Fixed some tests * Reverted submodules * Fixed build * Private transaction message added * Empty line removed * Private transactions logic removed from client into the separate module * Fixed compilation after merge with head * Signed private transaction message added as well * Comments after the review fixed * Encrypted private transaction message and signed reply added * Private tx execution * Test update * Main scenario completed * Merged with the latest head * Private transactions API * Comments after review fixed * Parameters for private transactions added to parity arguments * New files added * New API methods added * Do not process packets from unconfirmed peers * Merge with ptm_ss branch * Encryption and permissioning with key server added * Fixed compilation after merge * Version of Parity protocol incremented in order to support private transactions * Doc strings for constants added * Proper format for doc string added * fixed some encryptor.rs grumbles * Private transactions functionality moved to the separate crate * Refactoring in order to remove late initialisation * Tests fixed after moving to the separate crate * Fetch method removed * Sync test helpers refactored * Interaction with encryptor refactored * Contract address retrieving via substate removed * Sensible gas limit for private transactions implemented * New private contract with nonces added * Parsing of the response from key server fixed * Build fixed after the merge, native contracts removed * Crate renamed * Tests moved to the separate directory * Handling of errors reworked in order to use error chain * Encodable macro added, new constructor replaced with default * Native ethabi usage removed * Couple conversions optimized * Interactions with client reworked * Errors omitting removed * Fix after merge * Fix after the merge * private transactions improvements in progress * private_transactions -> ethcore/private-tx * making private transactions more idiomatic * private-tx encryptor uses shared FetchClient and is more idiomatic * removed redundant tests, moved integration tests to tests/ dir * fixed failing service test * reenable add_notify on private tx provider * removed private_tx tests from sync module * removed commented out code * Use plain password instead of unlocking account manager * remove dead code * Link to the contract changed * Transaction signature chain replay protection module created * Redundant type conversion removed * Contract address returned by private provider * Test fixed * Addressing grumbles in PrivateTransactions (#8249) * Tiny fixes part 1. * A bunch of additional comments and todos. * Fix ethsync tests. * resolved merge conflicts * final private tx pr (#8318) * added cli option that enables private transactions * fixed failing test * fixed failing test * fixed failing test * fixed failing test
This commit is contained in:
committed by
Marek Kotewicz
parent
c039ab79b5
commit
e6f75bccfe
@@ -18,6 +18,7 @@ ethcore = { path = "../ethcore" }
|
||||
ethereum-types = "0.3"
|
||||
plain_hasher = { path = "../util/plain_hasher" }
|
||||
rlp = { path = "../util/rlp" }
|
||||
rustc-hex = "1.0"
|
||||
keccak-hash = { path = "../util/hash" }
|
||||
triehash = { path = "../util/triehash" }
|
||||
kvdb = { path = "../util/kvdb" }
|
||||
|
||||
@@ -24,7 +24,7 @@ use network::{NetworkProtocolHandler, NetworkContext, HostInfo, PeerId, Protocol
|
||||
use ethereum_types::{H256, H512, U256};
|
||||
use io::{TimerToken};
|
||||
use ethcore::ethstore::ethkey::Secret;
|
||||
use ethcore::client::{BlockChainClient, ChainNotify};
|
||||
use ethcore::client::{BlockChainClient, ChainNotify, ChainMessageType};
|
||||
use ethcore::snapshot::SnapshotService;
|
||||
use ethcore::header::BlockNumber;
|
||||
use sync_io::NetSyncIo;
|
||||
@@ -32,11 +32,13 @@ use chain::{ChainSync, SyncStatus as EthSyncStatus};
|
||||
use std::net::{SocketAddr, AddrParseError};
|
||||
use std::str::FromStr;
|
||||
use parking_lot::RwLock;
|
||||
use chain::{ETH_PACKET_COUNT, SNAPSHOT_SYNC_PACKET_COUNT};
|
||||
use chain::{ETH_PACKET_COUNT, SNAPSHOT_SYNC_PACKET_COUNT, ETH_PROTOCOL_VERSION_63, ETH_PROTOCOL_VERSION_62,
|
||||
PAR_PROTOCOL_VERSION_1, PAR_PROTOCOL_VERSION_2, PAR_PROTOCOL_VERSION_3};
|
||||
use light::client::AsLightClient;
|
||||
use light::Provider;
|
||||
use light::net::{self as light_net, LightProtocol, Params as LightParams, Capabilities, Handler as LightHandler, EventContext};
|
||||
use network::IpFilter;
|
||||
use private_tx::PrivateTxHandler;
|
||||
|
||||
/// Parity sync protocol
|
||||
pub const WARP_SYNC_PROTOCOL_ID: ProtocolId = *b"par";
|
||||
@@ -227,6 +229,8 @@ pub struct Params {
|
||||
pub chain: Arc<BlockChainClient>,
|
||||
/// Snapshot service.
|
||||
pub snapshot_service: Arc<SnapshotService>,
|
||||
/// Private tx service.
|
||||
pub private_tx_handler: Arc<PrivateTxHandler>,
|
||||
/// Light data provider.
|
||||
pub provider: Arc<::light::Provider>,
|
||||
/// Network layer configuration.
|
||||
@@ -288,7 +292,7 @@ impl EthSync {
|
||||
})
|
||||
};
|
||||
|
||||
let chain_sync = ChainSync::new(params.config, &*params.chain);
|
||||
let chain_sync = ChainSync::new(params.config, &*params.chain, params.private_tx_handler.clone());
|
||||
let service = NetworkService::new(params.network_config.clone().into_basic()?, connection_filter)?;
|
||||
|
||||
let sync = Arc::new(EthSync {
|
||||
@@ -392,9 +396,10 @@ impl NetworkProtocolHandler for SyncProtocolHandler {
|
||||
}
|
||||
|
||||
fn timeout(&self, io: &NetworkContext, _timer: TimerToken) {
|
||||
self.sync.write().maintain_peers(&mut NetSyncIo::new(io, &*self.chain, &*self.snapshot_service, &self.overlay));
|
||||
self.sync.write().maintain_sync(&mut NetSyncIo::new(io, &*self.chain, &*self.snapshot_service, &self.overlay));
|
||||
self.sync.write().propagate_new_transactions(&mut NetSyncIo::new(io, &*self.chain, &*self.snapshot_service, &self.overlay));
|
||||
let mut io = NetSyncIo::new(io, &*self.chain, &*self.snapshot_service, &self.overlay);
|
||||
self.sync.write().maintain_peers(&mut io);
|
||||
self.sync.write().maintain_sync(&mut io);
|
||||
self.sync.write().propagate_new_transactions(&mut io);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -411,7 +416,8 @@ impl ChainNotify for EthSync {
|
||||
use light::net::Announcement;
|
||||
|
||||
self.network.with_context(self.subprotocol_name, |context| {
|
||||
let mut sync_io = NetSyncIo::new(context, &*self.eth_handler.chain, &*self.eth_handler.snapshot_service, &self.eth_handler.overlay);
|
||||
let mut sync_io = NetSyncIo::new(context, &*self.eth_handler.chain, &*self.eth_handler.snapshot_service,
|
||||
&self.eth_handler.overlay);
|
||||
self.eth_handler.sync.write().chain_new_blocks(
|
||||
&mut sync_io,
|
||||
&imported,
|
||||
@@ -448,10 +454,10 @@ impl ChainNotify for EthSync {
|
||||
Err(err) => warn!("Error starting network: {}", err),
|
||||
_ => {},
|
||||
}
|
||||
self.network.register_protocol(self.eth_handler.clone(), self.subprotocol_name, ETH_PACKET_COUNT, &[62u8, 63u8])
|
||||
self.network.register_protocol(self.eth_handler.clone(), self.subprotocol_name, ETH_PACKET_COUNT, &[ETH_PROTOCOL_VERSION_62, ETH_PROTOCOL_VERSION_63])
|
||||
.unwrap_or_else(|e| warn!("Error registering ethereum protocol: {:?}", e));
|
||||
// register the warp sync subprotocol
|
||||
self.network.register_protocol(self.eth_handler.clone(), WARP_SYNC_PROTOCOL_ID, SNAPSHOT_SYNC_PACKET_COUNT, &[1u8, 2u8])
|
||||
self.network.register_protocol(self.eth_handler.clone(), WARP_SYNC_PROTOCOL_ID, SNAPSHOT_SYNC_PACKET_COUNT, &[PAR_PROTOCOL_VERSION_1, PAR_PROTOCOL_VERSION_2, PAR_PROTOCOL_VERSION_3])
|
||||
.unwrap_or_else(|e| warn!("Error registering snapshot sync protocol: {:?}", e));
|
||||
|
||||
// register the light protocol.
|
||||
@@ -469,10 +475,14 @@ impl ChainNotify for EthSync {
|
||||
self.network.stop().unwrap_or_else(|e| warn!("Error stopping network: {:?}", e));
|
||||
}
|
||||
|
||||
fn broadcast(&self, message: Vec<u8>) {
|
||||
fn broadcast(&self, message_type: ChainMessageType) {
|
||||
self.network.with_context(WARP_SYNC_PROTOCOL_ID, |context| {
|
||||
let mut sync_io = NetSyncIo::new(context, &*self.eth_handler.chain, &*self.eth_handler.snapshot_service, &self.eth_handler.overlay);
|
||||
self.eth_handler.sync.write().propagate_consensus_packet(&mut sync_io, message.clone());
|
||||
match message_type {
|
||||
ChainMessageType::Consensus(message) => self.eth_handler.sync.write().propagate_consensus_packet(&mut sync_io, message),
|
||||
ChainMessageType::PrivateTransaction(message) => self.eth_handler.sync.write().propagate_private_transaction(&mut sync_io, message),
|
||||
ChainMessageType::SignedPrivateTransaction(message) => self.eth_handler.sync.write().propagate_signed_private_transaction(&mut sync_io, message),
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -88,6 +88,7 @@
|
||||
/// All other messages are ignored.
|
||||
///
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::collections::{HashSet, HashMap};
|
||||
use std::cmp;
|
||||
use std::time::Instant;
|
||||
@@ -111,15 +112,23 @@ use rand::Rng;
|
||||
use snapshot::{Snapshot, ChunkType};
|
||||
use api::{EthProtocolInfo as PeerInfoDigest, WARP_SYNC_PROTOCOL_ID};
|
||||
use transactions_stats::{TransactionsStats, Stats as TransactionStats};
|
||||
use private_tx::PrivateTxHandler;
|
||||
|
||||
known_heap_size!(0, PeerInfo);
|
||||
|
||||
type PacketDecodeError = DecoderError;
|
||||
|
||||
const PROTOCOL_VERSION_63: u8 = 63;
|
||||
const PROTOCOL_VERSION_62: u8 = 62;
|
||||
const PROTOCOL_VERSION_1: u8 = 1;
|
||||
const PROTOCOL_VERSION_2: u8 = 2;
|
||||
/// 63 version of Ethereum protocol.
|
||||
pub const ETH_PROTOCOL_VERSION_63: u8 = 63;
|
||||
/// 62 version of Ethereum protocol.
|
||||
pub const ETH_PROTOCOL_VERSION_62: u8 = 62;
|
||||
/// 1 version of Parity protocol.
|
||||
pub const PAR_PROTOCOL_VERSION_1: u8 = 1;
|
||||
/// 2 version of Parity protocol (consensus messages added).
|
||||
pub const PAR_PROTOCOL_VERSION_2: u8 = 2;
|
||||
/// 3 version of Parity protocol (private transactions messages added).
|
||||
pub const PAR_PROTOCOL_VERSION_3: u8 = 3;
|
||||
|
||||
const MAX_BODIES_TO_SEND: usize = 256;
|
||||
const MAX_HEADERS_TO_SEND: usize = 512;
|
||||
const MAX_NODE_DATA_TO_SEND: usize = 1024;
|
||||
@@ -160,8 +169,10 @@ const SNAPSHOT_MANIFEST_PACKET: u8 = 0x12;
|
||||
const GET_SNAPSHOT_DATA_PACKET: u8 = 0x13;
|
||||
const SNAPSHOT_DATA_PACKET: u8 = 0x14;
|
||||
const CONSENSUS_DATA_PACKET: u8 = 0x15;
|
||||
const PRIVATE_TRANSACTION_PACKET: u8 = 0x16;
|
||||
const SIGNED_PRIVATE_TRANSACTION_PACKET: u8 = 0x17;
|
||||
|
||||
pub const SNAPSHOT_SYNC_PACKET_COUNT: u8 = 0x16;
|
||||
pub const SNAPSHOT_SYNC_PACKET_COUNT: u8 = 0x18;
|
||||
|
||||
const MAX_SNAPSHOT_CHUNKS_DOWNLOAD_AHEAD: usize = 3;
|
||||
|
||||
@@ -384,6 +395,8 @@ pub struct ChainSync {
|
||||
transactions_stats: TransactionsStats,
|
||||
/// Enable ancient block downloading
|
||||
download_old_blocks: bool,
|
||||
/// Shared private tx service.
|
||||
private_tx_handler: Arc<PrivateTxHandler>,
|
||||
/// Enable warp sync.
|
||||
warp_sync: WarpSync,
|
||||
}
|
||||
@@ -392,7 +405,7 @@ type RlpResponseResult = Result<Option<(PacketId, RlpStream)>, PacketDecodeError
|
||||
|
||||
impl ChainSync {
|
||||
/// Create a new instance of syncing strategy.
|
||||
pub fn new(config: SyncConfig, chain: &BlockChainClient) -> ChainSync {
|
||||
pub fn new(config: SyncConfig, chain: &BlockChainClient, private_tx_handler: Arc<PrivateTxHandler>) -> ChainSync {
|
||||
let chain_info = chain.chain_info();
|
||||
let best_block = chain.chain_info().best_block_number;
|
||||
let state = match config.warp_sync {
|
||||
@@ -417,6 +430,7 @@ impl ChainSync {
|
||||
snapshot: Snapshot::new(),
|
||||
sync_start_time: None,
|
||||
transactions_stats: TransactionsStats::default(),
|
||||
private_tx_handler,
|
||||
warp_sync: config.warp_sync,
|
||||
};
|
||||
sync.update_targets(chain);
|
||||
@@ -428,7 +442,7 @@ impl ChainSync {
|
||||
let last_imported_number = self.new_blocks.last_imported_block_number();
|
||||
SyncStatus {
|
||||
state: self.state.clone(),
|
||||
protocol_version: PROTOCOL_VERSION_63,
|
||||
protocol_version: ETH_PROTOCOL_VERSION_63,
|
||||
network_id: self.network_id,
|
||||
start_block_number: self.starting_block,
|
||||
last_imported_block_number: Some(last_imported_number),
|
||||
@@ -662,7 +676,8 @@ impl ChainSync {
|
||||
trace!(target: "sync", "Peer {} network id mismatch (ours: {}, theirs: {})", peer_id, self.network_id, peer.network_id);
|
||||
return Ok(());
|
||||
}
|
||||
if (warp_protocol && peer.protocol_version != PROTOCOL_VERSION_1 && peer.protocol_version != PROTOCOL_VERSION_2) || (!warp_protocol && peer.protocol_version != PROTOCOL_VERSION_63 && peer.protocol_version != PROTOCOL_VERSION_62) {
|
||||
if (warp_protocol && peer.protocol_version != PAR_PROTOCOL_VERSION_1 && peer.protocol_version != PAR_PROTOCOL_VERSION_2 && peer.protocol_version != PAR_PROTOCOL_VERSION_3)
|
||||
|| (!warp_protocol && peer.protocol_version != ETH_PROTOCOL_VERSION_63 && peer.protocol_version != ETH_PROTOCOL_VERSION_62) {
|
||||
io.disable_peer(peer_id);
|
||||
trace!(target: "sync", "Peer {} unsupported eth protocol ({})", peer_id, peer.protocol_version);
|
||||
return Ok(());
|
||||
@@ -1493,6 +1508,7 @@ impl ChainSync {
|
||||
}
|
||||
if !self.peers.get(&peer_id).map_or(false, |p| p.can_sync()) {
|
||||
trace!(target: "sync", "{} Ignoring transactions from unconfirmed/unknown peer", peer_id);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let item_count = r.item_count()?;
|
||||
@@ -1515,7 +1531,7 @@ impl ChainSync {
|
||||
fn send_status(&mut self, io: &mut SyncIo, peer: PeerId) -> Result<(), network::Error> {
|
||||
let warp_protocol_version = io.protocol_version(&WARP_SYNC_PROTOCOL_ID, peer);
|
||||
let warp_protocol = warp_protocol_version != 0;
|
||||
let protocol = if warp_protocol { warp_protocol_version } else { PROTOCOL_VERSION_63 };
|
||||
let protocol = if warp_protocol { warp_protocol_version } else { ETH_PROTOCOL_VERSION_63 };
|
||||
trace!(target: "sync", "Sending status to {}, protocol version {}", peer, protocol);
|
||||
let mut packet = RlpStream::new_list(if warp_protocol { 7 } else { 5 });
|
||||
let chain = io.chain().chain_info();
|
||||
@@ -1792,6 +1808,8 @@ impl ChainSync {
|
||||
NEW_BLOCK_HASHES_PACKET => self.on_peer_new_hashes(io, peer, &rlp),
|
||||
SNAPSHOT_MANIFEST_PACKET => self.on_snapshot_manifest(io, peer, &rlp),
|
||||
SNAPSHOT_DATA_PACKET => self.on_snapshot_data(io, peer, &rlp),
|
||||
PRIVATE_TRANSACTION_PACKET => self.on_private_transaction(io, peer, &rlp),
|
||||
SIGNED_PRIVATE_TRANSACTION_PACKET => self.on_signed_private_transaction(io, peer, &rlp),
|
||||
_ => {
|
||||
debug!(target: "sync", "{}: Unknown packet {}", peer, packet_id);
|
||||
Ok(())
|
||||
@@ -1945,7 +1963,11 @@ impl ChainSync {
|
||||
}
|
||||
|
||||
fn get_consensus_peers(&self) -> Vec<PeerId> {
|
||||
self.peers.iter().filter_map(|(id, p)| if p.protocol_version == PROTOCOL_VERSION_2 { Some(*id) } else { None }).collect()
|
||||
self.peers.iter().filter_map(|(id, p)| if p.protocol_version >= PAR_PROTOCOL_VERSION_2 { Some(*id) } else { None }).collect()
|
||||
}
|
||||
|
||||
fn get_private_transaction_peers(&self) -> Vec<PeerId> {
|
||||
self.peers.iter().filter_map(|(id, p)| if p.protocol_version >= PAR_PROTOCOL_VERSION_3 { Some(*id) } else { None }).collect()
|
||||
}
|
||||
|
||||
/// propagates latest block to a set of peers
|
||||
@@ -2223,6 +2245,54 @@ impl ChainSync {
|
||||
self.send_packet(io, peer_id, CONSENSUS_DATA_PACKET, packet.clone());
|
||||
}
|
||||
}
|
||||
|
||||
/// Called when peer sends us new private transaction packet
|
||||
fn on_private_transaction(&self, _io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> {
|
||||
if !self.peers.get(&peer_id).map_or(false, |p| p.can_sync()) {
|
||||
trace!(target: "sync", "{} Ignoring packet from unconfirmed/unknown peer", peer_id);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
trace!(target: "sync", "Received private transaction packet from {:?}", peer_id);
|
||||
|
||||
if let Err(e) = self.private_tx_handler.import_private_transaction(r.as_raw()) {
|
||||
trace!(target: "sync", "Ignoring the message, error queueing: {}", e);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Broadcast private transaction message to peers.
|
||||
pub fn propagate_private_transaction(&mut self, io: &mut SyncIo, packet: Bytes) {
|
||||
let lucky_peers = ChainSync::select_random_peers(&self.get_private_transaction_peers());
|
||||
trace!(target: "sync", "Sending private transaction packet to {:?}", lucky_peers);
|
||||
for peer_id in lucky_peers {
|
||||
self.send_packet(io, peer_id, PRIVATE_TRANSACTION_PACKET, packet.clone());
|
||||
}
|
||||
}
|
||||
|
||||
/// Called when peer sends us signed private transaction packet
|
||||
fn on_signed_private_transaction(&self, _io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> {
|
||||
if !self.peers.get(&peer_id).map_or(false, |p| p.can_sync()) {
|
||||
trace!(target: "sync", "{} Ignoring packet from unconfirmed/unknown peer", peer_id);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
trace!(target: "sync", "Received signed private transaction packet from {:?}", peer_id);
|
||||
if let Err(e) = self.private_tx_handler.import_signed_private_transaction(r.as_raw()) {
|
||||
trace!(target: "sync", "Ignoring the message, error queueing: {}", e);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Broadcast signed private transaction message to peers.
|
||||
pub fn propagate_signed_private_transaction(&mut self, io: &mut SyncIo, packet: Bytes) {
|
||||
let lucky_peers = ChainSync::select_random_peers(&self.get_private_transaction_peers());
|
||||
trace!(target: "sync", "Sending signed private transaction packet to {:?}", lucky_peers);
|
||||
for peer_id in lucky_peers {
|
||||
self.send_packet(io, peer_id, SIGNED_PRIVATE_TRANSACTION_PACKET, packet.clone());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/// Checks if peer is able to process service transactions
|
||||
@@ -2247,7 +2317,7 @@ mod tests {
|
||||
use std::collections::{HashSet, VecDeque};
|
||||
use ethkey;
|
||||
use network::PeerId;
|
||||
use tests::helpers::*;
|
||||
use tests::helpers::{TestIo};
|
||||
use tests::snapshot::TestSnapshotService;
|
||||
use ethereum_types::{H256, U256, Address};
|
||||
use parking_lot::RwLock;
|
||||
@@ -2260,6 +2330,7 @@ mod tests {
|
||||
use ethcore::client::{BlockChainClient, EachBlockWith, TestBlockChainClient, ChainInfo, BlockInfo};
|
||||
use ethcore::miner::MinerService;
|
||||
use transaction::UnverifiedTransaction;
|
||||
use private_tx::NoopPrivateTxHandler;
|
||||
|
||||
fn get_dummy_block(order: u32, parent_hash: H256) -> Bytes {
|
||||
let mut header = Header::new();
|
||||
@@ -2483,7 +2554,7 @@ mod tests {
|
||||
}
|
||||
|
||||
fn dummy_sync_with_peer(peer_latest_hash: H256, client: &BlockChainClient) -> ChainSync {
|
||||
let mut sync = ChainSync::new(SyncConfig::default(), client);
|
||||
let mut sync = ChainSync::new(SyncConfig::default(), client, Arc::new(NoopPrivateTxHandler));
|
||||
insert_dummy_peer(&mut sync, 0, peer_latest_hash);
|
||||
sync
|
||||
}
|
||||
@@ -2608,7 +2679,7 @@ mod tests {
|
||||
client.add_blocks(2, EachBlockWith::Uncle);
|
||||
let queue = RwLock::new(VecDeque::new());
|
||||
let block = client.block(BlockId::Latest).unwrap().into_inner();
|
||||
let mut sync = ChainSync::new(SyncConfig::default(), &client);
|
||||
let mut sync = ChainSync::new(SyncConfig::default(), &client, Arc::new(NoopPrivateTxHandler));
|
||||
sync.peers.insert(0,
|
||||
PeerInfo {
|
||||
// Messaging protocol
|
||||
@@ -2695,7 +2766,7 @@ mod tests {
|
||||
client.add_blocks(100, EachBlockWith::Uncle);
|
||||
client.insert_transaction_to_queue();
|
||||
// Sync with no peers
|
||||
let mut sync = ChainSync::new(SyncConfig::default(), &client);
|
||||
let mut sync = ChainSync::new(SyncConfig::default(), &client, Arc::new(NoopPrivateTxHandler));
|
||||
let queue = RwLock::new(VecDeque::new());
|
||||
let ss = TestSnapshotService::new();
|
||||
let mut io = TestIo::new(&mut client, &ss, &queue, None);
|
||||
@@ -2765,7 +2836,7 @@ mod tests {
|
||||
let mut client = TestBlockChainClient::new();
|
||||
client.insert_transaction_with_gas_price_to_queue(U256::zero());
|
||||
let block_hash = client.block_hash_delta_minus(1);
|
||||
let mut sync = ChainSync::new(SyncConfig::default(), &client);
|
||||
let mut sync = ChainSync::new(SyncConfig::default(), &client, Arc::new(NoopPrivateTxHandler));
|
||||
let queue = RwLock::new(VecDeque::new());
|
||||
let ss = TestSnapshotService::new();
|
||||
let mut io = TestIo::new(&mut client, &ss, &queue, None);
|
||||
@@ -2798,7 +2869,7 @@ mod tests {
|
||||
let tx1_hash = client.insert_transaction_to_queue();
|
||||
let tx2_hash = client.insert_transaction_with_gas_price_to_queue(U256::zero());
|
||||
let block_hash = client.block_hash_delta_minus(1);
|
||||
let mut sync = ChainSync::new(SyncConfig::default(), &client);
|
||||
let mut sync = ChainSync::new(SyncConfig::default(), &client, Arc::new(NoopPrivateTxHandler));
|
||||
let queue = RwLock::new(VecDeque::new());
|
||||
let ss = TestSnapshotService::new();
|
||||
let mut io = TestIo::new(&mut client, &ss, &queue, None);
|
||||
|
||||
@@ -44,6 +44,7 @@ extern crate ethcore_light as light;
|
||||
|
||||
#[cfg(test)] extern crate ethkey;
|
||||
#[cfg(test)] extern crate kvdb_memorydb;
|
||||
#[cfg(test)] extern crate rustc_hex;
|
||||
|
||||
#[macro_use]
|
||||
extern crate macros;
|
||||
@@ -56,6 +57,7 @@ mod chain;
|
||||
mod blocks;
|
||||
mod block_sync;
|
||||
mod sync_io;
|
||||
mod private_tx;
|
||||
mod snapshot;
|
||||
mod transactions_stats;
|
||||
|
||||
@@ -70,3 +72,4 @@ pub use api::*;
|
||||
pub use chain::{SyncStatus, SyncState};
|
||||
pub use devp2p::{validate_node_url, ConnectionFilter, ConnectionDirection};
|
||||
pub use network::{NonReservedPeerMode, Error, ErrorKind};
|
||||
pub use private_tx::{PrivateTxHandler, NoopPrivateTxHandler, SimplePrivateTxHandler};
|
||||
|
||||
@@ -205,6 +205,10 @@ impl PeerLike for Peer {
|
||||
}
|
||||
|
||||
fn restart_sync(&self) { }
|
||||
|
||||
fn process_all_io_messages(&self) { }
|
||||
|
||||
fn process_all_new_block_messages(&self) { }
|
||||
}
|
||||
|
||||
impl TestNet<Peer> {
|
||||
|
||||
60
sync/src/private_tx.rs
Normal file
60
sync/src/private_tx.rs
Normal file
@@ -0,0 +1,60 @@
|
||||
// Copyright 2015-2017 Parity Technologies (UK) Ltd.
|
||||
// This file is part of Parity.
|
||||
|
||||
// Parity is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Parity is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use parking_lot::Mutex;
|
||||
|
||||
/// Trait which should be implemented by a private transaction handler.
|
||||
pub trait PrivateTxHandler: Send + Sync + 'static {
|
||||
/// Function called on new private transaction received.
|
||||
fn import_private_transaction(&self, rlp: &[u8]) -> Result<(), String>;
|
||||
|
||||
/// Function called on new signed private transaction received.
|
||||
fn import_signed_private_transaction(&self, rlp: &[u8]) -> Result<(), String>;
|
||||
}
|
||||
|
||||
/// Nonoperative private transaction handler.
|
||||
pub struct NoopPrivateTxHandler;
|
||||
|
||||
impl PrivateTxHandler for NoopPrivateTxHandler {
|
||||
fn import_private_transaction(&self, _rlp: &[u8]) -> Result<(), String> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn import_signed_private_transaction(&self, _rlp: &[u8]) -> Result<(), String> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Simple private transaction handler. Used for tests.
|
||||
#[derive(Default)]
|
||||
pub struct SimplePrivateTxHandler {
|
||||
/// imported private transactions
|
||||
pub txs: Mutex<Vec<Vec<u8>>>,
|
||||
/// imported signed private transactions
|
||||
pub signed_txs: Mutex<Vec<Vec<u8>>>,
|
||||
}
|
||||
|
||||
impl PrivateTxHandler for SimplePrivateTxHandler {
|
||||
fn import_private_transaction(&self, rlp: &[u8]) -> Result<(), String> {
|
||||
self.txs.lock().push(rlp.to_vec());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn import_signed_private_transaction(&self, rlp: &[u8]) -> Result<(), String> {
|
||||
self.signed_txs.lock().push(rlp.to_vec());
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -17,8 +17,8 @@
|
||||
use std::sync::Arc;
|
||||
use hash::keccak;
|
||||
use ethereum_types::{U256, Address};
|
||||
use io::{IoHandler, IoContext, IoChannel};
|
||||
use ethcore::client::{Client, ChainInfo, ClientIoMessage};
|
||||
use io::{IoHandler, IoChannel};
|
||||
use ethcore::client::{ChainInfo, ClientIoMessage};
|
||||
use ethcore::spec::Spec;
|
||||
use ethcore::miner::MinerService;
|
||||
use ethcore::account_provider::AccountProvider;
|
||||
@@ -27,21 +27,6 @@ use transaction::{Action, PendingTransaction, Transaction};
|
||||
use super::helpers::*;
|
||||
use SyncConfig;
|
||||
|
||||
struct TestIoHandler {
|
||||
client: Arc<Client>,
|
||||
}
|
||||
|
||||
impl IoHandler<ClientIoMessage> for TestIoHandler {
|
||||
fn message(&self, _io: &IoContext<ClientIoMessage>, net_message: &ClientIoMessage) {
|
||||
match *net_message {
|
||||
ClientIoMessage::NewMessage(ref message) => if let Err(e) = self.client.engine().handle_message(message) {
|
||||
panic!("Invalid message received: {}", e);
|
||||
},
|
||||
_ => {} // ignore other messages
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn new_tx(secret: &Secret, nonce: U256, chain_id: u64) -> PendingTransaction {
|
||||
let signed = Transaction {
|
||||
nonce: nonce.into(),
|
||||
@@ -63,9 +48,9 @@ fn authority_round() {
|
||||
ap.insert_account(s1.secret().clone(), "").unwrap();
|
||||
|
||||
let chain_id = Spec::new_test_round().chain_id();
|
||||
let mut net = TestNet::with_spec_and_accounts(2, SyncConfig::default(), Spec::new_test_round, Some(ap));
|
||||
let io_handler0: Arc<IoHandler<ClientIoMessage>> = Arc::new(TestIoHandler { client: net.peer(0).chain.clone() });
|
||||
let io_handler1: Arc<IoHandler<ClientIoMessage>> = Arc::new(TestIoHandler { client: net.peer(1).chain.clone() });
|
||||
let mut net = TestNet::with_spec_and_accounts(2, SyncConfig::default(), Spec::new_test_round, Some(ap), false);
|
||||
let io_handler0: Arc<IoHandler<ClientIoMessage>> = Arc::new(TestIoHandler::new(net.peer(0).chain.clone()));
|
||||
let io_handler1: Arc<IoHandler<ClientIoMessage>> = Arc::new(TestIoHandler::new(net.peer(1).chain.clone()));
|
||||
// Push transaction to both clients. Only one of them gets lucky to produce a block.
|
||||
net.peer(0).chain.miner().set_engine_signer(s0.address(), "".to_owned()).unwrap();
|
||||
net.peer(1).chain.miner().set_engine_signer(s1.address(), "".to_owned()).unwrap();
|
||||
@@ -150,9 +135,9 @@ fn tendermint() {
|
||||
ap.insert_account(s1.secret().clone(), "").unwrap();
|
||||
|
||||
let chain_id = Spec::new_test_tendermint().chain_id();
|
||||
let mut net = TestNet::with_spec_and_accounts(2, SyncConfig::default(), Spec::new_test_tendermint, Some(ap));
|
||||
let io_handler0: Arc<IoHandler<ClientIoMessage>> = Arc::new(TestIoHandler { client: net.peer(0).chain.clone() });
|
||||
let io_handler1: Arc<IoHandler<ClientIoMessage>> = Arc::new(TestIoHandler { client: net.peer(1).chain.clone() });
|
||||
let mut net = TestNet::with_spec_and_accounts(2, SyncConfig::default(), Spec::new_test_tendermint, Some(ap), false);
|
||||
let io_handler0: Arc<IoHandler<ClientIoMessage>> = Arc::new(TestIoHandler::new(net.peer(0).chain.clone()));
|
||||
let io_handler1: Arc<IoHandler<ClientIoMessage>> = Arc::new(TestIoHandler::new(net.peer(1).chain.clone()));
|
||||
// Push transaction to both clients. Only one of them issues a proposal.
|
||||
net.peer(0).chain.miner().set_engine_signer(s0.address(), "".to_owned()).unwrap();
|
||||
trace!(target: "poa", "Peer 0 is {}.", s0.address());
|
||||
|
||||
@@ -17,21 +17,23 @@
|
||||
use std::collections::{VecDeque, HashSet, HashMap};
|
||||
use std::sync::Arc;
|
||||
use ethereum_types::H256;
|
||||
use parking_lot::RwLock;
|
||||
use parking_lot::{RwLock, Mutex};
|
||||
use bytes::Bytes;
|
||||
use network::{self, PeerId, ProtocolId, PacketId, SessionInfo};
|
||||
use tests::snapshot::*;
|
||||
use ethcore::client::{TestBlockChainClient, BlockChainClient, Client as EthcoreClient, ClientConfig, ChainNotify};
|
||||
use ethcore::client::{TestBlockChainClient, BlockChainClient, Client as EthcoreClient,
|
||||
ClientConfig, ChainNotify, ChainMessageType, ClientIoMessage};
|
||||
use ethcore::header::BlockNumber;
|
||||
use ethcore::snapshot::SnapshotService;
|
||||
use ethcore::spec::Spec;
|
||||
use ethcore::account_provider::AccountProvider;
|
||||
use ethcore::miner::Miner;
|
||||
use sync_io::SyncIo;
|
||||
use io::IoChannel;
|
||||
use io::{IoChannel, IoContext, IoHandler};
|
||||
use api::WARP_SYNC_PROTOCOL_ID;
|
||||
use chain::ChainSync;
|
||||
use ::SyncConfig;
|
||||
use chain::{ChainSync, ETH_PROTOCOL_VERSION_63, PAR_PROTOCOL_VERSION_3};
|
||||
use SyncConfig;
|
||||
use private_tx::{NoopPrivateTxHandler, PrivateTxHandler, SimplePrivateTxHandler};
|
||||
|
||||
pub trait FlushingBlockChainClient: BlockChainClient {
|
||||
fn flush(&self) {}
|
||||
@@ -131,11 +133,11 @@ impl<'p, C> SyncIo for TestIo<'p, C> where C: FlushingBlockChainClient, C: 'p {
|
||||
}
|
||||
|
||||
fn eth_protocol_version(&self, _peer: PeerId) -> u8 {
|
||||
63
|
||||
ETH_PROTOCOL_VERSION_63
|
||||
}
|
||||
|
||||
fn protocol_version(&self, protocol: &ProtocolId, peer_id: PeerId) -> u8 {
|
||||
if protocol == &WARP_SYNC_PROTOCOL_ID { 2 } else { self.eth_protocol_version(peer_id) }
|
||||
if protocol == &WARP_SYNC_PROTOCOL_ID { PAR_PROTOCOL_VERSION_3 } else { self.eth_protocol_version(peer_id) }
|
||||
}
|
||||
|
||||
fn chain_overlay(&self) -> &RwLock<HashMap<BlockNumber, Bytes>> {
|
||||
@@ -143,6 +145,16 @@ impl<'p, C> SyncIo for TestIo<'p, C> where C: FlushingBlockChainClient, C: 'p {
|
||||
}
|
||||
}
|
||||
|
||||
/// Mock for emulution of async run of new blocks
|
||||
struct NewBlockMessage {
|
||||
imported: Vec<H256>,
|
||||
invalid: Vec<H256>,
|
||||
enacted: Vec<H256>,
|
||||
retracted: Vec<H256>,
|
||||
sealed: Vec<H256>,
|
||||
proposed: Vec<Bytes>,
|
||||
}
|
||||
|
||||
/// Abstract messages between peers.
|
||||
pub trait Message {
|
||||
/// The intended recipient of this message.
|
||||
@@ -184,6 +196,12 @@ pub trait Peer {
|
||||
|
||||
/// Restart sync for a peer.
|
||||
fn restart_sync(&self);
|
||||
|
||||
/// Process the queue of pending io messages
|
||||
fn process_all_io_messages(&self);
|
||||
|
||||
/// Process the queue of new block messages
|
||||
fn process_all_new_block_messages(&self);
|
||||
}
|
||||
|
||||
pub struct EthPeer<C> where C: FlushingBlockChainClient {
|
||||
@@ -191,6 +209,41 @@ pub struct EthPeer<C> where C: FlushingBlockChainClient {
|
||||
pub snapshot_service: Arc<TestSnapshotService>,
|
||||
pub sync: RwLock<ChainSync>,
|
||||
pub queue: RwLock<VecDeque<TestPacket>>,
|
||||
pub private_tx_handler: Arc<PrivateTxHandler>,
|
||||
pub io_queue: RwLock<VecDeque<ChainMessageType>>,
|
||||
new_blocks_queue: RwLock<VecDeque<NewBlockMessage>>,
|
||||
}
|
||||
|
||||
impl<C> EthPeer<C> where C: FlushingBlockChainClient {
|
||||
fn is_io_queue_empty(&self) -> bool {
|
||||
self.io_queue.read().is_empty()
|
||||
}
|
||||
|
||||
fn is_new_blocks_queue_empty(&self) -> bool {
|
||||
self.new_blocks_queue.read().is_empty()
|
||||
}
|
||||
|
||||
fn process_io_message(&self, message: ChainMessageType) {
|
||||
let mut io = TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, None);
|
||||
match message {
|
||||
ChainMessageType::Consensus(data) => self.sync.write().propagate_consensus_packet(&mut io, data),
|
||||
ChainMessageType::PrivateTransaction(data) => self.sync.write().propagate_private_transaction(&mut io, data),
|
||||
ChainMessageType::SignedPrivateTransaction(data) => self.sync.write().propagate_signed_private_transaction(&mut io, data),
|
||||
}
|
||||
}
|
||||
|
||||
fn process_new_block_message(&self, message: NewBlockMessage) {
|
||||
let mut io = TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, None);
|
||||
self.sync.write().chain_new_blocks(
|
||||
&mut io,
|
||||
&message.imported,
|
||||
&message.invalid,
|
||||
&message.enacted,
|
||||
&message.retracted,
|
||||
&message.sealed,
|
||||
&message.proposed
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
impl<C: FlushingBlockChainClient> Peer for EthPeer<C> {
|
||||
@@ -198,7 +251,12 @@ impl<C: FlushingBlockChainClient> Peer for EthPeer<C> {
|
||||
|
||||
fn on_connect(&self, other: PeerId) {
|
||||
self.sync.write().update_targets(&*self.chain);
|
||||
self.sync.write().on_peer_connected(&mut TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, Some(other)), other);
|
||||
self.sync.write().on_peer_connected(&mut TestIo::new(
|
||||
&*self.chain,
|
||||
&self.snapshot_service,
|
||||
&self.queue,
|
||||
Some(other)),
|
||||
other);
|
||||
}
|
||||
|
||||
fn on_disconnect(&self, other: PeerId) {
|
||||
@@ -219,7 +277,7 @@ impl<C: FlushingBlockChainClient> Peer for EthPeer<C> {
|
||||
}
|
||||
|
||||
fn is_done(&self) -> bool {
|
||||
self.queue.read().is_empty()
|
||||
self.queue.read().is_empty() && self.is_io_queue_empty() && self.is_new_blocks_queue_empty()
|
||||
}
|
||||
|
||||
fn sync_step(&self) {
|
||||
@@ -232,6 +290,22 @@ impl<C: FlushingBlockChainClient> Peer for EthPeer<C> {
|
||||
fn restart_sync(&self) {
|
||||
self.sync.write().restart(&mut TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, None));
|
||||
}
|
||||
|
||||
fn process_all_io_messages(&self) {
|
||||
if !self.is_io_queue_empty() {
|
||||
while let Some(message) = self.io_queue.write().pop_front() {
|
||||
self.process_io_message(message);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn process_all_new_block_messages(&self) {
|
||||
if !self.is_new_blocks_queue_empty() {
|
||||
while let Some(message) = self.new_blocks_queue.write().pop_front() {
|
||||
self.process_new_block_message(message);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct TestNet<P> {
|
||||
@@ -260,20 +334,35 @@ impl TestNet<EthPeer<TestBlockChainClient>> {
|
||||
for _ in 0..n {
|
||||
let chain = TestBlockChainClient::new();
|
||||
let ss = Arc::new(TestSnapshotService::new());
|
||||
let sync = ChainSync::new(config.clone(), &chain);
|
||||
let private_tx_handler = Arc::new(NoopPrivateTxHandler);
|
||||
let sync = ChainSync::new(config.clone(), &chain, private_tx_handler.clone());
|
||||
net.peers.push(Arc::new(EthPeer {
|
||||
sync: RwLock::new(sync),
|
||||
snapshot_service: ss,
|
||||
chain: Arc::new(chain),
|
||||
queue: RwLock::new(VecDeque::new()),
|
||||
private_tx_handler,
|
||||
io_queue: RwLock::new(VecDeque::new()),
|
||||
new_blocks_queue: RwLock::new(VecDeque::new()),
|
||||
}));
|
||||
}
|
||||
net
|
||||
}
|
||||
|
||||
// relies on Arc uniqueness, which is only true when we haven't registered a ChainNotify.
|
||||
pub fn peer_mut(&mut self, i: usize) -> &mut EthPeer<TestBlockChainClient> {
|
||||
Arc::get_mut(&mut self.peers[i]).expect("Arc never exposed externally")
|
||||
}
|
||||
}
|
||||
|
||||
impl TestNet<EthPeer<EthcoreClient>> {
|
||||
pub fn with_spec_and_accounts<F>(n: usize, config: SyncConfig, spec_factory: F, accounts: Option<Arc<AccountProvider>>) -> Self
|
||||
pub fn with_spec_and_accounts<F>(
|
||||
n: usize,
|
||||
config: SyncConfig,
|
||||
spec_factory: F,
|
||||
accounts: Option<Arc<AccountProvider>>,
|
||||
private_tx_handler: bool,
|
||||
) -> Self
|
||||
where F: Fn() -> Spec
|
||||
{
|
||||
let mut net = TestNet {
|
||||
@@ -282,11 +371,42 @@ impl TestNet<EthPeer<EthcoreClient>> {
|
||||
disconnect_events: Vec::new(),
|
||||
};
|
||||
for _ in 0..n {
|
||||
net.add_peer(config.clone(), spec_factory(), accounts.clone());
|
||||
if private_tx_handler {
|
||||
net.add_peer_with_private_config(config.clone(), spec_factory(), accounts.clone());
|
||||
} else {
|
||||
net.add_peer(config.clone(), spec_factory(), accounts.clone());
|
||||
}
|
||||
}
|
||||
net
|
||||
}
|
||||
|
||||
pub fn add_peer_with_private_config(&mut self, config: SyncConfig, spec: Spec, accounts: Option<Arc<AccountProvider>>) {
|
||||
let channel = IoChannel::disconnected();
|
||||
let client = EthcoreClient::new(
|
||||
ClientConfig::default(),
|
||||
&spec,
|
||||
Arc::new(::kvdb_memorydb::create(::ethcore::db::NUM_COLUMNS.unwrap_or(0))),
|
||||
Arc::new(Miner::with_spec_and_accounts(&spec, accounts.clone())),
|
||||
channel.clone()
|
||||
).unwrap();
|
||||
|
||||
let private_tx_handler = Arc::new(SimplePrivateTxHandler::default());
|
||||
let ss = Arc::new(TestSnapshotService::new());
|
||||
let sync = ChainSync::new(config, &*client, private_tx_handler.clone());
|
||||
let peer = Arc::new(EthPeer {
|
||||
sync: RwLock::new(sync),
|
||||
snapshot_service: ss,
|
||||
chain: client,
|
||||
queue: RwLock::new(VecDeque::new()),
|
||||
private_tx_handler,
|
||||
io_queue: RwLock::new(VecDeque::new()),
|
||||
new_blocks_queue: RwLock::new(VecDeque::new()),
|
||||
});
|
||||
peer.chain.add_notify(peer.clone());
|
||||
//private_provider.add_notify(peer.clone());
|
||||
self.peers.push(peer);
|
||||
}
|
||||
|
||||
pub fn add_peer(&mut self, config: SyncConfig, spec: Spec, accounts: Option<Arc<AccountProvider>>) {
|
||||
let client = EthcoreClient::new(
|
||||
ClientConfig::default(),
|
||||
@@ -297,12 +417,16 @@ impl TestNet<EthPeer<EthcoreClient>> {
|
||||
).unwrap();
|
||||
|
||||
let ss = Arc::new(TestSnapshotService::new());
|
||||
let sync = ChainSync::new(config, &*client);
|
||||
let private_tx_handler = Arc::new(NoopPrivateTxHandler);
|
||||
let sync = ChainSync::new(config, &*client, private_tx_handler.clone());
|
||||
let peer = Arc::new(EthPeer {
|
||||
sync: RwLock::new(sync),
|
||||
snapshot_service: ss,
|
||||
chain: client,
|
||||
queue: RwLock::new(VecDeque::new()),
|
||||
private_tx_handler,
|
||||
io_queue: RwLock::new(VecDeque::new()),
|
||||
new_blocks_queue: RwLock::new(VecDeque::new()),
|
||||
});
|
||||
peer.chain.add_notify(peer.clone());
|
||||
self.peers.push(peer);
|
||||
@@ -366,6 +490,8 @@ impl<P> TestNet<P> where P: Peer {
|
||||
let mut total_steps = 0;
|
||||
while !self.done() {
|
||||
self.sync_step();
|
||||
self.deliver_io_messages();
|
||||
self.deliver_new_block_messages();
|
||||
total_steps += 1;
|
||||
}
|
||||
total_steps
|
||||
@@ -378,18 +504,23 @@ impl<P> TestNet<P> where P: Peer {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn deliver_io_messages(&mut self) {
|
||||
for peer in self.peers.iter() {
|
||||
peer.process_all_io_messages();
|
||||
}
|
||||
}
|
||||
|
||||
pub fn deliver_new_block_messages(&mut self) {
|
||||
for peer in self.peers.iter() {
|
||||
peer.process_all_new_block_messages();
|
||||
}
|
||||
}
|
||||
|
||||
pub fn done(&self) -> bool {
|
||||
self.peers.iter().all(|p| p.is_done())
|
||||
}
|
||||
}
|
||||
|
||||
impl TestNet<EthPeer<TestBlockChainClient>> {
|
||||
// relies on Arc uniqueness, which is only true when we haven't registered a ChainNotify.
|
||||
pub fn peer_mut(&mut self, i: usize) -> &mut EthPeer<TestBlockChainClient> {
|
||||
Arc::get_mut(&mut self.peers[i]).expect("Arc never exposed externally")
|
||||
}
|
||||
}
|
||||
|
||||
impl<C: FlushingBlockChainClient> TestNet<EthPeer<C>> {
|
||||
pub fn trigger_chain_new_blocks(&mut self, peer_id: usize) {
|
||||
let peer = &mut self.peers[peer_id];
|
||||
@@ -397,6 +528,34 @@ impl<C: FlushingBlockChainClient> TestNet<EthPeer<C>> {
|
||||
}
|
||||
}
|
||||
|
||||
pub struct TestIoHandler {
|
||||
pub client: Arc<EthcoreClient>,
|
||||
pub private_tx_queued: Mutex<usize>,
|
||||
}
|
||||
|
||||
impl TestIoHandler {
|
||||
pub fn new(client: Arc<EthcoreClient>) -> Self {
|
||||
TestIoHandler {
|
||||
client,
|
||||
private_tx_queued: Mutex::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl IoHandler<ClientIoMessage> for TestIoHandler {
|
||||
fn message(&self, _io: &IoContext<ClientIoMessage>, net_message: &ClientIoMessage) {
|
||||
match *net_message {
|
||||
ClientIoMessage::NewMessage(ref message) => if let Err(e) = self.client.engine().handle_message(message) {
|
||||
panic!("Invalid message received: {}", e);
|
||||
},
|
||||
ClientIoMessage::NewPrivateTransaction => {
|
||||
*self.private_tx_queued.lock() += 1;
|
||||
},
|
||||
_ => {} // ignore other messages
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ChainNotify for EthPeer<EthcoreClient> {
|
||||
fn new_blocks(&self,
|
||||
imported: Vec<H256>,
|
||||
@@ -407,23 +566,21 @@ impl ChainNotify for EthPeer<EthcoreClient> {
|
||||
proposed: Vec<Bytes>,
|
||||
_duration: u64)
|
||||
{
|
||||
let mut io = TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, None);
|
||||
self.sync.write().chain_new_blocks(
|
||||
&mut io,
|
||||
&imported,
|
||||
&invalid,
|
||||
&enacted,
|
||||
&retracted,
|
||||
&sealed,
|
||||
&proposed);
|
||||
self.new_blocks_queue.write().push_back(NewBlockMessage {
|
||||
imported,
|
||||
invalid,
|
||||
enacted,
|
||||
retracted,
|
||||
sealed,
|
||||
proposed,
|
||||
});
|
||||
}
|
||||
|
||||
fn start(&self) {}
|
||||
|
||||
fn stop(&self) {}
|
||||
|
||||
fn broadcast(&self, message: Vec<u8>) {
|
||||
let mut io = TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, None);
|
||||
self.sync.write().propagate_consensus_packet(&mut io, message.clone());
|
||||
fn broadcast(&self, message_type: ChainMessageType) {
|
||||
self.io_queue.write().push_back(message_type)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user