diff --git a/ethcore/src/blockchain/block_info.rs b/ethcore/src/blockchain/block_info.rs index ce639bfed..cf16a8834 100644 --- a/ethcore/src/blockchain/block_info.rs +++ b/ethcore/src/blockchain/block_info.rs @@ -18,6 +18,7 @@ use util::numbers::{U256,H256}; use header::BlockNumber; /// Brief info about inserted block. +#[derive(Clone)] pub struct BlockInfo { /// Block hash. pub hash: H256, @@ -30,6 +31,7 @@ pub struct BlockInfo { } /// Describes location of newly inserted block. +#[derive(Clone)] pub enum BlockLocation { /// It's part of the canon chain. CanonChain, @@ -42,6 +44,8 @@ pub enum BlockLocation { /// Hash of the newest common ancestor with old canon chain. ancestor: H256, /// Hashes of the blocks between ancestor and this block. - route: Vec + enacted: Vec, + /// Hashes of the blocks which were invalidated. + retracted: Vec, } } diff --git a/ethcore/src/blockchain/blockchain.rs b/ethcore/src/blockchain/blockchain.rs index d7c9d7975..8c21532c8 100644 --- a/ethcore/src/blockchain/blockchain.rs +++ b/ethcore/src/blockchain/blockchain.rs @@ -28,7 +28,7 @@ use blockchain::best_block::BestBlock; use blockchain::bloom_indexer::BloomIndexer; use blockchain::tree_route::TreeRoute; use blockchain::update::ExtrasUpdate; -use blockchain::CacheSize; +use blockchain::{CacheSize, ImportRoute}; const BLOOM_INDEX_SIZE: usize = 16; const BLOOM_LEVELS: u8 = 3; @@ -414,14 +414,14 @@ impl BlockChain { /// Inserts the block into backing cache database. /// Expects the block to be valid and already verified. /// If the block is already known, does nothing. - pub fn insert_block(&self, bytes: &[u8], receipts: Vec) { + pub fn insert_block(&self, bytes: &[u8], receipts: Vec) -> ImportRoute { // create views onto rlp let block = BlockView::new(bytes); let header = block.header_view(); let hash = header.sha3(); if self.is_known(&hash) { - return; + return ImportRoute::none(); } // store block in db @@ -435,8 +435,10 @@ impl BlockChain { block_receipts: self.prepare_block_receipts_update(receipts, &info), transactions_addresses: self.prepare_transaction_addresses_update(bytes, &info), blocks_blooms: self.prepare_block_blooms_update(bytes, &info), - info: info + info: info.clone(), }); + + ImportRoute::from(info) } /// Applies extras update. @@ -549,9 +551,14 @@ impl BlockChain { match route.blocks.len() { 0 => BlockLocation::CanonChain, - _ => BlockLocation::BranchBecomingCanonChain { - ancestor: route.ancestor, - route: route.blocks.into_iter().skip(route.index).collect() + _ => { + let retracted = route.blocks.iter().take(route.index).cloned().collect::>(); + + BlockLocation::BranchBecomingCanonChain { + ancestor: route.ancestor, + enacted: route.blocks.into_iter().skip(route.index).collect(), + retracted: retracted.into_iter().rev().collect(), + } } } } else { @@ -572,11 +579,11 @@ impl BlockChain { BlockLocation::CanonChain => { block_hashes.insert(number, info.hash.clone()); }, - BlockLocation::BranchBecomingCanonChain { ref ancestor, ref route } => { + BlockLocation::BranchBecomingCanonChain { ref ancestor, ref enacted, .. } => { let ancestor_number = self.block_number(ancestor).unwrap(); let start_number = ancestor_number + 1; - for (index, hash) in route.iter().cloned().enumerate() { + for (index, hash) in enacted.iter().cloned().enumerate() { block_hashes.insert(start_number + index as BlockNumber, hash); } @@ -661,11 +668,11 @@ impl BlockChain { ChainFilter::new(self, self.bloom_indexer.index_size(), self.bloom_indexer.levels()) .add_bloom(&header.log_bloom(), header.number() as usize) }, - BlockLocation::BranchBecomingCanonChain { ref ancestor, ref route } => { + BlockLocation::BranchBecomingCanonChain { ref ancestor, ref enacted, .. } => { let ancestor_number = self.block_number(ancestor).unwrap(); let start_number = ancestor_number + 1; - let mut blooms: Vec = route.iter() + let mut blooms: Vec = enacted.iter() .map(|hash| self.block(hash).unwrap()) .map(|bytes| BlockView::new(&bytes).header_view().log_bloom()) .collect(); @@ -825,7 +832,7 @@ mod tests { use rustc_serialize::hex::FromHex; use util::hash::*; use util::sha3::Hashable; - use blockchain::{BlockProvider, BlockChain, BlockChainConfig}; + use blockchain::{BlockProvider, BlockChain, BlockChainConfig, ImportRoute}; use tests::helpers::*; use devtools::*; use blockchain::generator::{ChainGenerator, ChainIterator, BlockFinalizer}; @@ -943,10 +950,30 @@ mod tests { let temp = RandomTempPath::new(); let bc = BlockChain::new(BlockChainConfig::default(), &genesis, temp.as_path()); - bc.insert_block(&b1, vec![]); - bc.insert_block(&b2, vec![]); - bc.insert_block(&b3a, vec![]); - bc.insert_block(&b3b, vec![]); + let ir1 = bc.insert_block(&b1, vec![]); + let ir2 = bc.insert_block(&b2, vec![]); + let ir3b = bc.insert_block(&b3b, vec![]); + let ir3a = bc.insert_block(&b3a, vec![]); + + assert_eq!(ir1, ImportRoute { + enacted: vec![b1_hash], + retracted: vec![], + }); + + assert_eq!(ir2, ImportRoute { + enacted: vec![b2_hash], + retracted: vec![], + }); + + assert_eq!(ir3b, ImportRoute { + enacted: vec![b3b_hash], + retracted: vec![], + }); + + assert_eq!(ir3a, ImportRoute { + enacted: vec![b3a_hash], + retracted: vec![b3b_hash], + }); assert_eq!(bc.best_block_hash(), best_block_hash); assert_eq!(bc.block_number(&genesis_hash).unwrap(), 0); diff --git a/ethcore/src/blockchain/import_route.rs b/ethcore/src/blockchain/import_route.rs new file mode 100644 index 000000000..262b70899 --- /dev/null +++ b/ethcore/src/blockchain/import_route.rs @@ -0,0 +1,119 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Import route. + +use util::hash::H256; +use blockchain::block_info::{BlockInfo, BlockLocation}; + +/// Import route for newly inserted block. +#[derive(Debug, PartialEq)] +pub struct ImportRoute { + /// Blocks that were invalidated by new block. + pub retracted: Vec, + /// Blocks that were validated by new block. + pub enacted: Vec, +} + +impl ImportRoute { + pub fn none() -> Self { + ImportRoute { + retracted: vec![], + enacted: vec![], + } + } +} + +impl From for ImportRoute { + fn from(info: BlockInfo) -> ImportRoute { + match info.location { + BlockLocation::CanonChain => ImportRoute { + retracted: vec![], + enacted: vec![info.hash], + }, + BlockLocation::Branch => ImportRoute::none(), + BlockLocation::BranchBecomingCanonChain { mut enacted, retracted, .. } => { + enacted.push(info.hash); + ImportRoute { + retracted: retracted, + enacted: enacted, + } + } + } + } +} + +#[cfg(test)] +mod tests { + use util::hash::H256; + use util::numbers::U256; + use blockchain::block_info::{BlockInfo, BlockLocation}; + use blockchain::ImportRoute; + + #[test] + fn import_route_none() { + assert_eq!(ImportRoute::none(), ImportRoute { + enacted: vec![], + retracted: vec![], + }); + } + + #[test] + fn import_route_branch() { + let info = BlockInfo { + hash: H256::from(U256::from(1)), + number: 0, + total_difficulty: U256::from(0), + location: BlockLocation::Branch, + }; + + assert_eq!(ImportRoute::from(info), ImportRoute::none()); + } + + #[test] + fn import_route_canon_chain() { + let info = BlockInfo { + hash: H256::from(U256::from(1)), + number: 0, + total_difficulty: U256::from(0), + location: BlockLocation::CanonChain, + }; + + assert_eq!(ImportRoute::from(info), ImportRoute { + retracted: vec![], + enacted: vec![H256::from(U256::from(1))], + }); + } + + #[test] + fn import_route_branch_becoming_canon_chain() { + let info = BlockInfo { + hash: H256::from(U256::from(2)), + number: 0, + total_difficulty: U256::from(0), + location: BlockLocation::BranchBecomingCanonChain { + ancestor: H256::from(U256::from(0)), + enacted: vec![H256::from(U256::from(1))], + retracted: vec![H256::from(U256::from(3)), H256::from(U256::from(4))], + } + }; + + assert_eq!(ImportRoute::from(info), ImportRoute { + retracted: vec![H256::from(U256::from(3)), H256::from(U256::from(4))], + enacted: vec![H256::from(U256::from(1)), H256::from(U256::from(2))], + }); + } +} diff --git a/ethcore/src/blockchain/mod.rs b/ethcore/src/blockchain/mod.rs index b0679b563..6559d8364 100644 --- a/ethcore/src/blockchain/mod.rs +++ b/ethcore/src/blockchain/mod.rs @@ -25,7 +25,9 @@ mod tree_route; mod update; #[cfg(test)] mod generator; +mod import_route; pub use self::blockchain::{BlockProvider, BlockChain, BlockChainConfig}; pub use self::cache::CacheSize; pub use self::tree_route::TreeRoute; +pub use self::import_route::ImportRoute; diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index 2f9536b2e..4ef8bb029 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -283,7 +283,8 @@ impl Client where V: Verifier { .commit(header.number(), &header.hash(), ancient) .expect("State DB commit failed."); - // And update the chain + // And update the chain after commit to prevent race conditions + // (when something is in chain but you are not able to fetch details) self.chain.write().unwrap() .insert_block(&block.bytes, receipts); @@ -409,39 +410,6 @@ impl Client where V: Verifier { trace!("Sealing: number={}, hash={}, diff={}", b.hash(), b.block().header().difficulty(), b.block().header().number()); *self.sealing_block.lock().unwrap() = Some(b); } - - /// Grab the `ClosedBlock` that we want to be sealed. Comes as a mutex that you have to lock. - pub fn sealing_block(&self) -> &Mutex> { - if self.sealing_block.lock().unwrap().is_none() { - self.sealing_enabled.store(true, atomic::Ordering::Relaxed); - // TODO: Above should be on a timer that resets after two blocks have arrived without being asked for. - self.prepare_sealing(); - } - &self.sealing_block - } - - /// Submit `seal` as a valid solution for the header of `pow_hash`. - /// Will check the seal, but not actually insert the block into the chain. - pub fn submit_seal(&self, pow_hash: H256, seal: Vec) -> Result<(), Error> { - let mut maybe_b = self.sealing_block.lock().unwrap(); - match *maybe_b { - Some(ref b) if b.hash() == pow_hash => {} - _ => { return Err(Error::PowHashInvalid); } - } - - let b = maybe_b.take(); - match b.unwrap().try_seal(self.engine.deref().deref(), seal) { - Err(old) => { - *maybe_b = Some(old); - Err(Error::PowInvalid) - } - Ok(sealed) => { - // TODO: commit DB from `sealed.drain` and make a VerifiedBlock to skip running the transactions twice. - try!(self.import_block(sealed.rlp_bytes())); - Ok(()) - } - } - } } // TODO: need MinerService MinerIoHandler @@ -606,6 +574,39 @@ impl BlockChainClient for Client where V: Verifier { }) .collect() } + + /// Grab the `ClosedBlock` that we want to be sealed. Comes as a mutex that you have to lock. + fn sealing_block(&self) -> &Mutex> { + if self.sealing_block.lock().unwrap().is_none() { + self.sealing_enabled.store(true, atomic::Ordering::Relaxed); + // TODO: Above should be on a timer that resets after two blocks have arrived without being asked for. + self.prepare_sealing(); + } + &self.sealing_block + } + + /// Submit `seal` as a valid solution for the header of `pow_hash`. + /// Will check the seal, but not actually insert the block into the chain. + fn submit_seal(&self, pow_hash: H256, seal: Vec) -> Result<(), Error> { + let mut maybe_b = self.sealing_block.lock().unwrap(); + match *maybe_b { + Some(ref b) if b.hash() == pow_hash => {} + _ => { return Err(Error::PowHashInvalid); } + } + + let b = maybe_b.take(); + match b.unwrap().try_seal(self.engine.deref().deref(), seal) { + Err(old) => { + *maybe_b = Some(old); + Err(Error::PowInvalid) + } + Ok(sealed) => { + // TODO: commit DB from `sealed.drain` and make a VerifiedBlock to skip running the transactions twice. + try!(self.import_block(sealed.rlp_bytes())); + Ok(()) + } + } + } } impl MayPanic for Client { diff --git a/ethcore/src/client/mod.rs b/ethcore/src/client/mod.rs index 58a21f151..afdfb200a 100644 --- a/ethcore/src/client/mod.rs +++ b/ethcore/src/client/mod.rs @@ -26,16 +26,18 @@ pub use self::config::{ClientConfig, BlockQueueConfig, BlockChainConfig}; pub use self::ids::{BlockId, TransactionId}; pub use self::test_client::{TestBlockChainClient, EachBlockWith}; +use std::sync::Mutex; use util::bytes::Bytes; use util::hash::{Address, H256, H2048}; use util::numbers::U256; use blockchain::TreeRoute; use block_queue::BlockQueueInfo; +use block::ClosedBlock; use header::BlockNumber; use transaction::LocalizedTransaction; use log_entry::LocalizedLogEntry; use filter::Filter; -use error::ImportResult; +use error::{ImportResult, Error}; /// Blockchain database client. Owns and manages a blockchain and a block queue. pub trait BlockChainClient : Sync + Send { @@ -100,5 +102,12 @@ pub trait BlockChainClient : Sync + Send { /// Returns logs matching given filter. fn logs(&self, filter: Filter) -> Vec; + + /// Grab the `ClosedBlock` that we want to be sealed. Comes as a mutex that you have to lock. + fn sealing_block(&self) -> &Mutex>; + + /// Submit `seal` as a valid solution for the header of `pow_hash`. + /// Will check the seal, but not actually insert the block into the chain. + fn submit_seal(&self, pow_hash: H256, seal: Vec) -> Result<(), Error>; } diff --git a/ethcore/src/client/test_client.rs b/ethcore/src/client/test_client.rs index 4ca30dcd5..207f1090f 100644 --- a/ethcore/src/client/test_client.rs +++ b/ethcore/src/client/test_client.rs @@ -14,19 +14,9 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . -//use std::mem; -//use std::ops::{Deref, DerefMut}; -//use std::collections::HashMap; -//use rustc_serialize::hex::FromHex; -//use util::rlp; -//use util::rlp::*; -//use util::bytes::Bytes; -//use util::hash::{FixedHash, Address, H256, H2048}; -//use util::numbers::{Uint, U256}; -//use util::crypto::KeyPair; -//use util::sha3::Hashable; +//! Test client. + use util::*; -//use std::sync::RwLock; use transaction::{Transaction, LocalizedTransaction, Action}; use blockchain::TreeRoute; use client::{BlockChainClient, BlockChainInfo, BlockStatus, BlockId, TransactionId}; @@ -34,26 +24,39 @@ use header::{Header as BlockHeader, BlockNumber}; use filter::Filter; use log_entry::LocalizedLogEntry; use receipt::Receipt; -use error::ImportResult; +use error::{ImportResult, Error}; use block_queue::BlockQueueInfo; +use block::ClosedBlock; +/// Test client. pub struct TestBlockChainClient { + /// Blocks. pub blocks: RwLock>, + /// Mapping of numbers to hashes. pub numbers: RwLock>, + /// Genesis block hash. pub genesis_hash: H256, + /// Last block hash. pub last_hash: RwLock, + /// Difficulty. pub difficulty: RwLock, } #[derive(Clone)] +/// Used for generating test client blocks. pub enum EachBlockWith { + /// Plain block. Nothing, + /// Block with an uncle. Uncle, + /// Block with a transaction. Transaction, + /// Block with an uncle and transaction. UncleAndTransaction } impl TestBlockChainClient { + /// Creates new test client. pub fn new() -> TestBlockChainClient { let mut client = TestBlockChainClient { @@ -68,6 +71,7 @@ impl TestBlockChainClient { client } + /// Add blocks to test client. pub fn add_blocks(&mut self, count: usize, with: EachBlockWith) { let len = self.numbers.read().unwrap().len(); for n in len..(len + count) { @@ -115,6 +119,7 @@ impl TestBlockChainClient { } } + /// TODO: pub fn corrupt_block(&mut self, n: BlockNumber) { let hash = self.block_hash(BlockId::Number(n)).unwrap(); let mut header: BlockHeader = decode(&self.block_header(BlockId::Number(n)).unwrap()); @@ -126,6 +131,7 @@ impl TestBlockChainClient { self.blocks.write().unwrap().insert(hash, rlp.out()); } + /// TODO: pub fn block_hash_delta_minus(&mut self, delta: usize) -> H256 { let blocks_read = self.numbers.read().unwrap(); let index = blocks_read.len() - delta; @@ -171,6 +177,14 @@ impl BlockChainClient for TestBlockChainClient { unimplemented!(); } + fn sealing_block(&self) -> &Mutex> { + unimplemented!(); + } + + fn submit_seal(&self, _pow_hash: H256, _seal: Vec) -> Result<(), Error> { + unimplemented!(); + } + fn block_header(&self, id: BlockId) -> Option { self.block_hash(id).and_then(|hash| self.blocks.read().unwrap().get(&hash).map(|r| Rlp::new(r).at(0).as_raw().to_vec())) } diff --git a/parity/main.rs b/parity/main.rs index 68d45bc04..efff52e4e 100644 --- a/parity/main.rs +++ b/parity/main.rs @@ -49,7 +49,7 @@ use ethcore::spec::*; use ethcore::client::*; use ethcore::service::{ClientService, NetSyncMessage}; use ethcore::ethereum; -use ethsync::{EthSync, SyncConfig}; +use ethsync::{EthSync, SyncConfig, SyncProvider}; use docopt::Docopt; use daemonize::Daemonize; use number_prefix::{binary_prefix, Standalone, Prefixed}; @@ -79,7 +79,7 @@ Protocol Options: or olympic, frontier, homestead, mainnet, morden, or testnet [default: homestead]. --testnet Equivalent to --chain testnet (geth-compatible). --networkid INDEX Override the network identifier from the chain we are on. - --archive Client should not prune the state/storage trie. + --pruning Client should prune the state/storage trie. -d --datadir PATH Specify the database & configuration directory path [default: $HOME/.parity] --keys-path PATH Specify the path for JSON key files to be found [default: $HOME/.web3/keys] --identity NAME Specify your node's name. @@ -140,7 +140,7 @@ struct Args { flag_identity: String, flag_cache: Option, flag_keys_path: String, - flag_archive: bool, + flag_pruning: bool, flag_no_bootstrap: bool, flag_listen_address: String, flag_public_address: Option, @@ -402,7 +402,7 @@ impl Configuration { client_config.blockchain.max_cache_size = self.args.flag_cache_max_size; } } - client_config.prefer_journal = !self.args.flag_archive; + client_config.prefer_journal = self.args.flag_pruning; client_config.name = self.args.flag_identity.clone(); client_config.queue.max_mem_use = self.args.flag_queue_max_size; let mut service = ClientService::start(client_config, spec, net_settings, &Path::new(&self.path())).unwrap(); diff --git a/rpc/src/v1/helpers/poll_manager.rs b/rpc/src/v1/helpers/poll_manager.rs index 36a6352c2..0297384d1 100644 --- a/rpc/src/v1/helpers/poll_manager.rs +++ b/rpc/src/v1/helpers/poll_manager.rs @@ -84,7 +84,7 @@ impl PollManager where T: Timer { } /// Returns number of block when last poll happend. - pub fn get_poll_info(&mut self, id: &PollId) -> Option<&PollInfo> { + pub fn poll_info(&mut self, id: &PollId) -> Option<&PollInfo> { self.polls.prune(); self.polls.get(id) } @@ -124,21 +124,21 @@ mod tests { *time.borrow_mut() = 10; indexer.update_poll(&0, 21); - assert_eq!(indexer.get_poll_info(&0).unwrap().filter, false); - assert_eq!(indexer.get_poll_info(&0).unwrap().block_number, 21); + assert_eq!(indexer.poll_info(&0).unwrap().filter, false); + assert_eq!(indexer.poll_info(&0).unwrap().block_number, 21); *time.borrow_mut() = 30; indexer.update_poll(&1, 23); - assert_eq!(indexer.get_poll_info(&1).unwrap().filter, true); - assert_eq!(indexer.get_poll_info(&1).unwrap().block_number, 23); + assert_eq!(indexer.poll_info(&1).unwrap().filter, true); + assert_eq!(indexer.poll_info(&1).unwrap().block_number, 23); *time.borrow_mut() = 75; indexer.update_poll(&0, 30); - assert!(indexer.get_poll_info(&0).is_none()); - assert_eq!(indexer.get_poll_info(&1).unwrap().filter, true); - assert_eq!(indexer.get_poll_info(&1).unwrap().block_number, 23); + assert!(indexer.poll_info(&0).is_none()); + assert_eq!(indexer.poll_info(&1).unwrap().filter, true); + assert_eq!(indexer.poll_info(&1).unwrap().block_number, 23); indexer.remove_poll(&1); - assert!(indexer.get_poll_info(&1).is_none()); + assert!(indexer.poll_info(&1).is_none()); } } diff --git a/rpc/src/v1/impls/eth.rs b/rpc/src/v1/impls/eth.rs index 7113c55b1..a067b48fb 100644 --- a/rpc/src/v1/impls/eth.rs +++ b/rpc/src/v1/impls/eth.rs @@ -17,7 +17,7 @@ //! Eth rpc implementation. use std::collections::HashMap; use std::sync::{Arc, Weak, Mutex, RwLock}; -use ethsync::{EthSync, SyncState}; +use ethsync::{SyncProvider, SyncState}; use jsonrpc_core::*; use util::numbers::*; use util::sha3::*; @@ -25,7 +25,6 @@ use util::rlp::encode; use ethcore::client::*; use ethcore::block::{IsBlock}; use ethcore::views::*; -//#[macro_use] extern crate log; use ethcore::ethereum::Ethash; use ethcore::ethereum::denominations::shannon; use v1::traits::{Eth, EthFilter}; @@ -33,15 +32,15 @@ use v1::types::{Block, BlockTransactions, BlockNumber, Bytes, SyncStatus, SyncIn use v1::helpers::{PollFilter, PollManager}; /// Eth rpc implementation. -pub struct EthClient { - client: Weak, - sync: Weak, +pub struct EthClient where C: BlockChainClient, S: SyncProvider { + client: Weak, + sync: Weak, hashrates: RwLock>, } -impl EthClient { +impl EthClient where C: BlockChainClient, S: SyncProvider { /// Creates new EthClient. - pub fn new(client: &Arc, sync: &Arc) -> Self { + pub fn new(client: &Arc, sync: &Arc) -> Self { EthClient { client: Arc::downgrade(client), sync: Arc::downgrade(sync), @@ -95,7 +94,7 @@ impl EthClient { } } -impl Eth for EthClient { +impl Eth for EthClient where C: BlockChainClient + 'static, S: SyncProvider + 'static { fn protocol_version(&self, params: Params) -> Result { match params { Params::None => to_value(&U256::from(take_weak!(self.sync).status().protocol_version)), @@ -256,14 +255,14 @@ impl Eth for EthClient { } /// Eth filter rpc implementation. -pub struct EthFilterClient { - client: Weak, +pub struct EthFilterClient where C: BlockChainClient { + client: Weak, polls: Mutex>, } -impl EthFilterClient { +impl EthFilterClient where C: BlockChainClient { /// Creates new Eth filter client. - pub fn new(client: &Arc) -> Self { + pub fn new(client: &Arc) -> Self { EthFilterClient { client: Arc::downgrade(client), polls: Mutex::new(PollManager::new()) @@ -271,7 +270,7 @@ impl EthFilterClient { } } -impl EthFilter for EthFilterClient { +impl EthFilter for EthFilterClient where C: BlockChainClient + 'static { fn new_filter(&self, params: Params) -> Result { from_params::<(Filter,)>(params) .and_then(|(filter,)| { @@ -307,7 +306,7 @@ impl EthFilter for EthFilterClient { let client = take_weak!(self.client); from_params::<(Index,)>(params) .and_then(|(index,)| { - let info = self.polls.lock().unwrap().get_poll_info(&index.value()).cloned(); + let info = self.polls.lock().unwrap().poll_info(&index.value()).cloned(); match info { None => Ok(Value::Array(vec![] as Vec)), Some(info) => match info.filter { diff --git a/rpc/src/v1/impls/net.rs b/rpc/src/v1/impls/net.rs index 9e24caad2..5e67bf252 100644 --- a/rpc/src/v1/impls/net.rs +++ b/rpc/src/v1/impls/net.rs @@ -17,24 +17,24 @@ //! Net rpc implementation. use std::sync::{Arc, Weak}; use jsonrpc_core::*; -use ethsync::EthSync; +use ethsync::SyncProvider; use v1::traits::Net; /// Net rpc implementation. -pub struct NetClient { - sync: Weak +pub struct NetClient where S: SyncProvider { + sync: Weak } -impl NetClient { +impl NetClient where S: SyncProvider { /// Creates new NetClient. - pub fn new(sync: &Arc) -> Self { + pub fn new(sync: &Arc) -> Self { NetClient { sync: Arc::downgrade(sync) } } } -impl Net for NetClient { +impl Net for NetClient where S: SyncProvider + 'static { fn version(&self, _: Params) -> Result { Ok(Value::U64(take_weak!(self.sync).status().protocol_version as u64)) } diff --git a/sync/src/chain.rs b/sync/src/chain.rs index 391f79873..da3908a1e 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -43,6 +43,7 @@ use io::SyncIo; use transaction_queue::TransactionQueue; use time; use super::SyncConfig; +use ethcore; known_heap_size!(0, PeerInfo, Header, HeaderId); @@ -936,7 +937,7 @@ impl ChainSync { let mut transaction_queue = self.transaction_queue.lock().unwrap(); for i in 0..item_count { let tx: SignedTransaction = try!(r.val_at(i)); - transaction_queue.add(tx, &fetch_latest_nonce); + let _ = transaction_queue.add(tx, &fetch_latest_nonce); } Ok(()) } @@ -1292,7 +1293,7 @@ impl ChainSync { let _sender = tx.sender(); } let mut transaction_queue = self.transaction_queue.lock().unwrap(); - transaction_queue.add_all(txs, |a| chain.nonce(a)); + let _ = transaction_queue.add_all(txs, |a| chain.nonce(a)); }); } @@ -1301,6 +1302,13 @@ impl ChainSync { // TODO [todr] propagate transactions? } + /// Add transaction to the transaction queue + pub fn insert_transaction(&self, transaction: ethcore::transaction::SignedTransaction, fetch_nonce: &T) + where T: Fn(&Address) -> U256 + { + let mut queue = self.transaction_queue.lock().unwrap(); + queue.add(transaction, fetch_nonce); + } } #[cfg(test)] diff --git a/sync/src/lib.rs b/sync/src/lib.rs index 86c2fcbf5..3b79e5614 100644 --- a/sync/src/lib.rs +++ b/sync/src/lib.rs @@ -72,6 +72,7 @@ mod chain; mod io; mod range_collection; mod transaction_queue; +pub use transaction_queue::TransactionQueue; #[cfg(test)] mod tests; @@ -93,6 +94,14 @@ impl Default for SyncConfig { } } +/// Current sync status +pub trait SyncProvider: Send + Sync { + /// Get sync status + fn status(&self) -> SyncStatus; + /// Insert transaction in the sync transaction queue + fn insert_transaction(&self, transaction: ethcore::transaction::SignedTransaction); +} + /// Ethereum network protocol handler pub struct EthSync { /// Shared blockchain client. TODO: this should evetually become an IPC endpoint @@ -114,11 +123,6 @@ impl EthSync { sync } - /// Get sync status - pub fn status(&self) -> SyncStatus { - self.sync.read().unwrap().status() - } - /// Stop sync pub fn stop(&mut self, io: &mut NetworkContext) { self.sync.write().unwrap().abort(&mut NetSyncIo::new(io, self.chain.deref())); @@ -130,6 +134,22 @@ impl EthSync { } } +impl SyncProvider for EthSync { + /// Get sync status + fn status(&self) -> SyncStatus { + self.sync.read().unwrap().status() + } + + /// Insert transaction in transaction queue + fn insert_transaction(&self, transaction: ethcore::transaction::SignedTransaction) { + use util::numbers::*; + + let nonce_fn = |a: &Address| self.chain.state().nonce(a) + U256::one(); + let sync = self.sync.write().unwrap(); + sync.insert_transaction(transaction, &nonce_fn); + } +} + impl NetworkProtocolHandler for EthSync { fn initialize(&self, io: &NetworkContext) { io.register_timer(0, 1000).expect("Error registering sync timer"); diff --git a/sync/src/transaction_queue.rs b/sync/src/transaction_queue.rs index 3e0d931b5..243939a4c 100644 --- a/sync/src/transaction_queue.rs +++ b/sync/src/transaction_queue.rs @@ -17,6 +17,67 @@ // TODO [todr] - own transactions should have higher priority //! Transaction Queue +//! +//! TransactionQueue keeps track of all transactions seen by the node (received from other peers) and own transactions +//! and orders them by priority. Top priority transactions are those with low nonce height (difference between +//! transaction's nonce and next nonce expected from this sender). If nonces are equal transaction's gas price is used +//! for comparison (higher gas price = higher priority). +//! +//! # Usage Example +//! +//! ```rust +//! extern crate ethcore_util as util; +//! extern crate ethcore; +//! extern crate ethsync; +//! extern crate rustc_serialize; +//! +//! use util::crypto::KeyPair; +//! use util::hash::Address; +//! use util::numbers::{Uint, U256}; +//! use ethsync::TransactionQueue; +//! use ethcore::transaction::*; +//! use rustc_serialize::hex::FromHex; +//! +//! fn main() { +//! let key = KeyPair::create().unwrap(); +//! let t1 = Transaction { action: Action::Create, value: U256::from(100), data: "3331600055".from_hex().unwrap(), +//! gas: U256::from(100_000), gas_price: U256::one(), nonce: U256::from(10) }; +//! let t2 = Transaction { action: Action::Create, value: U256::from(100), data: "3331600055".from_hex().unwrap(), +//! gas: U256::from(100_000), gas_price: U256::one(), nonce: U256::from(11) }; +//! +//! let st1 = t1.sign(&key.secret()); +//! let st2 = t2.sign(&key.secret()); +//! let default_nonce = |_a: &Address| U256::from(10); +//! +//! let mut txq = TransactionQueue::new(); +//! txq.add(st2.clone(), &default_nonce); +//! txq.add(st1.clone(), &default_nonce); +//! +//! // Check status +//! assert_eq!(txq.status().pending, 2); +//! // Check top transactions +//! let top = txq.top_transactions(3); +//! assert_eq!(top.len(), 2); +//! assert_eq!(top[0], st1); +//! assert_eq!(top[1], st2); +//! +//! // And when transaction is removed (but nonce haven't changed) +//! // it will move invalid transactions to future +//! txq.remove(&st1.hash(), &default_nonce); +//! assert_eq!(txq.status().pending, 0); +//! assert_eq!(txq.status().future, 1); +//! assert_eq!(txq.top_transactions(3).len(), 0); +//! } +//! ``` +//! +//! # Maintaing valid state +//! +//! 1. Whenever transaction is imported to queue (to queue) all other transactions from this sender are revalidated in current. It means that they are moved to future and back again (height recalculation & gap filling). +//! 2. Whenever transaction is removed: +//! - When it's removed from `future` - all `future` transactions heights are recalculated and then +//! we check if the transactions should go to `current` (comparing state nonce) +//! - When it's removed from `current` - all transactions from this sender (`current` & `future`) are recalculated. +//! use std::cmp::{Ordering}; use std::collections::{HashMap, BTreeSet}; @@ -24,12 +85,20 @@ use util::numbers::{Uint, U256}; use util::hash::{Address, H256}; use util::table::*; use ethcore::transaction::*; +use ethcore::error::Error; #[derive(Clone, Debug)] +/// Light structure used to identify transaction and it's order struct TransactionOrder { + /// Primary ordering factory. Difference between transaction nonce and expected nonce in state + /// (e.g. Tx(nonce:5), State(nonce:0) -> height: 5) + /// High nonce_height = Low priority (processed later) nonce_height: U256, + /// Gas Price of the transaction. + /// Low gas price = Low priority (processed later) gas_price: U256, + /// Hash to identify associated transaction hash: H256, } @@ -70,7 +139,7 @@ impl Ord for TransactionOrder { let a_gas = self.gas_price; let b_gas = b.gas_price; if a_gas != b_gas { - return a_gas.cmp(&b_gas); + return b_gas.cmp(&a_gas); } // Compare hashes @@ -78,14 +147,16 @@ impl Ord for TransactionOrder { } } +/// Verified transaction (with sender) struct VerifiedTransaction { transaction: SignedTransaction } impl VerifiedTransaction { - fn new(transaction: SignedTransaction) -> Self { - VerifiedTransaction { + fn new(transaction: SignedTransaction) -> Result { + try!(transaction.sender()); + Ok(VerifiedTransaction { transaction: transaction - } + }) } fn hash(&self) -> H256 { @@ -101,6 +172,11 @@ impl VerifiedTransaction { } } +/// Holds transactions accessible by (address, nonce) and by priority +/// +/// TransactionSet keeps number of entries below limit, but it doesn't +/// automatically happen during `insert/remove` operations. +/// You have to call `enforce_limit` to remove lowest priority transactions from set. struct TransactionSet { by_priority: BTreeSet, by_address: Table, @@ -108,11 +184,15 @@ struct TransactionSet { } impl TransactionSet { + /// Inserts `TransactionOrder` to this set fn insert(&mut self, sender: Address, nonce: U256, order: TransactionOrder) -> Option { self.by_priority.insert(order.clone()); self.by_address.insert(sender, nonce, order) } + /// Remove low priority transactions if there is more then specified by given `limit`. + /// + /// It drops transactions from this set but also removes associated `VerifiedTransaction`. fn enforce_limit(&mut self, by_hash: &mut HashMap) { let len = self.by_priority.len(); if len <= self.limit { @@ -134,6 +214,7 @@ impl TransactionSet { } } + /// Drop transaction from this set (remove from `by_priority` and `by_address`) fn drop(&mut self, sender: &Address, nonce: &U256) -> Option { if let Some(tx_order) = self.by_address.remove(sender, nonce) { self.by_priority.remove(&tx_order); @@ -142,12 +223,15 @@ impl TransactionSet { None } + /// Drop all transactions. fn clear(&mut self) { self.by_priority.clear(); self.by_address.clear(); } } +// Will be used when rpc merged +#[allow(dead_code)] #[derive(Debug)] /// Current status of the queue pub struct TransactionQueueStatus { @@ -196,6 +280,8 @@ impl TransactionQueue { } } + // Will be used when rpc merged + #[allow(dead_code)] /// Returns current status for this queue pub fn status(&self) -> TransactionQueueStatus { TransactionQueueStatus { @@ -205,17 +291,19 @@ impl TransactionQueue { } /// Adds all signed transactions to queue to be verified and imported - pub fn add_all(&mut self, txs: Vec, fetch_nonce: T) + pub fn add_all(&mut self, txs: Vec, fetch_nonce: T) -> Result<(), Error> where T: Fn(&Address) -> U256 { for tx in txs.into_iter() { - self.add(tx, &fetch_nonce); + try!(self.add(tx, &fetch_nonce)); } + Ok(()) } /// Add signed transaction to queue to be verified and imported - pub fn add(&mut self, tx: SignedTransaction, fetch_nonce: &T) + pub fn add(&mut self, tx: SignedTransaction, fetch_nonce: &T) -> Result<(), Error> where T: Fn(&Address) -> U256 { - self.import_tx(VerifiedTransaction::new(tx), fetch_nonce); + self.import_tx(try!(VerifiedTransaction::new(tx)), fetch_nonce); + Ok(()) } /// Removes all transactions identified by hashes given in slice @@ -260,6 +348,8 @@ impl TransactionQueue { // We will either move transaction to future or remove it completely // so there will be no transactions from this sender in current self.last_nonces.remove(&sender); + // First update height of transactions in future to avoid collisions + self.update_future(&sender, current_nonce); // This should move all current transactions to future and remove old transactions self.move_all_to_future(&sender, current_nonce); // And now lets check if there is some chain of transactions in future @@ -269,6 +359,7 @@ impl TransactionQueue { } } + /// Update height of all transactions in future transactions set. fn update_future(&mut self, sender: &Address, current_nonce: U256) { // We need to drain all transactions for current sender from future and reinsert them with updated height let all_nonces_from_sender = match self.future.by_address.row(&sender) { @@ -277,10 +368,17 @@ impl TransactionQueue { }; for k in all_nonces_from_sender { let order = self.future.drop(&sender, &k).unwrap(); - self.future.insert(sender.clone(), k, order.update_height(k, current_nonce)); + if k >= current_nonce { + self.future.insert(sender.clone(), k, order.update_height(k, current_nonce)); + } else { + // Remove the transaction completely + self.by_hash.remove(&order.hash); + } } } + /// Drop all transactions from given sender from `current`. + /// Either moves them to `future` or removes them from queue completely. fn move_all_to_future(&mut self, sender: &Address, current_nonce: U256) { let all_nonces_from_sender = match self.current.by_address.row(&sender) { Some(row_map) => row_map.keys().cloned().collect::>(), @@ -299,8 +397,9 @@ impl TransactionQueue { self.future.enforce_limit(&mut self.by_hash); } - - /// Returns top transactions from the queue + // Will be used when mining merged + #[allow(dead_code)] + /// Returns top transactions from the queue ordered by priority. pub fn top_transactions(&self, size: usize) -> Vec { self.current.by_priority .iter() @@ -318,6 +417,8 @@ impl TransactionQueue { self.last_nonces.clear(); } + /// Checks if there are any transactions in `future` that should actually be promoted to `current` + /// (because nonce matches). fn move_matching_future_to_current(&mut self, address: Address, mut current_nonce: U256, first_nonce: U256) { { let by_nonce = self.future.by_address.row_mut(&address); @@ -339,6 +440,14 @@ impl TransactionQueue { self.last_nonces.insert(address, current_nonce - U256::one()); } + /// Adds VerifiedTransaction to this queue. + /// + /// Determines if it should be placed in current or future. When transaction is + /// imported to `current` also checks if there are any `future` transactions that should be promoted because of + /// this. + /// + /// It ignores transactions that has already been imported (same `hash`) and replaces the transaction + /// iff `(address, nonce)` is the same but `gas_price` is higher. fn import_tx(&mut self, tx: VerifiedTransaction, fetch_nonce: &T) where T: Fn(&Address) -> U256 { @@ -377,6 +486,10 @@ impl TransactionQueue { self.current.enforce_limit(&mut self.by_hash); } + /// Replaces transaction in given set (could be `future` or `current`). + /// + /// If there is already transaction with same `(sender, nonce)` it will be replaced iff `gas_price` is higher. + /// One of the transactions is dropped from set and also removed from queue entirely (from `by_hash`). fn replace_transaction(tx: VerifiedTransaction, base_nonce: U256, set: &mut TransactionSet, by_hash: &mut HashMap) { let order = TransactionOrder::for_transaction(&tx, base_nonce); let hash = tx.hash(); @@ -407,13 +520,8 @@ impl TransactionQueue { #[cfg(test)] mod test { extern crate rustc_serialize; - use self::rustc_serialize::hex::FromHex; - use std::ops::Deref; - use std::collections::{HashMap, BTreeSet}; - use util::crypto::KeyPair; - use util::numbers::{U256, Uint}; - use util::hash::{Address}; use util::table::*; + use util::*; use ethcore::transaction::*; use super::*; use super::{TransactionSet, TransactionOrder, VerifiedTransaction}; @@ -457,12 +565,12 @@ mod test { limit: 1 }; let (tx1, tx2) = new_txs(U256::from(1)); - let tx1 = VerifiedTransaction::new(tx1); - let tx2 = VerifiedTransaction::new(tx2); + let tx1 = VerifiedTransaction::new(tx1).unwrap(); + let tx2 = VerifiedTransaction::new(tx2).unwrap(); let mut by_hash = { let mut x = HashMap::new(); - let tx1 = VerifiedTransaction::new(tx1.transaction.clone()); - let tx2 = VerifiedTransaction::new(tx2.transaction.clone()); + let tx1 = VerifiedTransaction::new(tx1.transaction.clone()).unwrap(); + let tx2 = VerifiedTransaction::new(tx2.transaction.clone()).unwrap(); x.insert(tx1.hash(), tx1); x.insert(tx2.hash(), tx2); x @@ -496,13 +604,39 @@ mod test { let tx = new_tx(); // when - txq.add(tx, &default_nonce); + let res = txq.add(tx, &default_nonce); // then + assert!(res.is_ok()); let stats = txq.status(); assert_eq!(stats.pending, 1); } + #[test] + fn should_reject_incorectly_signed_transaction() { + // given + let mut txq = TransactionQueue::new(); + let tx = new_unsigned_tx(U256::from(123)); + let stx = { + let mut s = RlpStream::new_list(9); + s.append(&tx.nonce); + s.append(&tx.gas_price); + s.append(&tx.gas); + s.append_empty_data(); // action=create + s.append(&tx.value); + s.append(&tx.data); + s.append(&0u64); // v + s.append(&U256::zero()); // r + s.append(&U256::zero()); // s + decode(s.as_raw()) + }; + // when + let res = txq.add(stx, &default_nonce); + + // then + assert!(res.is_err()); + } + #[test] fn should_import_txs_from_same_sender() { // given @@ -511,8 +645,8 @@ mod test { let (tx, tx2) = new_txs(U256::from(1)); // when - txq.add(tx.clone(), &default_nonce); - txq.add(tx2.clone(), &default_nonce); + txq.add(tx.clone(), &default_nonce).unwrap(); + txq.add(tx2.clone(), &default_nonce).unwrap(); // then let top = txq.top_transactions(5); @@ -529,8 +663,8 @@ mod test { let (tx, tx2) = new_txs(U256::from(2)); // when - txq.add(tx.clone(), &default_nonce); - txq.add(tx2.clone(), &default_nonce); + txq.add(tx.clone(), &default_nonce).unwrap(); + txq.add(tx2.clone(), &default_nonce).unwrap(); // then let stats = txq.status(); @@ -541,6 +675,28 @@ mod test { assert_eq!(top[0], tx); } + #[test] + fn should_correctly_update_futures_when_removing() { + // given + let prev_nonce = |a: &Address| default_nonce(a) - U256::one(); + let next2_nonce = |a: &Address| default_nonce(a) + U256::from(2); + + let mut txq = TransactionQueue::new(); + + let (tx, tx2) = new_txs(U256::from(1)); + txq.add(tx.clone(), &prev_nonce); + txq.add(tx2.clone(), &prev_nonce); + assert_eq!(txq.status().future, 2); + + // when + txq.remove(&tx.hash(), &next2_nonce); + // should remove both transactions since they are not valid + + // then + assert_eq!(txq.status().pending, 0); + assert_eq!(txq.status().future, 0); + } + #[test] fn should_move_transactions_if_gap_filled() { // given @@ -551,13 +707,13 @@ mod test { let tx1 = new_unsigned_tx(U256::from(124)).sign(&secret); let tx2 = new_unsigned_tx(U256::from(125)).sign(&secret); - txq.add(tx, &default_nonce); + txq.add(tx, &default_nonce).unwrap(); assert_eq!(txq.status().pending, 1); - txq.add(tx2, &default_nonce); + txq.add(tx2, &default_nonce).unwrap(); assert_eq!(txq.status().future, 1); // when - txq.add(tx1, &default_nonce); + txq.add(tx1, &default_nonce).unwrap(); // then let stats = txq.status(); @@ -570,8 +726,8 @@ mod test { // given let mut txq2 = TransactionQueue::new(); let (tx, tx2) = new_txs(U256::from(3)); - txq2.add(tx.clone(), &default_nonce); - txq2.add(tx2.clone(), &default_nonce); + txq2.add(tx.clone(), &default_nonce).unwrap(); + txq2.add(tx2.clone(), &default_nonce).unwrap(); assert_eq!(txq2.status().pending, 1); assert_eq!(txq2.status().future, 1); @@ -592,10 +748,10 @@ mod test { let mut txq = TransactionQueue::new(); let (tx, tx2) = new_txs(U256::from(1)); let tx3 = new_tx(); - txq.add(tx2.clone(), &default_nonce); + txq.add(tx2.clone(), &default_nonce).unwrap(); assert_eq!(txq.status().future, 1); - txq.add(tx3.clone(), &default_nonce); - txq.add(tx.clone(), &default_nonce); + txq.add(tx3.clone(), &default_nonce).unwrap(); + txq.add(tx.clone(), &default_nonce).unwrap(); assert_eq!(txq.status().pending, 3); // when @@ -614,8 +770,8 @@ mod test { let (tx, tx2) = new_txs(U256::one()); // add - txq.add(tx2.clone(), &default_nonce); - txq.add(tx.clone(), &default_nonce); + txq.add(tx2.clone(), &default_nonce).unwrap(); + txq.add(tx.clone(), &default_nonce).unwrap(); let stats = txq.status(); assert_eq!(stats.pending, 2); @@ -632,11 +788,11 @@ mod test { // given let mut txq = TransactionQueue::with_limits(1, 1); let (tx, tx2) = new_txs(U256::one()); - txq.add(tx.clone(), &default_nonce); + txq.add(tx.clone(), &default_nonce).unwrap(); assert_eq!(txq.status().pending, 1); // when - txq.add(tx2.clone(), &default_nonce); + txq.add(tx2.clone(), &default_nonce).unwrap(); // then let t = txq.top_transactions(2); @@ -650,14 +806,14 @@ mod test { let mut txq = TransactionQueue::with_limits(10, 1); let (tx1, tx2) = new_txs(U256::from(4)); let (tx3, tx4) = new_txs(U256::from(4)); - txq.add(tx1.clone(), &default_nonce); - txq.add(tx3.clone(), &default_nonce); + txq.add(tx1.clone(), &default_nonce).unwrap(); + txq.add(tx3.clone(), &default_nonce).unwrap(); assert_eq!(txq.status().pending, 2); // when - txq.add(tx2.clone(), &default_nonce); + txq.add(tx2.clone(), &default_nonce).unwrap(); assert_eq!(txq.status().future, 1); - txq.add(tx4.clone(), &default_nonce); + txq.add(tx4.clone(), &default_nonce).unwrap(); // then assert_eq!(txq.status().future, 1); @@ -671,7 +827,7 @@ mod test { let fetch_last_nonce = |_a: &Address| last_nonce; // when - txq.add(tx, &fetch_last_nonce); + txq.add(tx, &fetch_last_nonce).unwrap(); // then let stats = txq.status(); @@ -685,12 +841,12 @@ mod test { let nonce = |a: &Address| default_nonce(a) + U256::one(); let mut txq = TransactionQueue::new(); let (_tx1, tx2) = new_txs(U256::from(1)); - txq.add(tx2.clone(), &default_nonce); + txq.add(tx2.clone(), &default_nonce).unwrap(); assert_eq!(txq.status().future, 1); assert_eq!(txq.status().pending, 0); // when - txq.add(tx2.clone(), &nonce); + txq.add(tx2.clone(), &nonce).unwrap(); // then let stats = txq.status(); @@ -703,15 +859,15 @@ mod test { // given let mut txq = TransactionQueue::new(); let (tx1, tx2) = new_txs(U256::from(1)); - txq.add(tx1.clone(), &default_nonce); - txq.add(tx2.clone(), &default_nonce); + txq.add(tx1.clone(), &default_nonce).unwrap(); + txq.add(tx2.clone(), &default_nonce).unwrap(); assert_eq!(txq.status().pending, 2); // when txq.remove(&tx1.hash(), &default_nonce); assert_eq!(txq.status().pending, 0); assert_eq!(txq.status().future, 1); - txq.add(tx1.clone(), &default_nonce); + txq.add(tx1.clone(), &default_nonce).unwrap(); // then let stats = txq.status(); @@ -726,10 +882,10 @@ mod test { let mut txq = TransactionQueue::new(); let (tx, tx2) = new_txs(U256::from(1)); let tx3 = new_tx(); - txq.add(tx2.clone(), &default_nonce); + txq.add(tx2.clone(), &default_nonce).unwrap(); assert_eq!(txq.status().future, 1); - txq.add(tx3.clone(), &default_nonce); - txq.add(tx.clone(), &default_nonce); + txq.add(tx3.clone(), &default_nonce).unwrap(); + txq.add(tx.clone(), &default_nonce).unwrap(); assert_eq!(txq.status().pending, 3); // when @@ -754,8 +910,8 @@ mod test { }; // when - txq.add(tx, &default_nonce); - txq.add(tx2, &default_nonce); + txq.add(tx, &default_nonce).unwrap(); + txq.add(tx2, &default_nonce).unwrap(); // then let stats = txq.status(); @@ -782,10 +938,10 @@ mod test { }; // when - txq.add(tx1, &default_nonce); - txq.add(tx2, &default_nonce); + txq.add(tx1, &default_nonce).unwrap(); + txq.add(tx2, &default_nonce).unwrap(); assert_eq!(txq.status().future, 1); - txq.add(tx0, &default_nonce); + txq.add(tx0, &default_nonce).unwrap(); // then let stats = txq.status(); @@ -801,8 +957,8 @@ mod test { let next_nonce = |a: &Address| default_nonce(a) + U256::one(); let mut txq = TransactionQueue::new(); let (tx1, tx2) = new_txs(U256::one()); - txq.add(tx1.clone(), &previous_nonce); - txq.add(tx2, &previous_nonce); + txq.add(tx1.clone(), &previous_nonce).unwrap(); + txq.add(tx2, &previous_nonce).unwrap(); assert_eq!(txq.status().future, 2); // when diff --git a/util/src/io/service.rs b/util/src/io/service.rs index 83fa71b8a..8a34ee80a 100644 --- a/util/src/io/service.rs +++ b/util/src/io/service.rs @@ -153,7 +153,7 @@ struct UserTimer { pub struct IoManager where Message: Send + Sync { timers: Arc>>, handlers: Vec>>, - _workers: Vec, + workers: Vec, worker_channel: chase_lev::Worker>, work_ready: Arc, } @@ -180,7 +180,7 @@ impl IoManager where Message: Send + Sync + Clone + 'static { timers: Arc::new(RwLock::new(HashMap::new())), handlers: Vec::new(), worker_channel: worker, - _workers: workers, + workers: workers, work_ready: work_ready, }; try!(event_loop.run(&mut io)); @@ -230,7 +230,10 @@ impl Handler for IoManager where Message: Send + Clone + Sync fn notify(&mut self, event_loop: &mut EventLoop, msg: Self::Message) { match msg { - IoMessage::Shutdown => event_loop.shutdown(), + IoMessage::Shutdown => { + self.workers.clear(); + event_loop.shutdown(); + }, IoMessage::AddHandler { handler } => { let handler_id = { self.handlers.push(handler.clone());