From da7b30628b310e4247481e03170bb9a46c4e0d8d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Mon, 20 Jun 2016 15:20:55 +0200 Subject: [PATCH 01/15] Fixing warnings --- ethcore/src/account_provider.rs | 1 - parity/configuration.rs | 7 ++++--- parity/io_handler.rs | 5 ++--- parity/main.rs | 2 +- util/src/network/host.rs | 9 +++------ 5 files changed, 10 insertions(+), 14 deletions(-) diff --git a/ethcore/src/account_provider.rs b/ethcore/src/account_provider.rs index 3c79cfd52..6744c6bd2 100644 --- a/ethcore/src/account_provider.rs +++ b/ethcore/src/account_provider.rs @@ -236,7 +236,6 @@ impl AccountProvider { #[cfg(test)] mod tests { use super::AccountProvider; - use ethstore::SecretStore; use ethstore::ethkey::{Generator, Random}; #[test] diff --git a/parity/configuration.rs b/parity/configuration.rs index 7c42f06e1..967a07a07 100644 --- a/parity/configuration.rs +++ b/parity/configuration.rs @@ -263,9 +263,10 @@ impl Configuration { }).collect::>(); if !self.args.flag_no_import_keys { - let dir_type = match self.args.flag_testnet { - true => DirectoryType::Testnet, - false => DirectoryType::Main, + let dir_type = if self.args.flag_testnet { + DirectoryType::Testnet + } else { + DirectoryType::Main }; let from = GethDirectory::open(dir_type); diff --git a/parity/io_handler.rs b/parity/io_handler.rs index a94582b1d..3f0c04fbd 100644 --- a/parity/io_handler.rs +++ b/parity/io_handler.rs @@ -39,9 +39,8 @@ impl IoHandler for ClientIoHandler { } fn timeout(&self, _io: &IoContext, timer: TimerToken) { - match timer { - INFO_TIMER => { self.info.tick(&self.client, Some(&self.sync)); } - _ => {} + if let INFO_TIMER = timer { + self.info.tick(&self.client, Some(&self.sync)); } } diff --git a/parity/main.rs b/parity/main.rs index 9b3698c5c..1397de8c7 100644 --- a/parity/main.rs +++ b/parity/main.rs @@ -485,7 +485,7 @@ fn execute_signer(conf: Configuration) { } fn execute_account_cli(conf: Configuration) { - use ethcore::ethstore::{SecretStore, EthStore, import_accounts}; + use ethcore::ethstore::{EthStore, import_accounts}; use ethcore::ethstore::dir::DiskDirectory; use ethcore::account_provider::AccountProvider; use rpassword::read_password; diff --git a/util/src/network/host.rs b/util/src/network/host.rs index aef56fc09..92d6a77ae 100644 --- a/util/src/network/host.rs +++ b/util/src/network/host.rs @@ -663,13 +663,10 @@ impl Host where Message: Send + Sync + Clone { match s.readable(io, &self.info.read().unwrap()) { Err(e) => { trace!(target: "network", "Session read error: {}:{:?} ({:?}) {:?}", token, s.id(), s.remote_addr(), e); - match e { - UtilError::Network(NetworkError::Disconnect(DisconnectReason::IncompatibleProtocol)) => { - if let Some(id) = s.id() { - self.nodes.write().unwrap().mark_as_useless(id); - } + if let UtilError::Network(NetworkError::Disconnect(DisconnectReason::IncompatibleProtocol)) = e { + if let Some(id) = s.id() { + self.nodes.write().unwrap().mark_as_useless(id); } - _ => (), } kill = true; break; From 7b9db37d84dfe34e209dd5c0489b7ace08b413d7 Mon Sep 17 00:00:00 2001 From: debris Date: Mon, 20 Jun 2016 16:29:04 +0200 Subject: [PATCH 02/15] removed unnecessary logs --- parity/configuration.rs | 5 ++--- parity/migration.rs | 12 ++++++++---- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/parity/configuration.rs b/parity/configuration.rs index 7c42f06e1..a20d6cdd2 100644 --- a/parity/configuration.rs +++ b/parity/configuration.rs @@ -270,9 +270,8 @@ impl Configuration { let from = GethDirectory::open(dir_type); let to = DiskDirectory::create(self.keys_path()).unwrap(); - if let Err(e) = import_accounts(&from, &to) { - warn!("Could not import accounts {}", e); - } + // ignore error, cause geth may not exist + let _ = import_accounts(&from, &to); } let dir = Box::new(DiskDirectory::create(self.keys_path()).unwrap()); diff --git a/parity/migration.rs b/parity/migration.rs index acfd32ffd..e2a25723f 100644 --- a/parity/migration.rs +++ b/parity/migration.rs @@ -151,8 +151,6 @@ fn migrate_database(version: u32, path: PathBuf, migrations: MigrationManager) - return Ok(()) } - println!("Migrating database {} from version {} to {}", path.to_string_lossy(), version, CURRENT_VERSION); - let temp_path = temp_database_path(&path); let backup_path = backup_database_path(&path); // remote the dir if it exists @@ -187,20 +185,26 @@ fn migrate_database(version: u32, path: PathBuf, migrations: MigrationManager) - // remove backup try!(fs::remove_dir_all(&backup_path)); - println!("Migration finished"); Ok(()) } +fn exists(path: &PathBuf) -> bool { + fs::metadata(path).is_ok() +} + /// Migrates the database. pub fn migrate(path: &PathBuf) -> Result<(), Error> { // read version file. let version = try!(current_version(path)); // migrate the databases. - if version != CURRENT_VERSION { + // main db directory may already exists, so let's check if we have blocks dir + if version != CURRENT_VERSION && exists(&blocks_database_path(path)) { + println!("Migrating database from version {} to {}", version, CURRENT_VERSION); try!(migrate_database(version, blocks_database_path(path), try!(blocks_database_migrations()))); try!(migrate_database(version, extras_database_path(path), try!(extras_database_migrations()))); + println!("Migration finished"); } // update version file. From 1ffe0c185cc3b2ac9a7cf158ef861ee87dcfda85 Mon Sep 17 00:00:00 2001 From: arkpar Date: Mon, 20 Jun 2016 17:28:48 +0200 Subject: [PATCH 03/15] Reduce locking --- sync/src/chain.rs | 58 +++++++++++++++++++++++---------------- sync/src/lib.rs | 2 +- sync/src/tests/chain.rs | 6 ++-- sync/src/tests/helpers.rs | 16 +++++------ 4 files changed, 47 insertions(+), 35 deletions(-) diff --git a/sync/src/chain.rs b/sync/src/chain.rs index 6ec0884b5..3b608610d 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -1097,7 +1097,7 @@ impl ChainSync { Ok(Some((RECEIPTS_PACKET, rlp_result))) } - fn return_rlp(&self, io: &mut SyncIo, rlp: &UntrustedRlp, peer: PeerId, rlp_func: FRlp, error_func: FError) -> Result<(), PacketDecodeError> + fn return_rlp(io: &mut SyncIo, rlp: &UntrustedRlp, peer: PeerId, rlp_func: FRlp, error_func: FError) -> Result<(), PacketDecodeError> where FRlp : Fn(&SyncIo, &UntrustedRlp, PeerId) -> RlpResponseResult, FError : FnOnce(UtilError) -> String { @@ -1114,13 +1114,41 @@ impl ChainSync { } /// Dispatch incoming requests and responses - pub fn on_packet(&mut self, io: &mut SyncIo, peer: PeerId, packet_id: u8, data: &[u8]) { + pub fn dispatch_packet(sync: &RwLock, io: &mut SyncIo, peer: PeerId, packet_id: u8, data: &[u8]) { let rlp = UntrustedRlp::new(data); + let result = match packet_id { + GET_BLOCK_BODIES_PACKET => ChainSync::return_rlp(io, &rlp, peer, + ChainSync::return_block_bodies, + |e| format!("Error sending block bodies: {:?}", e)), + GET_BLOCK_HEADERS_PACKET => ChainSync::return_rlp(io, &rlp, peer, + ChainSync::return_block_headers, + |e| format!("Error sending block headers: {:?}", e)), + + GET_RECEIPTS_PACKET => ChainSync::return_rlp(io, &rlp, peer, + ChainSync::return_receipts, + |e| format!("Error sending receipts: {:?}", e)), + + GET_NODE_DATA_PACKET => ChainSync::return_rlp(io, &rlp, peer, + ChainSync::return_node_data, + |e| format!("Error sending nodes: {:?}", e)), + + _ => { + sync.write().unwrap().on_packet(io, peer, packet_id, data); + Ok(()) + } + }; + result.unwrap_or_else(|e| { + debug!(target:"sync", "{} -> Malformed packet {} : {}", peer, packet_id, e); + }) + } + + pub fn on_packet(&mut self, io: &mut SyncIo, peer: PeerId, packet_id: u8, data: &[u8]) { if packet_id != STATUS_PACKET && !self.peers.contains_key(&peer) { debug!(target:"sync", "Unexpected packet from unregistered peer: {}:{}", peer, io.peer_info(peer)); return; } + let rlp = UntrustedRlp::new(data); let result = match packet_id { STATUS_PACKET => self.on_peer_status(io, peer, &rlp), TRANSACTIONS_PACKET => self.on_peer_transactions(io, peer, &rlp), @@ -1128,23 +1156,6 @@ impl ChainSync { BLOCK_BODIES_PACKET => self.on_peer_block_bodies(io, peer, &rlp), NEW_BLOCK_PACKET => self.on_peer_new_block(io, peer, &rlp), NEW_BLOCK_HASHES_PACKET => self.on_peer_new_hashes(io, peer, &rlp), - - GET_BLOCK_BODIES_PACKET => self.return_rlp(io, &rlp, peer, - ChainSync::return_block_bodies, - |e| format!("Error sending block bodies: {:?}", e)), - - GET_BLOCK_HEADERS_PACKET => self.return_rlp(io, &rlp, peer, - ChainSync::return_block_headers, - |e| format!("Error sending block headers: {:?}", e)), - - GET_RECEIPTS_PACKET => self.return_rlp(io, &rlp, peer, - ChainSync::return_receipts, - |e| format!("Error sending receipts: {:?}", e)), - - GET_NODE_DATA_PACKET => self.return_rlp(io, &rlp, peer, - ChainSync::return_node_data, - |e| format!("Error sending nodes: {:?}", e)), - _ => { debug!(target: "sync", "Unknown packet {}", packet_id); Ok(()) @@ -1424,7 +1435,7 @@ mod tests { fn return_receipts() { let mut client = TestBlockChainClient::new(); let mut queue = VecDeque::new(); - let mut sync = dummy_sync_with_peer(H256::new(), &client); + let sync = dummy_sync_with_peer(H256::new(), &client); let mut io = TestIo::new(&mut client, &mut queue, None); let mut receipt_list = RlpStream::new_list(4); @@ -1445,7 +1456,7 @@ mod tests { assert_eq!(603, rlp_result.unwrap().1.out().len()); io.sender = Some(2usize); - sync.on_packet(&mut io, 0usize, super::GET_RECEIPTS_PACKET, &receipts_request); + ChainSync::dispatch_packet(&RwLock::new(sync), &mut io, 0usize, super::GET_RECEIPTS_PACKET, &receipts_request); assert_eq!(1, io.queue.len()); } @@ -1517,7 +1528,7 @@ mod tests { fn return_nodes() { let mut client = TestBlockChainClient::new(); let mut queue = VecDeque::new(); - let mut sync = dummy_sync_with_peer(H256::new(), &client); + let sync = dummy_sync_with_peer(H256::new(), &client); let mut io = TestIo::new(&mut client, &mut queue, None); let mut node_list = RlpStream::new_list(3); @@ -1537,7 +1548,8 @@ mod tests { assert_eq!(34, rlp_result.unwrap().1.out().len()); io.sender = Some(2usize); - sync.on_packet(&mut io, 0usize, super::GET_NODE_DATA_PACKET, &node_request); + + ChainSync::dispatch_packet(&RwLock::new(sync), &mut io, 0usize, super::GET_NODE_DATA_PACKET, &node_request); assert_eq!(1, io.queue.len()); } diff --git a/sync/src/lib.rs b/sync/src/lib.rs index 86f70ff0a..27961e867 100644 --- a/sync/src/lib.rs +++ b/sync/src/lib.rs @@ -169,7 +169,7 @@ impl NetworkProtocolHandler for EthSync { } fn read(&self, io: &NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]) { - self.sync.write().unwrap().on_packet(&mut NetSyncIo::new(io, self.chain.deref()) , *peer, packet_id, data); + ChainSync::dispatch_packet(&self.sync, &mut NetSyncIo::new(io, self.chain.deref()) , *peer, packet_id, data); } fn connected(&self, io: &NetworkContext, peer: &PeerId) { diff --git a/sync/src/tests/chain.rs b/sync/src/tests/chain.rs index 7c7d70dde..2f2bde171 100644 --- a/sync/src/tests/chain.rs +++ b/sync/src/tests/chain.rs @@ -47,7 +47,7 @@ fn status_after_sync() { net.peer_mut(1).chain.add_blocks(1000, EachBlockWith::Uncle); net.peer_mut(2).chain.add_blocks(1000, EachBlockWith::Uncle); net.sync(); - let status = net.peer(0).sync.status(); + let status = net.peer(0).sync.read().unwrap().status(); assert_eq!(status.state, SyncState::Idle); } @@ -107,14 +107,14 @@ fn restart() { assert!(net.peer(0).chain.chain_info().best_block_number > 100); net.restart_peer(0); - let status = net.peer(0).sync.status(); + let status = net.peer(0).sync.read().unwrap().status(); assert_eq!(status.state, SyncState::ChainHead); } #[test] fn status_empty() { let net = TestNet::new(2); - assert_eq!(net.peer(0).sync.status().state, SyncState::Idle); + assert_eq!(net.peer(0).sync.read().unwrap().status().state, SyncState::Idle); } #[test] diff --git a/sync/src/tests/helpers.rs b/sync/src/tests/helpers.rs index 6496a43d5..831976048 100644 --- a/sync/src/tests/helpers.rs +++ b/sync/src/tests/helpers.rs @@ -78,7 +78,7 @@ pub struct TestPacket { pub struct TestPeer { pub chain: TestBlockChainClient, - pub sync: ChainSync, + pub sync: RwLock, pub queue: VecDeque, } @@ -97,7 +97,7 @@ impl TestNet { let chain = TestBlockChainClient::new(); let sync = ChainSync::new(SyncConfig::default(), &chain); net.peers.push(TestPeer { - sync: sync, + sync: RwLock::new(sync), chain: chain, queue: VecDeque::new(), }); @@ -118,7 +118,7 @@ impl TestNet { for client in 0..self.peers.len() { if peer != client { let mut p = self.peers.get_mut(peer).unwrap(); - p.sync.on_peer_connected(&mut TestIo::new(&mut p.chain, &mut p.queue, Some(client as PeerId)), client as PeerId); + p.sync.write().unwrap().on_peer_connected(&mut TestIo::new(&mut p.chain, &mut p.queue, Some(client as PeerId)), client as PeerId); } } } @@ -129,22 +129,22 @@ impl TestNet { if let Some(packet) = self.peers[peer].queue.pop_front() { let mut p = self.peers.get_mut(packet.recipient).unwrap(); trace!("--- {} -> {} ---", peer, packet.recipient); - p.sync.on_packet(&mut TestIo::new(&mut p.chain, &mut p.queue, Some(peer as PeerId)), peer as PeerId, packet.packet_id, &packet.data); + ChainSync::dispatch_packet(&p.sync, &mut TestIo::new(&mut p.chain, &mut p.queue, Some(peer as PeerId)), peer as PeerId, packet.packet_id, &packet.data); trace!("----------------"); } let mut p = self.peers.get_mut(peer).unwrap(); - p.sync.maintain_sync(&mut TestIo::new(&mut p.chain, &mut p.queue, None)); + p.sync.write().unwrap().maintain_sync(&mut TestIo::new(&mut p.chain, &mut p.queue, None)); } } pub fn sync_step_peer(&mut self, peer_num: usize) { let mut peer = self.peer_mut(peer_num); - peer.sync.maintain_sync(&mut TestIo::new(&mut peer.chain, &mut peer.queue, None)); + peer.sync.write().unwrap().maintain_sync(&mut TestIo::new(&mut peer.chain, &mut peer.queue, None)); } pub fn restart_peer(&mut self, i: usize) { let peer = self.peer_mut(i); - peer.sync.restart(&mut TestIo::new(&mut peer.chain, &mut peer.queue, None)); + peer.sync.write().unwrap().restart(&mut TestIo::new(&mut peer.chain, &mut peer.queue, None)); } pub fn sync(&mut self) -> u32 { @@ -173,6 +173,6 @@ impl TestNet { pub fn trigger_chain_new_blocks(&mut self, peer_id: usize) { let mut peer = self.peer_mut(peer_id); - peer.sync.chain_new_blocks(&mut TestIo::new(&mut peer.chain, &mut peer.queue, None), &[], &[], &[], &[]); + peer.sync.write().unwrap().chain_new_blocks(&mut TestIo::new(&mut peer.chain, &mut peer.queue, None), &[], &[], &[], &[]); } } From 0716eaa036c451b01bcb936037518ce9865d62bd Mon Sep 17 00:00:00 2001 From: debris Date: Mon, 20 Jun 2016 17:50:38 +0200 Subject: [PATCH 04/15] docopt is an optional dependency of ethkey and ethstore --- Cargo.lock | 2 -- ethkey/Cargo.toml | 2 +- ethstore/Cargo.toml | 2 +- 3 files changed, 2 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2ac272e7c..0b1e73cae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -426,7 +426,6 @@ dependencies = [ name = "ethkey" version = "0.2.0" dependencies = [ - "docopt 0.6.80 (registry+https://github.com/rust-lang/crates.io-index)", "eth-secp256k1 0.5.4 (git+https://github.com/ethcore/rust-secp256k1)", "lazy_static 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)", @@ -438,7 +437,6 @@ dependencies = [ name = "ethstore" version = "0.1.0" dependencies = [ - "docopt 0.6.80 (registry+https://github.com/rust-lang/crates.io-index)", "ethkey 0.2.0", "libc 0.2.12 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/ethkey/Cargo.toml b/ethkey/Cargo.toml index 08461e865..c7b99e45c 100644 --- a/ethkey/Cargo.toml +++ b/ethkey/Cargo.toml @@ -12,7 +12,7 @@ rustc-serialize = "0.3" docopt = { version = "0.6", optional = true } [features] -default = ["cli"] +default = [] cli = ["docopt"] [[bin]] diff --git a/ethstore/Cargo.toml b/ethstore/Cargo.toml index 41504154d..cfb5b9fbc 100644 --- a/ethstore/Cargo.toml +++ b/ethstore/Cargo.toml @@ -21,7 +21,7 @@ serde_codegen = { version = "0.7", optional = true } syntex = "0.33.0" [features] -default = ["cli", "serde_codegen"] +default = ["serde_codegen"] nightly = ["serde_macros"] cli = ["docopt"] From 8fa9a240ccfe1e4b32ef611939f1bf8d1cfcf66e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Mon, 20 Jun 2016 18:33:08 +0200 Subject: [PATCH 05/15] Fixing last nonce values in case transaction is replaced --- ethcore/src/miner/transaction_queue.rs | 53 +++++++++++++++++++------- 1 file changed, 40 insertions(+), 13 deletions(-) diff --git a/ethcore/src/miner/transaction_queue.rs b/ethcore/src/miner/transaction_queue.rs index 30ebc133f..cc8430b42 100644 --- a/ethcore/src/miner/transaction_queue.rs +++ b/ethcore/src/miner/transaction_queue.rs @@ -236,8 +236,8 @@ impl TransactionSet { self.by_priority.insert(order.clone()); let r = self.by_address.insert(sender, nonce, order); // If transaction was replaced remove it from priority queue - if let Some(ref order) = r { - self.by_priority.remove(order); + if let Some(ref old_order) = r { + self.by_priority.remove(old_order); } r } @@ -517,16 +517,9 @@ impl TransactionQueue { // Remove from current let order = self.current.drop(&sender, &nonce); if order.is_some() { - // We will either move transaction to future or remove it completely - // so there will be no transactions from this sender in current - self.last_nonces.remove(&sender); - // First update height of transactions in future to avoid collisions - self.update_future(&sender, current_nonce); - // This should move all current transactions to future and remove old transactions - self.move_all_to_future(&sender, current_nonce); - // And now lets check if there is some chain of transactions in future - // that should be placed in current. It should also update last_nonces. - self.move_matching_future_to_current(sender, current_nonce, current_nonce); + // This will keep consistency in queue + // Moves all to future and then promotes a batch from current: + self.remove_all(sender, current_nonce); return; } } @@ -682,7 +675,8 @@ impl TransactionQueue { try!(check_too_cheap(Self::replace_transaction(tx, state_nonce, &mut self.current, &mut self.by_hash))); // Keep track of highest nonce stored in current - self.last_nonces.insert(address, nonce); + let new_max = self.last_nonces.get(&address).map_or(nonce, |n| cmp::max(nonce, *n)); + self.last_nonces.insert(address, new_max); // Update nonces of transactions in future self.update_future(&address, state_nonce); // Maybe there are some more items waiting in future? @@ -1597,4 +1591,37 @@ mod test { assert_eq!(txq.future.by_priority.iter().next().unwrap().hash, tx1.hash()); } + #[test] + fn should_return_correct_last_nonce() { + // given + let mut txq = TransactionQueue::new(); + let (tx1, tx2, tx2_2, tx3) = { + let keypair = KeyPair::create().unwrap(); + let secret = &keypair.secret(); + let nonce = U256::from(123); + let tx = new_unsigned_tx(nonce); + let tx2 = new_unsigned_tx(nonce + 1.into()); + let mut tx2_2 = new_unsigned_tx(nonce + 1.into()); + tx2_2.gas_price = U256::from(5); + let tx3 = new_unsigned_tx(nonce + 2.into()); + + + (tx.sign(secret), tx2.sign(secret), tx2_2.sign(secret), tx3.sign(secret)) + }; + let sender = tx1.sender().unwrap(); + txq.add(tx1, &default_nonce, TransactionOrigin::Local).unwrap(); + txq.add(tx2, &default_nonce, TransactionOrigin::Local).unwrap(); + txq.add(tx3, &default_nonce, TransactionOrigin::Local).unwrap(); + assert_eq!(txq.future.by_priority.len(), 0); + assert_eq!(txq.current.by_priority.len(), 3); + + // when + let res = txq.add(tx2_2, &default_nonce, TransactionOrigin::Local); + + // then + assert_eq!(txq.last_nonce(&sender).unwrap(), 125.into()); + assert_eq!(res.unwrap(), TransactionImportResult::Current); + assert_eq!(txq.current.by_priority.len(), 3); + } + } From 09b8116cde1610384b5bf3f05f1e6794ec4f5654 Mon Sep 17 00:00:00 2001 From: arkpar Date: Sun, 19 Jun 2016 14:35:42 +0200 Subject: [PATCH 06/15] TX processing queue --- ethcore/src/client/client.rs | 39 ++++++++++++++++++++++- ethcore/src/client/mod.rs | 3 ++ ethcore/src/client/test_client.rs | 6 ++++ ethcore/src/service.rs | 5 +++ sync/src/chain.rs | 11 +++---- util/src/lib.rs | 2 ++ util/src/timer.rs | 51 +++++++++++++++++++++++++++++++ 7 files changed, 110 insertions(+), 7 deletions(-) create mode 100644 util/src/timer.rs diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index 85a3d693d..afba6a893 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -18,6 +18,7 @@ use std::marker::PhantomData; use std::path::PathBuf; +use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering}; use util::*; use util::panics::*; use views::BlockView; @@ -50,6 +51,8 @@ pub use types::block_status::BlockStatus; use evm::Factory as EvmFactory; use miner::{Miner, MinerService, TransactionImportResult, AccountDetails}; +const MAX_TX_QUEUE_SIZE: usize = 4096; + impl fmt::Display for BlockChainInfo { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!(f, "#{}.{}", self.best_block_number, self.best_block_hash) @@ -92,6 +95,8 @@ pub struct Client where V: Verifier { verifier: PhantomData, vm_factory: Arc, miner: Arc, + io_channel: IoChannel, + queue_transactions: AtomicUsize, } const HISTORY: u64 = 1200; @@ -149,7 +154,7 @@ impl Client where V: Verifier { let engine = Arc::new(spec.engine); - let block_queue = BlockQueue::new(config.queue, engine.clone(), message_channel); + let block_queue = BlockQueue::new(config.queue, engine.clone(), message_channel.clone()); let panic_handler = PanicHandler::new_in_arc(); panic_handler.forward_from(&block_queue); @@ -165,6 +170,8 @@ impl Client where V: Verifier { verifier: PhantomData, vm_factory: Arc::new(EvmFactory::new(config.vm_type)), miner: miner, + io_channel: message_channel, + queue_transactions: AtomicUsize::new(0), }; Ok(Arc::new(client)) @@ -271,6 +278,7 @@ impl Client where V: Verifier { let mut import_results = Vec::with_capacity(max_blocks_to_import); let _import_lock = self.import_lock.lock(); + let _timer = PerfTimer::new("import_verified_blocks"); let blocks = self.block_queue.drain(max_blocks_to_import); let original_best = self.chain_info().best_block_hash; @@ -361,6 +369,19 @@ impl Client where V: Verifier { imported } + /// Import transactions from the IO queue + pub fn import_queued_transactions(&self, transactions: &[Bytes]) -> usize { + let _timer = PerfTimer::new("import_queued_transactions"); + self.queue_transactions.fetch_sub(transactions.len(), AtomicOrdering::SeqCst); + let fetch_account = |a: &Address| AccountDetails { + nonce: self.latest_nonce(a), + balance: self.latest_balance(a), + }; + let tx = transactions.iter().filter_map(|bytes| UntrustedRlp::new(&bytes).as_val().ok()).collect(); + let results = self.miner.import_transactions(tx, fetch_account); + results.len() + } + /// Attempt to get a copy of a specific block's state. /// /// This will not fail if given BlockID::Latest. @@ -750,6 +771,22 @@ impl BlockChainClient for Client where V: Verifier { self.miner.import_transactions(transactions, fetch_account) } + fn queue_transactions(&self, transactions: Vec) { + if self.queue_transactions.load(AtomicOrdering::Relaxed) > MAX_TX_QUEUE_SIZE { + debug!("Ignoring {} transactions: queue is full", transactions.len()); + } else { + let len = transactions.len(); + match self.io_channel.send(NetworkIoMessage::User(SyncMessage::NewTransactions(transactions))) { + Ok(_) => { + self.queue_transactions.fetch_add(len, AtomicOrdering::SeqCst); + } + Err(e) => { + debug!("Ignoring {} transactions: error queueing: {}", len, e); + } + } + } + } + fn all_transactions(&self) -> Vec { self.miner.all_transactions() } diff --git a/ethcore/src/client/mod.rs b/ethcore/src/client/mod.rs index 3fec68815..9318e0185 100644 --- a/ethcore/src/client/mod.rs +++ b/ethcore/src/client/mod.rs @@ -193,6 +193,9 @@ pub trait BlockChainClient : Sync + Send { /// import transactions from network/other 3rd party fn import_transactions(&self, transactions: Vec) -> Vec>; + /// Queue transactions for importing. + fn queue_transactions(&self, transactions: Vec); + /// list all transactions fn all_transactions(&self) -> Vec; diff --git a/ethcore/src/client/test_client.rs b/ethcore/src/client/test_client.rs index a9f7f6300..b6cc946fc 100644 --- a/ethcore/src/client/test_client.rs +++ b/ethcore/src/client/test_client.rs @@ -493,6 +493,12 @@ impl BlockChainClient for TestBlockChainClient { self.miner.import_transactions(transactions, &fetch_account) } + fn queue_transactions(&self, transactions: Vec) { + // import right here + let tx = transactions.into_iter().filter_map(|bytes| UntrustedRlp::new(&bytes).as_val().ok()).collect(); + self.import_transactions(tx); + } + fn all_transactions(&self) -> Vec { self.miner.all_transactions() } diff --git a/ethcore/src/service.rs b/ethcore/src/service.rs index d9040113f..03a85ce13 100644 --- a/ethcore/src/service.rs +++ b/ethcore/src/service.rs @@ -41,6 +41,8 @@ pub enum SyncMessage { NewChainHead, /// A block is ready BlockVerified, + /// New transaction RLPs are ready to be imported + NewTransactions(Vec), /// Start network command. StartNetwork, /// Stop network command. @@ -136,6 +138,9 @@ impl IoHandler for ClientIoHandler { SyncMessage::BlockVerified => { self.client.import_verified_blocks(&io.channel()); }, + SyncMessage::NewTransactions(ref transactions) => { + self.client.import_queued_transactions(&transactions); + }, _ => {}, // ignore other messages } } diff --git a/sync/src/chain.rs b/sync/src/chain.rs index 6ec0884b5..167b9df40 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -95,7 +95,6 @@ use ethcore::views::{HeaderView, BlockView}; use ethcore::header::{BlockNumber, Header as BlockHeader}; use ethcore::client::{BlockChainClient, BlockStatus, BlockID, BlockChainInfo}; use ethcore::error::*; -use ethcore::transaction::SignedTransaction; use ethcore::block::Block; use io::SyncIo; use time; @@ -940,15 +939,15 @@ impl ChainSync { return Ok(()); } - let item_count = r.item_count(); + let mut item_count = r.item_count(); trace!(target: "sync", "{} -> Transactions ({} entries)", peer_id, item_count); - + item_count = min(item_count, MAX_TX_TO_IMPORT); let mut transactions = Vec::with_capacity(item_count); - for i in 0 .. min(item_count, MAX_TX_TO_IMPORT) { - let tx: SignedTransaction = try!(r.val_at(i)); + for i in 0 .. item_count { + let tx = try!(r.at(i)).as_raw().to_vec(); transactions.push(tx); } - let _ = io.chain().import_transactions(transactions); + let _ = io.chain().queue_transactions(transactions); Ok(()) } diff --git a/util/src/lib.rs b/util/src/lib.rs index e43bbbab0..adaf08e77 100644 --- a/util/src/lib.rs +++ b/util/src/lib.rs @@ -154,6 +154,7 @@ pub mod panics; pub mod table; pub mod network_settings; pub mod path; +mod timer; pub use common::*; pub use misc::*; @@ -175,6 +176,7 @@ pub use network::*; pub use io::*; pub use log::*; pub use kvdb::*; +pub use timer::*; #[cfg(test)] mod tests { diff --git a/util/src/timer.rs b/util/src/timer.rs new file mode 100644 index 000000000..5d95ff7de --- /dev/null +++ b/util/src/timer.rs @@ -0,0 +1,51 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Performance timer with logging +use time::precise_time_ns; + +/// Performance timer with logging. Starts measuring time in the constructor, prints +/// elapsed time in the destructor or when `stop` is called. +pub struct PerfTimer { + name: &'static str, + start: u64, + stopped: bool, +} + +impl PerfTimer { + /// Create an instance with given name. + pub fn new(name: &'static str) -> PerfTimer { + PerfTimer { + name: name, + start: precise_time_ns(), + stopped: false, + } + } + + /// Stop the timer and print elapsed time on trace level with `perf` target. + pub fn stop(&mut self) { + if !self.stopped { + trace!(target: "perf", "{}: {:.2}ms", self.name, (precise_time_ns() - self.start) as f32 / 1000_000.0); + self.stopped = true; + } + } +} + +impl Drop for PerfTimer { + fn drop(&mut self) { + self.stop() + } +} From 71bfda3534c76ffdb6429ee1eabce476c9d7c56f Mon Sep 17 00:00:00 2001 From: Marek Kotewicz Date: Mon, 20 Jun 2016 18:51:11 +0200 Subject: [PATCH 07/15] moved keystore tests files from util to ethstore (#1352) --- ethstore/tests/api.rs | 41 ++++++++++++++++++- ...--3f49624084b67849c7b4e805c5988c21a430f9d9 | 0 ...--5ba4dcf897e97c2bdf8315b9ef26c13c085988cf | 0 ...--63121b431a52f8043c16fcf0d1df9cb7b5f66649 | 2 +- {util => ethstore/tests}/res/pat/p1.json | 0 {util => ethstore/tests}/res/pat/p2.json | 0 6 files changed, 41 insertions(+), 2 deletions(-) rename {util => ethstore/tests}/res/geth_keystore/UTC--2016-02-17T09-20-45.721400158Z--3f49624084b67849c7b4e805c5988c21a430f9d9 (100%) rename {util => ethstore/tests}/res/geth_keystore/UTC--2016-02-20T09-33-03.984382741Z--5ba4dcf897e97c2bdf8315b9ef26c13c085988cf (100%) rename {util => ethstore/tests}/res/geth_keystore/UTC--2016-04-03T08-58-49.834202900Z--63121b431a52f8043c16fcf0d1df9cb7b5f66649 (91%) rename {util => ethstore/tests}/res/pat/p1.json (100%) rename {util => ethstore/tests}/res/pat/p2.json (100%) diff --git a/ethstore/tests/api.rs b/ethstore/tests/api.rs index 037c5c46c..3b6a9f784 100644 --- a/ethstore/tests/api.rs +++ b/ethstore/tests/api.rs @@ -19,8 +19,10 @@ extern crate ethstore; mod util; +use std::str::FromStr; use ethstore::{SecretStore, EthStore}; -use ethstore::ethkey::{Random, Generator, Secret}; +use ethstore::ethkey::{Random, Generator, Secret, Address}; +use ethstore::dir::DiskDirectory; use util::TransientDir; #[test] @@ -86,3 +88,40 @@ fn secret_store_remove_account() { assert_eq!(store.accounts().len(), 0); assert!(store.remove_account(&accounts[0], "").is_err()); } + +fn test_path() -> &'static str { + match ::std::fs::metadata("ethstore") { + Ok(_) => "ethstore/tests/res/geth_keystore", + Err(_) => "tests/res/geth_keystore", + } +} + +fn pat_path() -> &'static str { + match ::std::fs::metadata("ethstore") { + Ok(_) => "ethstore/tests/res/pat", + Err(_) => "tests/res/pat", + } +} + +#[test] +fn secret_store_laod_geth_files() { + let dir = DiskDirectory::at(test_path()); + let store = EthStore::open(Box::new(dir)).unwrap(); + assert_eq!(store.accounts(), vec![ + Address::from_str("3f49624084b67849c7b4e805c5988c21a430f9d9").unwrap(), + Address::from_str("5ba4dcf897e97c2bdf8315b9ef26c13c085988cf").unwrap(), + Address::from_str("63121b431a52f8043c16fcf0d1df9cb7b5f66649").unwrap(), + ]); +} + +#[test] +fn secret_store_load_pat_files() { + let dir = DiskDirectory::at(pat_path()); + let store = EthStore::open(Box::new(dir)).unwrap(); + assert_eq!(store.accounts(), vec![ + Address::from_str("3f49624084b67849c7b4e805c5988c21a430f9d9").unwrap(), + Address::from_str("5ba4dcf897e97c2bdf8315b9ef26c13c085988cf").unwrap(), + ]); +} + + diff --git a/util/res/geth_keystore/UTC--2016-02-17T09-20-45.721400158Z--3f49624084b67849c7b4e805c5988c21a430f9d9 b/ethstore/tests/res/geth_keystore/UTC--2016-02-17T09-20-45.721400158Z--3f49624084b67849c7b4e805c5988c21a430f9d9 similarity index 100% rename from util/res/geth_keystore/UTC--2016-02-17T09-20-45.721400158Z--3f49624084b67849c7b4e805c5988c21a430f9d9 rename to ethstore/tests/res/geth_keystore/UTC--2016-02-17T09-20-45.721400158Z--3f49624084b67849c7b4e805c5988c21a430f9d9 diff --git a/util/res/geth_keystore/UTC--2016-02-20T09-33-03.984382741Z--5ba4dcf897e97c2bdf8315b9ef26c13c085988cf b/ethstore/tests/res/geth_keystore/UTC--2016-02-20T09-33-03.984382741Z--5ba4dcf897e97c2bdf8315b9ef26c13c085988cf similarity index 100% rename from util/res/geth_keystore/UTC--2016-02-20T09-33-03.984382741Z--5ba4dcf897e97c2bdf8315b9ef26c13c085988cf rename to ethstore/tests/res/geth_keystore/UTC--2016-02-20T09-33-03.984382741Z--5ba4dcf897e97c2bdf8315b9ef26c13c085988cf diff --git a/util/res/geth_keystore/UTC--2016-04-03T08-58-49.834202900Z--63121b431a52f8043c16fcf0d1df9cb7b5f66649 b/ethstore/tests/res/geth_keystore/UTC--2016-04-03T08-58-49.834202900Z--63121b431a52f8043c16fcf0d1df9cb7b5f66649 similarity index 91% rename from util/res/geth_keystore/UTC--2016-04-03T08-58-49.834202900Z--63121b431a52f8043c16fcf0d1df9cb7b5f66649 rename to ethstore/tests/res/geth_keystore/UTC--2016-04-03T08-58-49.834202900Z--63121b431a52f8043c16fcf0d1df9cb7b5f66649 index 08272d43b..a3253e29a 100644 --- a/util/res/geth_keystore/UTC--2016-04-03T08-58-49.834202900Z--63121b431a52f8043c16fcf0d1df9cb7b5f66649 +++ b/ethstore/tests/res/geth_keystore/UTC--2016-04-03T08-58-49.834202900Z--63121b431a52f8043c16fcf0d1df9cb7b5f66649 @@ -1 +1 @@ -{"address":"63121b431a52f8043c16fcf0d1df9cb7b5f66649","crypto":{"cipher":"aes-128-ctr","ciphertext":"1dd21926c644b9983916d646f3a4f2c7f9362f7e1c9fb1abcb42494dae06fa01","cipherparams":{"iv":"c52c6ee66d89a7aa8c6839f4b6ed29c8"},"kdf":"scrypt","kdfparams":{"dklen":32,"n":262144,"p":1,"r":8,"salt":"96f17c17bbf48db2dc4da00b3e7decce8e21f44a5d7963dadeeff70e1d38ad75"},"mac":"f279f3444585c2817701225e2196c1176386ad549ebaec2bcc4f94f309727fe6"},"id":"15e49cd2-51fb-4316-ba46-c3cf8db4ae44","version":3} \ No newline at end of file +{"address":"63121b431a52f8043c16fcf0d1df9cb7b5f66649","crypto":{"cipher":"aes-128-ctr","ciphertext":"1dd21926c644b9983916d646f3a4f2c7f9362f7e1c9fb1abcb42494dae06fa01","cipherparams":{"iv":"c52c6ee66d89a7aa8c6839f4b6ed29c8"},"kdf":"scrypt","kdfparams":{"dklen":32,"n":262144,"p":1,"r":8,"salt":"96f17c17bbf48db2dc4da00b3e7decce8e21f44a5d7963dadeeff70e1d38ad75"},"mac":"f279f3444585c2817701225e2196c1176386ad549ebaec2bcc4f94f309727fe6"},"id":"15e49cd2-51fb-4316-ba46-c3cf8db4ae44","version":3} diff --git a/util/res/pat/p1.json b/ethstore/tests/res/pat/p1.json similarity index 100% rename from util/res/pat/p1.json rename to ethstore/tests/res/pat/p1.json diff --git a/util/res/pat/p2.json b/ethstore/tests/res/pat/p2.json similarity index 100% rename from util/res/pat/p2.json rename to ethstore/tests/res/pat/p2.json From 69c29fce84c13d2298588fc18464573aa7cf317c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Mon, 20 Jun 2016 18:51:36 +0200 Subject: [PATCH 08/15] Updating parity-dapps (#1353) --- Cargo.lock | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 03ea96029..f62ed5cae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -924,7 +924,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "parity-dapps" version = "0.3.0" -source = "git+https://github.com/ethcore/parity-dapps-rs.git#1f065d93aa49338e4a453c77c839957f2db78895" +source = "git+https://github.com/ethcore/parity-dapps-rs.git#8cc812c26c903cf5764ce0f4cc3f2a7c3ddb0dc2" dependencies = [ "aster 0.17.0 (registry+https://github.com/rust-lang/crates.io-index)", "glob 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", From 7aa73f300a5265ab8384b7e9969aa55b072a1ed3 Mon Sep 17 00:00:00 2001 From: debris Date: Mon, 20 Jun 2016 22:35:59 +0200 Subject: [PATCH 09/15] fixed migration of empty pruning dir --- parity/migration.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/parity/migration.rs b/parity/migration.rs index 9f1c4c52e..4198f7ed0 100644 --- a/parity/migration.rs +++ b/parity/migration.rs @@ -89,6 +89,7 @@ fn current_version(path: &PathBuf) -> Result { /// Writes current database version to the file. /// Creates a new file if the version file does not exist yet. fn update_version(path: &PathBuf) -> Result<(), Error> { + try!(fs::create_dir_all(path)); let mut file = try!(File::create(version_file_path(path))); try!(file.write_all(format!("{}", CURRENT_VERSION).as_bytes())); Ok(()) From e0b4eab8191c1215af98e893490694f07ce153eb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Tue, 21 Jun 2016 13:55:26 +0200 Subject: [PATCH 10/15] Fixing replacing transaction with lower gas_price in one of the edge cases (#1343) --- ethcore/src/miner/transaction_queue.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/ethcore/src/miner/transaction_queue.rs b/ethcore/src/miner/transaction_queue.rs index cc8430b42..06aa07880 100644 --- a/ethcore/src/miner/transaction_queue.rs +++ b/ethcore/src/miner/transaction_queue.rs @@ -686,14 +686,15 @@ impl TransactionQueue { if let Some(order) = self.future.drop(&address, &nonce) { // Let's insert that transaction to current (if it has higher gas_price) let future_tx = self.by_hash.remove(&order.hash).unwrap(); - try!(check_too_cheap(Self::replace_transaction(future_tx, state_nonce, &mut self.current, &mut self.by_hash))); + // if transaction in `current` (then one we are importing) is replaced it means that it has to low gas_price + try!(check_too_cheap(!Self::replace_transaction(future_tx, state_nonce, &mut self.current, &mut self.by_hash))); } // Also enforce the limit let removed = self.current.enforce_limit(&mut self.by_hash); // If some transaction were removed because of limit we need to update last_nonces also. self.update_last_nonces(&removed); - // Trigger error if we were removed. + // Trigger error if the transaction we are importing was removed. try!(check_if_removed(&address, &nonce, removed)); trace!(target: "miner", "status: {:?}", self.status()); @@ -937,7 +938,7 @@ mod test { let res = txq.add(tx2.clone(), &default_nonce, TransactionOrigin::External); // and then there should be only one transaction in current (the one with higher gas_price) - assert_eq!(unwrap_tx_err(res), TransactionError::TooCheapToReplace); + assert_eq!(res.unwrap(), TransactionImportResult::Current); assert_eq!(txq.status().pending, 1); assert_eq!(txq.status().future, 0); assert_eq!(txq.current.by_priority.len(), 1); From 951512f9c9f5fd5cabd2594c6cf2b6b4e2b5760b Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Tue, 21 Jun 2016 13:56:33 +0200 Subject: [PATCH 11/15] Reserved peers rpc API (#1360) * reserved peers: lock them, use a hashset, and provide to networkcontext * adding and removing reserved peer service API * add NonReservedPeerMode, and setters in host * setting non reserved mode, restriction accepted connections * implement RPC apis * fix deadlock * fix rpc tests --- parity/configuration.rs | 5 +- parity/main.rs | 5 +- parity/rpc_apis.rs | 4 +- rpc/src/v1/impls/ethcore_set.rs | 34 ++++++- rpc/src/v1/tests/mocked/ethcore.rs | 61 +++++++---- rpc/src/v1/traits/ethcore_set.rs | 16 +++ util/src/network/host.rs | 158 ++++++++++++++++++++++------- util/src/network/mod.rs | 23 ++++- util/src/network/service.rs | 33 +++++- 9 files changed, 272 insertions(+), 67 deletions(-) diff --git a/parity/configuration.rs b/parity/configuration.rs index 3f8202021..e6e6e36a1 100644 --- a/parity/configuration.rs +++ b/parity/configuration.rs @@ -200,7 +200,10 @@ impl Configuration { net_path.push("network"); ret.config_path = Some(net_path.to_str().unwrap().to_owned()); ret.reserved_nodes = self.init_reserved_nodes(); - ret.reserved_only = self.args.flag_reserved_only; + + if self.args.flag_reserved_only { + ret.non_reserved_mode = ::util::network::NonReservedPeerMode::Deny; + } ret } diff --git a/parity/main.rs b/parity/main.rs index dc5457aa1..a44cdf6d3 100644 --- a/parity/main.rs +++ b/parity/main.rs @@ -230,6 +230,7 @@ fn execute_client(conf: Configuration, spec: Spec, client_config: ClientConfig) logger: logger.clone(), settings: network_settings.clone(), allow_pending_receipt_query: !conf.args.flag_geth, + net_service: service.network(), }); let dependencies = rpc::Dependencies { @@ -315,11 +316,11 @@ fn execute_export(conf: Configuration) { udp_port: None, nat_enabled: false, discovery_enabled: false, - reserved_only: true, boot_nodes: Vec::new(), use_secret: None, ideal_peers: 0, reserved_nodes: Vec::new(), + non_reserved_mode: ::util::network::NonReservedPeerMode::Accept, }; let client_config = conf.client_config(&spec); @@ -387,11 +388,11 @@ fn execute_import(conf: Configuration) { udp_port: None, nat_enabled: false, discovery_enabled: false, - reserved_only: true, boot_nodes: Vec::new(), use_secret: None, ideal_peers: 0, reserved_nodes: Vec::new(), + non_reserved_mode: ::util::network::NonReservedPeerMode::Accept, }; let client_config = conf.client_config(&spec); diff --git a/parity/rpc_apis.rs b/parity/rpc_apis.rs index bf21181a1..251c24d85 100644 --- a/parity/rpc_apis.rs +++ b/parity/rpc_apis.rs @@ -25,6 +25,7 @@ use ethcore::client::Client; use util::RotatingLogger; use ethcore::account_provider::AccountProvider; use util::network_settings::NetworkSettings; +use util::network::NetworkService; #[cfg(feature="rpc")] pub use ethcore_rpc::ConfirmationsQueue; @@ -89,6 +90,7 @@ pub struct Dependencies { pub logger: Arc, pub settings: Arc, pub allow_pending_receipt_query: bool, + pub net_service: Arc>, } fn to_modules(apis: &[Api]) -> BTreeMap { @@ -163,7 +165,7 @@ pub fn setup_rpc(server: T, deps: Arc, apis: ApiSet server.add_delegate(EthcoreClient::new(&deps.client, &deps.miner, deps.logger.clone(), deps.settings.clone()).to_delegate()) }, Api::EthcoreSet => { - server.add_delegate(EthcoreSetClient::new(&deps.miner).to_delegate()) + server.add_delegate(EthcoreSetClient::new(&deps.miner, &deps.net_service).to_delegate()) }, Api::Traces => { server.add_delegate(TracesClient::new(&deps.client, &deps.miner).to_delegate()) diff --git a/rpc/src/v1/impls/ethcore_set.rs b/rpc/src/v1/impls/ethcore_set.rs index cbd9c4309..66f1a34aa 100644 --- a/rpc/src/v1/impls/ethcore_set.rs +++ b/rpc/src/v1/impls/ethcore_set.rs @@ -16,9 +16,11 @@ /// Ethcore-specific rpc interface for operations altering the settings. use util::{U256, Address}; +use util::network::{NetworkService, NonReservedPeerMode}; use std::sync::{Arc, Weak}; use jsonrpc_core::*; use ethcore::miner::MinerService; +use ethcore::service::SyncMessage; use v1::traits::EthcoreSet; use v1::types::{Bytes}; @@ -27,13 +29,15 @@ pub struct EthcoreSetClient where M: MinerService { miner: Weak, + net: Weak>, } impl EthcoreSetClient where M: MinerService { /// Creates new `EthcoreSetClient`. - pub fn new(miner: &Arc) -> Self { + pub fn new(miner: &Arc, net: &Arc>) -> Self { EthcoreSetClient { miner: Arc::downgrade(miner), + net: Arc::downgrade(net), } } } @@ -74,4 +78,32 @@ impl EthcoreSet for EthcoreSetClient where M: MinerService + 'static { to_value(&true) }) } + + fn add_reserved_peer(&self, params: Params) -> Result { + from_params::<(String,)>(params).and_then(|(peer,)| { + match take_weak!(self.net).add_reserved_peer(&peer) { + Ok(()) => to_value(&true), + Err(_) => Err(Error::invalid_params()), + } + }) + } + + fn remove_reserved_peer(&self, params: Params) -> Result { + from_params::<(String,)>(params).and_then(|(peer,)| { + match take_weak!(self.net).remove_reserved_peer(&peer) { + Ok(()) => to_value(&true), + Err(_) => Err(Error::invalid_params()), + } + }) + } + + fn drop_non_reserved_peers(&self, _: Params) -> Result { + take_weak!(self.net).set_non_reserved_mode(NonReservedPeerMode::Deny); + to_value(&true) + } + + fn accept_non_reserved_peers(&self, _: Params) -> Result { + take_weak!(self.net).set_non_reserved_mode(NonReservedPeerMode::Accept); + to_value(&true) + } } diff --git a/rpc/src/v1/tests/mocked/ethcore.rs b/rpc/src/v1/tests/mocked/ethcore.rs index 68c33ecce..a096543e3 100644 --- a/rpc/src/v1/tests/mocked/ethcore.rs +++ b/rpc/src/v1/tests/mocked/ethcore.rs @@ -19,11 +19,13 @@ use std::str::FromStr; use jsonrpc_core::IoHandler; use v1::{Ethcore, EthcoreClient, EthcoreSet, EthcoreSetClient}; use ethcore::miner::MinerService; +use ethcore::service::SyncMessage; use v1::tests::helpers::TestMinerService; use ethcore::client::{TestBlockChainClient}; use util::numbers::*; use rustc_serialize::hex::FromHex; use util::log::RotatingLogger; +use util::network::{NetworkConfiguration, NetworkService}; use util::network_settings::NetworkSettings; fn miner_service() -> Arc { @@ -50,21 +52,26 @@ fn settings() -> Arc { }) } +fn network_service() -> Arc> { + Arc::new(NetworkService::new(NetworkConfiguration::new()).unwrap()) +} + fn ethcore_client(client: &Arc, miner: &Arc) -> EthcoreClient { EthcoreClient::new(client, miner, logger(), settings()) } -fn ethcore_set_client(miner: &Arc) -> EthcoreSetClient { - EthcoreSetClient::new(miner) +fn ethcore_set_client(miner: &Arc, net: &Arc>) -> EthcoreSetClient { + EthcoreSetClient::new(miner, net) } #[test] fn rpc_ethcore_extra_data() { let miner = miner_service(); let client = client_service(); + let network = network_service(); let io = IoHandler::new(); io.add_delegate(ethcore_client(&client, &miner).to_delegate()); - io.add_delegate(ethcore_set_client(&miner).to_delegate()); + io.add_delegate(ethcore_set_client(&miner, &network).to_delegate()); let request = r#"{"jsonrpc": "2.0", "method": "ethcore_extraData", "params": [], "id": 1}"#; let response = r#"{"jsonrpc":"2.0","result":"0x01020304","id":1}"#; @@ -79,9 +86,10 @@ fn rpc_ethcore_default_extra_data() { let miner = miner_service(); let client = client_service(); + let network = network_service(); let io = IoHandler::new(); io.add_delegate(ethcore_client(&client, &miner).to_delegate()); - io.add_delegate(ethcore_set_client(&miner).to_delegate()); + io.add_delegate(ethcore_set_client(&miner, &network).to_delegate()); let request = r#"{"jsonrpc": "2.0", "method": "ethcore_defaultExtraData", "params": [], "id": 1}"#; let response = format!(r#"{{"jsonrpc":"2.0","result":"0x{}","id":1}}"#, misc::version_data().to_hex()); @@ -93,9 +101,10 @@ fn rpc_ethcore_default_extra_data() { fn rpc_ethcore_gas_floor_target() { let miner = miner_service(); let client = client_service(); + let network = network_service(); let io = IoHandler::new(); io.add_delegate(ethcore_client(&client, &miner).to_delegate()); - io.add_delegate(ethcore_set_client(&miner).to_delegate()); + io.add_delegate(ethcore_set_client(&miner, &network).to_delegate()); let request = r#"{"jsonrpc": "2.0", "method": "ethcore_gasFloorTarget", "params": [], "id": 1}"#; let response = r#"{"jsonrpc":"2.0","result":"0x3039","id":1}"#; @@ -107,9 +116,10 @@ fn rpc_ethcore_gas_floor_target() { fn rpc_ethcore_min_gas_price() { let miner = miner_service(); let client = client_service(); + let network = network_service(); let io = IoHandler::new(); io.add_delegate(ethcore_client(&client, &miner).to_delegate()); - io.add_delegate(ethcore_set_client(&miner).to_delegate()); + io.add_delegate(ethcore_set_client(&miner, &network).to_delegate()); let request = r#"{"jsonrpc": "2.0", "method": "ethcore_minGasPrice", "params": [], "id": 1}"#; let response = r#"{"jsonrpc":"2.0","result":"0x01312d00","id":1}"#; @@ -121,9 +131,10 @@ fn rpc_ethcore_min_gas_price() { fn rpc_ethcore_set_min_gas_price() { let miner = miner_service(); let client = client_service(); + let network = network_service(); let io = IoHandler::new(); io.add_delegate(ethcore_client(&client, &miner).to_delegate()); - io.add_delegate(ethcore_set_client(&miner).to_delegate()); + io.add_delegate(ethcore_set_client(&miner, &network).to_delegate()); let request = r#"{"jsonrpc": "2.0", "method": "ethcore_setMinGasPrice", "params":["0xcd1722f3947def4cf144679da39c4c32bdc35681"], "id": 1}"#; let response = r#"{"jsonrpc":"2.0","result":true,"id":1}"#; @@ -136,9 +147,10 @@ fn rpc_ethcore_set_min_gas_price() { fn rpc_ethcore_set_gas_floor_target() { let miner = miner_service(); let client = client_service(); + let network = network_service(); let io = IoHandler::new(); io.add_delegate(ethcore_client(&client, &miner).to_delegate()); - io.add_delegate(ethcore_set_client(&miner).to_delegate()); + io.add_delegate(ethcore_set_client(&miner, &network).to_delegate()); let request = r#"{"jsonrpc": "2.0", "method": "ethcore_setGasFloorTarget", "params":["0xcd1722f3947def4cf144679da39c4c32bdc35681"], "id": 1}"#; let response = r#"{"jsonrpc":"2.0","result":true,"id":1}"#; @@ -151,9 +163,10 @@ fn rpc_ethcore_set_gas_floor_target() { fn rpc_ethcore_set_extra_data() { let miner = miner_service(); let client = client_service(); + let network = network_service(); let io = IoHandler::new(); io.add_delegate(ethcore_client(&client, &miner).to_delegate()); - io.add_delegate(ethcore_set_client(&miner).to_delegate()); + io.add_delegate(ethcore_set_client(&miner, &network).to_delegate()); let request = r#"{"jsonrpc": "2.0", "method": "ethcore_setExtraData", "params":["0xcd1722f3947def4cf144679da39c4c32bdc35681"], "id": 1}"#; let response = r#"{"jsonrpc":"2.0","result":true,"id":1}"#; @@ -166,9 +179,10 @@ fn rpc_ethcore_set_extra_data() { fn rpc_ethcore_set_author() { let miner = miner_service(); let client = client_service(); + let network = network_service(); let io = IoHandler::new(); io.add_delegate(ethcore_client(&client, &miner).to_delegate()); - io.add_delegate(ethcore_set_client(&miner).to_delegate()); + io.add_delegate(ethcore_set_client(&miner, &network).to_delegate()); let request = r#"{"jsonrpc": "2.0", "method": "ethcore_setAuthor", "params":["0xcd1722f3947def4cf144679da39c4c32bdc35681"], "id": 1}"#; let response = r#"{"jsonrpc":"2.0","result":true,"id":1}"#; @@ -181,13 +195,14 @@ fn rpc_ethcore_set_author() { fn rpc_ethcore_dev_logs() { let miner = miner_service(); let client = client_service(); + let network = network_service(); let logger = logger(); logger.append("a".to_owned()); logger.append("b".to_owned()); let ethcore = EthcoreClient::new(&client, &miner, logger.clone(), settings()).to_delegate(); let io = IoHandler::new(); io.add_delegate(ethcore); - io.add_delegate(ethcore_set_client(&miner).to_delegate()); + io.add_delegate(ethcore_set_client(&miner, &network).to_delegate()); let request = r#"{"jsonrpc": "2.0", "method": "ethcore_devLogs", "params":[], "id": 1}"#; let response = r#"{"jsonrpc":"2.0","result":["b","a"],"id":1}"#; @@ -199,9 +214,10 @@ fn rpc_ethcore_dev_logs() { fn rpc_ethcore_dev_logs_levels() { let miner = miner_service(); let client = client_service(); + let network = network_service(); let io = IoHandler::new(); io.add_delegate(ethcore_client(&client, &miner).to_delegate()); - io.add_delegate(ethcore_set_client(&miner).to_delegate()); + io.add_delegate(ethcore_set_client(&miner, &network).to_delegate()); let request = r#"{"jsonrpc": "2.0", "method": "ethcore_devLogsLevels", "params":[], "id": 1}"#; let response = r#"{"jsonrpc":"2.0","result":"rpc=trace","id":1}"#; @@ -212,9 +228,10 @@ fn rpc_ethcore_dev_logs_levels() { fn rpc_ethcore_set_transactions_limit() { let miner = miner_service(); let client = client_service(); + let network = network_service(); let io = IoHandler::new(); io.add_delegate(ethcore_client(&client, &miner).to_delegate()); - io.add_delegate(ethcore_set_client(&miner).to_delegate()); + io.add_delegate(ethcore_set_client(&miner, &network).to_delegate()); let request = r#"{"jsonrpc": "2.0", "method": "ethcore_setTransactionsLimit", "params":[10240240], "id": 1}"#; let response = r#"{"jsonrpc":"2.0","result":true,"id":1}"#; @@ -227,9 +244,10 @@ fn rpc_ethcore_set_transactions_limit() { fn rpc_ethcore_transactions_limit() { let miner = miner_service(); let client = client_service(); + let network = network_service(); let io = IoHandler::new(); io.add_delegate(ethcore_client(&client, &miner).to_delegate()); - io.add_delegate(ethcore_set_client(&miner).to_delegate()); + io.add_delegate(ethcore_set_client(&miner, &network).to_delegate()); let request = r#"{"jsonrpc": "2.0", "method": "ethcore_transactionsLimit", "params":[], "id": 1}"#; let response = r#"{"jsonrpc":"2.0","result":1024,"id":1}"#; @@ -241,9 +259,10 @@ fn rpc_ethcore_transactions_limit() { fn rpc_ethcore_net_chain() { let miner = miner_service(); let client = client_service(); + let network = network_service(); let io = IoHandler::new(); io.add_delegate(ethcore_client(&client, &miner).to_delegate()); - io.add_delegate(ethcore_set_client(&miner).to_delegate()); + io.add_delegate(ethcore_set_client(&miner, &network).to_delegate()); let request = r#"{"jsonrpc": "2.0", "method": "ethcore_netChain", "params":[], "id": 1}"#; let response = r#"{"jsonrpc":"2.0","result":"testchain","id":1}"#; @@ -255,9 +274,10 @@ fn rpc_ethcore_net_chain() { fn rpc_ethcore_net_max_peers() { let miner = miner_service(); let client = client_service(); + let network = network_service(); let io = IoHandler::new(); io.add_delegate(ethcore_client(&client, &miner).to_delegate()); - io.add_delegate(ethcore_set_client(&miner).to_delegate()); + io.add_delegate(ethcore_set_client(&miner, &network).to_delegate()); let request = r#"{"jsonrpc": "2.0", "method": "ethcore_netMaxPeers", "params":[], "id": 1}"#; let response = r#"{"jsonrpc":"2.0","result":25,"id":1}"#; @@ -269,9 +289,10 @@ fn rpc_ethcore_net_max_peers() { fn rpc_ethcore_net_port() { let miner = miner_service(); let client = client_service(); + let network = network_service(); let io = IoHandler::new(); io.add_delegate(ethcore_client(&client, &miner).to_delegate()); - io.add_delegate(ethcore_set_client(&miner).to_delegate()); + io.add_delegate(ethcore_set_client(&miner, &network).to_delegate()); let request = r#"{"jsonrpc": "2.0", "method": "ethcore_netPort", "params":[], "id": 1}"#; let response = r#"{"jsonrpc":"2.0","result":30303,"id":1}"#; @@ -283,9 +304,10 @@ fn rpc_ethcore_net_port() { fn rpc_ethcore_rpc_settings() { let miner = miner_service(); let client = client_service(); + let network = network_service(); let io = IoHandler::new(); io.add_delegate(ethcore_client(&client, &miner).to_delegate()); - io.add_delegate(ethcore_set_client(&miner).to_delegate()); + io.add_delegate(ethcore_set_client(&miner, &network).to_delegate()); let request = r#"{"jsonrpc": "2.0", "method": "ethcore_rpcSettings", "params":[], "id": 1}"#; let response = r#"{"jsonrpc":"2.0","result":{"enabled":true,"interface":"all","port":8545},"id":1}"#; @@ -297,9 +319,10 @@ fn rpc_ethcore_rpc_settings() { fn rpc_ethcore_node_name() { let miner = miner_service(); let client = client_service(); + let network = network_service(); let io = IoHandler::new(); io.add_delegate(ethcore_client(&client, &miner).to_delegate()); - io.add_delegate(ethcore_set_client(&miner).to_delegate()); + io.add_delegate(ethcore_set_client(&miner, &network).to_delegate()); let request = r#"{"jsonrpc": "2.0", "method": "ethcore_nodeName", "params":[], "id": 1}"#; let response = r#"{"jsonrpc":"2.0","result":"mynode","id":1}"#; diff --git a/rpc/src/v1/traits/ethcore_set.rs b/rpc/src/v1/traits/ethcore_set.rs index 332c505b6..ed4303be1 100644 --- a/rpc/src/v1/traits/ethcore_set.rs +++ b/rpc/src/v1/traits/ethcore_set.rs @@ -37,6 +37,18 @@ pub trait EthcoreSet: Sized + Send + Sync + 'static { /// Sets the limits for transaction queue. fn set_transactions_limit(&self, _: Params) -> Result; + /// Add a reserved peer. + fn add_reserved_peer(&self, _: Params) -> Result; + + /// Remove a reserved peer. + fn remove_reserved_peer(&self, _: Params) -> Result; + + /// Drop all non-reserved peers. + fn drop_non_reserved_peers(&self, _: Params) -> Result; + + /// Accept non-reserved peers (default behavior) + fn accept_non_reserved_peers(&self, _: Params) -> Result; + /// Should be used to convert object to io delegate. fn to_delegate(self) -> IoDelegate { let mut delegate = IoDelegate::new(Arc::new(self)); @@ -45,6 +57,10 @@ pub trait EthcoreSet: Sized + Send + Sync + 'static { delegate.add_method("ethcore_setExtraData", EthcoreSet::set_extra_data); delegate.add_method("ethcore_setAuthor", EthcoreSet::set_author); delegate.add_method("ethcore_setTransactionsLimit", EthcoreSet::set_transactions_limit); + delegate.add_method("ethcore_addReservedPeer", EthcoreSet::add_reserved_peer); + delegate.add_method("ethcore_removeReservedPeer", EthcoreSet::remove_reserved_peer); + delegate.add_method("ethcore_dropNonReservedPeers", EthcoreSet::drop_non_reserved_peers); + delegate.add_method("ethcore_acceptNonReservedPeers", EthcoreSet::accept_non_reserved_peers); delegate } diff --git a/util/src/network/host.rs b/util/src/network/host.rs index aacfc3fcb..03ba07544 100644 --- a/util/src/network/host.rs +++ b/util/src/network/host.rs @@ -14,9 +14,9 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . -use std::net::{SocketAddr}; -use std::collections::{HashMap}; -use std::str::{FromStr}; +use std::net::SocketAddr; +use std::collections::{HashMap, HashSet}; +use std::str::FromStr; use std::sync::*; use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering as AtomicOrdering}; use std::ops::*; @@ -35,7 +35,7 @@ use rlp::*; use network::session::{Session, SessionData}; use error::*; use io::*; -use network::{NetworkProtocolHandler, PROTOCOL_VERSION}; +use network::{NetworkProtocolHandler, NonReservedPeerMode, PROTOCOL_VERSION}; use network::node_table::*; use network::stats::NetworkStats; use network::error::{NetworkError, DisconnectReason}; @@ -65,8 +65,6 @@ pub struct NetworkConfiguration { pub nat_enabled: bool, /// Enable discovery pub discovery_enabled: bool, - /// Pin to reserved nodes only - pub reserved_only: bool, /// List of initial node addresses pub boot_nodes: Vec, /// Use provided node key instead of default @@ -75,6 +73,8 @@ pub struct NetworkConfiguration { pub ideal_peers: u32, /// List of reserved node addresses. pub reserved_nodes: Vec, + /// The non-reserved peer mode. + pub non_reserved_mode: NonReservedPeerMode, } impl Default for NetworkConfiguration { @@ -93,11 +93,11 @@ impl NetworkConfiguration { udp_port: None, nat_enabled: true, discovery_enabled: true, - reserved_only: false, boot_nodes: Vec::new(), use_secret: None, ideal_peers: 25, reserved_nodes: Vec::new(), + non_reserved_mode: NonReservedPeerMode::Accept, } } @@ -191,13 +191,15 @@ pub struct NetworkContext<'s, Message> where Message: Send + Sync + Clone + 'sta sessions: Arc>>, session: Option, session_id: Option, + reserved_peers: &'s HashSet, } impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone + 'static, { /// Create a new network IO access point. Takes references to all the data that can be updated within the IO handler. fn new(io: &'s IoContext>, protocol: ProtocolId, - session: Option, sessions: Arc>>) -> NetworkContext<'s, Message> { + session: Option, sessions: Arc>>, + reserved_peers: &'s HashSet) -> NetworkContext<'s, Message> { let id = session.as_ref().map(|s| s.lock().unwrap().token()); NetworkContext { io: io, @@ -205,6 +207,7 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone session_id: id, session: session, sessions: sessions, + reserved_peers: reserved_peers, } } @@ -237,7 +240,7 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone self.io.message(NetworkIoMessage::User(msg)); } - /// Send an IO message + /// Get an IoChannel. pub fn io_channel(&self) -> IoChannel> { self.io.channel() } @@ -335,7 +338,7 @@ pub struct Host where Message: Send + Sync + Clone { timers: RwLock>, timer_counter: RwLock, stats: Arc, - pinned_nodes: Vec, + reserved_nodes: RwLock>, num_sessions: AtomicUsize, stopping: AtomicBool, } @@ -390,28 +393,28 @@ impl Host where Message: Send + Sync + Clone { timers: RwLock::new(HashMap::new()), timer_counter: RwLock::new(USER_TIMER), stats: stats, - pinned_nodes: Vec::new(), + reserved_nodes: RwLock::new(HashSet::new()), num_sessions: AtomicUsize::new(0), stopping: AtomicBool::new(false), }; for n in boot_nodes { - // don't pin boot nodes. - host.add_node(&n, false); + host.add_node(&n); } for n in reserved_nodes { - host.add_node(&n, true); + if let Err(e) = host.add_reserved_node(&n) { + debug!(target: "network", "Error parsing node id: {}: {:?}", n, e); + } } Ok(host) } - pub fn add_node(&mut self, id: &str, pin: bool) { + pub fn add_node(&mut self, id: &str) { match Node::from_str(id) { Err(e) => { debug!(target: "network", "Could not add node {}: {:?}", id, e); }, Ok(n) => { let entry = NodeEntry { endpoint: n.endpoint.clone(), id: n.id.clone() }; - if pin { self.pinned_nodes.push(n.id.clone()) } self.nodes.write().unwrap().add_node(n); if let Some(ref mut discovery) = *self.discovery.lock().unwrap() { @@ -421,6 +424,56 @@ impl Host where Message: Send + Sync + Clone { } } + pub fn add_reserved_node(&self, id: &str) -> Result<(), UtilError> { + let n = try!(Node::from_str(id)); + + let entry = NodeEntry { endpoint: n.endpoint.clone(), id: n.id.clone() }; + self.reserved_nodes.write().unwrap().insert(n.id.clone()); + + if let Some(ref mut discovery) = *self.discovery.lock().unwrap() { + discovery.add_node(entry); + } + + Ok(()) + } + + pub fn set_non_reserved_mode(&self, mode: NonReservedPeerMode, io: &IoContext>) { + let mut info = self.info.write().unwrap(); + + if info.config.non_reserved_mode != mode { + info.config.non_reserved_mode = mode.clone(); + drop(info); + if let NonReservedPeerMode::Deny = mode { + // disconnect all non-reserved peers here. + let reserved: HashSet = self.reserved_nodes.read().unwrap().clone(); + let mut to_kill = Vec::new(); + for e in self.sessions.write().unwrap().iter_mut() { + let mut s = e.lock().unwrap(); + { + let id = s.id(); + if id.is_some() && reserved.contains(id.unwrap()) { + continue; + } + } + + s.disconnect(io, DisconnectReason::ClientQuit); + to_kill.push(s.token()); + } + for p in to_kill { + trace!(target: "network", "Disconnecting on reserved-only mode: {}", p); + self.kill_connection(p, io, false); + } + } + } + } + + pub fn remove_reserved_node(&self, id: &str) -> Result<(), UtilError> { + let n = try!(Node::from_str(id)); + self.reserved_nodes.write().unwrap().remove(&n.id); + + Ok(()) + } + pub fn client_version() -> String { version() } @@ -483,7 +536,7 @@ impl Host where Message: Send + Sync + Clone { // Initialize discovery. let discovery = { let info = self.info.read().unwrap(); - if info.config.discovery_enabled && !info.config.reserved_only { + if info.config.discovery_enabled && info.config.non_reserved_mode == NonReservedPeerMode::Accept { Some(Discovery::new(&info.keys, public_endpoint.address.clone(), public_endpoint, DISCOVERY)) } else { None } }; @@ -540,17 +593,26 @@ impl Host where Message: Send + Sync + Clone { } fn connect_peers(&self, io: &IoContext>) { - if self.info.read().unwrap().capabilities.is_empty() { - return; - } - let ideal_peers = { self.info.read().unwrap().config.ideal_peers }; - let pin = { self.info.read().unwrap().config.reserved_only }; - let session_count = self.session_count(); - if session_count >= ideal_peers as usize + self.pinned_nodes.len() { - // check if all pinned nodes are connected. - if self.pinned_nodes.iter().all(|n| self.have_session(n) && self.connecting_to(n)) { + let (ideal_peers, mut pin) = { + let info = self.info.read().unwrap(); + if info.capabilities.is_empty() { return; } + let config = &info.config; + + (config.ideal_peers, config.non_reserved_mode == NonReservedPeerMode::Deny) + }; + + let session_count = self.session_count(); + let reserved_nodes = self.reserved_nodes.read().unwrap(); + if session_count >= ideal_peers as usize + reserved_nodes.len() { + // check if all pinned nodes are connected. + if reserved_nodes.iter().all(|n| self.have_session(n) && self.connecting_to(n)) { + return; + } + + // if not, only attempt connect to reserved peers + pin = true; } let handshake_count = self.handshake_count(); @@ -562,7 +624,7 @@ impl Host where Message: Send + Sync + Clone { // iterate over all nodes, reserved ones coming first. // if we are pinned to only reserved nodes, ignore all others. - let nodes = self.pinned_nodes.iter().cloned().chain(if !pin { + let nodes = reserved_nodes.iter().cloned().chain(if !pin { self.nodes.read().unwrap().nodes() } else { Vec::new() @@ -696,11 +758,20 @@ impl Host where Message: Send + Sync + Clone { self.num_sessions.fetch_add(1, AtomicOrdering::SeqCst); if !s.info.originated { let session_count = self.session_count(); - let ideal_peers = { self.info.read().unwrap().config.ideal_peers }; - if session_count >= ideal_peers as usize { - s.disconnect(io, DisconnectReason::TooManyPeers); - return; + let reserved_nodes = self.reserved_nodes.read().unwrap(); + let (ideal_peers, reserved_only) = { + let info = self.info.read().unwrap(); + (info.config.ideal_peers, info.config.non_reserved_mode == NonReservedPeerMode::Deny) + }; + + if session_count >= ideal_peers as usize || reserved_only { + // only proceed if the connecting peer is reserved. + if !reserved_nodes.contains(s.id().unwrap()) { + s.disconnect(io, DisconnectReason::TooManyPeers); + return; + } } + // Add it no node table if let Ok(address) = s.remote_addr() { let entry = NodeEntry { id: s.id().unwrap().clone(), endpoint: NodeEndpoint { address: address, udp_port: address.port() } }; @@ -735,14 +806,17 @@ impl Host where Message: Send + Sync + Clone { if kill { self.kill_connection(token, io, true); } + let handlers = self.handlers.read().unwrap(); for p in ready_data { - let h = self.handlers.read().unwrap().get(p).unwrap().clone(); + let h = handlers.get(p).unwrap().clone(); self.stats.inc_sessions(); - h.connected(&NetworkContext::new(io, p, session.clone(), self.sessions.clone()), &token); + let reserved = self.reserved_nodes.read().unwrap(); + h.connected(&NetworkContext::new(io, p, session.clone(), self.sessions.clone(), &reserved), &token); } for (p, packet_id, data) in packet_data { - let h = self.handlers.read().unwrap().get(p).unwrap().clone(); - h.read(&NetworkContext::new(io, p, session.clone(), self.sessions.clone()), &token, packet_id, &data[1..]); + let h = handlers.get(p).unwrap().clone(); + let reserved = self.reserved_nodes.read().unwrap(); + h.read(&NetworkContext::new(io, p, session.clone(), self.sessions.clone(), &reserved), &token, packet_id, &data[1..]); } } @@ -783,7 +857,8 @@ impl Host where Message: Send + Sync + Clone { } for p in to_disconnect { let h = self.handlers.read().unwrap().get(p).unwrap().clone(); - h.disconnected(&NetworkContext::new(io, p, expired_session.clone(), self.sessions.clone()), &token); + let reserved = self.reserved_nodes.read().unwrap(); + h.disconnected(&NetworkContext::new(io, p, expired_session.clone(), self.sessions.clone(), &reserved), &token); } if deregister { io.deregister_stream(token).unwrap_or_else(|e| debug!("Error deregistering stream: {:?}", e)); @@ -886,7 +961,10 @@ impl IoHandler> for Host where Messa _ => match self.timers.read().unwrap().get(&token).cloned() { Some(timer) => match self.handlers.read().unwrap().get(timer.protocol).cloned() { None => { warn!(target: "network", "No handler found for protocol: {:?}", timer.protocol) }, - Some(h) => { h.timeout(&NetworkContext::new(io, timer.protocol, None, self.sessions.clone()), timer.token); } + Some(h) => { + let reserved = self.reserved_nodes.read().unwrap(); + h.timeout(&NetworkContext::new(io, timer.protocol, None, self.sessions.clone(), &reserved), timer.token); + } }, None => { warn!("Unknown timer token: {}", token); } // timer is not registerd through us } @@ -904,7 +982,8 @@ impl IoHandler> for Host where Messa ref versions } => { let h = handler.clone(); - h.initialize(&NetworkContext::new(io, protocol, None, self.sessions.clone())); + let reserved = self.reserved_nodes.read().unwrap(); + h.initialize(&NetworkContext::new(io, protocol, None, self.sessions.clone(), &reserved)); self.handlers.write().unwrap().insert(protocol, h); let mut info = self.info.write().unwrap(); for v in versions { @@ -946,8 +1025,9 @@ impl IoHandler> for Host where Messa self.kill_connection(*peer, io, false); }, NetworkIoMessage::User(ref message) => { + let reserved = self.reserved_nodes.read().unwrap(); for (p, h) in self.handlers.read().unwrap().iter() { - h.message(&NetworkContext::new(io, p, None, self.sessions.clone()), &message); + h.message(&NetworkContext::new(io, p, None, self.sessions.clone(), &reserved), &message); } } } diff --git a/util/src/network/mod.rs b/util/src/network/mod.rs index d074e6631..d59ab63b1 100644 --- a/util/src/network/mod.rs +++ b/util/src/network/mod.rs @@ -14,8 +14,8 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . -//! Network and general IO module. -//! +//! Network and general IO module. +//! //! Example usage for craeting a network service and adding an IO handler: //! //! ```rust @@ -112,3 +112,22 @@ pub trait NetworkProtocolHandler: Sync + Send where Message: Send + Syn fn message(&self, _io: &NetworkContext, _message: &Message) {} } +/// Non-reserved peer modes. +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum NonReservedPeerMode { + /// Accept them. This is the default. + Accept, + /// Deny them. + Deny, +} + +impl NonReservedPeerMode { + /// Attempt to parse the peer mode from a string. + pub fn parse(s: &str) -> Option { + match s { + "accept" => Some(NonReservedPeerMode::Accept), + "deny" => Some(NonReservedPeerMode::Deny), + _ => None, + } + } +} \ No newline at end of file diff --git a/util/src/network/service.rs b/util/src/network/service.rs index e8db354d4..353a24bbe 100644 --- a/util/src/network/service.rs +++ b/util/src/network/service.rs @@ -18,9 +18,9 @@ use std::sync::*; use error::*; use panics::*; use network::{NetworkProtocolHandler, NetworkConfiguration}; -use network::error::{NetworkError}; +use network::error::NetworkError; use network::host::{Host, NetworkIoMessage, ProtocolId}; -use network::stats::{NetworkStats}; +use network::stats::NetworkStats; use io::*; /// IO Service with networking @@ -111,6 +111,35 @@ impl NetworkService where Message: Send + Sync + Clone + 'stat *host = None; Ok(()) } + + /// Try to add a reserved peer. + pub fn add_reserved_peer(&self, peer: &str) -> Result<(), UtilError> { + let host = self.host.read().unwrap(); + if let Some(ref host) = *host { + host.add_reserved_node(peer) + } else { + Ok(()) + } + } + + /// Try to remove a reserved peer. + pub fn remove_reserved_peer(&self, peer: &str) -> Result<(), UtilError> { + let host = self.host.read().unwrap(); + if let Some(ref host) = *host { + host.remove_reserved_node(peer) + } else { + Ok(()) + } + } + + /// Set the non-reserved peer mode. + pub fn set_non_reserved_mode(&self, mode: ::network::NonReservedPeerMode) { + let host = self.host.read().unwrap(); + if let Some(ref host) = *host { + let io_ctxt = IoContext::new(self.io_service.channel(), 0); + host.set_non_reserved_mode(mode, &io_ctxt); + } + } } impl MayPanic for NetworkService where Message: Send + Sync + Clone + 'static { From c5f6250668540dbd474540eb3b1abc77334c504c Mon Sep 17 00:00:00 2001 From: Nikolay Volf Date: Tue, 21 Jun 2016 14:57:06 +0300 Subject: [PATCH 12/15] Set default database file size large enough (#1363) * make default 100mb file size * update again * fix type * little less extreme file sizes --- Cargo.lock | 4 ++-- util/src/kvdb.rs | 9 +++++++-- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index dc97ce0ca..1f41ea021 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1103,7 +1103,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "rocksdb" version = "0.4.5" -source = "git+https://github.com/ethcore/rust-rocksdb#9140e37ce0fdb748097f85653c01b0f7e3736ea9" +source = "git+https://github.com/ethcore/rust-rocksdb#e0e6c099d8cd156fe446009fce241d57b00cd8f4" dependencies = [ "libc 0.2.12 (registry+https://github.com/rust-lang/crates.io-index)", "rocksdb-sys 0.3.0 (git+https://github.com/ethcore/rust-rocksdb)", @@ -1112,7 +1112,7 @@ dependencies = [ [[package]] name = "rocksdb-sys" version = "0.3.0" -source = "git+https://github.com/ethcore/rust-rocksdb#9140e37ce0fdb748097f85653c01b0f7e3736ea9" +source = "git+https://github.com/ethcore/rust-rocksdb#e0e6c099d8cd156fe446009fce241d57b00cd8f4" dependencies = [ "gcc 0.3.28 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.12 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/util/src/kvdb.rs b/util/src/kvdb.rs index 7d581fdbc..0911b3471 100644 --- a/util/src/kvdb.rs +++ b/util/src/kvdb.rs @@ -20,6 +20,9 @@ use std::default::Default; use rocksdb::{DB, Writable, WriteBatch, IteratorMode, DBVector, DBIterator, IndexType, Options, DBCompactionStyle, BlockBasedOptions, Direction}; +const DB_FILE_SIZE_BASE: u64 = 10 * 1024 * 1024; +const DB_FILE_SIZE_MULTIPLIER: i32 = 5; + /// Write transaction. Batches a sequence of put/delete operations for efficiency. pub struct DBTransaction { batch: WriteBatch, @@ -64,7 +67,7 @@ impl DatabaseConfig { DatabaseConfig { cache_size: Some(cache_size), prefix_size: None, - max_open_files: 256 + max_open_files: -1, } } } @@ -74,7 +77,7 @@ impl Default for DatabaseConfig { DatabaseConfig { cache_size: None, prefix_size: None, - max_open_files: 256 + max_open_files: -1, } } } @@ -110,6 +113,8 @@ impl Database { opts.create_if_missing(true); opts.set_use_fsync(false); opts.set_compaction_style(DBCompactionStyle::DBUniversalCompaction); + opts.set_target_file_size_base(DB_FILE_SIZE_BASE); + opts.set_target_file_size_multiplier(DB_FILE_SIZE_MULTIPLIER); if let Some(cache_size) = config.cache_size { // half goes to read cache opts.set_block_cache_size_mb(cache_size as u64 / 2); From 57e9ed3f085eafdc753c43cfe7022bd3b94ecf99 Mon Sep 17 00:00:00 2001 From: Marek Kotewicz Date: Tue, 21 Jun 2016 14:42:27 +0200 Subject: [PATCH 13/15] importing presale wallet (#1368) * importing presale wallet in progress * PresaleWallet data structure --- ethstore/src/crypto.rs | 11 ++++- ethstore/src/ethkey.rs | 8 ++++ ethstore/src/json/hash.rs | 27 +++++++++++- ethstore/src/json/mod.rs.in | 4 +- ethstore/src/json/presale.rs | 42 +++++++++++++++++++ ethstore/src/lib.rs | 2 + ethstore/src/presale.rs | 80 ++++++++++++++++++++++++++++++++++++ 7 files changed, 170 insertions(+), 4 deletions(-) create mode 100644 ethstore/src/json/presale.rs create mode 100644 ethstore/src/presale.rs diff --git a/ethstore/src/crypto.rs b/ethstore/src/crypto.rs index 2858808d7..2733fa720 100644 --- a/ethstore/src/crypto.rs +++ b/ethstore/src/crypto.rs @@ -65,8 +65,8 @@ impl Keccak256<[u8; 32]> for [u8] { /// AES encryption pub mod aes { - use rcrypto::blockmodes::CtrMode; - use rcrypto::aessafe::AesSafe128Encryptor; + use rcrypto::blockmodes::{CtrMode, CbcDecryptor, PkcsPadding}; + use rcrypto::aessafe::{AesSafe128Encryptor, AesSafe128Decryptor}; use rcrypto::symmetriccipher::{Encryptor, Decryptor}; use rcrypto::buffer::{RefReadBuffer, RefWriteBuffer}; @@ -81,5 +81,12 @@ pub mod aes { let mut encryptor = CtrMode::new(AesSafe128Encryptor::new(k), iv.to_vec()); encryptor.decrypt(&mut RefReadBuffer::new(encrypted), &mut RefWriteBuffer::new(dest), true).expect("Invalid length or padding"); } + + /// Decrypt a message using cbc mode + pub fn decrypt_cbc(k: &[u8], iv: &[u8], encrypted: &[u8], dest: &mut [u8]) { + let mut encryptor = CbcDecryptor::new(AesSafe128Decryptor::new(k), PkcsPadding, iv.to_vec()); + encryptor.decrypt(&mut RefReadBuffer::new(encrypted), &mut RefWriteBuffer::new(dest), true).expect("Invalid length or padding"); + } + } diff --git a/ethstore/src/ethkey.rs b/ethstore/src/ethkey.rs index eba877397..9d8858b79 100644 --- a/ethstore/src/ethkey.rs +++ b/ethstore/src/ethkey.rs @@ -31,3 +31,11 @@ impl From for Address { From::from(a) } } + +impl<'a> From<&'a json::H160> for Address { + fn from(json: &'a json::H160) -> Self { + let mut a = [0u8; 20]; + a.copy_from_slice(json); + From::from(a) + } +} diff --git a/ethstore/src/json/hash.rs b/ethstore/src/json/hash.rs index f0fb91e7a..2edc7b80b 100644 --- a/ethstore/src/json/hash.rs +++ b/ethstore/src/json/hash.rs @@ -14,6 +14,8 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . +use std::fmt; +use std::ops; use std::str::FromStr; use rustc_serialize::hex::{FromHex, ToHex}; use serde::{Serialize, Serializer, Deserialize, Deserializer, Error as SerdeError}; @@ -22,9 +24,31 @@ use super::Error; macro_rules! impl_hash { ($name: ident, $size: expr) => { - #[derive(Debug, PartialEq)] pub struct $name([u8; $size]); + impl fmt::Debug for $name { + fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { + let self_ref: &[u8] = &self.0; + write!(f, "{:?}", self_ref) + } + } + + impl PartialEq for $name { + fn eq(&self, other: &Self) -> bool { + let self_ref: &[u8] = &self.0; + let other_ref: &[u8] = &other.0; + self_ref == other_ref + } + } + + impl ops::Deref for $name { + type Target = [u8]; + + fn deref(&self) -> &Self::Target { + &self.0 + } + } + impl Serialize for $name { fn serialize(&self, serializer: &mut S) -> Result<(), S::Error> where S: Serializer { @@ -85,3 +109,4 @@ macro_rules! impl_hash { impl_hash!(H128, 16); impl_hash!(H160, 20); impl_hash!(H256, 32); +impl_hash!(H768, 96); diff --git a/ethstore/src/json/mod.rs.in b/ethstore/src/json/mod.rs.in index c4a67a287..7272d7e2e 100644 --- a/ethstore/src/json/mod.rs.in +++ b/ethstore/src/json/mod.rs.in @@ -5,14 +5,16 @@ mod hash; mod id; mod kdf; mod key_file; +mod presale; mod version; pub use self::cipher::{Cipher, CipherSer, CipherSerParams, Aes128Ctr}; pub use self::crypto::Crypto; pub use self::error::Error; -pub use self::hash::{H128, H160, H256}; +pub use self::hash::{H128, H160, H256, H768}; pub use self::id::UUID; pub use self::kdf::{Kdf, KdfSer, Prf, Pbkdf2, Scrypt, KdfSerParams}; pub use self::key_file::KeyFile; +pub use self::presale::PresaleWallet; pub use self::version::Version; diff --git a/ethstore/src/json/presale.rs b/ethstore/src/json/presale.rs new file mode 100644 index 000000000..cba50695f --- /dev/null +++ b/ethstore/src/json/presale.rs @@ -0,0 +1,42 @@ +use std::io::Read; +use serde_json; +use super::{H160, H768}; + +#[derive(Debug, PartialEq, Deserialize)] +pub struct PresaleWallet { + pub encseed: H768, + #[serde(rename = "ethaddr")] + pub address: H160, +} + +impl PresaleWallet { + pub fn load(reader: R) -> Result where R: Read { + serde_json::from_reader(reader) + } +} + +#[cfg(test)] +mod tests { + use std::str::FromStr; + use serde_json; + use json::{PresaleWallet, H160, H768}; + + #[test] + fn presale_wallet() { + let json = r#" + { + "encseed": "137103c28caeebbcea5d7f95edb97a289ded151b72159137cb7b2671f394f54cff8c121589dcb373e267225547b3c71cbdb54f6e48ec85cd549f96cf0dedb3bc0a9ac6c79b9c426c5878ca2c9d06ff42a23cb648312fc32ba83649de0928e066", + "ethaddr": "ede84640d1a1d3e06902048e67aa7db8d52c2ce1", + "email": "123@gmail.com", + "btcaddr": "1JvqEc6WLhg6GnyrLBe2ztPAU28KRfuseH" + } "#; + + let expected = PresaleWallet { + encseed: H768::from_str("137103c28caeebbcea5d7f95edb97a289ded151b72159137cb7b2671f394f54cff8c121589dcb373e267225547b3c71cbdb54f6e48ec85cd549f96cf0dedb3bc0a9ac6c79b9c426c5878ca2c9d06ff42a23cb648312fc32ba83649de0928e066").unwrap(), + address: H160::from_str("ede84640d1a1d3e06902048e67aa7db8d52c2ce1").unwrap(), + }; + + let wallet: PresaleWallet = serde_json::from_str(json).unwrap(); + assert_eq!(expected, wallet); + } +} diff --git a/ethstore/src/lib.rs b/ethstore/src/lib.rs index 19f1c4806..96d860db4 100644 --- a/ethstore/src/lib.rs +++ b/ethstore/src/lib.rs @@ -37,6 +37,7 @@ mod crypto; mod error; mod ethstore; mod import; +mod presale; mod random; mod secret_store; @@ -44,5 +45,6 @@ pub use self::account::SafeAccount; pub use self::error::Error; pub use self::ethstore::EthStore; pub use self::import::import_accounts; +pub use self::presale::PresaleWallet; pub use self::secret_store::SecretStore; diff --git a/ethstore/src/presale.rs b/ethstore/src/presale.rs new file mode 100644 index 000000000..5ba57b8d4 --- /dev/null +++ b/ethstore/src/presale.rs @@ -0,0 +1,80 @@ +use std::fs; +use std::path::Path; +use rcrypto::pbkdf2::pbkdf2; +use rcrypto::sha2::Sha256; +use rcrypto::hmac::Hmac; +use json; +use ethkey::{Address, Secret, KeyPair}; +use crypto::Keccak256; +use {crypto, Error}; + +pub struct PresaleWallet { + iv: [u8; 16], + ciphertext: [u8; 80], + address: Address, +} + +impl From for PresaleWallet { + fn from(wallet: json::PresaleWallet) -> Self { + let mut iv = [0u8; 16]; + iv.copy_from_slice(&wallet.encseed[..16]); + + let mut ciphertext = [0u8; 80]; + ciphertext.copy_from_slice(&wallet.encseed[16..]); + + PresaleWallet { + iv: iv, + ciphertext: ciphertext, + address: Address::from(wallet.address), + } + } +} + +impl PresaleWallet { + pub fn open

(path: P) -> Result where P: AsRef { + let file = try!(fs::File::open(path)); + let presale = json::PresaleWallet::load(file).unwrap(); + Ok(PresaleWallet::from(presale)) + } + + pub fn decrypt(&self, password: &str) -> Result { + let mut h_mac = Hmac::new(Sha256::new(), password.as_bytes()); + let mut derived_key = vec![0u8; 16]; + pbkdf2(&mut h_mac, password.as_bytes(), 2000, &mut derived_key); + + let mut key = [0u8; 64]; + crypto::aes::decrypt_cbc(&derived_key, &self.iv, &self.ciphertext, &mut key); + + let secret = Secret::from(key.keccak256()); + if let Ok(kp) = KeyPair::from_secret(secret) { + if kp.address() == self.address { + return Ok(kp) + } + } + + Err(Error::InvalidPassword) + } +} + +#[cfg(test)] +mod tests { + use ethkey::Address; + use super::PresaleWallet; + use json; + + #[test] + fn test() { + let json = r#" + { + "encseed": "137103c28caeebbcea5d7f95edb97a289ded151b72159137cb7b2671f394f54cff8c121589dcb373e267225547b3c71cbdb54f6e48ec85cd549f96cf0dedb3bc0a9ac6c79b9c426c5878ca2c9d06ff42a23cb648312fc32ba83649de0928e066", + "ethaddr": "ede84640d1a1d3e06902048e67aa7db8d52c2ce1", + "email": "123@gmail.com", + "btcaddr": "1JvqEc6WLhg6GnyrLBe2ztPAU28KRfuseH" + } "#; + + let wallet = json::PresaleWallet::load(json.as_bytes()).unwrap(); + let wallet = PresaleWallet::from(wallet); + let kp = wallet.decrypt("123").unwrap(); + assert_eq!(kp.address(), Address::from(wallet.address)); + } +} From bca4e23df691e0284539efceda98d66cc5611f0d Mon Sep 17 00:00:00 2001 From: Arkadiy Paronyan Date: Tue, 21 Jun 2016 15:56:00 +0200 Subject: [PATCH 14/15] Fixed panic on aborted connection (#1370) --- ethcore/src/block_queue.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/ethcore/src/block_queue.rs b/ethcore/src/block_queue.rs index 5e89641c0..ce99dcccd 100644 --- a/ethcore/src/block_queue.rs +++ b/ethcore/src/block_queue.rs @@ -130,7 +130,9 @@ impl QueueSignal { } if self.signalled.compare_and_swap(false, true, AtomicOrdering::Relaxed) == false { - self.message_channel.send(UserMessage(SyncMessage::BlockVerified)).expect("Error sending BlockVerified message"); + if let Err(e) = self.message_channel.send(UserMessage(SyncMessage::BlockVerified)) { + debug!("Error sending BlockVerified message: {:?}", e); + } } } From b2891fcdda0de9128772f10c684f90d9fb5df310 Mon Sep 17 00:00:00 2001 From: Arkadiy Paronyan Date: Tue, 21 Jun 2016 16:00:34 +0200 Subject: [PATCH 15/15] Update sealing on new transactions (#1365) --- ethcore/src/client/client.rs | 4 ++-- ethcore/src/client/test_client.rs | 2 +- ethcore/src/miner/miner.rs | 18 ++++++++++++------ ethcore/src/miner/mod.rs | 2 +- rpc/src/v1/tests/helpers/miner_service.rs | 2 +- 5 files changed, 17 insertions(+), 11 deletions(-) diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index bf67a8772..25e61a986 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -381,7 +381,7 @@ impl Client where V: Verifier { balance: self.latest_balance(a), }; let tx = transactions.iter().filter_map(|bytes| UntrustedRlp::new(&bytes).as_val().ok()).collect(); - let results = self.miner.import_transactions(tx, fetch_account); + let results = self.miner.import_transactions(self, tx, fetch_account); results.len() } @@ -771,7 +771,7 @@ impl BlockChainClient for Client where V: Verifier { nonce: self.latest_nonce(a), balance: self.latest_balance(a), }; - self.miner.import_transactions(transactions, fetch_account) + self.miner.import_transactions(self, transactions, fetch_account) } fn queue_transactions(&self, transactions: Vec) { diff --git a/ethcore/src/client/test_client.rs b/ethcore/src/client/test_client.rs index b6cc946fc..29f91a1d6 100644 --- a/ethcore/src/client/test_client.rs +++ b/ethcore/src/client/test_client.rs @@ -490,7 +490,7 @@ impl BlockChainClient for TestBlockChainClient { balance: balances[a], }; - self.miner.import_transactions(transactions, &fetch_account) + self.miner.import_transactions(self, transactions, &fetch_account) } fn queue_transactions(&self, transactions: Vec) { diff --git a/ethcore/src/miner/miner.rs b/ethcore/src/miner/miner.rs index 735ad5cf4..9840a682e 100644 --- a/ethcore/src/miner/miner.rs +++ b/ethcore/src/miner/miner.rs @@ -376,13 +376,19 @@ impl MinerService for Miner { *self.gas_floor_target.read().unwrap() } - fn import_transactions(&self, transactions: Vec, fetch_account: T) -> + fn import_transactions(&self, chain: &MiningBlockChainClient, transactions: Vec, fetch_account: T) -> Vec> where T: Fn(&Address) -> AccountDetails { - let mut transaction_queue = self.transaction_queue.lock().unwrap(); - transactions.into_iter() - .map(|tx| transaction_queue.add(tx, &fetch_account, TransactionOrigin::External)) - .collect() + let results: Vec> = { + let mut transaction_queue = self.transaction_queue.lock().unwrap(); + transactions.into_iter() + .map(|tx| transaction_queue.add(tx, &fetch_account, TransactionOrigin::External)) + .collect() + }; + if !results.is_empty() { + self.update_sealing(chain); + } + results } fn import_own_transaction( @@ -564,7 +570,7 @@ impl MinerService for Miner { for tx in &txs { let _sender = tx.sender(); } - let _ = self.import_transactions(txs, |a| AccountDetails { + let _ = self.import_transactions(chain, txs, |a| AccountDetails { nonce: chain.latest_nonce(a), balance: chain.latest_balance(a), }); diff --git a/ethcore/src/miner/mod.rs b/ethcore/src/miner/mod.rs index 71727f51d..697f7513f 100644 --- a/ethcore/src/miner/mod.rs +++ b/ethcore/src/miner/mod.rs @@ -94,7 +94,7 @@ pub trait MinerService : Send + Sync { fn set_transactions_limit(&self, limit: usize); /// Imports transactions to transaction queue. - fn import_transactions(&self, transactions: Vec, fetch_account: T) -> + fn import_transactions(&self, chain: &MiningBlockChainClient, transactions: Vec, fetch_account: T) -> Vec> where T: Fn(&Address) -> AccountDetails, Self: Sized; diff --git a/rpc/src/v1/tests/helpers/miner_service.rs b/rpc/src/v1/tests/helpers/miner_service.rs index daa5c9497..7ec7fc1cc 100644 --- a/rpc/src/v1/tests/helpers/miner_service.rs +++ b/rpc/src/v1/tests/helpers/miner_service.rs @@ -115,7 +115,7 @@ impl MinerService for TestMinerService { } /// Imports transactions to transaction queue. - fn import_transactions(&self, transactions: Vec, fetch_account: T) -> + fn import_transactions(&self, _chain: &MiningBlockChainClient, transactions: Vec, fetch_account: T) -> Vec> where T: Fn(&Address) -> AccountDetails { // lets assume that all txs are valid