diff --git a/.travis.yml b/.travis.yml index 7213b8f09..0c614ca5d 100644 --- a/.travis.yml +++ b/.travis.yml @@ -14,11 +14,11 @@ matrix: - rust: nightly include: - rust: stable - env: FEATURES="--features travis-beta" KCOV_FEATURES="" TARGETS="-p ethash -p ethcore-util -p ethcore -p ethsync -p ethcore-rpc -p parity" ARCHIVE_SUFFIX="-${TRAVIS_OS_NAME}-${TRAVIS_TAG}" + env: FEATURES="--features travis-beta" KCOV_FEATURES="" TARGETS="-p ethash -p ethcore-util -p ethcore -p ethsync -p ethcore-rpc -p parity -p ethminer" ARCHIVE_SUFFIX="-${TRAVIS_OS_NAME}-${TRAVIS_TAG}" - rust: beta - env: FEATURES="--features travis-beta" KCOV_FEATURES="" TARGETS="-p ethash -p ethcore-util -p ethcore -p ethsync -p ethcore-rpc -p parity" ARCHIVE_SUFFIX="-${TRAVIS_OS_NAME}-${TRAVIS_TAG}" + env: FEATURES="--features travis-beta" KCOV_FEATURES="" TARGETS="-p ethash -p ethcore-util -p ethcore -p ethsync -p ethcore-rpc -p parity -p ethminer" ARCHIVE_SUFFIX="-${TRAVIS_OS_NAME}-${TRAVIS_TAG}" - rust: nightly - env: FEATURES="--features travis-nightly" KCOV_FEATURES="" TARGETS="-p ethash -p ethcore-util -p ethcore -p ethsync -p ethcore-rpc -p parity" ARCHIVE_SUFFIX="-${TRAVIS_OS_NAME}-${TRAVIS_TAG}" + env: FEATURES="--features travis-nightly" KCOV_FEATURES="" TARGETS="-p ethash -p ethcore-util -p ethcore -p ethsync -p ethcore-rpc -p parity -p ethminer" ARCHIVE_SUFFIX="-${TRAVIS_OS_NAME}-${TRAVIS_TAG}" cache: apt: true directories: @@ -51,6 +51,7 @@ after_success: | ./kcov-master/tmp/usr/local/bin/kcov --exclude-pattern /usr/,/.cargo,/root/.multirust,src/tests,util/json-tests,util/src/network/tests,sync/src/tests,ethcore/src/tests,ethcore/src/evm/tests target/kcov target/debug/deps/ethcore-* && ./kcov-master/tmp/usr/local/bin/kcov --exclude-pattern /usr/,/.cargo,/root/.multirust,src/tests,util/json-tests,util/src/network/tests,sync/src/tests,ethcore/src/tests,ethcore/src/evm/tests target/kcov target/debug/deps/ethsync-* && ./kcov-master/tmp/usr/local/bin/kcov --exclude-pattern /usr/,/.cargo,/root/.multirust,src/tests,util/json-tests,util/src/network/tests,sync/src/tests,ethcore/src/tests,ethcore/src/evm/tests target/kcov target/debug/deps/ethcore_rpc-* && + ./kcov-master/tmp/usr/local/bin/kcov --exclude-pattern /usr/,/.cargo,/root/.multirust,src/tests,util/json-tests,util/src/network/tests,sync/src/tests,ethcore/src/tests,ethcore/src/evm/tests target/kcov target/debug/deps/ethminer-* && ./kcov-master/tmp/usr/local/bin/kcov --coveralls-id=${TRAVIS_JOB_ID} --exclude-pattern /usr/,/.cargo,/root/.multirust target/kcov target/debug/parity-* && [ $TRAVIS_BRANCH = master ] && [ $TRAVIS_PULL_REQUEST = false ] && diff --git a/Cargo.lock b/Cargo.lock index 624043e30..8bf57cb6a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11,6 +11,7 @@ dependencies = [ "ethcore-devtools 0.9.99", "ethcore-rpc 0.9.99", "ethcore-util 0.9.99", + "ethminer 0.9.99", "ethsync 0.9.99", "fdlimit 0.1.0", "log 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", @@ -237,6 +238,7 @@ dependencies = [ "ethash 0.9.99", "ethcore 0.9.99", "ethcore-util 0.9.99", + "ethminer 0.9.99", "ethsync 0.9.99", "jsonrpc-core 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "jsonrpc-http-server 3.0.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -284,6 +286,20 @@ dependencies = [ "vergen 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "ethminer" +version = "0.9.99" +dependencies = [ + "clippy 0.0.50 (registry+https://github.com/rust-lang/crates.io-index)", + "env_logger 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", + "ethcore 0.9.99", + "ethcore-util 0.9.99", + "log 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", + "rayon 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "rustc-serialize 0.3.18 (registry+https://github.com/rust-lang/crates.io-index)", + "rustc_version 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "ethsync" version = "0.9.99" @@ -292,11 +308,10 @@ dependencies = [ "env_logger 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", "ethcore 0.9.99", "ethcore-util 0.9.99", + "ethminer 0.9.99", "heapsize 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)", - "rayon 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", - "rustc-serialize 0.3.18 (registry+https://github.com/rust-lang/crates.io-index)", "time 0.1.34 (registry+https://github.com/rust-lang/crates.io-index)", ] diff --git a/Cargo.toml b/Cargo.toml index efe794d5b..351041119 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,18 +19,19 @@ ctrlc = { git = "https://github.com/tomusdrw/rust-ctrlc.git" } fdlimit = { path = "util/fdlimit" } daemonize = "0.2" number_prefix = "0.2" +rpassword = "0.1" clippy = { version = "0.0.50", optional = true } ethcore = { path = "ethcore" } ethcore-util = { path = "util" } ethsync = { path = "sync" } +ethminer = { path = "miner" } ethcore-devtools = { path = "devtools" } ethcore-rpc = { path = "rpc", optional = true } -rpassword = "0.1" [features] default = ["rpc"] rpc = ["ethcore-rpc"] -dev = ["clippy", "ethcore/dev", "ethcore-util/dev", "ethsync/dev", "ethcore-rpc/dev"] +dev = ["clippy", "ethcore/dev", "ethcore-util/dev", "ethsync/dev", "ethcore-rpc/dev", "ethminer/dev"] travis-beta = ["ethcore/json-tests"] travis-nightly = ["ethcore/json-tests", "dev"] diff --git a/cov.sh b/cov.sh index a1fa29e46..d60ef223d 100755 --- a/cov.sh +++ b/cov.sh @@ -15,12 +15,23 @@ if ! type kcov > /dev/null; then exit 1 fi -cargo test -p ethash -p ethcore-util -p ethcore -p ethsync -p ethcore-rpc -p parity --no-run || exit $? +cargo test \ + -p ethash \ + -p ethcore-util \ + -p ethcore \ + -p ethsync \ + -p ethcore-rpc \ + -p parity \ + -p ethminer \ + --no-run || exit $? rm -rf target/coverage mkdir -p target/coverage -kcov --exclude-pattern ~/.multirust,rocksdb,secp256k1,src/tests,util/json-tests,util/src/network/tests,sync/src/tests,ethcore/src/tests,ethcore/src/evm/tests --include-pattern src --verify target/coverage target/debug/deps/ethcore-* -kcov --exclude-pattern ~/.multirust,rocksdb,secp256k1,src/tests,util/json-tests,util/src/network/tests,sync/src/tests,ethcore/src/tests,ethcore/src/evm/tests --include-pattern src --verify target/coverage target/debug/deps/ethash-* -kcov --exclude-pattern ~/.multirust,rocksdb,secp256k1,src/tests,util/json-tests,util/src/network/tests,sync/src/tests,ethcore/src/tests,ethcore/src/evm/tests --include-pattern src --verify target/coverage target/debug/deps/ethcore_util-* -kcov --exclude-pattern ~/.multirust,rocksdb,secp256k1,src/tests,util/json-tests,util/src/network/tests,sync/src/tests,ethcore/src/tests,ethcore/src/evm/tests --include-pattern src --verify target/coverage target/debug/deps/ethsync-* -kcov --exclude-pattern ~/.multirust,rocksdb,secp256k1,src/tests,util/json-tests,util/src/network/tests,sync/src/tests,ethcore/src/tests,ethcore/src/evm/tests --include-pattern src --verify target/coverage target/debug/deps/ethcore_rpc-* + +EXCLUDE="~/.multirust,rocksdb,secp256k1,src/tests,util/json-tests,util/src/network/tests,sync/src/tests,ethcore/src/tests,ethcore/src/evm/tests" +kcov --exclude-pattern $EXCLUDE --include-pattern src --verify target/coverage target/debug/deps/ethcore-* +kcov --exclude-pattern $EXCLUDE --include-pattern src --verify target/coverage target/debug/deps/ethash-* +kcov --exclude-pattern $EXCLUDE --include-pattern src --verify target/coverage target/debug/deps/ethcore_util-* +kcov --exclude-pattern $EXCLUDE --include-pattern src --verify target/coverage target/debug/deps/ethsync-* +kcov --exclude-pattern $EXCLUDE --include-pattern src --verify target/coverage target/debug/deps/ethcore_rpc-* +kcov --exclude-pattern $EXCLUDE --include-pattern src --verify target/coverage target/debug/deps/ethminer-* xdg-open target/coverage/index.html diff --git a/doc.sh b/doc.sh index 2fd5ac20f..a5e5e2e13 100755 --- a/doc.sh +++ b/doc.sh @@ -1,4 +1,11 @@ #!/bin/sh # generate documentation only for partiy and ethcore libraries -cargo doc --no-deps --verbose -p ethash -p ethcore-util -p ethcore -p ethsync -p ethcore-rpc -p parity +cargo doc --no-deps --verbose \ + -p ethash \ + -p ethcore-util \ + -p ethcore \ + -p ethsync \ + -p ethcore-rpc \ + -p parity \ + -p ethminer diff --git a/ethcore/src/account_db.rs b/ethcore/src/account_db.rs index 026e813f5..f95ec53a1 100644 --- a/ethcore/src/account_db.rs +++ b/ethcore/src/account_db.rs @@ -97,6 +97,9 @@ impl<'db> HashDB for AccountDBMut<'db>{ } fn insert(&mut self, value: &[u8]) -> H256 { + if value == &NULL_RLP { + return SHA3_NULL_RLP.clone(); + } let k = value.sha3(); let ak = combine_key(&self.address, &k); self.db.emplace(ak, value.to_vec()); @@ -104,11 +107,17 @@ impl<'db> HashDB for AccountDBMut<'db>{ } fn emplace(&mut self, key: H256, value: Bytes) { + if key == SHA3_NULL_RLP { + return; + } let key = combine_key(&self.address, &key); self.db.emplace(key, value.to_vec()) } fn kill(&mut self, key: &H256) { + if key == &SHA3_NULL_RLP { + return; + } let key = combine_key(&self.address, key); self.db.kill(&key) } diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index 174142f7a..d748cc4ee 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -17,7 +17,6 @@ //! Blockchain database client. use std::marker::PhantomData; -use std::sync::atomic::AtomicBool; use util::*; use util::panics::*; use views::BlockView; @@ -31,12 +30,12 @@ use service::{NetSyncMessage, SyncMessage}; use env_info::LastHashes; use verification::*; use block::*; -use transaction::LocalizedTransaction; +use transaction::{LocalizedTransaction, SignedTransaction}; use extras::TransactionAddress; use filter::Filter; use log_entry::LocalizedLogEntry; use block_queue::{BlockQueue, BlockQueueInfo}; -use blockchain::{BlockChain, BlockProvider, TreeRoute}; +use blockchain::{BlockChain, BlockProvider, TreeRoute, ImportRoute}; use client::{BlockId, TransactionId, ClientConfig, BlockChainClient}; pub use blockchain::CacheSize as BlockChainCacheSize; @@ -106,12 +105,6 @@ pub struct Client where V: Verifier { report: RwLock, import_lock: Mutex<()>, panic_handler: Arc, - - // for sealing... - sealing_enabled: AtomicBool, - sealing_block: Mutex>, - author: RwLock
, - extra_data: RwLock, verifier: PhantomData, } @@ -159,10 +152,6 @@ impl Client where V: Verifier { report: RwLock::new(Default::default()), import_lock: Mutex::new(()), panic_handler: panic_handler, - sealing_enabled: AtomicBool::new(false), - sealing_block: Mutex::new(None), - author: RwLock::new(Address::new()), - extra_data: RwLock::new(Vec::new()), verifier: PhantomData, })) } @@ -233,12 +222,39 @@ impl Client where V: Verifier { Ok(closed_block) } + fn calculate_enacted_retracted(&self, import_results: Vec) -> (Vec, Vec) { + fn map_to_vec(map: Vec<(H256, bool)>) -> Vec { + map.into_iter().map(|(k, _v)| k).collect() + } + + // In ImportRoute we get all the blocks that have been enacted and retracted by single insert. + // Because we are doing multiple inserts some of the blocks that were enacted in import `k` + // could be retracted in import `k+1`. This is why to understand if after all inserts + // the block is enacted or retracted we iterate over all routes and at the end final state + // will be in the hashmap + let map = import_results.into_iter().fold(HashMap::new(), |mut map, route| { + for hash in route.enacted { + map.insert(hash, true); + } + for hash in route.retracted { + map.insert(hash, false); + } + map + }); + + // Split to enacted retracted (using hashmap value) + let (enacted, retracted) = map.into_iter().partition(|&(_k, v)| v); + // And convert tuples to keys + (map_to_vec(enacted), map_to_vec(retracted)) + } + /// This is triggered by a message coming from a block queue when the block is ready for insertion pub fn import_verified_blocks(&self, io: &IoChannel) -> usize { let max_blocks_to_import = 128; - let mut good_blocks = Vec::with_capacity(max_blocks_to_import); - let mut bad_blocks = HashSet::new(); + let mut imported_blocks = Vec::with_capacity(max_blocks_to_import); + let mut invalid_blocks = HashSet::new(); + let mut import_results = Vec::with_capacity(max_blocks_to_import); let _import_lock = self.import_lock.lock(); let blocks = self.block_queue.drain(max_blocks_to_import); @@ -248,16 +264,16 @@ impl Client where V: Verifier { for block in blocks { let header = &block.header; - if bad_blocks.contains(&header.parent_hash) { - bad_blocks.insert(header.hash()); + if invalid_blocks.contains(&header.parent_hash) { + invalid_blocks.insert(header.hash()); continue; } let closed_block = self.check_and_close_block(&block); if let Err(_) = closed_block { - bad_blocks.insert(header.hash()); + invalid_blocks.insert(header.hash()); break; } - good_blocks.push(header.hash()); + imported_blocks.push(header.hash()); // Are we committing an era? let ancient = if header.number() >= HISTORY { @@ -276,37 +292,41 @@ impl Client where V: Verifier { // And update the chain after commit to prevent race conditions // (when something is in chain but you are not able to fetch details) - self.chain.insert_block(&block.bytes, receipts); + let route = self.chain.insert_block(&block.bytes, receipts); + import_results.push(route); self.report.write().unwrap().accrue_block(&block); trace!(target: "client", "Imported #{} ({})", header.number(), header.hash()); } - let imported = good_blocks.len(); - let bad_blocks = bad_blocks.into_iter().collect::>(); + let imported = imported_blocks.len(); + let invalid_blocks = invalid_blocks.into_iter().collect::>(); { - if !bad_blocks.is_empty() { - self.block_queue.mark_as_bad(&bad_blocks); + if !invalid_blocks.is_empty() { + self.block_queue.mark_as_bad(&invalid_blocks); } - if !good_blocks.is_empty() { - self.block_queue.mark_as_good(&good_blocks); + if !imported_blocks.is_empty() { + self.block_queue.mark_as_good(&imported_blocks); } } { - if !good_blocks.is_empty() && self.block_queue.queue_info().is_empty() { + if !imported_blocks.is_empty() && self.block_queue.queue_info().is_empty() { + let (enacted, retracted) = self.calculate_enacted_retracted(import_results); io.send(NetworkIoMessage::User(SyncMessage::NewChainBlocks { - good: good_blocks, - bad: bad_blocks, - // TODO [todr] were to take those from? - retracted: vec![], + imported: imported_blocks, + invalid: invalid_blocks, + enacted: enacted, + retracted: retracted, })).unwrap(); } } - if self.chain_info().best_block_hash != original_best && self.sealing_enabled.load(atomic::Ordering::Relaxed) { - self.prepare_sealing(); + { + if self.chain_info().best_block_hash != original_best { + io.send(NetworkIoMessage::User(SyncMessage::NewChainHead)).unwrap(); + } } imported @@ -357,52 +377,59 @@ impl Client where V: Verifier { BlockId::Latest => Some(self.chain.best_block_number()) } } - - /// Get the author that we will seal blocks as. - pub fn author(&self) -> Address { - self.author.read().unwrap().clone() - } - - /// Set the author that we will seal blocks as. - pub fn set_author(&self, author: Address) { - *self.author.write().unwrap() = author; - } - - /// Get the extra_data that we will seal blocks wuth. - pub fn extra_data(&self) -> Bytes { - self.extra_data.read().unwrap().clone() - } - - /// Set the extra_data that we will seal blocks with. - pub fn set_extra_data(&self, extra_data: Bytes) { - *self.extra_data.write().unwrap() = extra_data; - } - - /// New chain head event. Restart mining operation. - pub fn prepare_sealing(&self) { - let h = self.chain.best_block_hash(); - let mut b = OpenBlock::new( - self.engine.deref().deref(), - self.state_db.lock().unwrap().spawn(), - match self.chain.block_header(&h) { Some(ref x) => x, None => {return;} }, - self.build_last_hashes(h.clone()), - self.author(), - self.extra_data() - ); - - self.chain.find_uncle_headers(&h, self.engine.deref().deref().maximum_uncle_age()).unwrap().into_iter().take(self.engine.deref().deref().maximum_uncle_count()).foreach(|h| { b.push_uncle(h).unwrap(); }); - - // TODO: push transactions. - - let b = b.close(); - trace!("Sealing: number={}, hash={}, diff={}", b.hash(), b.block().header().difficulty(), b.block().header().number()); - *self.sealing_block.lock().unwrap() = Some(b); - } } -// TODO: need MinerService MinerIoHandler - impl BlockChainClient for Client where V: Verifier { + + + // TODO [todr] Should be moved to miner crate eventually. + fn try_seal(&self, block: ClosedBlock, seal: Vec) -> Result { + block.try_seal(self.engine.deref().deref(), seal) + } + + // TODO [todr] Should be moved to miner crate eventually. + fn prepare_sealing(&self, author: Address, extra_data: Bytes, transactions: Vec) -> Option { + let engine = self.engine.deref().deref(); + let h = self.chain.best_block_hash(); + + let mut b = OpenBlock::new( + engine, + self.state_db.lock().unwrap().spawn(), + match self.chain.block_header(&h) { Some(ref x) => x, None => {return None} }, + self.build_last_hashes(h.clone()), + author, + extra_data, + ); + + // Add uncles + self.chain + .find_uncle_headers(&h, engine.maximum_uncle_age()) + .unwrap() + .into_iter() + .take(engine.maximum_uncle_count()) + .foreach(|h| { + b.push_uncle(h).unwrap(); + }); + + // Add transactions + let block_number = b.block().header().number(); + for tx in transactions { + let import = b.push_transaction(tx, None); + if let Err(e) = import { + trace!("Error adding transaction to block: number={}. Error: {:?}", block_number, e); + } + } + + // And close + let b = b.close(); + trace!("Sealing: number={}, hash={}, diff={}", + b.block().header().number(), + b.hash(), + b.block().header().difficulty() + ); + Some(b) + } + fn block_header(&self, id: BlockId) -> Option { Self::block_hash(&self.chain, id).and_then(|hash| self.chain.block(&hash).map(|bytes| BlockView::new(&bytes).rlp().at(0).as_raw().to_vec())) } @@ -561,39 +588,6 @@ impl BlockChainClient for Client where V: Verifier { }) .collect() } - - /// Grab the `ClosedBlock` that we want to be sealed. Comes as a mutex that you have to lock. - fn sealing_block(&self) -> &Mutex> { - if self.sealing_block.lock().unwrap().is_none() { - self.sealing_enabled.store(true, atomic::Ordering::Relaxed); - // TODO: Above should be on a timer that resets after two blocks have arrived without being asked for. - self.prepare_sealing(); - } - &self.sealing_block - } - - /// Submit `seal` as a valid solution for the header of `pow_hash`. - /// Will check the seal, but not actually insert the block into the chain. - fn submit_seal(&self, pow_hash: H256, seal: Vec) -> Result<(), Error> { - let mut maybe_b = self.sealing_block.lock().unwrap(); - match *maybe_b { - Some(ref b) if b.hash() == pow_hash => {} - _ => { return Err(Error::PowHashInvalid); } - } - - let b = maybe_b.take(); - match b.unwrap().try_seal(self.engine.deref().deref(), seal) { - Err(old) => { - *maybe_b = Some(old); - Err(Error::PowInvalid) - } - Ok(sealed) => { - // TODO: commit DB from `sealed.drain` and make a VerifiedBlock to skip running the transactions twice. - try!(self.import_block(sealed.rlp_bytes())); - Ok(()) - } - } - } } impl MayPanic for Client { diff --git a/ethcore/src/client/mod.rs b/ethcore/src/client/mod.rs index f07a1f7c3..e46d0b570 100644 --- a/ethcore/src/client/mod.rs +++ b/ethcore/src/client/mod.rs @@ -26,18 +26,17 @@ pub use self::config::{ClientConfig, BlockQueueConfig, BlockChainConfig}; pub use self::ids::{BlockId, TransactionId}; pub use self::test_client::{TestBlockChainClient, EachBlockWith}; -use std::sync::Mutex; use util::bytes::Bytes; use util::hash::{Address, H256, H2048}; use util::numbers::U256; use blockchain::TreeRoute; use block_queue::BlockQueueInfo; -use block::ClosedBlock; +use block::{ClosedBlock, SealedBlock}; use header::BlockNumber; -use transaction::LocalizedTransaction; +use transaction::{LocalizedTransaction, SignedTransaction}; use log_entry::LocalizedLogEntry; use filter::Filter; -use error::{ImportResult, Error}; +use error::{ImportResult}; /// Blockchain database client. Owns and manages a blockchain and a block queue. pub trait BlockChainClient : Sync + Send { @@ -109,11 +108,13 @@ pub trait BlockChainClient : Sync + Send { /// Returns logs matching given filter. fn logs(&self, filter: Filter) -> Vec; - /// Grab the `ClosedBlock` that we want to be sealed. Comes as a mutex that you have to lock. - fn sealing_block(&self) -> &Mutex>; + // TODO [todr] Should be moved to miner crate eventually. + /// Returns ClosedBlock prepared for sealing. + fn prepare_sealing(&self, author: Address, extra_data: Bytes, transactions: Vec) -> Option; + + // TODO [todr] Should be moved to miner crate eventually. + /// Attempts to seal given block. Returns `SealedBlock` on success and the same block in case of error. + fn try_seal(&self, block: ClosedBlock, seal: Vec) -> Result; - /// Submit `seal` as a valid solution for the header of `pow_hash`. - /// Will check the seal, but not actually insert the block into the chain. - fn submit_seal(&self, pow_hash: H256, seal: Vec) -> Result<(), Error>; } diff --git a/ethcore/src/client/test_client.rs b/ethcore/src/client/test_client.rs index d85540858..140b8d91f 100644 --- a/ethcore/src/client/test_client.rs +++ b/ethcore/src/client/test_client.rs @@ -17,16 +17,16 @@ //! Test client. use util::*; -use transaction::{Transaction, LocalizedTransaction, Action}; +use transaction::{Transaction, LocalizedTransaction, SignedTransaction, Action}; use blockchain::TreeRoute; use client::{BlockChainClient, BlockChainInfo, BlockStatus, BlockId, TransactionId}; use header::{Header as BlockHeader, BlockNumber}; use filter::Filter; use log_entry::LocalizedLogEntry; use receipt::Receipt; -use error::{ImportResult, Error}; +use error::{ImportResult}; use block_queue::BlockQueueInfo; -use block::ClosedBlock; +use block::{SealedBlock, ClosedBlock}; /// Test client. pub struct TestBlockChainClient { @@ -86,17 +86,17 @@ impl TestBlockChainClient { client } - /// Set code at given address. - pub fn set_code(&mut self, address: Address, code: Bytes) { - self.code.write().unwrap().insert(address, code); - } - - /// Set balance at given address. + /// Set the balance of account `address` to `balance`. pub fn set_balance(&mut self, address: Address, balance: U256) { self.balances.write().unwrap().insert(address, balance); } - /// Set storage at given address and position. + /// Set `code` at `address`. + pub fn set_code(&mut self, address: Address, code: Bytes) { + self.code.write().unwrap().insert(address, code); + } + + /// Set storage `position` to `value` for account `address`. pub fn set_storage(&mut self, address: Address, position: H256, value: H256) { self.storage.write().unwrap().insert((address, position), value); } @@ -215,12 +215,12 @@ impl BlockChainClient for TestBlockChainClient { unimplemented!(); } - fn sealing_block(&self) -> &Mutex> { - unimplemented!(); + fn prepare_sealing(&self, _author: Address, _extra_data: Bytes, _transactions: Vec) -> Option { + unimplemented!() } - fn submit_seal(&self, _pow_hash: H256, _seal: Vec) -> Result<(), Error> { - unimplemented!(); + fn try_seal(&self, _block: ClosedBlock, _seal: Vec) -> Result { + unimplemented!() } fn block_header(&self, id: BlockId) -> Option { diff --git a/ethcore/src/error.rs b/ethcore/src/error.rs index 824d8da90..72127c754 100644 --- a/ethcore/src/error.rs +++ b/ethcore/src/error.rs @@ -63,8 +63,15 @@ pub enum ExecutionError { } #[derive(Debug)] -/// Errors concerning transaction proessing. +/// Errors concerning transaction processing. pub enum TransactionError { + /// Transaction's gas price is below threshold. + InsufficientGasPrice { + /// Minimal expected gas price + minimal: U256, + /// Transaction gas price + got: U256 + }, /// Transaction's gas limit (aka gas) is invalid. InvalidGasLimit(OutOfBounds), } diff --git a/ethcore/src/log_entry.rs b/ethcore/src/log_entry.rs index a75e6fcc1..63d09b4f0 100644 --- a/ethcore/src/log_entry.rs +++ b/ethcore/src/log_entry.rs @@ -111,7 +111,7 @@ mod tests { let bloom = H2048::from_str("00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000008800000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000800000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000").unwrap(); let address = Address::from_str("0f572e5295c57f15886f9b263e2f6d2d6c7b5ec6").unwrap(); let log = LogEntry { - address: address, + address: address, topics: vec![], data: vec![] }; diff --git a/ethcore/src/service.rs b/ethcore/src/service.rs index 6daf0d7b6..bcfe7724f 100644 --- a/ethcore/src/service.rs +++ b/ethcore/src/service.rs @@ -28,12 +28,16 @@ pub enum SyncMessage { /// New block has been imported into the blockchain NewChainBlocks { /// Hashes of blocks imported to blockchain - good: Vec, - /// Hashes of blocks not imported to blockchain - bad: Vec, + imported: Vec, + /// Hashes of blocks not imported to blockchain (because were invalid) + invalid: Vec, /// Hashes of blocks that were removed from canonical chain retracted: Vec, + /// Hashes of blocks that are now included in cannonical chain + enacted: Vec, }, + /// Best Block Hash in chain has been changed + NewChainHead, /// A block is ready BlockVerified, } diff --git a/ethcore/src/tests/client.rs b/ethcore/src/tests/client.rs index 001d1729b..ed0b02788 100644 --- a/ethcore/src/tests/client.rs +++ b/ethcore/src/tests/client.rs @@ -132,16 +132,9 @@ fn can_mine() { let dummy_blocks = get_good_dummy_block_seq(2); let client_result = get_test_client_with_blocks(vec![dummy_blocks[0].clone()]); let client = client_result.reference(); - let b = client.sealing_block(); - let pow_hash = { - let u = b.lock().unwrap(); - match *u { - Some(ref b) => { - assert_eq!(*b.block().header().parent_hash(), BlockView::new(&dummy_blocks[0]).header_view().sha3()); - b.hash() - } - None => { panic!(); } - } - }; - assert!(client.submit_seal(pow_hash, vec![]).is_ok()); + + let b = client.prepare_sealing(Address::default(), vec![], vec![]).unwrap(); + + assert_eq!(*b.block().header().parent_hash(), BlockView::new(&dummy_blocks[0]).header_view().sha3()); + assert!(client.try_seal(b, vec![]).is_ok()); } diff --git a/hook.sh b/hook.sh index 9780541fe..58bff20ab 100755 --- a/hook.sh +++ b/hook.sh @@ -7,6 +7,6 @@ echo "set -e" >> $FILE echo "cargo build --release --features dev" >> $FILE # Build tests echo "cargo test --no-run --features dev \\" >> $FILE -echo " -p ethash -p ethcore-util -p ethcore -p ethsync -p ethcore-rpc -p parity" >> $FILE +echo " -p ethash -p ethcore-util -p ethcore -p ethsync -p ethcore-rpc -p parity -p ethminer" >> $FILE echo "" >> $FILE chmod +x $FILE diff --git a/miner/Cargo.toml b/miner/Cargo.toml new file mode 100644 index 000000000..b450ece73 --- /dev/null +++ b/miner/Cargo.toml @@ -0,0 +1,24 @@ +[package] +description = "Ethminer library" +homepage = "http://ethcore.io" +license = "GPL-3.0" +name = "ethminer" +version = "0.9.99" +authors = ["Ethcore "] +build = "build.rs" + +[build-dependencies] +rustc_version = "0.1" + +[dependencies] +ethcore-util = { path = "../util" } +ethcore = { path = "../ethcore" } +log = "0.3" +env_logger = "0.3" +rustc-serialize = "0.3" +rayon = "0.3.1" +clippy = { version = "0.0.50", optional = true } + +[features] +default = [] +dev = ["clippy"] diff --git a/miner/build.rs b/miner/build.rs new file mode 100644 index 000000000..41b9a1b3e --- /dev/null +++ b/miner/build.rs @@ -0,0 +1,25 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +extern crate rustc_version; + +use rustc_version::{version_meta, Channel}; + +fn main() { + if let Channel::Nightly = version_meta().channel { + println!("cargo:rustc-cfg=nightly"); + } +} diff --git a/miner/src/lib.rs b/miner/src/lib.rs new file mode 100644 index 000000000..a431bd44e --- /dev/null +++ b/miner/src/lib.rs @@ -0,0 +1,111 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +#![warn(missing_docs)] +#![cfg_attr(all(nightly, feature="dev"), feature(plugin))] +#![cfg_attr(all(nightly, feature="dev"), plugin(clippy))] + +//! Miner module +//! Keeps track of transactions and mined block. +//! +//! Usage example: +//! +//! ```rust +//! extern crate ethcore_util as util; +//! extern crate ethcore; +//! extern crate ethminer; +//! use std::ops::Deref; +//! use std::env; +//! use std::sync::Arc; +//! use util::network::{NetworkService, NetworkConfiguration}; +//! use ethcore::client::{Client, ClientConfig, BlockChainClient}; +//! use ethcore::ethereum; +//! use ethminer::{Miner, MinerService}; +//! +//! fn main() { +//! let mut service = NetworkService::start(NetworkConfiguration::new()).unwrap(); +//! let dir = env::temp_dir(); +//! let client = Client::new(ClientConfig::default(), ethereum::new_frontier(), &dir, service.io().channel()).unwrap(); +//! +//! let miner: Miner = Miner::default(); +//! // get status +//! assert_eq!(miner.status().transaction_queue_pending, 0); +//! +//! // Check block for sealing +//! miner.prepare_sealing(client.deref()); +//! assert!(miner.sealing_block(client.deref()).lock().unwrap().is_some()); +//! } +//! ``` + + +#[macro_use] +extern crate log; +#[macro_use] +extern crate ethcore_util as util; +extern crate ethcore; +extern crate env_logger; +extern crate rayon; + +mod miner; +mod transaction_queue; + +pub use transaction_queue::TransactionQueue; +pub use miner::{Miner}; + +use std::sync::Mutex; +use util::{H256, U256, Address, Bytes}; +use ethcore::client::{BlockChainClient}; +use ethcore::block::{ClosedBlock}; +use ethcore::error::{Error}; +use ethcore::transaction::SignedTransaction; + +/// Miner client API +pub trait MinerService : Send + Sync { + + /// Returns miner's status. + fn status(&self) -> MinerStatus; + + /// Imports transactions to transaction queue. + fn import_transactions(&self, transactions: Vec, fetch_nonce: T) -> Result<(), Error> + where T: Fn(&Address) -> U256; + + /// Returns hashes of transactions currently in pending + fn pending_transactions_hashes(&self) -> Vec; + + /// Removes all transactions from the queue and restart mining operation. + fn clear_and_reset(&self, chain: &BlockChainClient); + + /// Called when blocks are imported to chain, updates transactions queue. + fn chain_new_blocks(&self, chain: &BlockChainClient, imported: &[H256], invalid: &[H256], enacted: &[H256], retracted: &[H256]); + + /// New chain head event. Restart mining operation. + fn prepare_sealing(&self, chain: &BlockChainClient); + + /// Grab the `ClosedBlock` that we want to be sealed. Comes as a mutex that you have to lock. + fn sealing_block(&self, chain: &BlockChainClient) -> &Mutex>; + + /// Submit `seal` as a valid solution for the header of `pow_hash`. + /// Will check the seal, but not actually insert the block into the chain. + fn submit_seal(&self, chain: &BlockChainClient, pow_hash: H256, seal: Vec) -> Result<(), Error>; +} + +/// Mining status +pub struct MinerStatus { + /// Number of transactions in queue with state `pending` (ready to be included in block) + pub transaction_queue_pending: usize, + /// Number of transactions in queue with state `future` (not yet ready to be included in block) + pub transaction_queue_future: usize, +} diff --git a/miner/src/miner.rs b/miner/src/miner.rs new file mode 100644 index 000000000..ad403150d --- /dev/null +++ b/miner/src/miner.rs @@ -0,0 +1,191 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +use rayon::prelude::*; +use std::sync::{Mutex, RwLock, Arc}; +use std::sync::atomic; +use std::sync::atomic::AtomicBool; + +use util::{H256, U256, Address, Bytes}; +use ethcore::views::{BlockView}; +use ethcore::client::{BlockChainClient, BlockId}; +use ethcore::block::{ClosedBlock}; +use ethcore::error::{Error}; +use ethcore::transaction::SignedTransaction; +use super::{MinerService, MinerStatus, TransactionQueue}; + +/// Keeps track of transactions using priority queue and holds currently mined block. +pub struct Miner { + transaction_queue: Mutex, + + // for sealing... + sealing_enabled: AtomicBool, + sealing_block: Mutex>, + author: RwLock
, + extra_data: RwLock, +} + +impl Default for Miner { + fn default() -> Miner { + Miner { + transaction_queue: Mutex::new(TransactionQueue::new()), + sealing_enabled: AtomicBool::new(false), + sealing_block: Mutex::new(None), + author: RwLock::new(Address::default()), + extra_data: RwLock::new(Vec::new()), + } + } +} + +impl Miner { + /// Creates new instance of miner + pub fn new() -> Arc { + Arc::new(Miner::default()) + } + + /// Get the author that we will seal blocks as. + fn author(&self) -> Address { + *self.author.read().unwrap() + } + + /// Get the extra_data that we will seal blocks wuth. + fn extra_data(&self) -> Bytes { + self.extra_data.read().unwrap().clone() + } + + /// Set the author that we will seal blocks as. + pub fn set_author(&self, author: Address) { + *self.author.write().unwrap() = author; + } + + /// Set the extra_data that we will seal blocks with. + pub fn set_extra_data(&self, extra_data: Bytes) { + *self.extra_data.write().unwrap() = extra_data; + } + + /// Set minimal gas price of transaction to be accepted for mining. + pub fn set_minimal_gas_price(&self, min_gas_price: U256) { + self.transaction_queue.lock().unwrap().set_minimal_gas_price(min_gas_price); + } +} + +impl MinerService for Miner { + + fn clear_and_reset(&self, chain: &BlockChainClient) { + self.transaction_queue.lock().unwrap().clear(); + self.prepare_sealing(chain); + } + + fn status(&self) -> MinerStatus { + let status = self.transaction_queue.lock().unwrap().status(); + MinerStatus { + transaction_queue_pending: status.pending, + transaction_queue_future: status.future, + } + } + + fn import_transactions(&self, transactions: Vec, fetch_nonce: T) -> Result<(), Error> + where T: Fn(&Address) -> U256 { + let mut transaction_queue = self.transaction_queue.lock().unwrap(); + transaction_queue.add_all(transactions, fetch_nonce) + } + + fn pending_transactions_hashes(&self) -> Vec { + let transaction_queue = self.transaction_queue.lock().unwrap(); + transaction_queue.pending_hashes() + } + + fn prepare_sealing(&self, chain: &BlockChainClient) { + let no_of_transactions = 128; + let transactions = self.transaction_queue.lock().unwrap().top_transactions(no_of_transactions); + + let b = chain.prepare_sealing( + self.author(), + self.extra_data(), + transactions, + ); + *self.sealing_block.lock().unwrap() = b; + } + + fn sealing_block(&self, chain: &BlockChainClient) -> &Mutex> { + if self.sealing_block.lock().unwrap().is_none() { + self.sealing_enabled.store(true, atomic::Ordering::Relaxed); + // TODO: Above should be on a timer that resets after two blocks have arrived without being asked for. + self.prepare_sealing(chain); + } + &self.sealing_block + } + + fn submit_seal(&self, chain: &BlockChainClient, pow_hash: H256, seal: Vec) -> Result<(), Error> { + let mut maybe_b = self.sealing_block.lock().unwrap(); + match *maybe_b { + Some(ref b) if b.hash() == pow_hash => {} + _ => { return Err(Error::PowHashInvalid); } + } + + let b = maybe_b.take(); + match chain.try_seal(b.unwrap(), seal) { + Err(old) => { + *maybe_b = Some(old); + Err(Error::PowInvalid) + } + Ok(sealed) => { + // TODO: commit DB from `sealed.drain` and make a VerifiedBlock to skip running the transactions twice. + try!(chain.import_block(sealed.rlp_bytes())); + Ok(()) + } + } + } + + fn chain_new_blocks(&self, chain: &BlockChainClient, imported: &[H256], invalid: &[H256], enacted: &[H256], retracted: &[H256]) { + fn fetch_transactions(chain: &BlockChainClient, hash: &H256) -> Vec { + let block = chain + .block(BlockId::Hash(*hash)) + // Client should send message after commit to db and inserting to chain. + .expect("Expected in-chain blocks."); + let block = BlockView::new(&block); + block.transactions() + } + + { + let in_chain = vec![imported, enacted, invalid]; + let in_chain = in_chain + .par_iter() + .flat_map(|h| h.par_iter().map(|h| fetch_transactions(chain, h))); + let out_of_chain = retracted + .par_iter() + .map(|h| fetch_transactions(chain, h)); + + in_chain.for_each(|txs| { + let mut transaction_queue = self.transaction_queue.lock().unwrap(); + let hashes = txs.iter().map(|tx| tx.hash()).collect::>(); + transaction_queue.remove_all(&hashes, |a| chain.nonce(a)); + }); + out_of_chain.for_each(|txs| { + // populate sender + for tx in &txs { + let _sender = tx.sender(); + } + let mut transaction_queue = self.transaction_queue.lock().unwrap(); + let _ = transaction_queue.add_all(txs, |a| chain.nonce(a)); + }); + } + + if self.sealing_enabled.load(atomic::Ordering::Relaxed) { + self.prepare_sealing(chain); + } + } +} diff --git a/sync/src/transaction_queue.rs b/miner/src/transaction_queue.rs similarity index 91% rename from sync/src/transaction_queue.rs rename to miner/src/transaction_queue.rs index 0f9ef19c8..880c73750 100644 --- a/sync/src/transaction_queue.rs +++ b/miner/src/transaction_queue.rs @@ -28,13 +28,13 @@ //! ```rust //! extern crate ethcore_util as util; //! extern crate ethcore; -//! extern crate ethsync; +//! extern crate ethminer; //! extern crate rustc_serialize; //! //! use util::crypto::KeyPair; //! use util::hash::Address; //! use util::numbers::{Uint, U256}; -//! use ethsync::TransactionQueue; +//! use ethminer::TransactionQueue; //! use ethcore::transaction::*; //! use rustc_serialize::hex::FromHex; //! @@ -86,7 +86,7 @@ use util::numbers::{Uint, U256}; use util::hash::{Address, H256}; use util::table::*; use ethcore::transaction::*; -use ethcore::error::Error; +use ethcore::error::{Error, TransactionError}; #[derive(Clone, Debug)] @@ -245,6 +245,8 @@ pub struct TransactionQueueStatus { /// TransactionQueue implementation pub struct TransactionQueue { + /// Gas Price threshold for transactions that can be imported to this queue (defaults to 0) + minimal_gas_price: U256, /// Priority queue for transactions that can go to block current: TransactionSet, /// Priority queue for transactions that has been received but are not yet valid to go to block @@ -281,6 +283,7 @@ impl TransactionQueue { }; TransactionQueue { + minimal_gas_price: U256::zero(), current: current, future: future, by_hash: HashMap::new(), @@ -288,6 +291,12 @@ impl TransactionQueue { } } + /// Sets new gas price threshold for incoming transactions. + /// Any transactions already imported to the queue are not affected. + pub fn set_minimal_gas_price(&mut self, min_gas_price: U256) { + self.minimal_gas_price = min_gas_price; + } + // Will be used when rpc merged #[allow(dead_code)] /// Returns current status for this queue @@ -310,6 +319,19 @@ impl TransactionQueue { /// Add signed transaction to queue to be verified and imported pub fn add(&mut self, tx: SignedTransaction, fetch_nonce: &T) -> Result<(), Error> where T: Fn(&Address) -> U256 { + + if tx.gas_price < self.minimal_gas_price { + trace!(target: "sync", + "Dropping transaction below minimal gas price threshold: {:?} (gp: {} < {})", + tx.hash(), tx.gas_price, self.minimal_gas_price + ); + + return Err(Error::Transaction(TransactionError::InsufficientGasPrice{ + minimal: self.minimal_gas_price, + got: tx.gas_price + })); + } + self.import_tx(try!(VerifiedTransaction::new(tx)), fetch_nonce); Ok(()) } @@ -346,7 +368,7 @@ impl TransactionQueue { self.update_future(&sender, current_nonce); // And now lets check if there is some chain of transactions in future // that should be placed in current - self.move_matching_future_to_current(sender.clone(), current_nonce, current_nonce); + self.move_matching_future_to_current(sender, current_nonce, current_nonce); return; } @@ -362,7 +384,7 @@ impl TransactionQueue { self.move_all_to_future(&sender, current_nonce); // And now lets check if there is some chain of transactions in future // that should be placed in current. It should also update last_nonces. - self.move_matching_future_to_current(sender.clone(), current_nonce, current_nonce); + self.move_matching_future_to_current(sender, current_nonce, current_nonce); return; } } @@ -377,7 +399,7 @@ impl TransactionQueue { for k in all_nonces_from_sender { let order = self.future.drop(&sender, &k).unwrap(); if k >= current_nonce { - self.future.insert(sender.clone(), k, order.update_height(k, current_nonce)); + self.future.insert(*sender, k, order.update_height(k, current_nonce)); } else { // Remove the transaction completely self.by_hash.remove(&order.hash); @@ -397,7 +419,7 @@ impl TransactionQueue { // Goes to future or is removed let order = self.current.drop(&sender, &k).unwrap(); if k >= current_nonce { - self.future.insert(sender.clone(), k, order.update_height(k, current_nonce)); + self.future.insert(*sender, k, order.update_height(k, current_nonce)); } else { self.by_hash.remove(&order.hash); } @@ -417,6 +439,14 @@ impl TransactionQueue { .collect() } + /// Returns hashes of all transactions from current, ordered by priority. + pub fn pending_hashes(&self) -> Vec { + self.current.by_priority + .iter() + .map(|t| t.hash) + .collect() + } + /// Removes all elements (in any state) from the queue pub fn clear(&mut self) { self.current.clear(); @@ -438,8 +468,8 @@ impl TransactionQueue { // remove also from priority and hash self.future.by_priority.remove(&order); // Put to current - let order = order.update_height(current_nonce.clone(), first_nonce); - self.current.insert(address.clone(), current_nonce, order); + let order = order.update_height(current_nonce, first_nonce); + self.current.insert(address, current_nonce, order); current_nonce = current_nonce + U256::one(); } } @@ -487,10 +517,10 @@ impl TransactionQueue { } let base_nonce = fetch_nonce(&address); - Self::replace_transaction(tx, base_nonce.clone(), &mut self.current, &mut self.by_hash); - self.last_nonces.insert(address.clone(), nonce); + Self::replace_transaction(tx, base_nonce, &mut self.current, &mut self.by_hash); + self.last_nonces.insert(address, nonce); // But maybe there are some more items waiting in future? - self.move_matching_future_to_current(address.clone(), nonce + U256::one(), base_nonce); + self.move_matching_future_to_current(address, nonce + U256::one(), base_nonce); self.current.enforce_limit(&mut self.by_hash); } @@ -504,7 +534,7 @@ impl TransactionQueue { let address = tx.sender(); let nonce = tx.nonce(); - by_hash.insert(hash.clone(), tx); + by_hash.insert(hash, tx); if let Some(old) = set.insert(address, nonce, order.clone()) { // There was already transaction in queue. Let's check which one should stay let old_fee = old.gas_price; @@ -620,6 +650,22 @@ mod test { assert_eq!(stats.pending, 1); } + #[test] + fn should_not_import_transaction_below_min_gas_price_threshold() { + // given + let mut txq = TransactionQueue::new(); + let tx = new_tx(); + txq.set_minimal_gas_price(tx.gas_price + U256::one()); + + // when + txq.add(tx, &default_nonce).unwrap_err(); + + // then + let stats = txq.status(); + assert_eq!(stats.pending, 0); + assert_eq!(stats.future, 0); + } + #[test] fn should_reject_incorectly_signed_transaction() { // given @@ -663,6 +709,24 @@ mod test { assert_eq!(top.len(), 2); } + #[test] + fn should_return_pending_hashes() { + // given + let mut txq = TransactionQueue::new(); + + let (tx, tx2) = new_txs(U256::from(1)); + + // when + txq.add(tx.clone(), &default_nonce).unwrap(); + txq.add(tx2.clone(), &default_nonce).unwrap(); + + // then + let top = txq.pending_hashes(); + assert_eq!(top[0], tx.hash()); + assert_eq!(top[1], tx2.hash()); + assert_eq!(top.len(), 2); + } + #[test] fn should_put_transaction_to_futures_if_gap_detected() { // given @@ -831,7 +895,7 @@ mod test { fn should_drop_transactions_with_old_nonces() { let mut txq = TransactionQueue::new(); let tx = new_tx(); - let last_nonce = tx.nonce.clone() + U256::one(); + let last_nonce = tx.nonce + U256::one(); let fetch_last_nonce = |_a: &Address| last_nonce; // when diff --git a/parity/main.rs b/parity/main.rs index 2bfa75e8a..f2c661958 100644 --- a/parity/main.rs +++ b/parity/main.rs @@ -24,6 +24,7 @@ extern crate rustc_serialize; extern crate ethcore_util as util; extern crate ethcore; extern crate ethsync; +extern crate ethminer; #[macro_use] extern crate log as rlog; extern crate env_logger; @@ -50,6 +51,7 @@ use ethcore::client::*; use ethcore::service::{ClientService, NetSyncMessage}; use ethcore::ethereum; use ethsync::{EthSync, SyncConfig, SyncProvider}; +use ethminer::{Miner, MinerService}; use docopt::Docopt; use daemonize::Daemonize; use number_prefix::{binary_prefix, Standalone, Prefixed}; @@ -112,6 +114,7 @@ API and Console Options: --rpccorsdomain URL Equivalent to --jsonrpc-cors URL (geth-compatible). Sealing/Mining Options: + --gas-price WEI Minimum amount of Wei to be paid for a transaction to be accepted for mining [default: 20000000000]. --author ADDRESS Specify the block author (aka "coinbase") address for sending block rewards from sealed blocks [default: 0037a6b811ffeb6e072da21179d11b1406371c63]. --extra-data STRING Specify a custom extra-data for authored blocks, no more than 32 characters. @@ -135,11 +138,12 @@ Geth-Compatibility Options --maxpeers COUNT Equivalent to --peers COUNT. --nodekey KEY Equivalent to --node-key KEY. --nodiscover Equivalent to --no-discovery. + --gasprice WEI Equivalent to --gas-price WEI. --etherbase ADDRESS Equivalent to --author ADDRESS. --extradata STRING Equivalent to --extra-data STRING. Miscellaneous Options: - -l --logging LOGGING Specify the logging level. + -l --logging LOGGING Specify the logging level. Must conform to the same format as RUST_LOG. -v --version Show information about version. -h --help Show this screen. "#; @@ -172,17 +176,19 @@ struct Args { flag_jsonrpc_port: u16, flag_jsonrpc_cors: String, flag_jsonrpc_apis: String, + flag_author: String, + flag_gas_price: String, + flag_extra_data: Option, flag_logging: Option, flag_version: bool, // geth-compatibility... flag_nodekey: Option, flag_nodiscover: bool, flag_maxpeers: Option, - flag_author: String, - flag_extra_data: Option, flag_datadir: Option, flag_extradata: Option, flag_etherbase: Option, + flag_gasprice: Option, flag_rpc: bool, flag_rpcaddr: Option, flag_rpcport: Option, @@ -219,7 +225,15 @@ fn setup_log(init: &Option) { } #[cfg(feature = "rpc")] -fn setup_rpc_server(client: Arc, sync: Arc, secret_store: Arc, url: &str, cors_domain: &str, apis: Vec<&str>) -> Option> { +fn setup_rpc_server( + client: Arc, + sync: Arc, + secret_store: Arc, + miner: Arc, + url: &str, + cors_domain: &str, + apis: Vec<&str> +) -> Option> { use rpc::v1::*; let server = rpc::RpcServer::new(); @@ -228,8 +242,8 @@ fn setup_rpc_server(client: Arc, sync: Arc, secret_store: Arc server.add_delegate(Web3Client::new().to_delegate()), "net" => server.add_delegate(NetClient::new(&sync).to_delegate()), "eth" => { - server.add_delegate(EthClient::new(&client, &sync, &secret_store).to_delegate()); - server.add_delegate(EthFilterClient::new(&client).to_delegate()); + server.add_delegate(EthClient::new(&client, &sync, &secret_store, &miner).to_delegate()); + server.add_delegate(EthFilterClient::new(&client, &miner).to_delegate()); } "personal" => server.add_delegate(PersonalClient::new(&secret_store).to_delegate()), _ => { @@ -241,7 +255,15 @@ fn setup_rpc_server(client: Arc, sync: Arc, secret_store: Arc, _sync: Arc, _url: &str) -> Option> { +fn setup_rpc_server( + _client: Arc, + _sync: Arc, + _secret_store: Arc, + _miner: Arc, + _url: &str, + _cors_domain: &str, + _apis: Vec<&str> +) -> Option> { None } @@ -276,7 +298,16 @@ impl Configuration { fn author(&self) -> Address { let d = self.args.flag_etherbase.as_ref().unwrap_or(&self.args.flag_author); - Address::from_str(d).unwrap_or_else(|_| die!("{}: Invalid address for --author. Must be 40 hex characters, without the 0x at the beginning.", self.args.flag_author)) + Address::from_str(d).unwrap_or_else(|_| { + die!("{}: Invalid address for --author. Must be 40 hex characters, without the 0x at the beginning.", d) + }) + } + + fn gas_price(&self) -> U256 { + let d = self.args.flag_gasprice.as_ref().unwrap_or(&self.args.flag_gas_price); + U256::from_dec_str(d).unwrap_or_else(|_| { + die!("{}: Invalid gas price given. Must be a decimal unsigned 256-bit number.", d) + }) } fn extra_data(&self) -> Bytes { @@ -299,7 +330,9 @@ impl Configuration { "frontier" | "homestead" | "mainnet" => ethereum::new_frontier(), "morden" | "testnet" => ethereum::new_morden(), "olympic" => ethereum::new_olympic(), - f => Spec::from_json_utf8(contents(f).unwrap_or_else(|_| die!("{}: Couldn't read chain specification file. Sure it exists?", f)).as_ref()), + f => Spec::from_json_utf8(contents(f).unwrap_or_else(|_| { + die!("{}: Couldn't read chain specification file. Sure it exists?", f) + }).as_ref()), } } @@ -314,7 +347,11 @@ impl Configuration { fn init_nodes(&self, spec: &Spec) -> Vec { let mut r = if self.args.flag_no_bootstrap { Vec::new() } else { spec.nodes().clone() }; if let Some(ref x) = self.args.flag_bootnodes { - r.extend(x.split(',').map(|s| Self::normalize_enode(s).unwrap_or_else(|| die!("{}: Invalid node address format given for a boot node.", s)))); + r.extend(x.split(',').map(|s| { + Self::normalize_enode(s).unwrap_or_else(|| { + die!("{}: Invalid node address format given for a boot node.", s) + }) + })); } r } @@ -348,6 +385,38 @@ impl Configuration { ret } + fn client_config(&self) -> ClientConfig { + let mut client_config = ClientConfig::default(); + match self.args.flag_cache { + Some(mb) => { + client_config.blockchain.max_cache_size = mb * 1024 * 1024; + client_config.blockchain.pref_cache_size = client_config.blockchain.max_cache_size / 2; + } + None => { + client_config.blockchain.pref_cache_size = self.args.flag_cache_pref_size; + client_config.blockchain.max_cache_size = self.args.flag_cache_max_size; + } + } + client_config.pruning = match self.args.flag_pruning.as_str() { + "archive" => journaldb::Algorithm::Archive, + "light" => journaldb::Algorithm::EarlyMerge, + "fast" => journaldb::Algorithm::OverlayRecent, + "basic" => journaldb::Algorithm::RefCounted, + _ => { die!("Invalid pruning method given."); } + }; + client_config.name = self.args.flag_identity.clone(); + client_config.queue.max_mem_use = self.args.flag_queue_max_size; + client_config + } + + fn sync_config(&self, spec: &Spec) -> SyncConfig { + let mut sync_config = SyncConfig::default(); + sync_config.network_id = self.args.flag_networkid.as_ref().map_or(spec.network_id(), |id| { + U256::from_str(id).unwrap_or_else(|_| die!("{}: Invalid index given with --networkid", id)) + }); + sync_config + } + fn execute(&self) { if self.args.flag_version { print_version(); @@ -406,42 +475,21 @@ impl Configuration { let spec = self.spec(); let net_settings = self.net_settings(&spec); - let mut sync_config = SyncConfig::default(); - sync_config.network_id = self.args.flag_networkid.as_ref().map_or(spec.network_id(), |id| { - U256::from_str(id).unwrap_or_else(|_| { - die!("{}: Invalid index given with --networkid", id) - }) - }); + let sync_config = self.sync_config(&spec); // Build client - let mut client_config = ClientConfig::default(); - match self.args.flag_cache { - Some(mb) => { - client_config.blockchain.max_cache_size = mb * 1024 * 1024; - client_config.blockchain.pref_cache_size = client_config.blockchain.max_cache_size / 2; - } - None => { - client_config.blockchain.pref_cache_size = self.args.flag_cache_pref_size; - client_config.blockchain.max_cache_size = self.args.flag_cache_max_size; - } - } - client_config.pruning = match self.args.flag_pruning.as_str() { - "" | "archive" => journaldb::Algorithm::Archive, - "pruned" => journaldb::Algorithm::EarlyMerge, - "fast" => journaldb::Algorithm::OverlayRecent, - "slow" => journaldb::Algorithm::RefCounted, - _ => { die!("Invalid pruning method given."); } - }; - client_config.name = self.args.flag_identity.clone(); - client_config.queue.max_mem_use = self.args.flag_queue_max_size; - let mut service = ClientService::start(client_config, spec, net_settings, &Path::new(&self.path())).unwrap(); + let mut service = ClientService::start(self.client_config(), spec, net_settings, &Path::new(&self.path())).unwrap(); panic_handler.forward_from(&service); - let client = service.client().clone(); - client.set_author(self.author()); - client.set_extra_data(self.extra_data()); + let client = service.client(); + + // Miner + let miner = Miner::new(); + miner.set_author(self.author()); + miner.set_extra_data(self.extra_data()); + miner.set_minimal_gas_price(self.gas_price()); // Sync - let sync = EthSync::register(service.network(), sync_config, client); + let sync = EthSync::register(service.network(), sync_config, client.clone(), miner.clone()); // Secret Store let account_service = Arc::new(AccountService::new()); @@ -456,11 +504,18 @@ impl Configuration { let cors = self.args.flag_rpccorsdomain.as_ref().unwrap_or(&self.args.flag_jsonrpc_cors); // TODO: use this as the API list. let apis = self.args.flag_rpcapi.as_ref().unwrap_or(&self.args.flag_jsonrpc_apis); - let server_handler = setup_rpc_server(service.client(), sync.clone(), account_service.clone(), &url, cors, apis.split(',').collect()); + let server_handler = setup_rpc_server( + service.client(), + sync.clone(), + account_service.clone(), + miner.clone(), + &url, + cors, + apis.split(',').collect() + ); if let Some(handler) = server_handler { panic_handler.forward_from(handler.deref()); } - } // Register IO handler @@ -530,7 +585,11 @@ impl Informant { let report = client.report(); let sync_info = sync.status(); - if let (_, _, &Some(ref last_report)) = (self.chain_info.read().unwrap().deref(), self.cache_info.read().unwrap().deref(), self.report.read().unwrap().deref()) { + if let (_, _, &Some(ref last_report)) = ( + self.chain_info.read().unwrap().deref(), + self.cache_info.read().unwrap().deref(), + self.report.read().unwrap().deref() + ) { println!("[ #{} {} ]---[ {} blk/s | {} tx/s | {} gas/s //··· {}/{} peers, #{}, {}+{} queued ···// mem: {} db, {} chain, {} queue, {} sync ]", chain_info.best_block_number, chain_info.best_block_hash, diff --git a/rpc/Cargo.toml b/rpc/Cargo.toml index 900b10548..fa89041d8 100644 --- a/rpc/Cargo.toml +++ b/rpc/Cargo.toml @@ -18,10 +18,11 @@ ethcore-util = { path = "../util" } ethcore = { path = "../ethcore" } ethash = { path = "../ethash" } ethsync = { path = "../sync" } -clippy = { version = "0.0.50", optional = true } +ethminer = { path = "../miner" } rustc-serialize = "0.3" transient-hashmap = "0.1" serde_macros = { version = "0.7.0", optional = true } +clippy = { version = "0.0.50", optional = true } [build-dependencies] serde_codegen = { version = "0.7.0", optional = true } @@ -30,4 +31,4 @@ syntex = "0.29.0" [features] default = ["serde_codegen"] nightly = ["serde_macros"] -dev = ["clippy", "ethcore/dev", "ethcore-util/dev", "ethsync/dev"] +dev = ["clippy", "ethcore/dev", "ethcore-util/dev", "ethsync/dev", "ethminer/dev"] diff --git a/rpc/src/lib.rs b/rpc/src/lib.rs index 731ded8c4..3096a45c9 100644 --- a/rpc/src/lib.rs +++ b/rpc/src/lib.rs @@ -19,6 +19,8 @@ #![cfg_attr(feature="nightly", feature(custom_derive, custom_attribute, plugin))] #![cfg_attr(feature="nightly", plugin(serde_macros, clippy))] +#[macro_use] +extern crate log; extern crate rustc_serialize; extern crate serde; extern crate serde_json; @@ -27,6 +29,7 @@ extern crate jsonrpc_http_server; extern crate ethcore_util as util; extern crate ethcore; extern crate ethsync; +extern crate ethminer; extern crate transient_hashmap; use std::sync::Arc; diff --git a/rpc/src/v1/helpers/poll_filter.rs b/rpc/src/v1/helpers/poll_filter.rs index 465290270..f9ed6230c 100644 --- a/rpc/src/v1/helpers/poll_filter.rs +++ b/rpc/src/v1/helpers/poll_filter.rs @@ -1,10 +1,13 @@ //! Helper type with all filter possibilities. +use util::hash::H256; use ethcore::filter::Filter; +pub type BlockNumber = u64; + #[derive(Clone)] pub enum PollFilter { - Block, - PendingTransaction, - Logs(Filter) + Block(BlockNumber), + PendingTransaction(Vec), + Logs(BlockNumber, Filter) } diff --git a/rpc/src/v1/helpers/poll_manager.rs b/rpc/src/v1/helpers/poll_manager.rs index 0297384d1..9735d7d5d 100644 --- a/rpc/src/v1/helpers/poll_manager.rs +++ b/rpc/src/v1/helpers/poll_manager.rs @@ -22,28 +22,13 @@ use transient_hashmap::{TransientHashMap, Timer, StandardTimer}; const POLL_LIFETIME: u64 = 60; pub type PollId = usize; -pub type BlockNumber = u64; - -pub struct PollInfo { - pub filter: F, - pub block_number: BlockNumber -} - -impl Clone for PollInfo where F: Clone { - fn clone(&self) -> Self { - PollInfo { - filter: self.filter.clone(), - block_number: self.block_number.clone() - } - } -} /// Indexes all poll requests. /// /// Lazily garbage collects unused polls info. pub struct PollManager where T: Timer { - polls: TransientHashMap, T>, - next_available_id: PollId + polls: TransientHashMap, + next_available_id: PollId, } impl PollManager { @@ -54,6 +39,7 @@ impl PollManager { } impl PollManager where T: Timer { + pub fn new_with_timer(timer: T) -> Self { PollManager { polls: TransientHashMap::new_with_timer(POLL_LIFETIME, timer), @@ -64,31 +50,30 @@ impl PollManager where T: Timer { /// Returns id which can be used for new poll. /// /// Stores information when last poll happend. - pub fn create_poll(&mut self, filter: F, block: BlockNumber) -> PollId { + pub fn create_poll(&mut self, filter: F) -> PollId { self.polls.prune(); + let id = self.next_available_id; + self.polls.insert(id, filter); + self.next_available_id += 1; - self.polls.insert(id, PollInfo { - filter: filter, - block_number: block, - }); id } - /// Updates information when last poll happend. - pub fn update_poll(&mut self, id: &PollId, block: BlockNumber) { - self.polls.prune(); - if let Some(info) = self.polls.get_mut(id) { - info.block_number = block; - } - } - - /// Returns number of block when last poll happend. - pub fn poll_info(&mut self, id: &PollId) -> Option<&PollInfo> { + // Implementation is always using `poll_mut` + #[cfg(test)] + /// Get a reference to stored poll filter + pub fn poll(&mut self, id: &PollId) -> Option<&F> { self.polls.prune(); self.polls.get(id) } + /// Get a mutable reference to stored poll filter + pub fn poll_mut(&mut self, id: &PollId) -> Option<&mut F> { + self.polls.prune(); + self.polls.get_mut(id) + } + /// Removes poll info. pub fn remove_poll(&mut self, id: &PollId) { self.polls.remove(id); @@ -97,48 +82,46 @@ impl PollManager where T: Timer { #[cfg(test)] mod tests { - use std::cell::RefCell; + use std::cell::Cell; use transient_hashmap::Timer; use v1::helpers::PollManager; struct TestTimer<'a> { - time: &'a RefCell, + time: &'a Cell, } impl<'a> Timer for TestTimer<'a> { fn get_time(&self) -> i64 { - *self.time.borrow() + self.time.get() } } #[test] fn test_poll_indexer() { - let time = RefCell::new(0); + let time = Cell::new(0); let timer = TestTimer { time: &time, }; let mut indexer = PollManager::new_with_timer(timer); - assert_eq!(indexer.create_poll(false, 20), 0); - assert_eq!(indexer.create_poll(true, 20), 1); + assert_eq!(indexer.create_poll(20), 0); + assert_eq!(indexer.create_poll(20), 1); - *time.borrow_mut() = 10; - indexer.update_poll(&0, 21); - assert_eq!(indexer.poll_info(&0).unwrap().filter, false); - assert_eq!(indexer.poll_info(&0).unwrap().block_number, 21); + time.set(10); + *indexer.poll_mut(&0).unwrap() = 21; + assert_eq!(*indexer.poll(&0).unwrap(), 21); + assert_eq!(*indexer.poll(&1).unwrap(), 20); - *time.borrow_mut() = 30; - indexer.update_poll(&1, 23); - assert_eq!(indexer.poll_info(&1).unwrap().filter, true); - assert_eq!(indexer.poll_info(&1).unwrap().block_number, 23); + time.set(30); + *indexer.poll_mut(&1).unwrap() = 23; + assert_eq!(*indexer.poll(&1).unwrap(), 23); - *time.borrow_mut() = 75; - indexer.update_poll(&0, 30); - assert!(indexer.poll_info(&0).is_none()); - assert_eq!(indexer.poll_info(&1).unwrap().filter, true); - assert_eq!(indexer.poll_info(&1).unwrap().block_number, 23); + time.set(75); + assert!(indexer.poll(&0).is_none()); + assert_eq!(*indexer.poll(&1).unwrap(), 23); indexer.remove_poll(&1); - assert!(indexer.poll_info(&1).is_none()); + assert!(indexer.poll(&1).is_none()); } + } diff --git a/rpc/src/v1/impls/eth.rs b/rpc/src/v1/impls/eth.rs index 8ff1f30d0..0e8b8d863 100644 --- a/rpc/src/v1/impls/eth.rs +++ b/rpc/src/v1/impls/eth.rs @@ -15,9 +15,11 @@ // along with Parity. If not, see . //! Eth rpc implementation. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::sync::{Arc, Weak, Mutex, RwLock}; +use std::ops::Deref; use ethsync::{SyncProvider, SyncState}; +use ethminer::{MinerService}; use jsonrpc_core::*; use util::numbers::*; use util::sha3::*; @@ -34,19 +36,29 @@ use v1::helpers::{PollFilter, PollManager}; use util::keys::store::AccountProvider; /// Eth rpc implementation. -pub struct EthClient where C: BlockChainClient, S: SyncProvider, A: AccountProvider { +pub struct EthClient + where C: BlockChainClient, + S: SyncProvider, + A: AccountProvider, + M: MinerService { client: Weak, sync: Weak, accounts: Weak, + miner: Weak, hashrates: RwLock>, } -impl EthClient where C: BlockChainClient, S: SyncProvider, A: AccountProvider { +impl EthClient + where C: BlockChainClient, + S: SyncProvider, + A: AccountProvider, + M: MinerService { /// Creates new EthClient. - pub fn new(client: &Arc, sync: &Arc, accounts: &Arc) -> Self { + pub fn new(client: &Arc, sync: &Arc, accounts: &Arc, miner: &Arc) -> Self { EthClient { client: Arc::downgrade(client), sync: Arc::downgrade(sync), + miner: Arc::downgrade(miner), accounts: Arc::downgrade(accounts), hashrates: RwLock::new(HashMap::new()), } @@ -98,7 +110,12 @@ impl EthClient where C: BlockChainClient, S: SyncProvider, A: } } -impl Eth for EthClient where C: BlockChainClient + 'static, S: SyncProvider + 'static, A: AccountProvider + 'static { +impl Eth for EthClient + where C: BlockChainClient + 'static, + S: SyncProvider + 'static, + A: AccountProvider + 'static, + M: MinerService + 'static { + fn protocol_version(&self, params: Params) -> Result { match params { Params::None => Ok(Value::String(format!("{}", take_weak!(self.sync).status().protocol_version).to_owned())), @@ -196,7 +213,7 @@ impl Eth for EthClient where C: BlockChainClient + 'static, S: fn block_transaction_count_by_number(&self, params: Params) -> Result { from_params::<(BlockNumber,)>(params) .and_then(|(block_number,)| match block_number { - BlockNumber::Pending => to_value(&U256::from(take_weak!(self.sync).status().transaction_queue_pending)), + BlockNumber::Pending => to_value(&U256::from(take_weak!(self.miner).status().transaction_queue_pending)), _ => to_value(&take_weak!(self.client).block(block_number.into()) .map_or_else(U256::zero, |bytes| U256::from(BlockView::new(&bytes).transactions_count()))) }) @@ -271,8 +288,9 @@ impl Eth for EthClient where C: BlockChainClient + 'static, S: fn work(&self, params: Params) -> Result { match params { Params::None => { - let c = take_weak!(self.client); - let u = c.sealing_block().lock().unwrap(); + let miner = take_weak!(self.miner); + let client = take_weak!(self.client); + let u = miner.sealing_block(client.deref()).lock().unwrap(); match *u { Some(ref b) => { let pow_hash = b.hash(); @@ -290,9 +308,10 @@ impl Eth for EthClient where C: BlockChainClient + 'static, S: fn submit_work(&self, params: Params) -> Result { from_params::<(H64, H256, H256)>(params).and_then(|(nonce, pow_hash, mix_hash)| { // trace!("Decoded: nonce={}, pow_hash={}, mix_hash={}", nonce, pow_hash, mix_hash); - let c = take_weak!(self.client); + let miner = take_weak!(self.miner); + let client = take_weak!(self.client); let seal = vec![encode(&mix_hash).to_vec(), encode(&nonce).to_vec()]; - let r = c.submit_seal(pow_hash, seal); + let r = miner.submit_seal(client.deref(), pow_hash, seal); to_value(&r.is_ok()) }) } @@ -311,12 +330,21 @@ impl Eth for EthClient where C: BlockChainClient + 'static, S: let accounts = take_weak!(self.accounts); match accounts.account_secret(&transaction_request.from) { Ok(secret) => { - let sync = take_weak!(self.sync); + let miner = take_weak!(self.miner); + let client = take_weak!(self.client); + let transaction: EthTransaction = transaction_request.into(); let signed_transaction = transaction.sign(&secret); let hash = signed_transaction.hash(); - sync.insert_transaction(signed_transaction); - to_value(&hash) + + let import = miner.import_transactions(vec![signed_transaction], |a: &Address| client.nonce(a)); + match import { + Ok(_) => to_value(&hash), + Err(e) => { + warn!("Error sending transaction: {:?}", e); + to_value(&U256::zero()) + } + } }, Err(_) => { to_value(&U256::zero()) } } @@ -325,27 +353,39 @@ impl Eth for EthClient where C: BlockChainClient + 'static, S: } /// Eth filter rpc implementation. -pub struct EthFilterClient where C: BlockChainClient { +pub struct EthFilterClient + where C: BlockChainClient, + M: MinerService { + client: Weak, + miner: Weak, polls: Mutex>, } -impl EthFilterClient where C: BlockChainClient { +impl EthFilterClient + where C: BlockChainClient, + M: MinerService { + /// Creates new Eth filter client. - pub fn new(client: &Arc) -> Self { + pub fn new(client: &Arc, miner: &Arc) -> Self { EthFilterClient { client: Arc::downgrade(client), - polls: Mutex::new(PollManager::new()) + miner: Arc::downgrade(miner), + polls: Mutex::new(PollManager::new()), } } } -impl EthFilter for EthFilterClient where C: BlockChainClient + 'static { +impl EthFilter for EthFilterClient + where C: BlockChainClient + 'static, + M: MinerService + 'static { + fn new_filter(&self, params: Params) -> Result { from_params::<(Filter,)>(params) .and_then(|(filter,)| { let mut polls = self.polls.lock().unwrap(); - let id = polls.create_poll(PollFilter::Logs(filter.into()), take_weak!(self.client).chain_info().best_block_number); + let block_number = take_weak!(self.client).chain_info().best_block_number; + let id = polls.create_poll(PollFilter::Logs(block_number, filter.into())); to_value(&U256::from(id)) }) } @@ -354,7 +394,7 @@ impl EthFilter for EthFilterClient where C: BlockChainClient + 'static { match params { Params::None => { let mut polls = self.polls.lock().unwrap(); - let id = polls.create_poll(PollFilter::Block, take_weak!(self.client).chain_info().best_block_number); + let id = polls.create_poll(PollFilter::Block(take_weak!(self.client).chain_info().best_block_number)); to_value(&U256::from(id)) }, _ => Err(Error::invalid_params()) @@ -365,7 +405,9 @@ impl EthFilter for EthFilterClient where C: BlockChainClient + 'static { match params { Params::None => { let mut polls = self.polls.lock().unwrap(); - let id = polls.create_poll(PollFilter::PendingTransaction, take_weak!(self.client).chain_info().best_block_number); + let pending_transactions = take_weak!(self.miner).pending_transactions_hashes(); + let id = polls.create_poll(PollFilter::PendingTransaction(pending_transactions)); + to_value(&U256::from(id)) }, _ => Err(Error::invalid_params()) @@ -376,37 +418,47 @@ impl EthFilter for EthFilterClient where C: BlockChainClient + 'static { let client = take_weak!(self.client); from_params::<(Index,)>(params) .and_then(|(index,)| { - let info = self.polls.lock().unwrap().poll_info(&index.value()).cloned(); - match info { + let mut polls = self.polls.lock().unwrap(); + match polls.poll_mut(&index.value()) { None => Ok(Value::Array(vec![] as Vec)), - Some(info) => match info.filter { - PollFilter::Block => { + Some(filter) => match *filter { + PollFilter::Block(ref mut block_number) => { // + 1, cause we want to return hashes including current block hash. let current_number = client.chain_info().best_block_number + 1; - let hashes = (info.block_number..current_number).into_iter() + let hashes = (*block_number..current_number).into_iter() .map(BlockId::Number) .filter_map(|id| client.block_hash(id)) .collect::>(); - self.polls.lock().unwrap().update_poll(&index.value(), current_number); + *block_number = current_number; to_value(&hashes) }, - PollFilter::PendingTransaction => { - // TODO: fix implementation once TransactionQueue is merged - to_value(&vec![] as &Vec) + PollFilter::PendingTransaction(ref mut previous_hashes) => { + let current_hashes = take_weak!(self.miner).pending_transactions_hashes(); + // calculate diff + let previous_hashes_set = previous_hashes.into_iter().map(|h| h.clone()).collect::>(); + let diff = current_hashes + .iter() + .filter(|hash| previous_hashes_set.contains(&hash)) + .cloned() + .collect::>(); + + *previous_hashes = current_hashes; + + to_value(&diff) }, - PollFilter::Logs(mut filter) => { - filter.from_block = BlockId::Number(info.block_number); + PollFilter::Logs(ref mut block_number, ref mut filter) => { + filter.from_block = BlockId::Number(*block_number); filter.to_block = BlockId::Latest; - let logs = client.logs(filter) + let logs = client.logs(filter.clone()) .into_iter() .map(From::from) .collect::>(); let current_number = client.chain_info().best_block_number; - self.polls.lock().unwrap().update_poll(&index.value(), current_number); + *block_number = current_number; to_value(&logs) } } diff --git a/rpc/src/v1/tests/eth.rs b/rpc/src/v1/tests/eth.rs index 3d4fb0451..35c227e40 100644 --- a/rpc/src/v1/tests/eth.rs +++ b/rpc/src/v1/tests/eth.rs @@ -21,7 +21,7 @@ use util::hash::{Address, H256}; use util::numbers::U256; use ethcore::client::{TestBlockChainClient, EachBlockWith}; use v1::{Eth, EthClient}; -use v1::tests::helpers::{TestAccount, TestAccountProvider, TestSyncProvider, Config}; +use v1::tests::helpers::{TestAccount, TestAccountProvider, TestSyncProvider, Config, TestMinerService}; fn blockchain_client() -> Arc { let mut client = TestBlockChainClient::new(); @@ -46,10 +46,15 @@ fn sync_provider() -> Arc { })) } +fn miner_service() -> Arc { + Arc::new(TestMinerService) +} + struct EthTester { _client: Arc, _sync: Arc, _accounts_provider: Arc, + _miner: Arc, pub io: IoHandler, } @@ -58,13 +63,15 @@ impl Default for EthTester { let client = blockchain_client(); let sync = sync_provider(); let ap = accounts_provider(); - let eth = EthClient::new(&client, &sync, &ap).to_delegate(); + let miner = miner_service(); + let eth = EthClient::new(&client, &sync, &ap, &miner).to_delegate(); let io = IoHandler::new(); io.add_delegate(eth); EthTester { _client: client, _sync: sync, _accounts_provider: ap, + _miner: miner, io: io } } diff --git a/rpc/src/v1/tests/helpers/miner_service.rs b/rpc/src/v1/tests/helpers/miner_service.rs new file mode 100644 index 000000000..0cddf2a1e --- /dev/null +++ b/rpc/src/v1/tests/helpers/miner_service.rs @@ -0,0 +1,53 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +use util::{Address, H256, U256, Bytes}; +use util::standard::*; +use ethcore::error::Error; +use ethcore::client::BlockChainClient; +use ethcore::block::ClosedBlock; +use ethcore::transaction::SignedTransaction; +use ethminer::{MinerService, MinerStatus}; + +pub struct TestMinerService; + +impl MinerService for TestMinerService { + + /// Returns miner's status. + fn status(&self) -> MinerStatus { unimplemented!(); } + + /// Imports transactions to transaction queue. + fn import_transactions(&self, _transactions: Vec, _fetch_nonce: T) -> Result<(), Error> where T: Fn(&Address) -> U256 { unimplemented!(); } + + /// Returns hashes of transactions currently in pending + fn pending_transactions_hashes(&self) -> Vec { unimplemented!(); } + + /// Removes all transactions from the queue and restart mining operation. + fn clear_and_reset(&self, _chain: &BlockChainClient) { unimplemented!(); } + + /// Called when blocks are imported to chain, updates transactions queue. + fn chain_new_blocks(&self, _chain: &BlockChainClient, _imported: &[H256], _invalid: &[H256], _enacted: &[H256], _retracted: &[H256]) { unimplemented!(); } + + /// New chain head event. Restart mining operation. + fn prepare_sealing(&self, _chain: &BlockChainClient) { unimplemented!(); } + + /// Grab the `ClosedBlock` that we want to be sealed. Comes as a mutex that you have to lock. + fn sealing_block(&self, _chain: &BlockChainClient) -> &Mutex> { unimplemented!(); } + + /// Submit `seal` as a valid solution for the header of `pow_hash`. + /// Will check the seal, but not actually insert the block into the chain. + fn submit_seal(&self, _chain: &BlockChainClient, _pow_hash: H256, _seal: Vec) -> Result<(), Error> { unimplemented!(); } +} \ No newline at end of file diff --git a/rpc/src/v1/tests/helpers/mod.rs b/rpc/src/v1/tests/helpers/mod.rs index 3bd74bab7..fc429982e 100644 --- a/rpc/src/v1/tests/helpers/mod.rs +++ b/rpc/src/v1/tests/helpers/mod.rs @@ -16,6 +16,8 @@ mod account_provider; mod sync_provider; +mod miner_service; pub use self::account_provider::{TestAccount, TestAccountProvider}; pub use self::sync_provider::{Config, TestSyncProvider}; +pub use self::miner_service::{TestMinerService}; diff --git a/rpc/src/v1/tests/helpers/sync_provider.rs b/rpc/src/v1/tests/helpers/sync_provider.rs index a3711d949..631752dfc 100644 --- a/rpc/src/v1/tests/helpers/sync_provider.rs +++ b/rpc/src/v1/tests/helpers/sync_provider.rs @@ -14,7 +14,6 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . -use ethcore::transaction::SignedTransaction; use ethsync::{SyncProvider, SyncStatus, SyncState}; pub struct Config { @@ -40,7 +39,6 @@ impl TestSyncProvider { num_peers: config.num_peers, num_active_peers: 0, mem_used: 0, - transaction_queue_pending: 0, }, } } @@ -50,9 +48,5 @@ impl SyncProvider for TestSyncProvider { fn status(&self) -> SyncStatus { self.status.clone() } - - fn insert_transaction(&self, _transaction: SignedTransaction) { - unimplemented!() - } } diff --git a/sync/Cargo.toml b/sync/Cargo.toml index 6022beb9c..8cd59333d 100644 --- a/sync/Cargo.toml +++ b/sync/Cargo.toml @@ -11,14 +11,13 @@ authors = ["Ethcore , + /// Miner + miner: Arc, } type RlpResponseResult = Result, PacketDecodeError>; impl ChainSync { /// Create a new instance of syncing strategy. - pub fn new(config: SyncConfig) -> ChainSync { + pub fn new(config: SyncConfig, miner: Arc) -> ChainSync { ChainSync { state: SyncState::NotSynced, starting_block: 0, @@ -243,7 +239,7 @@ impl ChainSync { last_sent_block_number: 0, max_download_ahead_blocks: max(MAX_HEADERS_TO_REQUEST, config.max_download_ahead_blocks), network_id: config.network_id, - transaction_queue: Mutex::new(TransactionQueue::new()), + miner: miner, } } @@ -259,7 +255,6 @@ impl ChainSync { blocks_total: match self.highest_block { Some(x) if x > self.starting_block => x - self.starting_block, _ => 0 }, num_peers: self.peers.len(), num_active_peers: self.peers.values().filter(|p| p.asking != PeerAsking::Nothing).count(), - transaction_queue_pending: self.transaction_queue.lock().unwrap().status().pending, mem_used: // TODO: https://github.com/servo/heapsize/pull/50 // self.downloading_hashes.heap_size_of_children() @@ -301,7 +296,6 @@ impl ChainSync { self.starting_block = 0; self.highest_block = None; self.have_common_block = false; - self.transaction_queue.lock().unwrap().clear(); self.starting_block = io.chain().chain_info().best_block_number; self.state = SyncState::NotSynced; } @@ -931,16 +925,17 @@ impl ChainSync { } /// Called when peer sends us new transactions fn on_peer_transactions(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { - let chain = io.chain(); let item_count = r.item_count(); trace!(target: "sync", "{} -> Transactions ({} entries)", peer_id, item_count); - let fetch_latest_nonce = |a : &Address| chain.nonce(a); - let mut transaction_queue = self.transaction_queue.lock().unwrap(); + let mut transactions = Vec::with_capacity(item_count); for i in 0..item_count { let tx: SignedTransaction = try!(r.val_at(i)); - let _ = transaction_queue.add(tx, &fetch_latest_nonce); + transactions.push(tx); } + let chain = io.chain(); + let fetch_nonce = |a: &Address| chain.nonce(a); + let _ = self.miner.import_transactions(transactions, fetch_nonce); Ok(()) } @@ -1268,48 +1263,16 @@ impl ChainSync { } /// called when block is imported to chain, updates transactions queue and propagates the blocks - pub fn chain_new_blocks(&mut self, io: &mut SyncIo, good: &[H256], bad: &[H256], _retracted: &[H256]) { - fn fetch_transactions(chain: &BlockChainClient, hash: &H256) -> Vec { - let block = chain - .block(BlockId::Hash(hash.clone())) - // Client should send message after commit to db and inserting to chain. - .expect("Expected in-chain blocks."); - let block = BlockView::new(&block); - block.transactions() - } - - - { - let chain = io.chain(); - let good = good.par_iter().map(|h| fetch_transactions(chain, h)); - let bad = bad.par_iter().map(|h| fetch_transactions(chain, h)); - - good.for_each(|txs| { - let mut transaction_queue = self.transaction_queue.lock().unwrap(); - let hashes = txs.iter().map(|tx| tx.hash()).collect::>(); - transaction_queue.remove_all(&hashes, |a| chain.nonce(a)); - }); - bad.for_each(|txs| { - // populate sender - for tx in &txs { - let _sender = tx.sender(); - } - let mut transaction_queue = self.transaction_queue.lock().unwrap(); - let _ = transaction_queue.add_all(txs, |a| chain.nonce(a)); - }); - } - + pub fn chain_new_blocks(&mut self, io: &mut SyncIo, imported: &[H256], invalid: &[H256], enacted: &[H256], retracted: &[H256]) { + // Notify miner + self.miner.chain_new_blocks(io.chain(), imported, invalid, enacted, retracted); // Propagate latests blocks self.propagate_latest_blocks(io); // TODO [todr] propagate transactions? } - /// Add transaction to the transaction queue - pub fn insert_transaction(&self, transaction: ethcore::transaction::SignedTransaction, fetch_nonce: &T) -> Result<(), Error> - where T: Fn(&Address) -> U256 - { - let mut queue = self.transaction_queue.lock().unwrap(); - queue.add(transaction, fetch_nonce) + pub fn chain_new_head(&mut self, io: &mut SyncIo) { + self.miner.prepare_sealing(io.chain()); } } @@ -1322,6 +1285,7 @@ mod tests { use super::{PeerInfo, PeerAsking}; use ethcore::header::*; use ethcore::client::*; + use ethminer::{Miner, MinerService}; fn get_dummy_block(order: u32, parent_hash: H256) -> Bytes { let mut header = Header::new(); @@ -1431,7 +1395,7 @@ mod tests { } fn dummy_sync_with_peer(peer_latest_hash: H256) -> ChainSync { - let mut sync = ChainSync::new(SyncConfig::default()); + let mut sync = ChainSync::new(SyncConfig::default(), Miner::new()); sync.peers.insert(0, PeerInfo { protocol_version: 0, @@ -1652,15 +1616,15 @@ mod tests { let mut io = TestIo::new(&mut client, &mut queue, None); // when - sync.chain_new_blocks(&mut io, &[], &good_blocks, &[]); - assert_eq!(sync.transaction_queue.lock().unwrap().status().future, 0); - assert_eq!(sync.transaction_queue.lock().unwrap().status().pending, 1); - sync.chain_new_blocks(&mut io, &good_blocks, &retracted_blocks, &[]); + sync.chain_new_blocks(&mut io, &[], &[], &[], &good_blocks); + assert_eq!(sync.miner.status().transaction_queue_future, 0); + assert_eq!(sync.miner.status().transaction_queue_pending, 1); + sync.chain_new_blocks(&mut io, &good_blocks, &[], &[], &retracted_blocks); // then - let status = sync.transaction_queue.lock().unwrap().status(); - assert_eq!(status.pending, 1); - assert_eq!(status.future, 0); + let status = sync.miner.status(); + assert_eq!(status.transaction_queue_pending, 1); + assert_eq!(status.transaction_queue_future, 0); } #[test] diff --git a/sync/src/lib.rs b/sync/src/lib.rs index ef0df0dcf..1c87da2de 100644 --- a/sync/src/lib.rs +++ b/sync/src/lib.rs @@ -32,18 +32,21 @@ //! extern crate ethcore_util as util; //! extern crate ethcore; //! extern crate ethsync; +//! extern crate ethminer; //! use std::env; //! use std::sync::Arc; //! use util::network::{NetworkService, NetworkConfiguration}; //! use ethcore::client::{Client, ClientConfig}; //! use ethsync::{EthSync, SyncConfig}; +//! use ethminer::Miner; //! use ethcore::ethereum; //! //! fn main() { //! let mut service = NetworkService::start(NetworkConfiguration::new()).unwrap(); //! let dir = env::temp_dir(); //! let client = Client::new(ClientConfig::default(), ethereum::new_frontier(), &dir, service.io().channel()).unwrap(); -//! EthSync::register(&mut service, SyncConfig::default(), client); +//! let miner = Miner::new(); +//! EthSync::register(&mut service, SyncConfig::default(), client, miner); //! } //! ``` @@ -52,28 +55,27 @@ extern crate log; #[macro_use] extern crate ethcore_util as util; extern crate ethcore; +extern crate ethminer; extern crate env_logger; extern crate time; extern crate rand; -extern crate rayon; #[macro_use] extern crate heapsize; use std::ops::*; use std::sync::*; -use ethcore::client::Client; use util::network::{NetworkProtocolHandler, NetworkService, NetworkContext, PeerId}; use util::TimerToken; use util::{U256, ONE_U256}; -use chain::ChainSync; +use ethcore::client::Client; use ethcore::service::SyncMessage; +use ethminer::Miner; use io::NetSyncIo; +use chain::ChainSync; mod chain; mod io; mod range_collection; -mod transaction_queue; -pub use transaction_queue::TransactionQueue; #[cfg(test)] mod tests; @@ -99,8 +101,6 @@ impl Default for SyncConfig { pub trait SyncProvider: Send + Sync { /// Get sync status fn status(&self) -> SyncStatus; - /// Insert transaction in the sync transaction queue - fn insert_transaction(&self, transaction: ethcore::transaction::SignedTransaction); } /// Ethereum network protocol handler @@ -115,10 +115,10 @@ pub use self::chain::{SyncStatus, SyncState}; impl EthSync { /// Creates and register protocol with the network service - pub fn register(service: &mut NetworkService, config: SyncConfig, chain: Arc) -> Arc { + pub fn register(service: &mut NetworkService, config: SyncConfig, chain: Arc, miner: Arc) -> Arc { let sync = Arc::new(EthSync { chain: chain, - sync: RwLock::new(ChainSync::new(config)), + sync: RwLock::new(ChainSync::new(config, miner)), }); service.register_protocol(sync.clone(), "eth", &[62u8, 63u8]).expect("Error registering eth protocol handler"); sync @@ -140,16 +140,6 @@ impl SyncProvider for EthSync { fn status(&self) -> SyncStatus { self.sync.read().unwrap().status() } - - /// Insert transaction in transaction queue - fn insert_transaction(&self, transaction: ethcore::transaction::SignedTransaction) { - use util::numbers::*; - - let nonce_fn = |a: &Address| self.chain.state().nonce(a) + U256::one(); - let sync = self.sync.write().unwrap(); - sync.insert_transaction(transaction, &nonce_fn).unwrap_or_else( - |e| warn!(target: "sync", "Error inserting transaction to queue: {:?}", e)); - } } impl NetworkProtocolHandler for EthSync { @@ -174,13 +164,16 @@ impl NetworkProtocolHandler for EthSync { self.sync.write().unwrap().maintain_sync(&mut NetSyncIo::new(io, self.chain.deref())); } - #[allow(single_match)] fn message(&self, io: &NetworkContext, message: &SyncMessage) { match *message { - SyncMessage::NewChainBlocks { ref good, ref bad, ref retracted } => { + SyncMessage::NewChainBlocks { ref imported, ref invalid, ref enacted, ref retracted } => { let mut sync_io = NetSyncIo::new(io, self.chain.deref()); - self.sync.write().unwrap().chain_new_blocks(&mut sync_io, good, bad, retracted); + self.sync.write().unwrap().chain_new_blocks(&mut sync_io, imported, invalid, enacted, retracted); }, + SyncMessage::NewChainHead => { + let mut sync_io = NetSyncIo::new(io, self.chain.deref()); + self.sync.write().unwrap().chain_new_head(&mut sync_io); + } _ => {/* Ignore other messages */}, } } diff --git a/sync/src/tests/helpers.rs b/sync/src/tests/helpers.rs index ca4ae5158..b3e62ccc6 100644 --- a/sync/src/tests/helpers.rs +++ b/sync/src/tests/helpers.rs @@ -18,6 +18,7 @@ use util::*; use ethcore::client::{TestBlockChainClient, BlockChainClient}; use io::SyncIo; use chain::ChainSync; +use ethminer::Miner; use ::SyncConfig; pub struct TestIo<'p> { @@ -92,7 +93,7 @@ impl TestNet { for _ in 0..n { net.peers.push(TestPeer { chain: TestBlockChainClient::new(), - sync: ChainSync::new(SyncConfig::default()), + sync: ChainSync::new(SyncConfig::default(), Miner::new()), queue: VecDeque::new(), }); } @@ -167,6 +168,6 @@ impl TestNet { pub fn trigger_chain_new_blocks(&mut self, peer_id: usize) { let mut peer = self.peer_mut(peer_id); - peer.sync.chain_new_blocks(&mut TestIo::new(&mut peer.chain, &mut peer.queue, None), &[], &[], &[]); + peer.sync.chain_new_blocks(&mut TestIo::new(&mut peer.chain, &mut peer.queue, None), &[], &[], &[], &[]); } } diff --git a/test.sh b/test.sh index 0f5edb0d1..e1881a8ad 100755 --- a/test.sh +++ b/test.sh @@ -1,4 +1,5 @@ #!/bin/sh # Running Parity Full Test Sute -cargo test --features ethcore/json-tests $1 -p ethash -p ethcore-util -p ethcore -p ethsync -p ethcore-rpc -p parity +cargo test --features ethcore/json-tests $1 -p ethash -p ethcore-util -p ethcore -p ethsync -p ethcore-rpc -p parity -p +ethminer diff --git a/util/bigint/src/uint.rs b/util/bigint/src/uint.rs index c18ed839c..ea617d570 100644 --- a/util/bigint/src/uint.rs +++ b/util/bigint/src/uint.rs @@ -36,7 +36,6 @@ //! The functions here are designed to be fast. //! - #[cfg(all(asm_available, target_arch="x86_64"))] use std::mem; use std::fmt; diff --git a/util/src/error.rs b/util/src/error.rs index 68aa3e648..409cc0e5d 100644 --- a/util/src/error.rs +++ b/util/src/error.rs @@ -21,12 +21,13 @@ use network::NetworkError; use rlp::DecoderError; use io; use std::fmt; +use hash::H256; #[derive(Debug)] /// Error in database subsystem. pub enum BaseDataError { /// An entry was removed more times than inserted. - NegativelyReferencedHash, + NegativelyReferencedHash(H256), } #[derive(Debug)] diff --git a/util/src/journaldb/archivedb.rs b/util/src/journaldb/archivedb.rs index 19570b281..83a80b7c2 100644 --- a/util/src/journaldb/archivedb.rs +++ b/util/src/journaldb/archivedb.rs @@ -132,7 +132,7 @@ impl JournalDB for ArchiveDB { Box::new(ArchiveDB { overlay: MemoryDB::new(), backing: self.backing.clone(), - latest_era: None, + latest_era: self.latest_era, }) } @@ -144,7 +144,7 @@ impl JournalDB for ArchiveDB { self.latest_era.is_none() } - fn commit(&mut self, _: u64, _: &H256, _: Option<(u64, H256)>) -> Result { + fn commit(&mut self, now: u64, _: &H256, _: Option<(u64, H256)>) -> Result { let batch = DBTransaction::new(); let mut inserts = 0usize; let mut deletes = 0usize; @@ -160,6 +160,10 @@ impl JournalDB for ArchiveDB { deletes += 1; } } + if self.latest_era.map_or(true, |e| now > e) { + try!(batch.put(&LATEST_ERA_KEY, &encode(&now))); + self.latest_era = Some(now); + } try!(self.backing.write(batch)); Ok((inserts + deletes) as u32) } diff --git a/util/src/journaldb/refcounteddb.rs b/util/src/journaldb/refcounteddb.rs index 09362676b..590964247 100644 --- a/util/src/journaldb/refcounteddb.rs +++ b/util/src/journaldb/refcounteddb.rs @@ -43,6 +43,7 @@ pub struct RefCountedDB { const LATEST_ERA_KEY : [u8; 12] = [ b'l', b'a', b's', b't', 0, 0, 0, 0, 0, 0, 0, 0 ]; const VERSION_KEY : [u8; 12] = [ b'j', b'v', b'e', b'r', 0, 0, 0, 0, 0, 0, 0, 0 ]; const DB_VERSION : u32 = 512; +const PADDING : [u8; 10] = [ 0u8; 10 ]; impl RefCountedDB { /// Create a new instance given a `backing` database. @@ -131,9 +132,10 @@ impl JournalDB for RefCountedDB { let mut last; while try!(self.backing.get({ - let mut r = RlpStream::new_list(2); + let mut r = RlpStream::new_list(3); r.append(&now); r.append(&index); + r.append(&&PADDING[..]); last = r.drain(); &last })).is_some() { @@ -144,7 +146,10 @@ impl JournalDB for RefCountedDB { r.append(id); r.append(&self.inserts); r.append(&self.removes); - try!(self.backing.put(&last, r.as_raw())); + try!(batch.put(&last, r.as_raw())); + + trace!(target: "rcdb", "new journal for time #{}.{} => {}: inserts={:?}, removes={:?}", now, index, id, self.inserts, self.removes); + self.inserts.clear(); self.removes.clear(); @@ -158,20 +163,25 @@ impl JournalDB for RefCountedDB { if let Some((end_era, canon_id)) = end { let mut index = 0usize; let mut last; - while let Some(rlp_data) = try!(self.backing.get({ - let mut r = RlpStream::new_list(2); - r.append(&end_era); - r.append(&index); - last = r.drain(); - &last - })) { + while let Some(rlp_data) = { +// trace!(target: "rcdb", "checking for journal #{}.{}", end_era, index); + try!(self.backing.get({ + let mut r = RlpStream::new_list(3); + r.append(&end_era); + r.append(&index); + r.append(&&PADDING[..]); + last = r.drain(); + &last + })) + } { let rlp = Rlp::new(&rlp_data); - let to_remove: Vec = rlp.val_at(if canon_id == rlp.val_at(0) {2} else {1}); + let our_id: H256 = rlp.val_at(0); + let to_remove: Vec = rlp.val_at(if canon_id == our_id {2} else {1}); + trace!(target: "rcdb", "delete journal for time #{}.{}=>{}, (canon was {}): deleting {:?}", end_era, index, our_id, canon_id, to_remove); for i in &to_remove { self.forward.remove(i); } - try!(self.backing.delete(&last)); - trace!("RefCountedDB: delete journal for time #{}.{}, (canon was {}): {} entries", end_era, index, canon_id, to_remove.len()); + try!(batch.delete(&last)); index += 1; } } diff --git a/util/src/overlaydb.rs b/util/src/overlaydb.rs index 8166dd318..b5dec75e2 100644 --- a/util/src/overlaydb.rs +++ b/util/src/overlaydb.rs @@ -70,15 +70,15 @@ impl OverlayDB { let (back_value, back_rc) = x; let total_rc: i32 = back_rc as i32 + rc; if total_rc < 0 { - return Err(From::from(BaseDataError::NegativelyReferencedHash)); + return Err(From::from(BaseDataError::NegativelyReferencedHash(key))); } - deletes += if self.put_payload(batch, &key, (back_value, total_rc as u32)) {1} else {0}; + deletes += if self.put_payload_in_batch(batch, &key, (back_value, total_rc as u32)) {1} else {0}; } None => { if rc < 0 { - return Err(From::from(BaseDataError::NegativelyReferencedHash)); + return Err(From::from(BaseDataError::NegativelyReferencedHash(key))); } - self.put_payload(batch, &key, (value, rc as u32)); + self.put_payload_in_batch(batch, &key, (value, rc as u32)); } }; ret += 1; @@ -116,10 +116,32 @@ impl OverlayDB { /// } /// ``` pub fn commit(&mut self) -> Result { - let batch = DBTransaction::new(); - let r = try!(self.commit_to_batch(&batch)); - try!(self.backing.write(batch)); - Ok(r) + let mut ret = 0u32; + let mut deletes = 0usize; + for i in self.overlay.drain().into_iter() { + let (key, (value, rc)) = i; + if rc != 0 { + match self.payload(&key) { + Some(x) => { + let (back_value, back_rc) = x; + let total_rc: i32 = back_rc as i32 + rc; + if total_rc < 0 { + return Err(From::from(BaseDataError::NegativelyReferencedHash(key))); + } + deletes += if self.put_payload(&key, (back_value, total_rc as u32)) {1} else {0}; + } + None => { + if rc < 0 { + return Err(From::from(BaseDataError::NegativelyReferencedHash(key))); + } + self.put_payload(&key, (value, rc as u32)); + } + }; + ret += 1; + } + } + trace!("OverlayDB::commit() deleted {} nodes", deletes); + Ok(ret) } /// Revert all operations on this object (i.e. `insert()`s and `kill()`s) since the @@ -145,6 +167,9 @@ impl OverlayDB { /// ``` pub fn revert(&mut self) { self.overlay.clear(); } + /// Get the number of references that would be committed. + pub fn commit_refs(&self, key: &H256) -> i32 { self.overlay.raw(&key).map_or(0, |&(_, refs)| refs) } + /// Get the refs and value of the given key. fn payload(&self, key: &H256) -> Option<(Bytes, u32)> { self.backing.get(&key.bytes()) @@ -156,7 +181,7 @@ impl OverlayDB { } /// Put the refs and value of the given key, possibly deleting it from the db. - fn put_payload(&self, batch: &DBTransaction, key: &H256, payload: (Bytes, u32)) -> bool { + fn put_payload_in_batch(&self, batch: &DBTransaction, key: &H256, payload: (Bytes, u32)) -> bool { if payload.1 > 0 { let mut s = RlpStream::new_list(2); s.append(&payload.1); @@ -168,6 +193,20 @@ impl OverlayDB { true } } + + /// Put the refs and value of the given key, possibly deleting it from the db. + fn put_payload(&self, key: &H256, payload: (Bytes, u32)) -> bool { + if payload.1 > 0 { + let mut s = RlpStream::new_list(2); + s.append(&payload.1); + s.append(&payload.0); + self.backing.put(&key.bytes(), s.as_raw()).expect("Low-level database error. Some issue with your hard disk?"); + false + } else { + self.backing.delete(&key.bytes()).expect("Low-level database error. Some issue with your hard disk?"); + true + } + } } impl HashDB for OverlayDB { diff --git a/util/src/rlp/rlpin.rs b/util/src/rlp/rlpin.rs index d58fa95e8..9d3fcb2fa 100644 --- a/util/src/rlp/rlpin.rs +++ b/util/src/rlp/rlpin.rs @@ -24,7 +24,7 @@ impl<'a> From> for Rlp<'a> { } /// Data-oriented view onto trusted rlp-slice. -/// +/// /// Unlikely to `UntrustedRlp` doesn't bother you with error /// handling. It assumes that you know what you are doing. #[derive(Debug)] @@ -44,7 +44,7 @@ impl<'a, 'view> View<'a, 'view> for Rlp<'a> where 'a: 'view { type Data = &'a [u8]; type Item = Rlp<'a>; type Iter = RlpIterator<'a, 'view>; - + /// Create a new instance of `Rlp` fn new(bytes: &'a [u8]) -> Rlp<'a> { Rlp { @@ -116,7 +116,7 @@ impl<'a, 'view> View<'a, 'view> for Rlp<'a> where 'a: 'view { impl <'a, 'view> Rlp<'a> where 'a: 'view { fn view_as_val(r: &R) -> T where R: View<'a, 'view>, T: RlpDecodable { let res: Result = r.as_val(); - res.unwrap_or_else(|_| panic!()) + res.unwrap_or_else(|e| panic!("DecodeError: {}", e)) } /// Decode into an object