diff --git a/Cargo.lock b/Cargo.lock index 490c277ec..9d4e2cab6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1078,7 +1078,7 @@ dependencies = [ "ethcore-private-tx 1.0.0", "ethereum-types 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "ethkey 0.3.0", - "ethstore 0.2.0", + "ethstore 0.2.1", "fastmap 0.1.0", "hashdb 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "heapsize 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2401,7 +2401,7 @@ dependencies = [ "ethcore-sync 1.12.0", "ethereum-types 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "ethkey 0.3.0", - "ethstore 0.2.0", + "ethstore 0.2.1", "fake-fetch 0.0.1", "fdlimit 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/ethcore/sync/src/api.rs b/ethcore/sync/src/api.rs index 108d8bf0f..1afbd3960 100644 --- a/ethcore/sync/src/api.rs +++ b/ethcore/sync/src/api.rs @@ -268,7 +268,7 @@ pub struct Params { /// Snapshot service. pub snapshot_service: Arc, /// Private tx service. - pub private_tx_handler: Arc, + pub private_tx_handler: Option>, /// Light data provider. pub provider: Arc<::light::Provider>, /// Network layer configuration. @@ -349,7 +349,7 @@ impl EthSync { let sync = ChainSyncApi::new( params.config, &*params.chain, - params.private_tx_handler.clone(), + params.private_tx_handler.as_ref().cloned(), priority_tasks_rx, ); let service = NetworkService::new(params.network_config.clone().into_basic()?, connection_filter)?; diff --git a/ethcore/sync/src/chain/handler.rs b/ethcore/sync/src/chain/handler.rs index 6a3a50bfd..17fd8c08d 100644 --- a/ethcore/sync/src/chain/handler.rs +++ b/ethcore/sync/src/chain/handler.rs @@ -557,7 +557,9 @@ impl SyncHandler { fn on_peer_status(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> { sync.handshaking_peers.remove(&peer_id); let protocol_version: u8 = r.val_at(0)?; - let warp_protocol = io.protocol_version(&WARP_SYNC_PROTOCOL_ID, peer_id) != 0; + let warp_protocol_version = io.protocol_version(&WARP_SYNC_PROTOCOL_ID, peer_id); + let warp_protocol = warp_protocol_version != 0; + let private_tx_protocol = warp_protocol_version >= PAR_PROTOCOL_VERSION_3.0; let peer = PeerInfo { protocol_version: protocol_version, network_id: r.val_at(1)?, @@ -576,10 +578,26 @@ impl SyncHandler { snapshot_hash: if warp_protocol { Some(r.val_at(5)?) } else { None }, snapshot_number: if warp_protocol { Some(r.val_at(6)?) } else { None }, block_set: None, + private_tx_enabled: if private_tx_protocol { r.val_at(7).unwrap_or(false) } else { false }, }; - trace!(target: "sync", "New peer {} (protocol: {}, network: {:?}, difficulty: {:?}, latest:{}, genesis:{}, snapshot:{:?})", - peer_id, peer.protocol_version, peer.network_id, peer.difficulty, peer.latest_hash, peer.genesis, peer.snapshot_number); + trace!(target: "sync", "New peer {} (\ + protocol: {}, \ + network: {:?}, \ + difficulty: {:?}, \ + latest:{}, \ + genesis:{}, \ + snapshot:{:?}, \ + private_tx_enabled:{})", + peer_id, + peer.protocol_version, + peer.network_id, + peer.difficulty, + peer.latest_hash, + peer.genesis, + peer.snapshot_number, + peer.private_tx_enabled + ); if io.is_expired() { trace!(target: "sync", "Status packet from expired session {}:{}", peer_id, io.peer_info(peer_id)); return Ok(()); @@ -654,9 +672,15 @@ impl SyncHandler { trace!(target: "sync", "{} Ignoring packet from unconfirmed/unknown peer", peer_id); return Ok(()); } - + let private_handler = match sync.private_tx_handler { + Some(ref handler) => handler, + None => { + trace!(target: "sync", "{} Ignoring private tx packet from peer", peer_id); + return Ok(()); + } + }; trace!(target: "sync", "Received signed private transaction packet from {:?}", peer_id); - match sync.private_tx_handler.import_signed_private_transaction(r.as_raw()) { + match private_handler.import_signed_private_transaction(r.as_raw()) { Ok(transaction_hash) => { //don't send the packet back if let Some(ref mut peer) = sync.peers.get_mut(&peer_id) { @@ -676,10 +700,15 @@ impl SyncHandler { trace!(target: "sync", "{} Ignoring packet from unconfirmed/unknown peer", peer_id); return Ok(()); } - + let private_handler = match sync.private_tx_handler { + Some(ref handler) => handler, + None => { + trace!(target: "sync", "{} Ignoring private tx packet from peer", peer_id); + return Ok(()); + } + }; trace!(target: "sync", "Received private transaction packet from {:?}", peer_id); - - match sync.private_tx_handler.import_private_transaction(r.as_raw()) { + match private_handler.import_private_transaction(r.as_raw()) { Ok(transaction_hash) => { //don't send the packet back if let Some(ref mut peer) = sync.peers.get_mut(&peer_id) { diff --git a/ethcore/sync/src/chain/mod.rs b/ethcore/sync/src/chain/mod.rs index ff976f668..63824752a 100644 --- a/ethcore/sync/src/chain/mod.rs +++ b/ethcore/sync/src/chain/mod.rs @@ -333,6 +333,8 @@ pub struct PeerInfo { last_sent_private_transactions: H256FastSet, /// Pending request is expired and result should be ignored expired: bool, + /// Private transactions enabled + private_tx_enabled: bool, /// Peer fork confirmation status confirmation: ForkConfirmation, /// Best snapshot hash @@ -395,7 +397,7 @@ impl ChainSyncApi { pub fn new( config: SyncConfig, chain: &BlockChainClient, - private_tx_handler: Arc, + private_tx_handler: Option>, priority_tasks: mpsc::Receiver, ) -> Self { ChainSyncApi { @@ -626,7 +628,7 @@ pub struct ChainSync { /// Enable ancient block downloading download_old_blocks: bool, /// Shared private tx service. - private_tx_handler: Arc, + private_tx_handler: Option>, /// Enable warp sync. warp_sync: WarpSync, } @@ -636,7 +638,7 @@ impl ChainSync { pub fn new( config: SyncConfig, chain: &BlockChainClient, - private_tx_handler: Arc, + private_tx_handler: Option>, ) -> Self { let chain_info = chain.chain_info(); let best_block = chain.chain_info().best_block_number; @@ -1120,9 +1122,11 @@ impl ChainSync { fn send_status(&mut self, io: &mut SyncIo, peer: PeerId) -> Result<(), network::Error> { let warp_protocol_version = io.protocol_version(&WARP_SYNC_PROTOCOL_ID, peer); let warp_protocol = warp_protocol_version != 0; + let private_tx_protocol = warp_protocol_version >= PAR_PROTOCOL_VERSION_3.0; let protocol = if warp_protocol { warp_protocol_version } else { ETH_PROTOCOL_VERSION_63.0 }; trace!(target: "sync", "Sending status to {}, protocol version {}", peer, protocol); - let mut packet = RlpStream::new_list(if warp_protocol { 7 } else { 5 }); + let mut packet = RlpStream::new(); + packet.begin_unbounded_list(); let chain = io.chain().chain_info(); packet.append(&(protocol as u32)); packet.append(&self.network_id); @@ -1135,7 +1139,11 @@ impl ChainSync { let manifest_hash = manifest.map_or(H256::new(), |m| keccak(m.into_rlp())); packet.append(&manifest_hash); packet.append(&block_number); + if private_tx_protocol { + packet.append(&self.private_tx_handler.is_some()); + } } + packet.complete_unbounded_list(); io.respond(STATUS_PACKET, packet.out()) } @@ -1246,7 +1254,8 @@ impl ChainSync { fn get_private_transaction_peers(&self, transaction_hash: &H256) -> Vec { self.peers.iter().filter_map( |(id, p)| if p.protocol_version >= PAR_PROTOCOL_VERSION_3.0 - && !p.last_sent_private_transactions.contains(transaction_hash) { + && !p.last_sent_private_transactions.contains(transaction_hash) + && p.private_tx_enabled { Some(*id) } else { None @@ -1342,7 +1351,6 @@ pub mod tests { use ethcore::client::{BlockChainClient, EachBlockWith, TestBlockChainClient, ChainInfo, BlockInfo}; use ethcore::miner::{MinerService, PendingOrdering}; use types::header::Header; - use private_tx::NoopPrivateTxHandler; pub fn get_dummy_block(order: u32, parent_hash: H256) -> Bytes { let mut header = Header::new(); @@ -1426,7 +1434,7 @@ pub mod tests { } pub fn dummy_sync_with_peer(peer_latest_hash: H256, client: &BlockChainClient) -> ChainSync { - let mut sync = ChainSync::new(SyncConfig::default(), client, Arc::new(NoopPrivateTxHandler)); + let mut sync = ChainSync::new(SyncConfig::default(), client, None); insert_dummy_peer(&mut sync, 0, peer_latest_hash); sync } @@ -1446,6 +1454,7 @@ pub mod tests { last_sent_transactions: Default::default(), last_sent_private_transactions: Default::default(), expired: false, + private_tx_enabled: false, confirmation: super::ForkConfirmation::Confirmed, snapshot_number: None, snapshot_hash: None, diff --git a/ethcore/sync/src/chain/propagator.rs b/ethcore/sync/src/chain/propagator.rs index de57ede09..06d6cf54a 100644 --- a/ethcore/sync/src/chain/propagator.rs +++ b/ethcore/sync/src/chain/propagator.rs @@ -313,12 +313,16 @@ impl SyncPropagator { /// Broadcast private transaction message to peers. pub fn propagate_private_transaction(sync: &mut ChainSync, io: &mut SyncIo, transaction_hash: H256, packet_id: PacketId, packet: Bytes) { let lucky_peers = ChainSync::select_random_peers(&sync.get_private_transaction_peers(&transaction_hash)); - trace!(target: "sync", "Sending private transaction packet to {:?}", lucky_peers); - for peer_id in lucky_peers { - if let Some(ref mut peer) = sync.peers.get_mut(&peer_id) { - peer.last_sent_private_transactions.insert(transaction_hash); + if lucky_peers.is_empty() { + error!(target: "privatetx", "Cannot propagate the packet, no peers with private tx enabled connected"); + } else { + trace!(target: "privatetx", "Sending private transaction packet to {:?}", lucky_peers); + for peer_id in lucky_peers { + if let Some(ref mut peer) = sync.peers.get_mut(&peer_id) { + peer.last_sent_private_transactions.insert(transaction_hash); + } + SyncPropagator::send_packet(io, peer_id, packet_id, packet.clone()); } - SyncPropagator::send_packet(io, peer_id, packet_id, packet.clone()); } } @@ -350,7 +354,6 @@ impl SyncPropagator { mod tests { use ethcore::client::{BlockInfo, ChainInfo, EachBlockWith, TestBlockChainClient}; use parking_lot::RwLock; - use private_tx::NoopPrivateTxHandler; use rlp::{Rlp}; use std::collections::{VecDeque}; use tests::helpers::{TestIo}; @@ -426,7 +429,7 @@ mod tests { client.add_blocks(2, EachBlockWith::Uncle); let queue = RwLock::new(VecDeque::new()); let block = client.block(BlockId::Latest).unwrap().into_inner(); - let mut sync = ChainSync::new(SyncConfig::default(), &client, Arc::new(NoopPrivateTxHandler)); + let mut sync = ChainSync::new(SyncConfig::default(), &client, None); sync.peers.insert(0, PeerInfo { // Messaging protocol @@ -442,6 +445,7 @@ mod tests { last_sent_transactions: Default::default(), last_sent_private_transactions: Default::default(), expired: false, + private_tx_enabled: false, confirmation: ForkConfirmation::Confirmed, snapshot_number: None, snapshot_hash: None, @@ -514,7 +518,7 @@ mod tests { client.add_blocks(100, EachBlockWith::Uncle); client.insert_transaction_to_queue(); // Sync with no peers - let mut sync = ChainSync::new(SyncConfig::default(), &client, Arc::new(NoopPrivateTxHandler)); + let mut sync = ChainSync::new(SyncConfig::default(), &client, None); let queue = RwLock::new(VecDeque::new()); let ss = TestSnapshotService::new(); let mut io = TestIo::new(&mut client, &ss, &queue, None); @@ -584,7 +588,7 @@ mod tests { let mut client = TestBlockChainClient::new(); client.insert_transaction_with_gas_price_to_queue(U256::zero()); let block_hash = client.block_hash_delta_minus(1); - let mut sync = ChainSync::new(SyncConfig::default(), &client, Arc::new(NoopPrivateTxHandler)); + let mut sync = ChainSync::new(SyncConfig::default(), &client, None); let queue = RwLock::new(VecDeque::new()); let ss = TestSnapshotService::new(); let mut io = TestIo::new(&mut client, &ss, &queue, None); @@ -617,7 +621,7 @@ mod tests { let tx1_hash = client.insert_transaction_to_queue(); let tx2_hash = client.insert_transaction_with_gas_price_to_queue(U256::zero()); let block_hash = client.block_hash_delta_minus(1); - let mut sync = ChainSync::new(SyncConfig::default(), &client, Arc::new(NoopPrivateTxHandler)); + let mut sync = ChainSync::new(SyncConfig::default(), &client, None); let queue = RwLock::new(VecDeque::new()); let ss = TestSnapshotService::new(); let mut io = TestIo::new(&mut client, &ss, &queue, None); diff --git a/ethcore/sync/src/tests/helpers.rs b/ethcore/sync/src/tests/helpers.rs index 915a3a26a..60f49cd2d 100644 --- a/ethcore/sync/src/tests/helpers.rs +++ b/ethcore/sync/src/tests/helpers.rs @@ -341,7 +341,7 @@ impl TestNet> { let chain = TestBlockChainClient::new(); let ss = Arc::new(TestSnapshotService::new()); let private_tx_handler = Arc::new(SimplePrivateTxHandler::default()); - let sync = ChainSync::new(config.clone(), &chain, private_tx_handler.clone()); + let sync = ChainSync::new(config.clone(), &chain, Some(private_tx_handler.clone())); net.peers.push(Arc::new(EthPeer { sync: RwLock::new(sync), snapshot_service: ss, @@ -395,7 +395,7 @@ impl TestNet> { let private_tx_handler = Arc::new(SimplePrivateTxHandler::default()); let ss = Arc::new(TestSnapshotService::new()); - let sync = ChainSync::new(config, &*client, private_tx_handler.clone()); + let sync = ChainSync::new(config, &*client, Some(private_tx_handler.clone())); let peer = Arc::new(EthPeer { sync: RwLock::new(sync), snapshot_service: ss, diff --git a/parity/modules.rs b/parity/modules.rs index ac84aea5f..3b5add612 100644 --- a/parity/modules.rs +++ b/parity/modules.rs @@ -37,7 +37,7 @@ pub fn sync( network_config: NetworkConfiguration, chain: Arc, snapshot_service: Arc, - private_tx_handler: Arc, + private_tx_handler: Option>, provider: Arc, _log_settings: &LogConfig, attached_protos: Vec, diff --git a/parity/run.rs b/parity/run.rs index 4be9539a5..1df495228 100644 --- a/parity/run.rs +++ b/parity/run.rs @@ -31,7 +31,7 @@ use ethcore::verification::queue::VerifierSettings; use ethcore_logger::{Config as LogConfig, RotatingLogger}; use ethcore_service::ClientService; use ethereum_types::Address; -use sync::{self, SyncConfig}; +use sync::{self, SyncConfig, PrivateTxHandler}; use miner::work_notify::WorkPoster; use futures::IntoFuture; use hash_fetch::{self, fetch}; @@ -666,13 +666,18 @@ fn execute_impl(cmd: RunCmd, logger: Arc, on_client_rq: None }; + let private_tx_sync: Option> = match cmd.private_tx_enabled { + true => Some(private_tx_service.clone() as Arc), + false => None, + }; + // create sync object let (sync_provider, manage_network, chain_notify, priority_tasks) = modules::sync( sync_config, net_conf.clone().into(), client.clone(), snapshot_service.clone(), - private_tx_service.clone(), + private_tx_sync, client.clone(), &cmd.logger_config, attached_protos,