Private tx enabled flag added into STATUS packet (#9999)

* Add private tx enabled flag into status packet

* Error log added for the case with no peers available

* Add flag only for supported protocol versions

* Work with private handler refactored

* Log target changed

* Cargo.lock updated
This commit is contained in:
Anton Gavrilov 2019-01-04 19:57:01 +01:00 committed by Afri Schoedon
parent 90fb473d87
commit b180be7526
8 changed files with 81 additions and 34 deletions

4
Cargo.lock generated
View File

@ -1078,7 +1078,7 @@ dependencies = [
"ethcore-private-tx 1.0.0", "ethcore-private-tx 1.0.0",
"ethereum-types 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "ethereum-types 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
"ethkey 0.3.0", "ethkey 0.3.0",
"ethstore 0.2.0", "ethstore 0.2.1",
"fastmap 0.1.0", "fastmap 0.1.0",
"hashdb 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "hashdb 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
"heapsize 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", "heapsize 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)",
@ -2401,7 +2401,7 @@ dependencies = [
"ethcore-sync 1.12.0", "ethcore-sync 1.12.0",
"ethereum-types 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "ethereum-types 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
"ethkey 0.3.0", "ethkey 0.3.0",
"ethstore 0.2.0", "ethstore 0.2.1",
"fake-fetch 0.0.1", "fake-fetch 0.0.1",
"fdlimit 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "fdlimit 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)",

View File

@ -268,7 +268,7 @@ pub struct Params {
/// Snapshot service. /// Snapshot service.
pub snapshot_service: Arc<SnapshotService>, pub snapshot_service: Arc<SnapshotService>,
/// Private tx service. /// Private tx service.
pub private_tx_handler: Arc<PrivateTxHandler>, pub private_tx_handler: Option<Arc<PrivateTxHandler>>,
/// Light data provider. /// Light data provider.
pub provider: Arc<::light::Provider>, pub provider: Arc<::light::Provider>,
/// Network layer configuration. /// Network layer configuration.
@ -349,7 +349,7 @@ impl EthSync {
let sync = ChainSyncApi::new( let sync = ChainSyncApi::new(
params.config, params.config,
&*params.chain, &*params.chain,
params.private_tx_handler.clone(), params.private_tx_handler.as_ref().cloned(),
priority_tasks_rx, priority_tasks_rx,
); );
let service = NetworkService::new(params.network_config.clone().into_basic()?, connection_filter)?; let service = NetworkService::new(params.network_config.clone().into_basic()?, connection_filter)?;

View File

@ -557,7 +557,9 @@ impl SyncHandler {
fn on_peer_status(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> { fn on_peer_status(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> {
sync.handshaking_peers.remove(&peer_id); sync.handshaking_peers.remove(&peer_id);
let protocol_version: u8 = r.val_at(0)?; let protocol_version: u8 = r.val_at(0)?;
let warp_protocol = io.protocol_version(&WARP_SYNC_PROTOCOL_ID, peer_id) != 0; let warp_protocol_version = io.protocol_version(&WARP_SYNC_PROTOCOL_ID, peer_id);
let warp_protocol = warp_protocol_version != 0;
let private_tx_protocol = warp_protocol_version >= PAR_PROTOCOL_VERSION_3.0;
let peer = PeerInfo { let peer = PeerInfo {
protocol_version: protocol_version, protocol_version: protocol_version,
network_id: r.val_at(1)?, network_id: r.val_at(1)?,
@ -576,10 +578,26 @@ impl SyncHandler {
snapshot_hash: if warp_protocol { Some(r.val_at(5)?) } else { None }, snapshot_hash: if warp_protocol { Some(r.val_at(5)?) } else { None },
snapshot_number: if warp_protocol { Some(r.val_at(6)?) } else { None }, snapshot_number: if warp_protocol { Some(r.val_at(6)?) } else { None },
block_set: None, block_set: None,
private_tx_enabled: if private_tx_protocol { r.val_at(7).unwrap_or(false) } else { false },
}; };
trace!(target: "sync", "New peer {} (protocol: {}, network: {:?}, difficulty: {:?}, latest:{}, genesis:{}, snapshot:{:?})", trace!(target: "sync", "New peer {} (\
peer_id, peer.protocol_version, peer.network_id, peer.difficulty, peer.latest_hash, peer.genesis, peer.snapshot_number); protocol: {}, \
network: {:?}, \
difficulty: {:?}, \
latest:{}, \
genesis:{}, \
snapshot:{:?}, \
private_tx_enabled:{})",
peer_id,
peer.protocol_version,
peer.network_id,
peer.difficulty,
peer.latest_hash,
peer.genesis,
peer.snapshot_number,
peer.private_tx_enabled
);
if io.is_expired() { if io.is_expired() {
trace!(target: "sync", "Status packet from expired session {}:{}", peer_id, io.peer_info(peer_id)); trace!(target: "sync", "Status packet from expired session {}:{}", peer_id, io.peer_info(peer_id));
return Ok(()); return Ok(());
@ -654,9 +672,15 @@ impl SyncHandler {
trace!(target: "sync", "{} Ignoring packet from unconfirmed/unknown peer", peer_id); trace!(target: "sync", "{} Ignoring packet from unconfirmed/unknown peer", peer_id);
return Ok(()); return Ok(());
} }
let private_handler = match sync.private_tx_handler {
Some(ref handler) => handler,
None => {
trace!(target: "sync", "{} Ignoring private tx packet from peer", peer_id);
return Ok(());
}
};
trace!(target: "sync", "Received signed private transaction packet from {:?}", peer_id); trace!(target: "sync", "Received signed private transaction packet from {:?}", peer_id);
match sync.private_tx_handler.import_signed_private_transaction(r.as_raw()) { match private_handler.import_signed_private_transaction(r.as_raw()) {
Ok(transaction_hash) => { Ok(transaction_hash) => {
//don't send the packet back //don't send the packet back
if let Some(ref mut peer) = sync.peers.get_mut(&peer_id) { if let Some(ref mut peer) = sync.peers.get_mut(&peer_id) {
@ -676,10 +700,15 @@ impl SyncHandler {
trace!(target: "sync", "{} Ignoring packet from unconfirmed/unknown peer", peer_id); trace!(target: "sync", "{} Ignoring packet from unconfirmed/unknown peer", peer_id);
return Ok(()); return Ok(());
} }
let private_handler = match sync.private_tx_handler {
Some(ref handler) => handler,
None => {
trace!(target: "sync", "{} Ignoring private tx packet from peer", peer_id);
return Ok(());
}
};
trace!(target: "sync", "Received private transaction packet from {:?}", peer_id); trace!(target: "sync", "Received private transaction packet from {:?}", peer_id);
match private_handler.import_private_transaction(r.as_raw()) {
match sync.private_tx_handler.import_private_transaction(r.as_raw()) {
Ok(transaction_hash) => { Ok(transaction_hash) => {
//don't send the packet back //don't send the packet back
if let Some(ref mut peer) = sync.peers.get_mut(&peer_id) { if let Some(ref mut peer) = sync.peers.get_mut(&peer_id) {

View File

@ -333,6 +333,8 @@ pub struct PeerInfo {
last_sent_private_transactions: H256FastSet, last_sent_private_transactions: H256FastSet,
/// Pending request is expired and result should be ignored /// Pending request is expired and result should be ignored
expired: bool, expired: bool,
/// Private transactions enabled
private_tx_enabled: bool,
/// Peer fork confirmation status /// Peer fork confirmation status
confirmation: ForkConfirmation, confirmation: ForkConfirmation,
/// Best snapshot hash /// Best snapshot hash
@ -395,7 +397,7 @@ impl ChainSyncApi {
pub fn new( pub fn new(
config: SyncConfig, config: SyncConfig,
chain: &BlockChainClient, chain: &BlockChainClient,
private_tx_handler: Arc<PrivateTxHandler>, private_tx_handler: Option<Arc<PrivateTxHandler>>,
priority_tasks: mpsc::Receiver<PriorityTask>, priority_tasks: mpsc::Receiver<PriorityTask>,
) -> Self { ) -> Self {
ChainSyncApi { ChainSyncApi {
@ -626,7 +628,7 @@ pub struct ChainSync {
/// Enable ancient block downloading /// Enable ancient block downloading
download_old_blocks: bool, download_old_blocks: bool,
/// Shared private tx service. /// Shared private tx service.
private_tx_handler: Arc<PrivateTxHandler>, private_tx_handler: Option<Arc<PrivateTxHandler>>,
/// Enable warp sync. /// Enable warp sync.
warp_sync: WarpSync, warp_sync: WarpSync,
} }
@ -636,7 +638,7 @@ impl ChainSync {
pub fn new( pub fn new(
config: SyncConfig, config: SyncConfig,
chain: &BlockChainClient, chain: &BlockChainClient,
private_tx_handler: Arc<PrivateTxHandler>, private_tx_handler: Option<Arc<PrivateTxHandler>>,
) -> Self { ) -> Self {
let chain_info = chain.chain_info(); let chain_info = chain.chain_info();
let best_block = chain.chain_info().best_block_number; let best_block = chain.chain_info().best_block_number;
@ -1120,9 +1122,11 @@ impl ChainSync {
fn send_status(&mut self, io: &mut SyncIo, peer: PeerId) -> Result<(), network::Error> { fn send_status(&mut self, io: &mut SyncIo, peer: PeerId) -> Result<(), network::Error> {
let warp_protocol_version = io.protocol_version(&WARP_SYNC_PROTOCOL_ID, peer); let warp_protocol_version = io.protocol_version(&WARP_SYNC_PROTOCOL_ID, peer);
let warp_protocol = warp_protocol_version != 0; let warp_protocol = warp_protocol_version != 0;
let private_tx_protocol = warp_protocol_version >= PAR_PROTOCOL_VERSION_3.0;
let protocol = if warp_protocol { warp_protocol_version } else { ETH_PROTOCOL_VERSION_63.0 }; let protocol = if warp_protocol { warp_protocol_version } else { ETH_PROTOCOL_VERSION_63.0 };
trace!(target: "sync", "Sending status to {}, protocol version {}", peer, protocol); trace!(target: "sync", "Sending status to {}, protocol version {}", peer, protocol);
let mut packet = RlpStream::new_list(if warp_protocol { 7 } else { 5 }); let mut packet = RlpStream::new();
packet.begin_unbounded_list();
let chain = io.chain().chain_info(); let chain = io.chain().chain_info();
packet.append(&(protocol as u32)); packet.append(&(protocol as u32));
packet.append(&self.network_id); packet.append(&self.network_id);
@ -1135,7 +1139,11 @@ impl ChainSync {
let manifest_hash = manifest.map_or(H256::new(), |m| keccak(m.into_rlp())); let manifest_hash = manifest.map_or(H256::new(), |m| keccak(m.into_rlp()));
packet.append(&manifest_hash); packet.append(&manifest_hash);
packet.append(&block_number); packet.append(&block_number);
if private_tx_protocol {
packet.append(&self.private_tx_handler.is_some());
}
} }
packet.complete_unbounded_list();
io.respond(STATUS_PACKET, packet.out()) io.respond(STATUS_PACKET, packet.out())
} }
@ -1246,7 +1254,8 @@ impl ChainSync {
fn get_private_transaction_peers(&self, transaction_hash: &H256) -> Vec<PeerId> { fn get_private_transaction_peers(&self, transaction_hash: &H256) -> Vec<PeerId> {
self.peers.iter().filter_map( self.peers.iter().filter_map(
|(id, p)| if p.protocol_version >= PAR_PROTOCOL_VERSION_3.0 |(id, p)| if p.protocol_version >= PAR_PROTOCOL_VERSION_3.0
&& !p.last_sent_private_transactions.contains(transaction_hash) { && !p.last_sent_private_transactions.contains(transaction_hash)
&& p.private_tx_enabled {
Some(*id) Some(*id)
} else { } else {
None None
@ -1342,7 +1351,6 @@ pub mod tests {
use ethcore::client::{BlockChainClient, EachBlockWith, TestBlockChainClient, ChainInfo, BlockInfo}; use ethcore::client::{BlockChainClient, EachBlockWith, TestBlockChainClient, ChainInfo, BlockInfo};
use ethcore::miner::{MinerService, PendingOrdering}; use ethcore::miner::{MinerService, PendingOrdering};
use types::header::Header; use types::header::Header;
use private_tx::NoopPrivateTxHandler;
pub fn get_dummy_block(order: u32, parent_hash: H256) -> Bytes { pub fn get_dummy_block(order: u32, parent_hash: H256) -> Bytes {
let mut header = Header::new(); let mut header = Header::new();
@ -1426,7 +1434,7 @@ pub mod tests {
} }
pub fn dummy_sync_with_peer(peer_latest_hash: H256, client: &BlockChainClient) -> ChainSync { pub fn dummy_sync_with_peer(peer_latest_hash: H256, client: &BlockChainClient) -> ChainSync {
let mut sync = ChainSync::new(SyncConfig::default(), client, Arc::new(NoopPrivateTxHandler)); let mut sync = ChainSync::new(SyncConfig::default(), client, None);
insert_dummy_peer(&mut sync, 0, peer_latest_hash); insert_dummy_peer(&mut sync, 0, peer_latest_hash);
sync sync
} }
@ -1446,6 +1454,7 @@ pub mod tests {
last_sent_transactions: Default::default(), last_sent_transactions: Default::default(),
last_sent_private_transactions: Default::default(), last_sent_private_transactions: Default::default(),
expired: false, expired: false,
private_tx_enabled: false,
confirmation: super::ForkConfirmation::Confirmed, confirmation: super::ForkConfirmation::Confirmed,
snapshot_number: None, snapshot_number: None,
snapshot_hash: None, snapshot_hash: None,

View File

@ -313,12 +313,16 @@ impl SyncPropagator {
/// Broadcast private transaction message to peers. /// Broadcast private transaction message to peers.
pub fn propagate_private_transaction(sync: &mut ChainSync, io: &mut SyncIo, transaction_hash: H256, packet_id: PacketId, packet: Bytes) { pub fn propagate_private_transaction(sync: &mut ChainSync, io: &mut SyncIo, transaction_hash: H256, packet_id: PacketId, packet: Bytes) {
let lucky_peers = ChainSync::select_random_peers(&sync.get_private_transaction_peers(&transaction_hash)); let lucky_peers = ChainSync::select_random_peers(&sync.get_private_transaction_peers(&transaction_hash));
trace!(target: "sync", "Sending private transaction packet to {:?}", lucky_peers); if lucky_peers.is_empty() {
for peer_id in lucky_peers { error!(target: "privatetx", "Cannot propagate the packet, no peers with private tx enabled connected");
if let Some(ref mut peer) = sync.peers.get_mut(&peer_id) { } else {
peer.last_sent_private_transactions.insert(transaction_hash); trace!(target: "privatetx", "Sending private transaction packet to {:?}", lucky_peers);
for peer_id in lucky_peers {
if let Some(ref mut peer) = sync.peers.get_mut(&peer_id) {
peer.last_sent_private_transactions.insert(transaction_hash);
}
SyncPropagator::send_packet(io, peer_id, packet_id, packet.clone());
} }
SyncPropagator::send_packet(io, peer_id, packet_id, packet.clone());
} }
} }
@ -350,7 +354,6 @@ impl SyncPropagator {
mod tests { mod tests {
use ethcore::client::{BlockInfo, ChainInfo, EachBlockWith, TestBlockChainClient}; use ethcore::client::{BlockInfo, ChainInfo, EachBlockWith, TestBlockChainClient};
use parking_lot::RwLock; use parking_lot::RwLock;
use private_tx::NoopPrivateTxHandler;
use rlp::{Rlp}; use rlp::{Rlp};
use std::collections::{VecDeque}; use std::collections::{VecDeque};
use tests::helpers::{TestIo}; use tests::helpers::{TestIo};
@ -426,7 +429,7 @@ mod tests {
client.add_blocks(2, EachBlockWith::Uncle); client.add_blocks(2, EachBlockWith::Uncle);
let queue = RwLock::new(VecDeque::new()); let queue = RwLock::new(VecDeque::new());
let block = client.block(BlockId::Latest).unwrap().into_inner(); let block = client.block(BlockId::Latest).unwrap().into_inner();
let mut sync = ChainSync::new(SyncConfig::default(), &client, Arc::new(NoopPrivateTxHandler)); let mut sync = ChainSync::new(SyncConfig::default(), &client, None);
sync.peers.insert(0, sync.peers.insert(0,
PeerInfo { PeerInfo {
// Messaging protocol // Messaging protocol
@ -442,6 +445,7 @@ mod tests {
last_sent_transactions: Default::default(), last_sent_transactions: Default::default(),
last_sent_private_transactions: Default::default(), last_sent_private_transactions: Default::default(),
expired: false, expired: false,
private_tx_enabled: false,
confirmation: ForkConfirmation::Confirmed, confirmation: ForkConfirmation::Confirmed,
snapshot_number: None, snapshot_number: None,
snapshot_hash: None, snapshot_hash: None,
@ -514,7 +518,7 @@ mod tests {
client.add_blocks(100, EachBlockWith::Uncle); client.add_blocks(100, EachBlockWith::Uncle);
client.insert_transaction_to_queue(); client.insert_transaction_to_queue();
// Sync with no peers // Sync with no peers
let mut sync = ChainSync::new(SyncConfig::default(), &client, Arc::new(NoopPrivateTxHandler)); let mut sync = ChainSync::new(SyncConfig::default(), &client, None);
let queue = RwLock::new(VecDeque::new()); let queue = RwLock::new(VecDeque::new());
let ss = TestSnapshotService::new(); let ss = TestSnapshotService::new();
let mut io = TestIo::new(&mut client, &ss, &queue, None); let mut io = TestIo::new(&mut client, &ss, &queue, None);
@ -584,7 +588,7 @@ mod tests {
let mut client = TestBlockChainClient::new(); let mut client = TestBlockChainClient::new();
client.insert_transaction_with_gas_price_to_queue(U256::zero()); client.insert_transaction_with_gas_price_to_queue(U256::zero());
let block_hash = client.block_hash_delta_minus(1); let block_hash = client.block_hash_delta_minus(1);
let mut sync = ChainSync::new(SyncConfig::default(), &client, Arc::new(NoopPrivateTxHandler)); let mut sync = ChainSync::new(SyncConfig::default(), &client, None);
let queue = RwLock::new(VecDeque::new()); let queue = RwLock::new(VecDeque::new());
let ss = TestSnapshotService::new(); let ss = TestSnapshotService::new();
let mut io = TestIo::new(&mut client, &ss, &queue, None); let mut io = TestIo::new(&mut client, &ss, &queue, None);
@ -617,7 +621,7 @@ mod tests {
let tx1_hash = client.insert_transaction_to_queue(); let tx1_hash = client.insert_transaction_to_queue();
let tx2_hash = client.insert_transaction_with_gas_price_to_queue(U256::zero()); let tx2_hash = client.insert_transaction_with_gas_price_to_queue(U256::zero());
let block_hash = client.block_hash_delta_minus(1); let block_hash = client.block_hash_delta_minus(1);
let mut sync = ChainSync::new(SyncConfig::default(), &client, Arc::new(NoopPrivateTxHandler)); let mut sync = ChainSync::new(SyncConfig::default(), &client, None);
let queue = RwLock::new(VecDeque::new()); let queue = RwLock::new(VecDeque::new());
let ss = TestSnapshotService::new(); let ss = TestSnapshotService::new();
let mut io = TestIo::new(&mut client, &ss, &queue, None); let mut io = TestIo::new(&mut client, &ss, &queue, None);

View File

@ -341,7 +341,7 @@ impl TestNet<EthPeer<TestBlockChainClient>> {
let chain = TestBlockChainClient::new(); let chain = TestBlockChainClient::new();
let ss = Arc::new(TestSnapshotService::new()); let ss = Arc::new(TestSnapshotService::new());
let private_tx_handler = Arc::new(SimplePrivateTxHandler::default()); let private_tx_handler = Arc::new(SimplePrivateTxHandler::default());
let sync = ChainSync::new(config.clone(), &chain, private_tx_handler.clone()); let sync = ChainSync::new(config.clone(), &chain, Some(private_tx_handler.clone()));
net.peers.push(Arc::new(EthPeer { net.peers.push(Arc::new(EthPeer {
sync: RwLock::new(sync), sync: RwLock::new(sync),
snapshot_service: ss, snapshot_service: ss,
@ -395,7 +395,7 @@ impl TestNet<EthPeer<EthcoreClient>> {
let private_tx_handler = Arc::new(SimplePrivateTxHandler::default()); let private_tx_handler = Arc::new(SimplePrivateTxHandler::default());
let ss = Arc::new(TestSnapshotService::new()); let ss = Arc::new(TestSnapshotService::new());
let sync = ChainSync::new(config, &*client, private_tx_handler.clone()); let sync = ChainSync::new(config, &*client, Some(private_tx_handler.clone()));
let peer = Arc::new(EthPeer { let peer = Arc::new(EthPeer {
sync: RwLock::new(sync), sync: RwLock::new(sync),
snapshot_service: ss, snapshot_service: ss,

View File

@ -37,7 +37,7 @@ pub fn sync(
network_config: NetworkConfiguration, network_config: NetworkConfiguration,
chain: Arc<BlockChainClient>, chain: Arc<BlockChainClient>,
snapshot_service: Arc<SnapshotService>, snapshot_service: Arc<SnapshotService>,
private_tx_handler: Arc<PrivateTxHandler>, private_tx_handler: Option<Arc<PrivateTxHandler>>,
provider: Arc<Provider>, provider: Arc<Provider>,
_log_settings: &LogConfig, _log_settings: &LogConfig,
attached_protos: Vec<AttachedProtocol>, attached_protos: Vec<AttachedProtocol>,

View File

@ -31,7 +31,7 @@ use ethcore::verification::queue::VerifierSettings;
use ethcore_logger::{Config as LogConfig, RotatingLogger}; use ethcore_logger::{Config as LogConfig, RotatingLogger};
use ethcore_service::ClientService; use ethcore_service::ClientService;
use ethereum_types::Address; use ethereum_types::Address;
use sync::{self, SyncConfig}; use sync::{self, SyncConfig, PrivateTxHandler};
use miner::work_notify::WorkPoster; use miner::work_notify::WorkPoster;
use futures::IntoFuture; use futures::IntoFuture;
use hash_fetch::{self, fetch}; use hash_fetch::{self, fetch};
@ -666,13 +666,18 @@ fn execute_impl<Cr, Rr>(cmd: RunCmd, logger: Arc<RotatingLogger>, on_client_rq:
None None
}; };
let private_tx_sync: Option<Arc<PrivateTxHandler>> = match cmd.private_tx_enabled {
true => Some(private_tx_service.clone() as Arc<PrivateTxHandler>),
false => None,
};
// create sync object // create sync object
let (sync_provider, manage_network, chain_notify, priority_tasks) = modules::sync( let (sync_provider, manage_network, chain_notify, priority_tasks) = modules::sync(
sync_config, sync_config,
net_conf.clone().into(), net_conf.clone().into(),
client.clone(), client.clone(),
snapshot_service.clone(), snapshot_service.clone(),
private_tx_service.clone(), private_tx_sync,
client.clone(), client.clone(),
&cmd.logger_config, &cmd.logger_config,
attached_protos, attached_protos,