Merge branch 'master' into fixing_warnings

This commit is contained in:
Gav Wood 2016-03-14 00:57:49 +01:00
commit fe722419e7
40 changed files with 1066 additions and 440 deletions

View File

@ -14,11 +14,11 @@ matrix:
- rust: nightly - rust: nightly
include: include:
- rust: stable - rust: stable
env: FEATURES="--features travis-beta" KCOV_FEATURES="" TARGETS="-p ethash -p ethcore-util -p ethcore -p ethsync -p ethcore-rpc -p parity" ARCHIVE_SUFFIX="-${TRAVIS_OS_NAME}-${TRAVIS_TAG}" env: FEATURES="--features travis-beta" KCOV_FEATURES="" TARGETS="-p ethash -p ethcore-util -p ethcore -p ethsync -p ethcore-rpc -p parity -p ethminer" ARCHIVE_SUFFIX="-${TRAVIS_OS_NAME}-${TRAVIS_TAG}"
- rust: beta - rust: beta
env: FEATURES="--features travis-beta" KCOV_FEATURES="" TARGETS="-p ethash -p ethcore-util -p ethcore -p ethsync -p ethcore-rpc -p parity" ARCHIVE_SUFFIX="-${TRAVIS_OS_NAME}-${TRAVIS_TAG}" env: FEATURES="--features travis-beta" KCOV_FEATURES="" TARGETS="-p ethash -p ethcore-util -p ethcore -p ethsync -p ethcore-rpc -p parity -p ethminer" ARCHIVE_SUFFIX="-${TRAVIS_OS_NAME}-${TRAVIS_TAG}"
- rust: nightly - rust: nightly
env: FEATURES="--features travis-nightly" KCOV_FEATURES="" TARGETS="-p ethash -p ethcore-util -p ethcore -p ethsync -p ethcore-rpc -p parity" ARCHIVE_SUFFIX="-${TRAVIS_OS_NAME}-${TRAVIS_TAG}" env: FEATURES="--features travis-nightly" KCOV_FEATURES="" TARGETS="-p ethash -p ethcore-util -p ethcore -p ethsync -p ethcore-rpc -p parity -p ethminer" ARCHIVE_SUFFIX="-${TRAVIS_OS_NAME}-${TRAVIS_TAG}"
cache: cache:
apt: true apt: true
directories: directories:
@ -51,6 +51,7 @@ after_success: |
./kcov-master/tmp/usr/local/bin/kcov --exclude-pattern /usr/,/.cargo,/root/.multirust,src/tests,util/json-tests,util/src/network/tests,sync/src/tests,ethcore/src/tests,ethcore/src/evm/tests target/kcov target/debug/deps/ethcore-* && ./kcov-master/tmp/usr/local/bin/kcov --exclude-pattern /usr/,/.cargo,/root/.multirust,src/tests,util/json-tests,util/src/network/tests,sync/src/tests,ethcore/src/tests,ethcore/src/evm/tests target/kcov target/debug/deps/ethcore-* &&
./kcov-master/tmp/usr/local/bin/kcov --exclude-pattern /usr/,/.cargo,/root/.multirust,src/tests,util/json-tests,util/src/network/tests,sync/src/tests,ethcore/src/tests,ethcore/src/evm/tests target/kcov target/debug/deps/ethsync-* && ./kcov-master/tmp/usr/local/bin/kcov --exclude-pattern /usr/,/.cargo,/root/.multirust,src/tests,util/json-tests,util/src/network/tests,sync/src/tests,ethcore/src/tests,ethcore/src/evm/tests target/kcov target/debug/deps/ethsync-* &&
./kcov-master/tmp/usr/local/bin/kcov --exclude-pattern /usr/,/.cargo,/root/.multirust,src/tests,util/json-tests,util/src/network/tests,sync/src/tests,ethcore/src/tests,ethcore/src/evm/tests target/kcov target/debug/deps/ethcore_rpc-* && ./kcov-master/tmp/usr/local/bin/kcov --exclude-pattern /usr/,/.cargo,/root/.multirust,src/tests,util/json-tests,util/src/network/tests,sync/src/tests,ethcore/src/tests,ethcore/src/evm/tests target/kcov target/debug/deps/ethcore_rpc-* &&
./kcov-master/tmp/usr/local/bin/kcov --exclude-pattern /usr/,/.cargo,/root/.multirust,src/tests,util/json-tests,util/src/network/tests,sync/src/tests,ethcore/src/tests,ethcore/src/evm/tests target/kcov target/debug/deps/ethminer-* &&
./kcov-master/tmp/usr/local/bin/kcov --coveralls-id=${TRAVIS_JOB_ID} --exclude-pattern /usr/,/.cargo,/root/.multirust target/kcov target/debug/parity-* && ./kcov-master/tmp/usr/local/bin/kcov --coveralls-id=${TRAVIS_JOB_ID} --exclude-pattern /usr/,/.cargo,/root/.multirust target/kcov target/debug/parity-* &&
[ $TRAVIS_BRANCH = master ] && [ $TRAVIS_BRANCH = master ] &&
[ $TRAVIS_PULL_REQUEST = false ] && [ $TRAVIS_PULL_REQUEST = false ] &&

19
Cargo.lock generated
View File

@ -11,6 +11,7 @@ dependencies = [
"ethcore-devtools 0.9.99", "ethcore-devtools 0.9.99",
"ethcore-rpc 0.9.99", "ethcore-rpc 0.9.99",
"ethcore-util 0.9.99", "ethcore-util 0.9.99",
"ethminer 0.9.99",
"ethsync 0.9.99", "ethsync 0.9.99",
"fdlimit 0.1.0", "fdlimit 0.1.0",
"log 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)",
@ -237,6 +238,7 @@ dependencies = [
"ethash 0.9.99", "ethash 0.9.99",
"ethcore 0.9.99", "ethcore 0.9.99",
"ethcore-util 0.9.99", "ethcore-util 0.9.99",
"ethminer 0.9.99",
"ethsync 0.9.99", "ethsync 0.9.99",
"jsonrpc-core 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "jsonrpc-core 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"jsonrpc-http-server 3.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "jsonrpc-http-server 3.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
@ -284,6 +286,20 @@ dependencies = [
"vergen 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "vergen 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
] ]
[[package]]
name = "ethminer"
version = "0.9.99"
dependencies = [
"clippy 0.0.50 (registry+https://github.com/rust-lang/crates.io-index)",
"env_logger 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
"ethcore 0.9.99",
"ethcore-util 0.9.99",
"log 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)",
"rayon 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"rustc-serialize 0.3.18 (registry+https://github.com/rust-lang/crates.io-index)",
"rustc_version 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]] [[package]]
name = "ethsync" name = "ethsync"
version = "0.9.99" version = "0.9.99"
@ -292,11 +308,10 @@ dependencies = [
"env_logger 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", "env_logger 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
"ethcore 0.9.99", "ethcore 0.9.99",
"ethcore-util 0.9.99", "ethcore-util 0.9.99",
"ethminer 0.9.99",
"heapsize 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", "heapsize 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)",
"rayon 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"rustc-serialize 0.3.18 (registry+https://github.com/rust-lang/crates.io-index)",
"time 0.1.34 (registry+https://github.com/rust-lang/crates.io-index)", "time 0.1.34 (registry+https://github.com/rust-lang/crates.io-index)",
] ]

View File

@ -19,18 +19,19 @@ ctrlc = { git = "https://github.com/tomusdrw/rust-ctrlc.git" }
fdlimit = { path = "util/fdlimit" } fdlimit = { path = "util/fdlimit" }
daemonize = "0.2" daemonize = "0.2"
number_prefix = "0.2" number_prefix = "0.2"
rpassword = "0.1"
clippy = { version = "0.0.50", optional = true } clippy = { version = "0.0.50", optional = true }
ethcore = { path = "ethcore" } ethcore = { path = "ethcore" }
ethcore-util = { path = "util" } ethcore-util = { path = "util" }
ethsync = { path = "sync" } ethsync = { path = "sync" }
ethminer = { path = "miner" }
ethcore-devtools = { path = "devtools" } ethcore-devtools = { path = "devtools" }
ethcore-rpc = { path = "rpc", optional = true } ethcore-rpc = { path = "rpc", optional = true }
rpassword = "0.1"
[features] [features]
default = ["rpc"] default = ["rpc"]
rpc = ["ethcore-rpc"] rpc = ["ethcore-rpc"]
dev = ["clippy", "ethcore/dev", "ethcore-util/dev", "ethsync/dev", "ethcore-rpc/dev"] dev = ["clippy", "ethcore/dev", "ethcore-util/dev", "ethsync/dev", "ethcore-rpc/dev", "ethminer/dev"]
travis-beta = ["ethcore/json-tests"] travis-beta = ["ethcore/json-tests"]
travis-nightly = ["ethcore/json-tests", "dev"] travis-nightly = ["ethcore/json-tests", "dev"]

23
cov.sh
View File

@ -15,12 +15,23 @@ if ! type kcov > /dev/null; then
exit 1 exit 1
fi fi
cargo test -p ethash -p ethcore-util -p ethcore -p ethsync -p ethcore-rpc -p parity --no-run || exit $? cargo test \
-p ethash \
-p ethcore-util \
-p ethcore \
-p ethsync \
-p ethcore-rpc \
-p parity \
-p ethminer \
--no-run || exit $?
rm -rf target/coverage rm -rf target/coverage
mkdir -p target/coverage mkdir -p target/coverage
kcov --exclude-pattern ~/.multirust,rocksdb,secp256k1,src/tests,util/json-tests,util/src/network/tests,sync/src/tests,ethcore/src/tests,ethcore/src/evm/tests --include-pattern src --verify target/coverage target/debug/deps/ethcore-*
kcov --exclude-pattern ~/.multirust,rocksdb,secp256k1,src/tests,util/json-tests,util/src/network/tests,sync/src/tests,ethcore/src/tests,ethcore/src/evm/tests --include-pattern src --verify target/coverage target/debug/deps/ethash-* EXCLUDE="~/.multirust,rocksdb,secp256k1,src/tests,util/json-tests,util/src/network/tests,sync/src/tests,ethcore/src/tests,ethcore/src/evm/tests"
kcov --exclude-pattern ~/.multirust,rocksdb,secp256k1,src/tests,util/json-tests,util/src/network/tests,sync/src/tests,ethcore/src/tests,ethcore/src/evm/tests --include-pattern src --verify target/coverage target/debug/deps/ethcore_util-* kcov --exclude-pattern $EXCLUDE --include-pattern src --verify target/coverage target/debug/deps/ethcore-*
kcov --exclude-pattern ~/.multirust,rocksdb,secp256k1,src/tests,util/json-tests,util/src/network/tests,sync/src/tests,ethcore/src/tests,ethcore/src/evm/tests --include-pattern src --verify target/coverage target/debug/deps/ethsync-* kcov --exclude-pattern $EXCLUDE --include-pattern src --verify target/coverage target/debug/deps/ethash-*
kcov --exclude-pattern ~/.multirust,rocksdb,secp256k1,src/tests,util/json-tests,util/src/network/tests,sync/src/tests,ethcore/src/tests,ethcore/src/evm/tests --include-pattern src --verify target/coverage target/debug/deps/ethcore_rpc-* kcov --exclude-pattern $EXCLUDE --include-pattern src --verify target/coverage target/debug/deps/ethcore_util-*
kcov --exclude-pattern $EXCLUDE --include-pattern src --verify target/coverage target/debug/deps/ethsync-*
kcov --exclude-pattern $EXCLUDE --include-pattern src --verify target/coverage target/debug/deps/ethcore_rpc-*
kcov --exclude-pattern $EXCLUDE --include-pattern src --verify target/coverage target/debug/deps/ethminer-*
xdg-open target/coverage/index.html xdg-open target/coverage/index.html

9
doc.sh
View File

@ -1,4 +1,11 @@
#!/bin/sh #!/bin/sh
# generate documentation only for partiy and ethcore libraries # generate documentation only for partiy and ethcore libraries
cargo doc --no-deps --verbose -p ethash -p ethcore-util -p ethcore -p ethsync -p ethcore-rpc -p parity cargo doc --no-deps --verbose \
-p ethash \
-p ethcore-util \
-p ethcore \
-p ethsync \
-p ethcore-rpc \
-p parity \
-p ethminer

View File

@ -97,6 +97,9 @@ impl<'db> HashDB for AccountDBMut<'db>{
} }
fn insert(&mut self, value: &[u8]) -> H256 { fn insert(&mut self, value: &[u8]) -> H256 {
if value == &NULL_RLP {
return SHA3_NULL_RLP.clone();
}
let k = value.sha3(); let k = value.sha3();
let ak = combine_key(&self.address, &k); let ak = combine_key(&self.address, &k);
self.db.emplace(ak, value.to_vec()); self.db.emplace(ak, value.to_vec());
@ -104,11 +107,17 @@ impl<'db> HashDB for AccountDBMut<'db>{
} }
fn emplace(&mut self, key: H256, value: Bytes) { fn emplace(&mut self, key: H256, value: Bytes) {
if key == SHA3_NULL_RLP {
return;
}
let key = combine_key(&self.address, &key); let key = combine_key(&self.address, &key);
self.db.emplace(key, value.to_vec()) self.db.emplace(key, value.to_vec())
} }
fn kill(&mut self, key: &H256) { fn kill(&mut self, key: &H256) {
if key == &SHA3_NULL_RLP {
return;
}
let key = combine_key(&self.address, key); let key = combine_key(&self.address, key);
self.db.kill(&key) self.db.kill(&key)
} }

View File

@ -17,7 +17,6 @@
//! Blockchain database client. //! Blockchain database client.
use std::marker::PhantomData; use std::marker::PhantomData;
use std::sync::atomic::AtomicBool;
use util::*; use util::*;
use util::panics::*; use util::panics::*;
use views::BlockView; use views::BlockView;
@ -31,12 +30,12 @@ use service::{NetSyncMessage, SyncMessage};
use env_info::LastHashes; use env_info::LastHashes;
use verification::*; use verification::*;
use block::*; use block::*;
use transaction::LocalizedTransaction; use transaction::{LocalizedTransaction, SignedTransaction};
use extras::TransactionAddress; use extras::TransactionAddress;
use filter::Filter; use filter::Filter;
use log_entry::LocalizedLogEntry; use log_entry::LocalizedLogEntry;
use block_queue::{BlockQueue, BlockQueueInfo}; use block_queue::{BlockQueue, BlockQueueInfo};
use blockchain::{BlockChain, BlockProvider, TreeRoute}; use blockchain::{BlockChain, BlockProvider, TreeRoute, ImportRoute};
use client::{BlockId, TransactionId, ClientConfig, BlockChainClient}; use client::{BlockId, TransactionId, ClientConfig, BlockChainClient};
pub use blockchain::CacheSize as BlockChainCacheSize; pub use blockchain::CacheSize as BlockChainCacheSize;
@ -106,12 +105,6 @@ pub struct Client<V = CanonVerifier> where V: Verifier {
report: RwLock<ClientReport>, report: RwLock<ClientReport>,
import_lock: Mutex<()>, import_lock: Mutex<()>,
panic_handler: Arc<PanicHandler>, panic_handler: Arc<PanicHandler>,
// for sealing...
sealing_enabled: AtomicBool,
sealing_block: Mutex<Option<ClosedBlock>>,
author: RwLock<Address>,
extra_data: RwLock<Bytes>,
verifier: PhantomData<V>, verifier: PhantomData<V>,
} }
@ -159,10 +152,6 @@ impl<V> Client<V> where V: Verifier {
report: RwLock::new(Default::default()), report: RwLock::new(Default::default()),
import_lock: Mutex::new(()), import_lock: Mutex::new(()),
panic_handler: panic_handler, panic_handler: panic_handler,
sealing_enabled: AtomicBool::new(false),
sealing_block: Mutex::new(None),
author: RwLock::new(Address::new()),
extra_data: RwLock::new(Vec::new()),
verifier: PhantomData, verifier: PhantomData,
})) }))
} }
@ -233,12 +222,39 @@ impl<V> Client<V> where V: Verifier {
Ok(closed_block) Ok(closed_block)
} }
fn calculate_enacted_retracted(&self, import_results: Vec<ImportRoute>) -> (Vec<H256>, Vec<H256>) {
fn map_to_vec(map: Vec<(H256, bool)>) -> Vec<H256> {
map.into_iter().map(|(k, _v)| k).collect()
}
// In ImportRoute we get all the blocks that have been enacted and retracted by single insert.
// Because we are doing multiple inserts some of the blocks that were enacted in import `k`
// could be retracted in import `k+1`. This is why to understand if after all inserts
// the block is enacted or retracted we iterate over all routes and at the end final state
// will be in the hashmap
let map = import_results.into_iter().fold(HashMap::new(), |mut map, route| {
for hash in route.enacted {
map.insert(hash, true);
}
for hash in route.retracted {
map.insert(hash, false);
}
map
});
// Split to enacted retracted (using hashmap value)
let (enacted, retracted) = map.into_iter().partition(|&(_k, v)| v);
// And convert tuples to keys
(map_to_vec(enacted), map_to_vec(retracted))
}
/// This is triggered by a message coming from a block queue when the block is ready for insertion /// This is triggered by a message coming from a block queue when the block is ready for insertion
pub fn import_verified_blocks(&self, io: &IoChannel<NetSyncMessage>) -> usize { pub fn import_verified_blocks(&self, io: &IoChannel<NetSyncMessage>) -> usize {
let max_blocks_to_import = 128; let max_blocks_to_import = 128;
let mut good_blocks = Vec::with_capacity(max_blocks_to_import); let mut imported_blocks = Vec::with_capacity(max_blocks_to_import);
let mut bad_blocks = HashSet::new(); let mut invalid_blocks = HashSet::new();
let mut import_results = Vec::with_capacity(max_blocks_to_import);
let _import_lock = self.import_lock.lock(); let _import_lock = self.import_lock.lock();
let blocks = self.block_queue.drain(max_blocks_to_import); let blocks = self.block_queue.drain(max_blocks_to_import);
@ -248,16 +264,16 @@ impl<V> Client<V> where V: Verifier {
for block in blocks { for block in blocks {
let header = &block.header; let header = &block.header;
if bad_blocks.contains(&header.parent_hash) { if invalid_blocks.contains(&header.parent_hash) {
bad_blocks.insert(header.hash()); invalid_blocks.insert(header.hash());
continue; continue;
} }
let closed_block = self.check_and_close_block(&block); let closed_block = self.check_and_close_block(&block);
if let Err(_) = closed_block { if let Err(_) = closed_block {
bad_blocks.insert(header.hash()); invalid_blocks.insert(header.hash());
break; break;
} }
good_blocks.push(header.hash()); imported_blocks.push(header.hash());
// Are we committing an era? // Are we committing an era?
let ancient = if header.number() >= HISTORY { let ancient = if header.number() >= HISTORY {
@ -276,37 +292,41 @@ impl<V> Client<V> where V: Verifier {
// And update the chain after commit to prevent race conditions // And update the chain after commit to prevent race conditions
// (when something is in chain but you are not able to fetch details) // (when something is in chain but you are not able to fetch details)
self.chain.insert_block(&block.bytes, receipts); let route = self.chain.insert_block(&block.bytes, receipts);
import_results.push(route);
self.report.write().unwrap().accrue_block(&block); self.report.write().unwrap().accrue_block(&block);
trace!(target: "client", "Imported #{} ({})", header.number(), header.hash()); trace!(target: "client", "Imported #{} ({})", header.number(), header.hash());
} }
let imported = good_blocks.len(); let imported = imported_blocks.len();
let bad_blocks = bad_blocks.into_iter().collect::<Vec<H256>>(); let invalid_blocks = invalid_blocks.into_iter().collect::<Vec<H256>>();
{ {
if !bad_blocks.is_empty() { if !invalid_blocks.is_empty() {
self.block_queue.mark_as_bad(&bad_blocks); self.block_queue.mark_as_bad(&invalid_blocks);
} }
if !good_blocks.is_empty() { if !imported_blocks.is_empty() {
self.block_queue.mark_as_good(&good_blocks); self.block_queue.mark_as_good(&imported_blocks);
} }
} }
{ {
if !good_blocks.is_empty() && self.block_queue.queue_info().is_empty() { if !imported_blocks.is_empty() && self.block_queue.queue_info().is_empty() {
let (enacted, retracted) = self.calculate_enacted_retracted(import_results);
io.send(NetworkIoMessage::User(SyncMessage::NewChainBlocks { io.send(NetworkIoMessage::User(SyncMessage::NewChainBlocks {
good: good_blocks, imported: imported_blocks,
bad: bad_blocks, invalid: invalid_blocks,
// TODO [todr] were to take those from? enacted: enacted,
retracted: vec![], retracted: retracted,
})).unwrap(); })).unwrap();
} }
} }
if self.chain_info().best_block_hash != original_best && self.sealing_enabled.load(atomic::Ordering::Relaxed) { {
self.prepare_sealing(); if self.chain_info().best_block_hash != original_best {
io.send(NetworkIoMessage::User(SyncMessage::NewChainHead)).unwrap();
}
} }
imported imported
@ -357,52 +377,59 @@ impl<V> Client<V> where V: Verifier {
BlockId::Latest => Some(self.chain.best_block_number()) BlockId::Latest => Some(self.chain.best_block_number())
} }
} }
/// Get the author that we will seal blocks as.
pub fn author(&self) -> Address {
self.author.read().unwrap().clone()
}
/// Set the author that we will seal blocks as.
pub fn set_author(&self, author: Address) {
*self.author.write().unwrap() = author;
}
/// Get the extra_data that we will seal blocks wuth.
pub fn extra_data(&self) -> Bytes {
self.extra_data.read().unwrap().clone()
}
/// Set the extra_data that we will seal blocks with.
pub fn set_extra_data(&self, extra_data: Bytes) {
*self.extra_data.write().unwrap() = extra_data;
}
/// New chain head event. Restart mining operation.
pub fn prepare_sealing(&self) {
let h = self.chain.best_block_hash();
let mut b = OpenBlock::new(
self.engine.deref().deref(),
self.state_db.lock().unwrap().spawn(),
match self.chain.block_header(&h) { Some(ref x) => x, None => {return;} },
self.build_last_hashes(h.clone()),
self.author(),
self.extra_data()
);
self.chain.find_uncle_headers(&h, self.engine.deref().deref().maximum_uncle_age()).unwrap().into_iter().take(self.engine.deref().deref().maximum_uncle_count()).foreach(|h| { b.push_uncle(h).unwrap(); });
// TODO: push transactions.
let b = b.close();
trace!("Sealing: number={}, hash={}, diff={}", b.hash(), b.block().header().difficulty(), b.block().header().number());
*self.sealing_block.lock().unwrap() = Some(b);
}
} }
// TODO: need MinerService MinerIoHandler
impl<V> BlockChainClient for Client<V> where V: Verifier { impl<V> BlockChainClient for Client<V> where V: Verifier {
// TODO [todr] Should be moved to miner crate eventually.
fn try_seal(&self, block: ClosedBlock, seal: Vec<Bytes>) -> Result<SealedBlock, ClosedBlock> {
block.try_seal(self.engine.deref().deref(), seal)
}
// TODO [todr] Should be moved to miner crate eventually.
fn prepare_sealing(&self, author: Address, extra_data: Bytes, transactions: Vec<SignedTransaction>) -> Option<ClosedBlock> {
let engine = self.engine.deref().deref();
let h = self.chain.best_block_hash();
let mut b = OpenBlock::new(
engine,
self.state_db.lock().unwrap().spawn(),
match self.chain.block_header(&h) { Some(ref x) => x, None => {return None} },
self.build_last_hashes(h.clone()),
author,
extra_data,
);
// Add uncles
self.chain
.find_uncle_headers(&h, engine.maximum_uncle_age())
.unwrap()
.into_iter()
.take(engine.maximum_uncle_count())
.foreach(|h| {
b.push_uncle(h).unwrap();
});
// Add transactions
let block_number = b.block().header().number();
for tx in transactions {
let import = b.push_transaction(tx, None);
if let Err(e) = import {
trace!("Error adding transaction to block: number={}. Error: {:?}", block_number, e);
}
}
// And close
let b = b.close();
trace!("Sealing: number={}, hash={}, diff={}",
b.block().header().number(),
b.hash(),
b.block().header().difficulty()
);
Some(b)
}
fn block_header(&self, id: BlockId) -> Option<Bytes> { fn block_header(&self, id: BlockId) -> Option<Bytes> {
Self::block_hash(&self.chain, id).and_then(|hash| self.chain.block(&hash).map(|bytes| BlockView::new(&bytes).rlp().at(0).as_raw().to_vec())) Self::block_hash(&self.chain, id).and_then(|hash| self.chain.block(&hash).map(|bytes| BlockView::new(&bytes).rlp().at(0).as_raw().to_vec()))
} }
@ -561,39 +588,6 @@ impl<V> BlockChainClient for Client<V> where V: Verifier {
}) })
.collect() .collect()
} }
/// Grab the `ClosedBlock` that we want to be sealed. Comes as a mutex that you have to lock.
fn sealing_block(&self) -> &Mutex<Option<ClosedBlock>> {
if self.sealing_block.lock().unwrap().is_none() {
self.sealing_enabled.store(true, atomic::Ordering::Relaxed);
// TODO: Above should be on a timer that resets after two blocks have arrived without being asked for.
self.prepare_sealing();
}
&self.sealing_block
}
/// Submit `seal` as a valid solution for the header of `pow_hash`.
/// Will check the seal, but not actually insert the block into the chain.
fn submit_seal(&self, pow_hash: H256, seal: Vec<Bytes>) -> Result<(), Error> {
let mut maybe_b = self.sealing_block.lock().unwrap();
match *maybe_b {
Some(ref b) if b.hash() == pow_hash => {}
_ => { return Err(Error::PowHashInvalid); }
}
let b = maybe_b.take();
match b.unwrap().try_seal(self.engine.deref().deref(), seal) {
Err(old) => {
*maybe_b = Some(old);
Err(Error::PowInvalid)
}
Ok(sealed) => {
// TODO: commit DB from `sealed.drain` and make a VerifiedBlock to skip running the transactions twice.
try!(self.import_block(sealed.rlp_bytes()));
Ok(())
}
}
}
} }
impl MayPanic for Client { impl MayPanic for Client {

View File

@ -26,18 +26,17 @@ pub use self::config::{ClientConfig, BlockQueueConfig, BlockChainConfig};
pub use self::ids::{BlockId, TransactionId}; pub use self::ids::{BlockId, TransactionId};
pub use self::test_client::{TestBlockChainClient, EachBlockWith}; pub use self::test_client::{TestBlockChainClient, EachBlockWith};
use std::sync::Mutex;
use util::bytes::Bytes; use util::bytes::Bytes;
use util::hash::{Address, H256, H2048}; use util::hash::{Address, H256, H2048};
use util::numbers::U256; use util::numbers::U256;
use blockchain::TreeRoute; use blockchain::TreeRoute;
use block_queue::BlockQueueInfo; use block_queue::BlockQueueInfo;
use block::ClosedBlock; use block::{ClosedBlock, SealedBlock};
use header::BlockNumber; use header::BlockNumber;
use transaction::LocalizedTransaction; use transaction::{LocalizedTransaction, SignedTransaction};
use log_entry::LocalizedLogEntry; use log_entry::LocalizedLogEntry;
use filter::Filter; use filter::Filter;
use error::{ImportResult, Error}; use error::{ImportResult};
/// Blockchain database client. Owns and manages a blockchain and a block queue. /// Blockchain database client. Owns and manages a blockchain and a block queue.
pub trait BlockChainClient : Sync + Send { pub trait BlockChainClient : Sync + Send {
@ -109,11 +108,13 @@ pub trait BlockChainClient : Sync + Send {
/// Returns logs matching given filter. /// Returns logs matching given filter.
fn logs(&self, filter: Filter) -> Vec<LocalizedLogEntry>; fn logs(&self, filter: Filter) -> Vec<LocalizedLogEntry>;
/// Grab the `ClosedBlock` that we want to be sealed. Comes as a mutex that you have to lock. // TODO [todr] Should be moved to miner crate eventually.
fn sealing_block(&self) -> &Mutex<Option<ClosedBlock>>; /// Returns ClosedBlock prepared for sealing.
fn prepare_sealing(&self, author: Address, extra_data: Bytes, transactions: Vec<SignedTransaction>) -> Option<ClosedBlock>;
// TODO [todr] Should be moved to miner crate eventually.
/// Attempts to seal given block. Returns `SealedBlock` on success and the same block in case of error.
fn try_seal(&self, block: ClosedBlock, seal: Vec<Bytes>) -> Result<SealedBlock, ClosedBlock>;
/// Submit `seal` as a valid solution for the header of `pow_hash`.
/// Will check the seal, but not actually insert the block into the chain.
fn submit_seal(&self, pow_hash: H256, seal: Vec<Bytes>) -> Result<(), Error>;
} }

View File

@ -17,16 +17,16 @@
//! Test client. //! Test client.
use util::*; use util::*;
use transaction::{Transaction, LocalizedTransaction, Action}; use transaction::{Transaction, LocalizedTransaction, SignedTransaction, Action};
use blockchain::TreeRoute; use blockchain::TreeRoute;
use client::{BlockChainClient, BlockChainInfo, BlockStatus, BlockId, TransactionId}; use client::{BlockChainClient, BlockChainInfo, BlockStatus, BlockId, TransactionId};
use header::{Header as BlockHeader, BlockNumber}; use header::{Header as BlockHeader, BlockNumber};
use filter::Filter; use filter::Filter;
use log_entry::LocalizedLogEntry; use log_entry::LocalizedLogEntry;
use receipt::Receipt; use receipt::Receipt;
use error::{ImportResult, Error}; use error::{ImportResult};
use block_queue::BlockQueueInfo; use block_queue::BlockQueueInfo;
use block::ClosedBlock; use block::{SealedBlock, ClosedBlock};
/// Test client. /// Test client.
pub struct TestBlockChainClient { pub struct TestBlockChainClient {
@ -86,17 +86,17 @@ impl TestBlockChainClient {
client client
} }
/// Set code at given address. /// Set the balance of account `address` to `balance`.
pub fn set_code(&mut self, address: Address, code: Bytes) {
self.code.write().unwrap().insert(address, code);
}
/// Set balance at given address.
pub fn set_balance(&mut self, address: Address, balance: U256) { pub fn set_balance(&mut self, address: Address, balance: U256) {
self.balances.write().unwrap().insert(address, balance); self.balances.write().unwrap().insert(address, balance);
} }
/// Set storage at given address and position. /// Set `code` at `address`.
pub fn set_code(&mut self, address: Address, code: Bytes) {
self.code.write().unwrap().insert(address, code);
}
/// Set storage `position` to `value` for account `address`.
pub fn set_storage(&mut self, address: Address, position: H256, value: H256) { pub fn set_storage(&mut self, address: Address, position: H256, value: H256) {
self.storage.write().unwrap().insert((address, position), value); self.storage.write().unwrap().insert((address, position), value);
} }
@ -215,12 +215,12 @@ impl BlockChainClient for TestBlockChainClient {
unimplemented!(); unimplemented!();
} }
fn sealing_block(&self) -> &Mutex<Option<ClosedBlock>> { fn prepare_sealing(&self, _author: Address, _extra_data: Bytes, _transactions: Vec<SignedTransaction>) -> Option<ClosedBlock> {
unimplemented!(); unimplemented!()
} }
fn submit_seal(&self, _pow_hash: H256, _seal: Vec<Bytes>) -> Result<(), Error> { fn try_seal(&self, _block: ClosedBlock, _seal: Vec<Bytes>) -> Result<SealedBlock, ClosedBlock> {
unimplemented!(); unimplemented!()
} }
fn block_header(&self, id: BlockId) -> Option<Bytes> { fn block_header(&self, id: BlockId) -> Option<Bytes> {

View File

@ -63,8 +63,15 @@ pub enum ExecutionError {
} }
#[derive(Debug)] #[derive(Debug)]
/// Errors concerning transaction proessing. /// Errors concerning transaction processing.
pub enum TransactionError { pub enum TransactionError {
/// Transaction's gas price is below threshold.
InsufficientGasPrice {
/// Minimal expected gas price
minimal: U256,
/// Transaction gas price
got: U256
},
/// Transaction's gas limit (aka gas) is invalid. /// Transaction's gas limit (aka gas) is invalid.
InvalidGasLimit(OutOfBounds<U256>), InvalidGasLimit(OutOfBounds<U256>),
} }

View File

@ -111,7 +111,7 @@ mod tests {
let bloom = H2048::from_str("00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000008800000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000800000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000").unwrap(); let bloom = H2048::from_str("00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000008800000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000800000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000").unwrap();
let address = Address::from_str("0f572e5295c57f15886f9b263e2f6d2d6c7b5ec6").unwrap(); let address = Address::from_str("0f572e5295c57f15886f9b263e2f6d2d6c7b5ec6").unwrap();
let log = LogEntry { let log = LogEntry {
address: address, address: address,
topics: vec![], topics: vec![],
data: vec![] data: vec![]
}; };

View File

@ -28,12 +28,16 @@ pub enum SyncMessage {
/// New block has been imported into the blockchain /// New block has been imported into the blockchain
NewChainBlocks { NewChainBlocks {
/// Hashes of blocks imported to blockchain /// Hashes of blocks imported to blockchain
good: Vec<H256>, imported: Vec<H256>,
/// Hashes of blocks not imported to blockchain /// Hashes of blocks not imported to blockchain (because were invalid)
bad: Vec<H256>, invalid: Vec<H256>,
/// Hashes of blocks that were removed from canonical chain /// Hashes of blocks that were removed from canonical chain
retracted: Vec<H256>, retracted: Vec<H256>,
/// Hashes of blocks that are now included in cannonical chain
enacted: Vec<H256>,
}, },
/// Best Block Hash in chain has been changed
NewChainHead,
/// A block is ready /// A block is ready
BlockVerified, BlockVerified,
} }

View File

@ -132,16 +132,9 @@ fn can_mine() {
let dummy_blocks = get_good_dummy_block_seq(2); let dummy_blocks = get_good_dummy_block_seq(2);
let client_result = get_test_client_with_blocks(vec![dummy_blocks[0].clone()]); let client_result = get_test_client_with_blocks(vec![dummy_blocks[0].clone()]);
let client = client_result.reference(); let client = client_result.reference();
let b = client.sealing_block();
let pow_hash = { let b = client.prepare_sealing(Address::default(), vec![], vec![]).unwrap();
let u = b.lock().unwrap();
match *u { assert_eq!(*b.block().header().parent_hash(), BlockView::new(&dummy_blocks[0]).header_view().sha3());
Some(ref b) => { assert!(client.try_seal(b, vec![]).is_ok());
assert_eq!(*b.block().header().parent_hash(), BlockView::new(&dummy_blocks[0]).header_view().sha3());
b.hash()
}
None => { panic!(); }
}
};
assert!(client.submit_seal(pow_hash, vec![]).is_ok());
} }

View File

@ -7,6 +7,6 @@ echo "set -e" >> $FILE
echo "cargo build --release --features dev" >> $FILE echo "cargo build --release --features dev" >> $FILE
# Build tests # Build tests
echo "cargo test --no-run --features dev \\" >> $FILE echo "cargo test --no-run --features dev \\" >> $FILE
echo " -p ethash -p ethcore-util -p ethcore -p ethsync -p ethcore-rpc -p parity" >> $FILE echo " -p ethash -p ethcore-util -p ethcore -p ethsync -p ethcore-rpc -p parity -p ethminer" >> $FILE
echo "" >> $FILE echo "" >> $FILE
chmod +x $FILE chmod +x $FILE

24
miner/Cargo.toml Normal file
View File

@ -0,0 +1,24 @@
[package]
description = "Ethminer library"
homepage = "http://ethcore.io"
license = "GPL-3.0"
name = "ethminer"
version = "0.9.99"
authors = ["Ethcore <admin@ethcore.io>"]
build = "build.rs"
[build-dependencies]
rustc_version = "0.1"
[dependencies]
ethcore-util = { path = "../util" }
ethcore = { path = "../ethcore" }
log = "0.3"
env_logger = "0.3"
rustc-serialize = "0.3"
rayon = "0.3.1"
clippy = { version = "0.0.50", optional = true }
[features]
default = []
dev = ["clippy"]

25
miner/build.rs Normal file
View File

@ -0,0 +1,25 @@
// Copyright 2015, 2016 Ethcore (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
extern crate rustc_version;
use rustc_version::{version_meta, Channel};
fn main() {
if let Channel::Nightly = version_meta().channel {
println!("cargo:rustc-cfg=nightly");
}
}

111
miner/src/lib.rs Normal file
View File

@ -0,0 +1,111 @@
// Copyright 2015, 2016 Ethcore (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
#![warn(missing_docs)]
#![cfg_attr(all(nightly, feature="dev"), feature(plugin))]
#![cfg_attr(all(nightly, feature="dev"), plugin(clippy))]
//! Miner module
//! Keeps track of transactions and mined block.
//!
//! Usage example:
//!
//! ```rust
//! extern crate ethcore_util as util;
//! extern crate ethcore;
//! extern crate ethminer;
//! use std::ops::Deref;
//! use std::env;
//! use std::sync::Arc;
//! use util::network::{NetworkService, NetworkConfiguration};
//! use ethcore::client::{Client, ClientConfig, BlockChainClient};
//! use ethcore::ethereum;
//! use ethminer::{Miner, MinerService};
//!
//! fn main() {
//! let mut service = NetworkService::start(NetworkConfiguration::new()).unwrap();
//! let dir = env::temp_dir();
//! let client = Client::new(ClientConfig::default(), ethereum::new_frontier(), &dir, service.io().channel()).unwrap();
//!
//! let miner: Miner = Miner::default();
//! // get status
//! assert_eq!(miner.status().transaction_queue_pending, 0);
//!
//! // Check block for sealing
//! miner.prepare_sealing(client.deref());
//! assert!(miner.sealing_block(client.deref()).lock().unwrap().is_some());
//! }
//! ```
#[macro_use]
extern crate log;
#[macro_use]
extern crate ethcore_util as util;
extern crate ethcore;
extern crate env_logger;
extern crate rayon;
mod miner;
mod transaction_queue;
pub use transaction_queue::TransactionQueue;
pub use miner::{Miner};
use std::sync::Mutex;
use util::{H256, U256, Address, Bytes};
use ethcore::client::{BlockChainClient};
use ethcore::block::{ClosedBlock};
use ethcore::error::{Error};
use ethcore::transaction::SignedTransaction;
/// Miner client API
pub trait MinerService : Send + Sync {
/// Returns miner's status.
fn status(&self) -> MinerStatus;
/// Imports transactions to transaction queue.
fn import_transactions<T>(&self, transactions: Vec<SignedTransaction>, fetch_nonce: T) -> Result<(), Error>
where T: Fn(&Address) -> U256;
/// Returns hashes of transactions currently in pending
fn pending_transactions_hashes(&self) -> Vec<H256>;
/// Removes all transactions from the queue and restart mining operation.
fn clear_and_reset(&self, chain: &BlockChainClient);
/// Called when blocks are imported to chain, updates transactions queue.
fn chain_new_blocks(&self, chain: &BlockChainClient, imported: &[H256], invalid: &[H256], enacted: &[H256], retracted: &[H256]);
/// New chain head event. Restart mining operation.
fn prepare_sealing(&self, chain: &BlockChainClient);
/// Grab the `ClosedBlock` that we want to be sealed. Comes as a mutex that you have to lock.
fn sealing_block(&self, chain: &BlockChainClient) -> &Mutex<Option<ClosedBlock>>;
/// Submit `seal` as a valid solution for the header of `pow_hash`.
/// Will check the seal, but not actually insert the block into the chain.
fn submit_seal(&self, chain: &BlockChainClient, pow_hash: H256, seal: Vec<Bytes>) -> Result<(), Error>;
}
/// Mining status
pub struct MinerStatus {
/// Number of transactions in queue with state `pending` (ready to be included in block)
pub transaction_queue_pending: usize,
/// Number of transactions in queue with state `future` (not yet ready to be included in block)
pub transaction_queue_future: usize,
}

191
miner/src/miner.rs Normal file
View File

@ -0,0 +1,191 @@
// Copyright 2015, 2016 Ethcore (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use rayon::prelude::*;
use std::sync::{Mutex, RwLock, Arc};
use std::sync::atomic;
use std::sync::atomic::AtomicBool;
use util::{H256, U256, Address, Bytes};
use ethcore::views::{BlockView};
use ethcore::client::{BlockChainClient, BlockId};
use ethcore::block::{ClosedBlock};
use ethcore::error::{Error};
use ethcore::transaction::SignedTransaction;
use super::{MinerService, MinerStatus, TransactionQueue};
/// Keeps track of transactions using priority queue and holds currently mined block.
pub struct Miner {
transaction_queue: Mutex<TransactionQueue>,
// for sealing...
sealing_enabled: AtomicBool,
sealing_block: Mutex<Option<ClosedBlock>>,
author: RwLock<Address>,
extra_data: RwLock<Bytes>,
}
impl Default for Miner {
fn default() -> Miner {
Miner {
transaction_queue: Mutex::new(TransactionQueue::new()),
sealing_enabled: AtomicBool::new(false),
sealing_block: Mutex::new(None),
author: RwLock::new(Address::default()),
extra_data: RwLock::new(Vec::new()),
}
}
}
impl Miner {
/// Creates new instance of miner
pub fn new() -> Arc<Miner> {
Arc::new(Miner::default())
}
/// Get the author that we will seal blocks as.
fn author(&self) -> Address {
*self.author.read().unwrap()
}
/// Get the extra_data that we will seal blocks wuth.
fn extra_data(&self) -> Bytes {
self.extra_data.read().unwrap().clone()
}
/// Set the author that we will seal blocks as.
pub fn set_author(&self, author: Address) {
*self.author.write().unwrap() = author;
}
/// Set the extra_data that we will seal blocks with.
pub fn set_extra_data(&self, extra_data: Bytes) {
*self.extra_data.write().unwrap() = extra_data;
}
/// Set minimal gas price of transaction to be accepted for mining.
pub fn set_minimal_gas_price(&self, min_gas_price: U256) {
self.transaction_queue.lock().unwrap().set_minimal_gas_price(min_gas_price);
}
}
impl MinerService for Miner {
fn clear_and_reset(&self, chain: &BlockChainClient) {
self.transaction_queue.lock().unwrap().clear();
self.prepare_sealing(chain);
}
fn status(&self) -> MinerStatus {
let status = self.transaction_queue.lock().unwrap().status();
MinerStatus {
transaction_queue_pending: status.pending,
transaction_queue_future: status.future,
}
}
fn import_transactions<T>(&self, transactions: Vec<SignedTransaction>, fetch_nonce: T) -> Result<(), Error>
where T: Fn(&Address) -> U256 {
let mut transaction_queue = self.transaction_queue.lock().unwrap();
transaction_queue.add_all(transactions, fetch_nonce)
}
fn pending_transactions_hashes(&self) -> Vec<H256> {
let transaction_queue = self.transaction_queue.lock().unwrap();
transaction_queue.pending_hashes()
}
fn prepare_sealing(&self, chain: &BlockChainClient) {
let no_of_transactions = 128;
let transactions = self.transaction_queue.lock().unwrap().top_transactions(no_of_transactions);
let b = chain.prepare_sealing(
self.author(),
self.extra_data(),
transactions,
);
*self.sealing_block.lock().unwrap() = b;
}
fn sealing_block(&self, chain: &BlockChainClient) -> &Mutex<Option<ClosedBlock>> {
if self.sealing_block.lock().unwrap().is_none() {
self.sealing_enabled.store(true, atomic::Ordering::Relaxed);
// TODO: Above should be on a timer that resets after two blocks have arrived without being asked for.
self.prepare_sealing(chain);
}
&self.sealing_block
}
fn submit_seal(&self, chain: &BlockChainClient, pow_hash: H256, seal: Vec<Bytes>) -> Result<(), Error> {
let mut maybe_b = self.sealing_block.lock().unwrap();
match *maybe_b {
Some(ref b) if b.hash() == pow_hash => {}
_ => { return Err(Error::PowHashInvalid); }
}
let b = maybe_b.take();
match chain.try_seal(b.unwrap(), seal) {
Err(old) => {
*maybe_b = Some(old);
Err(Error::PowInvalid)
}
Ok(sealed) => {
// TODO: commit DB from `sealed.drain` and make a VerifiedBlock to skip running the transactions twice.
try!(chain.import_block(sealed.rlp_bytes()));
Ok(())
}
}
}
fn chain_new_blocks(&self, chain: &BlockChainClient, imported: &[H256], invalid: &[H256], enacted: &[H256], retracted: &[H256]) {
fn fetch_transactions(chain: &BlockChainClient, hash: &H256) -> Vec<SignedTransaction> {
let block = chain
.block(BlockId::Hash(*hash))
// Client should send message after commit to db and inserting to chain.
.expect("Expected in-chain blocks.");
let block = BlockView::new(&block);
block.transactions()
}
{
let in_chain = vec![imported, enacted, invalid];
let in_chain = in_chain
.par_iter()
.flat_map(|h| h.par_iter().map(|h| fetch_transactions(chain, h)));
let out_of_chain = retracted
.par_iter()
.map(|h| fetch_transactions(chain, h));
in_chain.for_each(|txs| {
let mut transaction_queue = self.transaction_queue.lock().unwrap();
let hashes = txs.iter().map(|tx| tx.hash()).collect::<Vec<H256>>();
transaction_queue.remove_all(&hashes, |a| chain.nonce(a));
});
out_of_chain.for_each(|txs| {
// populate sender
for tx in &txs {
let _sender = tx.sender();
}
let mut transaction_queue = self.transaction_queue.lock().unwrap();
let _ = transaction_queue.add_all(txs, |a| chain.nonce(a));
});
}
if self.sealing_enabled.load(atomic::Ordering::Relaxed) {
self.prepare_sealing(chain);
}
}
}

View File

@ -28,13 +28,13 @@
//! ```rust //! ```rust
//! extern crate ethcore_util as util; //! extern crate ethcore_util as util;
//! extern crate ethcore; //! extern crate ethcore;
//! extern crate ethsync; //! extern crate ethminer;
//! extern crate rustc_serialize; //! extern crate rustc_serialize;
//! //!
//! use util::crypto::KeyPair; //! use util::crypto::KeyPair;
//! use util::hash::Address; //! use util::hash::Address;
//! use util::numbers::{Uint, U256}; //! use util::numbers::{Uint, U256};
//! use ethsync::TransactionQueue; //! use ethminer::TransactionQueue;
//! use ethcore::transaction::*; //! use ethcore::transaction::*;
//! use rustc_serialize::hex::FromHex; //! use rustc_serialize::hex::FromHex;
//! //!
@ -86,7 +86,7 @@ use util::numbers::{Uint, U256};
use util::hash::{Address, H256}; use util::hash::{Address, H256};
use util::table::*; use util::table::*;
use ethcore::transaction::*; use ethcore::transaction::*;
use ethcore::error::Error; use ethcore::error::{Error, TransactionError};
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
@ -245,6 +245,8 @@ pub struct TransactionQueueStatus {
/// TransactionQueue implementation /// TransactionQueue implementation
pub struct TransactionQueue { pub struct TransactionQueue {
/// Gas Price threshold for transactions that can be imported to this queue (defaults to 0)
minimal_gas_price: U256,
/// Priority queue for transactions that can go to block /// Priority queue for transactions that can go to block
current: TransactionSet, current: TransactionSet,
/// Priority queue for transactions that has been received but are not yet valid to go to block /// Priority queue for transactions that has been received but are not yet valid to go to block
@ -281,6 +283,7 @@ impl TransactionQueue {
}; };
TransactionQueue { TransactionQueue {
minimal_gas_price: U256::zero(),
current: current, current: current,
future: future, future: future,
by_hash: HashMap::new(), by_hash: HashMap::new(),
@ -288,6 +291,12 @@ impl TransactionQueue {
} }
} }
/// Sets new gas price threshold for incoming transactions.
/// Any transactions already imported to the queue are not affected.
pub fn set_minimal_gas_price(&mut self, min_gas_price: U256) {
self.minimal_gas_price = min_gas_price;
}
// Will be used when rpc merged // Will be used when rpc merged
#[allow(dead_code)] #[allow(dead_code)]
/// Returns current status for this queue /// Returns current status for this queue
@ -310,6 +319,19 @@ impl TransactionQueue {
/// Add signed transaction to queue to be verified and imported /// Add signed transaction to queue to be verified and imported
pub fn add<T>(&mut self, tx: SignedTransaction, fetch_nonce: &T) -> Result<(), Error> pub fn add<T>(&mut self, tx: SignedTransaction, fetch_nonce: &T) -> Result<(), Error>
where T: Fn(&Address) -> U256 { where T: Fn(&Address) -> U256 {
if tx.gas_price < self.minimal_gas_price {
trace!(target: "sync",
"Dropping transaction below minimal gas price threshold: {:?} (gp: {} < {})",
tx.hash(), tx.gas_price, self.minimal_gas_price
);
return Err(Error::Transaction(TransactionError::InsufficientGasPrice{
minimal: self.minimal_gas_price,
got: tx.gas_price
}));
}
self.import_tx(try!(VerifiedTransaction::new(tx)), fetch_nonce); self.import_tx(try!(VerifiedTransaction::new(tx)), fetch_nonce);
Ok(()) Ok(())
} }
@ -346,7 +368,7 @@ impl TransactionQueue {
self.update_future(&sender, current_nonce); self.update_future(&sender, current_nonce);
// And now lets check if there is some chain of transactions in future // And now lets check if there is some chain of transactions in future
// that should be placed in current // that should be placed in current
self.move_matching_future_to_current(sender.clone(), current_nonce, current_nonce); self.move_matching_future_to_current(sender, current_nonce, current_nonce);
return; return;
} }
@ -362,7 +384,7 @@ impl TransactionQueue {
self.move_all_to_future(&sender, current_nonce); self.move_all_to_future(&sender, current_nonce);
// And now lets check if there is some chain of transactions in future // And now lets check if there is some chain of transactions in future
// that should be placed in current. It should also update last_nonces. // that should be placed in current. It should also update last_nonces.
self.move_matching_future_to_current(sender.clone(), current_nonce, current_nonce); self.move_matching_future_to_current(sender, current_nonce, current_nonce);
return; return;
} }
} }
@ -377,7 +399,7 @@ impl TransactionQueue {
for k in all_nonces_from_sender { for k in all_nonces_from_sender {
let order = self.future.drop(&sender, &k).unwrap(); let order = self.future.drop(&sender, &k).unwrap();
if k >= current_nonce { if k >= current_nonce {
self.future.insert(sender.clone(), k, order.update_height(k, current_nonce)); self.future.insert(*sender, k, order.update_height(k, current_nonce));
} else { } else {
// Remove the transaction completely // Remove the transaction completely
self.by_hash.remove(&order.hash); self.by_hash.remove(&order.hash);
@ -397,7 +419,7 @@ impl TransactionQueue {
// Goes to future or is removed // Goes to future or is removed
let order = self.current.drop(&sender, &k).unwrap(); let order = self.current.drop(&sender, &k).unwrap();
if k >= current_nonce { if k >= current_nonce {
self.future.insert(sender.clone(), k, order.update_height(k, current_nonce)); self.future.insert(*sender, k, order.update_height(k, current_nonce));
} else { } else {
self.by_hash.remove(&order.hash); self.by_hash.remove(&order.hash);
} }
@ -417,6 +439,14 @@ impl TransactionQueue {
.collect() .collect()
} }
/// Returns hashes of all transactions from current, ordered by priority.
pub fn pending_hashes(&self) -> Vec<H256> {
self.current.by_priority
.iter()
.map(|t| t.hash)
.collect()
}
/// Removes all elements (in any state) from the queue /// Removes all elements (in any state) from the queue
pub fn clear(&mut self) { pub fn clear(&mut self) {
self.current.clear(); self.current.clear();
@ -438,8 +468,8 @@ impl TransactionQueue {
// remove also from priority and hash // remove also from priority and hash
self.future.by_priority.remove(&order); self.future.by_priority.remove(&order);
// Put to current // Put to current
let order = order.update_height(current_nonce.clone(), first_nonce); let order = order.update_height(current_nonce, first_nonce);
self.current.insert(address.clone(), current_nonce, order); self.current.insert(address, current_nonce, order);
current_nonce = current_nonce + U256::one(); current_nonce = current_nonce + U256::one();
} }
} }
@ -487,10 +517,10 @@ impl TransactionQueue {
} }
let base_nonce = fetch_nonce(&address); let base_nonce = fetch_nonce(&address);
Self::replace_transaction(tx, base_nonce.clone(), &mut self.current, &mut self.by_hash); Self::replace_transaction(tx, base_nonce, &mut self.current, &mut self.by_hash);
self.last_nonces.insert(address.clone(), nonce); self.last_nonces.insert(address, nonce);
// But maybe there are some more items waiting in future? // But maybe there are some more items waiting in future?
self.move_matching_future_to_current(address.clone(), nonce + U256::one(), base_nonce); self.move_matching_future_to_current(address, nonce + U256::one(), base_nonce);
self.current.enforce_limit(&mut self.by_hash); self.current.enforce_limit(&mut self.by_hash);
} }
@ -504,7 +534,7 @@ impl TransactionQueue {
let address = tx.sender(); let address = tx.sender();
let nonce = tx.nonce(); let nonce = tx.nonce();
by_hash.insert(hash.clone(), tx); by_hash.insert(hash, tx);
if let Some(old) = set.insert(address, nonce, order.clone()) { if let Some(old) = set.insert(address, nonce, order.clone()) {
// There was already transaction in queue. Let's check which one should stay // There was already transaction in queue. Let's check which one should stay
let old_fee = old.gas_price; let old_fee = old.gas_price;
@ -620,6 +650,22 @@ mod test {
assert_eq!(stats.pending, 1); assert_eq!(stats.pending, 1);
} }
#[test]
fn should_not_import_transaction_below_min_gas_price_threshold() {
// given
let mut txq = TransactionQueue::new();
let tx = new_tx();
txq.set_minimal_gas_price(tx.gas_price + U256::one());
// when
txq.add(tx, &default_nonce).unwrap_err();
// then
let stats = txq.status();
assert_eq!(stats.pending, 0);
assert_eq!(stats.future, 0);
}
#[test] #[test]
fn should_reject_incorectly_signed_transaction() { fn should_reject_incorectly_signed_transaction() {
// given // given
@ -663,6 +709,24 @@ mod test {
assert_eq!(top.len(), 2); assert_eq!(top.len(), 2);
} }
#[test]
fn should_return_pending_hashes() {
// given
let mut txq = TransactionQueue::new();
let (tx, tx2) = new_txs(U256::from(1));
// when
txq.add(tx.clone(), &default_nonce).unwrap();
txq.add(tx2.clone(), &default_nonce).unwrap();
// then
let top = txq.pending_hashes();
assert_eq!(top[0], tx.hash());
assert_eq!(top[1], tx2.hash());
assert_eq!(top.len(), 2);
}
#[test] #[test]
fn should_put_transaction_to_futures_if_gap_detected() { fn should_put_transaction_to_futures_if_gap_detected() {
// given // given
@ -831,7 +895,7 @@ mod test {
fn should_drop_transactions_with_old_nonces() { fn should_drop_transactions_with_old_nonces() {
let mut txq = TransactionQueue::new(); let mut txq = TransactionQueue::new();
let tx = new_tx(); let tx = new_tx();
let last_nonce = tx.nonce.clone() + U256::one(); let last_nonce = tx.nonce + U256::one();
let fetch_last_nonce = |_a: &Address| last_nonce; let fetch_last_nonce = |_a: &Address| last_nonce;
// when // when

View File

@ -24,6 +24,7 @@ extern crate rustc_serialize;
extern crate ethcore_util as util; extern crate ethcore_util as util;
extern crate ethcore; extern crate ethcore;
extern crate ethsync; extern crate ethsync;
extern crate ethminer;
#[macro_use] #[macro_use]
extern crate log as rlog; extern crate log as rlog;
extern crate env_logger; extern crate env_logger;
@ -50,6 +51,7 @@ use ethcore::client::*;
use ethcore::service::{ClientService, NetSyncMessage}; use ethcore::service::{ClientService, NetSyncMessage};
use ethcore::ethereum; use ethcore::ethereum;
use ethsync::{EthSync, SyncConfig, SyncProvider}; use ethsync::{EthSync, SyncConfig, SyncProvider};
use ethminer::{Miner, MinerService};
use docopt::Docopt; use docopt::Docopt;
use daemonize::Daemonize; use daemonize::Daemonize;
use number_prefix::{binary_prefix, Standalone, Prefixed}; use number_prefix::{binary_prefix, Standalone, Prefixed};
@ -112,6 +114,7 @@ API and Console Options:
--rpccorsdomain URL Equivalent to --jsonrpc-cors URL (geth-compatible). --rpccorsdomain URL Equivalent to --jsonrpc-cors URL (geth-compatible).
Sealing/Mining Options: Sealing/Mining Options:
--gas-price WEI Minimum amount of Wei to be paid for a transaction to be accepted for mining [default: 20000000000].
--author ADDRESS Specify the block author (aka "coinbase") address for sending block rewards --author ADDRESS Specify the block author (aka "coinbase") address for sending block rewards
from sealed blocks [default: 0037a6b811ffeb6e072da21179d11b1406371c63]. from sealed blocks [default: 0037a6b811ffeb6e072da21179d11b1406371c63].
--extra-data STRING Specify a custom extra-data for authored blocks, no more than 32 characters. --extra-data STRING Specify a custom extra-data for authored blocks, no more than 32 characters.
@ -135,11 +138,12 @@ Geth-Compatibility Options
--maxpeers COUNT Equivalent to --peers COUNT. --maxpeers COUNT Equivalent to --peers COUNT.
--nodekey KEY Equivalent to --node-key KEY. --nodekey KEY Equivalent to --node-key KEY.
--nodiscover Equivalent to --no-discovery. --nodiscover Equivalent to --no-discovery.
--gasprice WEI Equivalent to --gas-price WEI.
--etherbase ADDRESS Equivalent to --author ADDRESS. --etherbase ADDRESS Equivalent to --author ADDRESS.
--extradata STRING Equivalent to --extra-data STRING. --extradata STRING Equivalent to --extra-data STRING.
Miscellaneous Options: Miscellaneous Options:
-l --logging LOGGING Specify the logging level. -l --logging LOGGING Specify the logging level. Must conform to the same format as RUST_LOG.
-v --version Show information about version. -v --version Show information about version.
-h --help Show this screen. -h --help Show this screen.
"#; "#;
@ -172,17 +176,19 @@ struct Args {
flag_jsonrpc_port: u16, flag_jsonrpc_port: u16,
flag_jsonrpc_cors: String, flag_jsonrpc_cors: String,
flag_jsonrpc_apis: String, flag_jsonrpc_apis: String,
flag_author: String,
flag_gas_price: String,
flag_extra_data: Option<String>,
flag_logging: Option<String>, flag_logging: Option<String>,
flag_version: bool, flag_version: bool,
// geth-compatibility... // geth-compatibility...
flag_nodekey: Option<String>, flag_nodekey: Option<String>,
flag_nodiscover: bool, flag_nodiscover: bool,
flag_maxpeers: Option<usize>, flag_maxpeers: Option<usize>,
flag_author: String,
flag_extra_data: Option<String>,
flag_datadir: Option<String>, flag_datadir: Option<String>,
flag_extradata: Option<String>, flag_extradata: Option<String>,
flag_etherbase: Option<String>, flag_etherbase: Option<String>,
flag_gasprice: Option<String>,
flag_rpc: bool, flag_rpc: bool,
flag_rpcaddr: Option<String>, flag_rpcaddr: Option<String>,
flag_rpcport: Option<u16>, flag_rpcport: Option<u16>,
@ -219,7 +225,15 @@ fn setup_log(init: &Option<String>) {
} }
#[cfg(feature = "rpc")] #[cfg(feature = "rpc")]
fn setup_rpc_server(client: Arc<Client>, sync: Arc<EthSync>, secret_store: Arc<AccountService>, url: &str, cors_domain: &str, apis: Vec<&str>) -> Option<Arc<PanicHandler>> { fn setup_rpc_server(
client: Arc<Client>,
sync: Arc<EthSync>,
secret_store: Arc<AccountService>,
miner: Arc<Miner>,
url: &str,
cors_domain: &str,
apis: Vec<&str>
) -> Option<Arc<PanicHandler>> {
use rpc::v1::*; use rpc::v1::*;
let server = rpc::RpcServer::new(); let server = rpc::RpcServer::new();
@ -228,8 +242,8 @@ fn setup_rpc_server(client: Arc<Client>, sync: Arc<EthSync>, secret_store: Arc<A
"web3" => server.add_delegate(Web3Client::new().to_delegate()), "web3" => server.add_delegate(Web3Client::new().to_delegate()),
"net" => server.add_delegate(NetClient::new(&sync).to_delegate()), "net" => server.add_delegate(NetClient::new(&sync).to_delegate()),
"eth" => { "eth" => {
server.add_delegate(EthClient::new(&client, &sync, &secret_store).to_delegate()); server.add_delegate(EthClient::new(&client, &sync, &secret_store, &miner).to_delegate());
server.add_delegate(EthFilterClient::new(&client).to_delegate()); server.add_delegate(EthFilterClient::new(&client, &miner).to_delegate());
} }
"personal" => server.add_delegate(PersonalClient::new(&secret_store).to_delegate()), "personal" => server.add_delegate(PersonalClient::new(&secret_store).to_delegate()),
_ => { _ => {
@ -241,7 +255,15 @@ fn setup_rpc_server(client: Arc<Client>, sync: Arc<EthSync>, secret_store: Arc<A
} }
#[cfg(not(feature = "rpc"))] #[cfg(not(feature = "rpc"))]
fn setup_rpc_server(_client: Arc<Client>, _sync: Arc<EthSync>, _url: &str) -> Option<Arc<PanicHandler>> { fn setup_rpc_server(
_client: Arc<Client>,
_sync: Arc<EthSync>,
_secret_store: Arc<AccountService>,
_miner: Arc<Miner>,
_url: &str,
_cors_domain: &str,
_apis: Vec<&str>
) -> Option<Arc<PanicHandler>> {
None None
} }
@ -276,7 +298,16 @@ impl Configuration {
fn author(&self) -> Address { fn author(&self) -> Address {
let d = self.args.flag_etherbase.as_ref().unwrap_or(&self.args.flag_author); let d = self.args.flag_etherbase.as_ref().unwrap_or(&self.args.flag_author);
Address::from_str(d).unwrap_or_else(|_| die!("{}: Invalid address for --author. Must be 40 hex characters, without the 0x at the beginning.", self.args.flag_author)) Address::from_str(d).unwrap_or_else(|_| {
die!("{}: Invalid address for --author. Must be 40 hex characters, without the 0x at the beginning.", d)
})
}
fn gas_price(&self) -> U256 {
let d = self.args.flag_gasprice.as_ref().unwrap_or(&self.args.flag_gas_price);
U256::from_dec_str(d).unwrap_or_else(|_| {
die!("{}: Invalid gas price given. Must be a decimal unsigned 256-bit number.", d)
})
} }
fn extra_data(&self) -> Bytes { fn extra_data(&self) -> Bytes {
@ -299,7 +330,9 @@ impl Configuration {
"frontier" | "homestead" | "mainnet" => ethereum::new_frontier(), "frontier" | "homestead" | "mainnet" => ethereum::new_frontier(),
"morden" | "testnet" => ethereum::new_morden(), "morden" | "testnet" => ethereum::new_morden(),
"olympic" => ethereum::new_olympic(), "olympic" => ethereum::new_olympic(),
f => Spec::from_json_utf8(contents(f).unwrap_or_else(|_| die!("{}: Couldn't read chain specification file. Sure it exists?", f)).as_ref()), f => Spec::from_json_utf8(contents(f).unwrap_or_else(|_| {
die!("{}: Couldn't read chain specification file. Sure it exists?", f)
}).as_ref()),
} }
} }
@ -314,7 +347,11 @@ impl Configuration {
fn init_nodes(&self, spec: &Spec) -> Vec<String> { fn init_nodes(&self, spec: &Spec) -> Vec<String> {
let mut r = if self.args.flag_no_bootstrap { Vec::new() } else { spec.nodes().clone() }; let mut r = if self.args.flag_no_bootstrap { Vec::new() } else { spec.nodes().clone() };
if let Some(ref x) = self.args.flag_bootnodes { if let Some(ref x) = self.args.flag_bootnodes {
r.extend(x.split(',').map(|s| Self::normalize_enode(s).unwrap_or_else(|| die!("{}: Invalid node address format given for a boot node.", s)))); r.extend(x.split(',').map(|s| {
Self::normalize_enode(s).unwrap_or_else(|| {
die!("{}: Invalid node address format given for a boot node.", s)
})
}));
} }
r r
} }
@ -348,6 +385,38 @@ impl Configuration {
ret ret
} }
fn client_config(&self) -> ClientConfig {
let mut client_config = ClientConfig::default();
match self.args.flag_cache {
Some(mb) => {
client_config.blockchain.max_cache_size = mb * 1024 * 1024;
client_config.blockchain.pref_cache_size = client_config.blockchain.max_cache_size / 2;
}
None => {
client_config.blockchain.pref_cache_size = self.args.flag_cache_pref_size;
client_config.blockchain.max_cache_size = self.args.flag_cache_max_size;
}
}
client_config.pruning = match self.args.flag_pruning.as_str() {
"archive" => journaldb::Algorithm::Archive,
"light" => journaldb::Algorithm::EarlyMerge,
"fast" => journaldb::Algorithm::OverlayRecent,
"basic" => journaldb::Algorithm::RefCounted,
_ => { die!("Invalid pruning method given."); }
};
client_config.name = self.args.flag_identity.clone();
client_config.queue.max_mem_use = self.args.flag_queue_max_size;
client_config
}
fn sync_config(&self, spec: &Spec) -> SyncConfig {
let mut sync_config = SyncConfig::default();
sync_config.network_id = self.args.flag_networkid.as_ref().map_or(spec.network_id(), |id| {
U256::from_str(id).unwrap_or_else(|_| die!("{}: Invalid index given with --networkid", id))
});
sync_config
}
fn execute(&self) { fn execute(&self) {
if self.args.flag_version { if self.args.flag_version {
print_version(); print_version();
@ -406,42 +475,21 @@ impl Configuration {
let spec = self.spec(); let spec = self.spec();
let net_settings = self.net_settings(&spec); let net_settings = self.net_settings(&spec);
let mut sync_config = SyncConfig::default(); let sync_config = self.sync_config(&spec);
sync_config.network_id = self.args.flag_networkid.as_ref().map_or(spec.network_id(), |id| {
U256::from_str(id).unwrap_or_else(|_| {
die!("{}: Invalid index given with --networkid", id)
})
});
// Build client // Build client
let mut client_config = ClientConfig::default(); let mut service = ClientService::start(self.client_config(), spec, net_settings, &Path::new(&self.path())).unwrap();
match self.args.flag_cache {
Some(mb) => {
client_config.blockchain.max_cache_size = mb * 1024 * 1024;
client_config.blockchain.pref_cache_size = client_config.blockchain.max_cache_size / 2;
}
None => {
client_config.blockchain.pref_cache_size = self.args.flag_cache_pref_size;
client_config.blockchain.max_cache_size = self.args.flag_cache_max_size;
}
}
client_config.pruning = match self.args.flag_pruning.as_str() {
"" | "archive" => journaldb::Algorithm::Archive,
"pruned" => journaldb::Algorithm::EarlyMerge,
"fast" => journaldb::Algorithm::OverlayRecent,
"slow" => journaldb::Algorithm::RefCounted,
_ => { die!("Invalid pruning method given."); }
};
client_config.name = self.args.flag_identity.clone();
client_config.queue.max_mem_use = self.args.flag_queue_max_size;
let mut service = ClientService::start(client_config, spec, net_settings, &Path::new(&self.path())).unwrap();
panic_handler.forward_from(&service); panic_handler.forward_from(&service);
let client = service.client().clone(); let client = service.client();
client.set_author(self.author());
client.set_extra_data(self.extra_data()); // Miner
let miner = Miner::new();
miner.set_author(self.author());
miner.set_extra_data(self.extra_data());
miner.set_minimal_gas_price(self.gas_price());
// Sync // Sync
let sync = EthSync::register(service.network(), sync_config, client); let sync = EthSync::register(service.network(), sync_config, client.clone(), miner.clone());
// Secret Store // Secret Store
let account_service = Arc::new(AccountService::new()); let account_service = Arc::new(AccountService::new());
@ -456,11 +504,18 @@ impl Configuration {
let cors = self.args.flag_rpccorsdomain.as_ref().unwrap_or(&self.args.flag_jsonrpc_cors); let cors = self.args.flag_rpccorsdomain.as_ref().unwrap_or(&self.args.flag_jsonrpc_cors);
// TODO: use this as the API list. // TODO: use this as the API list.
let apis = self.args.flag_rpcapi.as_ref().unwrap_or(&self.args.flag_jsonrpc_apis); let apis = self.args.flag_rpcapi.as_ref().unwrap_or(&self.args.flag_jsonrpc_apis);
let server_handler = setup_rpc_server(service.client(), sync.clone(), account_service.clone(), &url, cors, apis.split(',').collect()); let server_handler = setup_rpc_server(
service.client(),
sync.clone(),
account_service.clone(),
miner.clone(),
&url,
cors,
apis.split(',').collect()
);
if let Some(handler) = server_handler { if let Some(handler) = server_handler {
panic_handler.forward_from(handler.deref()); panic_handler.forward_from(handler.deref());
} }
} }
// Register IO handler // Register IO handler
@ -530,7 +585,11 @@ impl Informant {
let report = client.report(); let report = client.report();
let sync_info = sync.status(); let sync_info = sync.status();
if let (_, _, &Some(ref last_report)) = (self.chain_info.read().unwrap().deref(), self.cache_info.read().unwrap().deref(), self.report.read().unwrap().deref()) { if let (_, _, &Some(ref last_report)) = (
self.chain_info.read().unwrap().deref(),
self.cache_info.read().unwrap().deref(),
self.report.read().unwrap().deref()
) {
println!("[ #{} {} ]---[ {} blk/s | {} tx/s | {} gas/s //··· {}/{} peers, #{}, {}+{} queued ···// mem: {} db, {} chain, {} queue, {} sync ]", println!("[ #{} {} ]---[ {} blk/s | {} tx/s | {} gas/s //··· {}/{} peers, #{}, {}+{} queued ···// mem: {} db, {} chain, {} queue, {} sync ]",
chain_info.best_block_number, chain_info.best_block_number,
chain_info.best_block_hash, chain_info.best_block_hash,

View File

@ -18,10 +18,11 @@ ethcore-util = { path = "../util" }
ethcore = { path = "../ethcore" } ethcore = { path = "../ethcore" }
ethash = { path = "../ethash" } ethash = { path = "../ethash" }
ethsync = { path = "../sync" } ethsync = { path = "../sync" }
clippy = { version = "0.0.50", optional = true } ethminer = { path = "../miner" }
rustc-serialize = "0.3" rustc-serialize = "0.3"
transient-hashmap = "0.1" transient-hashmap = "0.1"
serde_macros = { version = "0.7.0", optional = true } serde_macros = { version = "0.7.0", optional = true }
clippy = { version = "0.0.50", optional = true }
[build-dependencies] [build-dependencies]
serde_codegen = { version = "0.7.0", optional = true } serde_codegen = { version = "0.7.0", optional = true }
@ -30,4 +31,4 @@ syntex = "0.29.0"
[features] [features]
default = ["serde_codegen"] default = ["serde_codegen"]
nightly = ["serde_macros"] nightly = ["serde_macros"]
dev = ["clippy", "ethcore/dev", "ethcore-util/dev", "ethsync/dev"] dev = ["clippy", "ethcore/dev", "ethcore-util/dev", "ethsync/dev", "ethminer/dev"]

View File

@ -19,6 +19,8 @@
#![cfg_attr(feature="nightly", feature(custom_derive, custom_attribute, plugin))] #![cfg_attr(feature="nightly", feature(custom_derive, custom_attribute, plugin))]
#![cfg_attr(feature="nightly", plugin(serde_macros, clippy))] #![cfg_attr(feature="nightly", plugin(serde_macros, clippy))]
#[macro_use]
extern crate log;
extern crate rustc_serialize; extern crate rustc_serialize;
extern crate serde; extern crate serde;
extern crate serde_json; extern crate serde_json;
@ -27,6 +29,7 @@ extern crate jsonrpc_http_server;
extern crate ethcore_util as util; extern crate ethcore_util as util;
extern crate ethcore; extern crate ethcore;
extern crate ethsync; extern crate ethsync;
extern crate ethminer;
extern crate transient_hashmap; extern crate transient_hashmap;
use std::sync::Arc; use std::sync::Arc;

View File

@ -1,10 +1,13 @@
//! Helper type with all filter possibilities. //! Helper type with all filter possibilities.
use util::hash::H256;
use ethcore::filter::Filter; use ethcore::filter::Filter;
pub type BlockNumber = u64;
#[derive(Clone)] #[derive(Clone)]
pub enum PollFilter { pub enum PollFilter {
Block, Block(BlockNumber),
PendingTransaction, PendingTransaction(Vec<H256>),
Logs(Filter) Logs(BlockNumber, Filter)
} }

View File

@ -22,28 +22,13 @@ use transient_hashmap::{TransientHashMap, Timer, StandardTimer};
const POLL_LIFETIME: u64 = 60; const POLL_LIFETIME: u64 = 60;
pub type PollId = usize; pub type PollId = usize;
pub type BlockNumber = u64;
pub struct PollInfo<F> {
pub filter: F,
pub block_number: BlockNumber
}
impl<F> Clone for PollInfo<F> where F: Clone {
fn clone(&self) -> Self {
PollInfo {
filter: self.filter.clone(),
block_number: self.block_number.clone()
}
}
}
/// Indexes all poll requests. /// Indexes all poll requests.
/// ///
/// Lazily garbage collects unused polls info. /// Lazily garbage collects unused polls info.
pub struct PollManager<F, T = StandardTimer> where T: Timer { pub struct PollManager<F, T = StandardTimer> where T: Timer {
polls: TransientHashMap<PollId, PollInfo<F>, T>, polls: TransientHashMap<PollId, F, T>,
next_available_id: PollId next_available_id: PollId,
} }
impl<F> PollManager<F, StandardTimer> { impl<F> PollManager<F, StandardTimer> {
@ -54,6 +39,7 @@ impl<F> PollManager<F, StandardTimer> {
} }
impl<F, T> PollManager<F, T> where T: Timer { impl<F, T> PollManager<F, T> where T: Timer {
pub fn new_with_timer(timer: T) -> Self { pub fn new_with_timer(timer: T) -> Self {
PollManager { PollManager {
polls: TransientHashMap::new_with_timer(POLL_LIFETIME, timer), polls: TransientHashMap::new_with_timer(POLL_LIFETIME, timer),
@ -64,31 +50,30 @@ impl<F, T> PollManager<F, T> where T: Timer {
/// Returns id which can be used for new poll. /// Returns id which can be used for new poll.
/// ///
/// Stores information when last poll happend. /// Stores information when last poll happend.
pub fn create_poll(&mut self, filter: F, block: BlockNumber) -> PollId { pub fn create_poll(&mut self, filter: F) -> PollId {
self.polls.prune(); self.polls.prune();
let id = self.next_available_id; let id = self.next_available_id;
self.polls.insert(id, filter);
self.next_available_id += 1; self.next_available_id += 1;
self.polls.insert(id, PollInfo {
filter: filter,
block_number: block,
});
id id
} }
/// Updates information when last poll happend. // Implementation is always using `poll_mut`
pub fn update_poll(&mut self, id: &PollId, block: BlockNumber) { #[cfg(test)]
self.polls.prune(); /// Get a reference to stored poll filter
if let Some(info) = self.polls.get_mut(id) { pub fn poll(&mut self, id: &PollId) -> Option<&F> {
info.block_number = block;
}
}
/// Returns number of block when last poll happend.
pub fn poll_info(&mut self, id: &PollId) -> Option<&PollInfo<F>> {
self.polls.prune(); self.polls.prune();
self.polls.get(id) self.polls.get(id)
} }
/// Get a mutable reference to stored poll filter
pub fn poll_mut(&mut self, id: &PollId) -> Option<&mut F> {
self.polls.prune();
self.polls.get_mut(id)
}
/// Removes poll info. /// Removes poll info.
pub fn remove_poll(&mut self, id: &PollId) { pub fn remove_poll(&mut self, id: &PollId) {
self.polls.remove(id); self.polls.remove(id);
@ -97,48 +82,46 @@ impl<F, T> PollManager<F, T> where T: Timer {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::cell::RefCell; use std::cell::Cell;
use transient_hashmap::Timer; use transient_hashmap::Timer;
use v1::helpers::PollManager; use v1::helpers::PollManager;
struct TestTimer<'a> { struct TestTimer<'a> {
time: &'a RefCell<i64>, time: &'a Cell<i64>,
} }
impl<'a> Timer for TestTimer<'a> { impl<'a> Timer for TestTimer<'a> {
fn get_time(&self) -> i64 { fn get_time(&self) -> i64 {
*self.time.borrow() self.time.get()
} }
} }
#[test] #[test]
fn test_poll_indexer() { fn test_poll_indexer() {
let time = RefCell::new(0); let time = Cell::new(0);
let timer = TestTimer { let timer = TestTimer {
time: &time, time: &time,
}; };
let mut indexer = PollManager::new_with_timer(timer); let mut indexer = PollManager::new_with_timer(timer);
assert_eq!(indexer.create_poll(false, 20), 0); assert_eq!(indexer.create_poll(20), 0);
assert_eq!(indexer.create_poll(true, 20), 1); assert_eq!(indexer.create_poll(20), 1);
*time.borrow_mut() = 10; time.set(10);
indexer.update_poll(&0, 21); *indexer.poll_mut(&0).unwrap() = 21;
assert_eq!(indexer.poll_info(&0).unwrap().filter, false); assert_eq!(*indexer.poll(&0).unwrap(), 21);
assert_eq!(indexer.poll_info(&0).unwrap().block_number, 21); assert_eq!(*indexer.poll(&1).unwrap(), 20);
*time.borrow_mut() = 30; time.set(30);
indexer.update_poll(&1, 23); *indexer.poll_mut(&1).unwrap() = 23;
assert_eq!(indexer.poll_info(&1).unwrap().filter, true); assert_eq!(*indexer.poll(&1).unwrap(), 23);
assert_eq!(indexer.poll_info(&1).unwrap().block_number, 23);
*time.borrow_mut() = 75; time.set(75);
indexer.update_poll(&0, 30); assert!(indexer.poll(&0).is_none());
assert!(indexer.poll_info(&0).is_none()); assert_eq!(*indexer.poll(&1).unwrap(), 23);
assert_eq!(indexer.poll_info(&1).unwrap().filter, true);
assert_eq!(indexer.poll_info(&1).unwrap().block_number, 23);
indexer.remove_poll(&1); indexer.remove_poll(&1);
assert!(indexer.poll_info(&1).is_none()); assert!(indexer.poll(&1).is_none());
} }
} }

View File

@ -15,9 +15,11 @@
// along with Parity. If not, see <http://www.gnu.org/licenses/>. // along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! Eth rpc implementation. //! Eth rpc implementation.
use std::collections::HashMap; use std::collections::{HashMap, HashSet};
use std::sync::{Arc, Weak, Mutex, RwLock}; use std::sync::{Arc, Weak, Mutex, RwLock};
use std::ops::Deref;
use ethsync::{SyncProvider, SyncState}; use ethsync::{SyncProvider, SyncState};
use ethminer::{MinerService};
use jsonrpc_core::*; use jsonrpc_core::*;
use util::numbers::*; use util::numbers::*;
use util::sha3::*; use util::sha3::*;
@ -34,19 +36,29 @@ use v1::helpers::{PollFilter, PollManager};
use util::keys::store::AccountProvider; use util::keys::store::AccountProvider;
/// Eth rpc implementation. /// Eth rpc implementation.
pub struct EthClient<C, S, A> where C: BlockChainClient, S: SyncProvider, A: AccountProvider { pub struct EthClient<C, S, A, M>
where C: BlockChainClient,
S: SyncProvider,
A: AccountProvider,
M: MinerService {
client: Weak<C>, client: Weak<C>,
sync: Weak<S>, sync: Weak<S>,
accounts: Weak<A>, accounts: Weak<A>,
miner: Weak<M>,
hashrates: RwLock<HashMap<H256, u64>>, hashrates: RwLock<HashMap<H256, u64>>,
} }
impl<C, S, A> EthClient<C, S, A> where C: BlockChainClient, S: SyncProvider, A: AccountProvider { impl<C, S, A, M> EthClient<C, S, A, M>
where C: BlockChainClient,
S: SyncProvider,
A: AccountProvider,
M: MinerService {
/// Creates new EthClient. /// Creates new EthClient.
pub fn new(client: &Arc<C>, sync: &Arc<S>, accounts: &Arc<A>) -> Self { pub fn new(client: &Arc<C>, sync: &Arc<S>, accounts: &Arc<A>, miner: &Arc<M>) -> Self {
EthClient { EthClient {
client: Arc::downgrade(client), client: Arc::downgrade(client),
sync: Arc::downgrade(sync), sync: Arc::downgrade(sync),
miner: Arc::downgrade(miner),
accounts: Arc::downgrade(accounts), accounts: Arc::downgrade(accounts),
hashrates: RwLock::new(HashMap::new()), hashrates: RwLock::new(HashMap::new()),
} }
@ -98,7 +110,12 @@ impl<C, S, A> EthClient<C, S, A> where C: BlockChainClient, S: SyncProvider, A:
} }
} }
impl<C, S, A> Eth for EthClient<C, S, A> where C: BlockChainClient + 'static, S: SyncProvider + 'static, A: AccountProvider + 'static { impl<C, S, A, M> Eth for EthClient<C, S, A, M>
where C: BlockChainClient + 'static,
S: SyncProvider + 'static,
A: AccountProvider + 'static,
M: MinerService + 'static {
fn protocol_version(&self, params: Params) -> Result<Value, Error> { fn protocol_version(&self, params: Params) -> Result<Value, Error> {
match params { match params {
Params::None => Ok(Value::String(format!("{}", take_weak!(self.sync).status().protocol_version).to_owned())), Params::None => Ok(Value::String(format!("{}", take_weak!(self.sync).status().protocol_version).to_owned())),
@ -196,7 +213,7 @@ impl<C, S, A> Eth for EthClient<C, S, A> where C: BlockChainClient + 'static, S:
fn block_transaction_count_by_number(&self, params: Params) -> Result<Value, Error> { fn block_transaction_count_by_number(&self, params: Params) -> Result<Value, Error> {
from_params::<(BlockNumber,)>(params) from_params::<(BlockNumber,)>(params)
.and_then(|(block_number,)| match block_number { .and_then(|(block_number,)| match block_number {
BlockNumber::Pending => to_value(&U256::from(take_weak!(self.sync).status().transaction_queue_pending)), BlockNumber::Pending => to_value(&U256::from(take_weak!(self.miner).status().transaction_queue_pending)),
_ => to_value(&take_weak!(self.client).block(block_number.into()) _ => to_value(&take_weak!(self.client).block(block_number.into())
.map_or_else(U256::zero, |bytes| U256::from(BlockView::new(&bytes).transactions_count()))) .map_or_else(U256::zero, |bytes| U256::from(BlockView::new(&bytes).transactions_count())))
}) })
@ -271,8 +288,9 @@ impl<C, S, A> Eth for EthClient<C, S, A> where C: BlockChainClient + 'static, S:
fn work(&self, params: Params) -> Result<Value, Error> { fn work(&self, params: Params) -> Result<Value, Error> {
match params { match params {
Params::None => { Params::None => {
let c = take_weak!(self.client); let miner = take_weak!(self.miner);
let u = c.sealing_block().lock().unwrap(); let client = take_weak!(self.client);
let u = miner.sealing_block(client.deref()).lock().unwrap();
match *u { match *u {
Some(ref b) => { Some(ref b) => {
let pow_hash = b.hash(); let pow_hash = b.hash();
@ -290,9 +308,10 @@ impl<C, S, A> Eth for EthClient<C, S, A> where C: BlockChainClient + 'static, S:
fn submit_work(&self, params: Params) -> Result<Value, Error> { fn submit_work(&self, params: Params) -> Result<Value, Error> {
from_params::<(H64, H256, H256)>(params).and_then(|(nonce, pow_hash, mix_hash)| { from_params::<(H64, H256, H256)>(params).and_then(|(nonce, pow_hash, mix_hash)| {
// trace!("Decoded: nonce={}, pow_hash={}, mix_hash={}", nonce, pow_hash, mix_hash); // trace!("Decoded: nonce={}, pow_hash={}, mix_hash={}", nonce, pow_hash, mix_hash);
let c = take_weak!(self.client); let miner = take_weak!(self.miner);
let client = take_weak!(self.client);
let seal = vec![encode(&mix_hash).to_vec(), encode(&nonce).to_vec()]; let seal = vec![encode(&mix_hash).to_vec(), encode(&nonce).to_vec()];
let r = c.submit_seal(pow_hash, seal); let r = miner.submit_seal(client.deref(), pow_hash, seal);
to_value(&r.is_ok()) to_value(&r.is_ok())
}) })
} }
@ -311,12 +330,21 @@ impl<C, S, A> Eth for EthClient<C, S, A> where C: BlockChainClient + 'static, S:
let accounts = take_weak!(self.accounts); let accounts = take_weak!(self.accounts);
match accounts.account_secret(&transaction_request.from) { match accounts.account_secret(&transaction_request.from) {
Ok(secret) => { Ok(secret) => {
let sync = take_weak!(self.sync); let miner = take_weak!(self.miner);
let client = take_weak!(self.client);
let transaction: EthTransaction = transaction_request.into(); let transaction: EthTransaction = transaction_request.into();
let signed_transaction = transaction.sign(&secret); let signed_transaction = transaction.sign(&secret);
let hash = signed_transaction.hash(); let hash = signed_transaction.hash();
sync.insert_transaction(signed_transaction);
to_value(&hash) let import = miner.import_transactions(vec![signed_transaction], |a: &Address| client.nonce(a));
match import {
Ok(_) => to_value(&hash),
Err(e) => {
warn!("Error sending transaction: {:?}", e);
to_value(&U256::zero())
}
}
}, },
Err(_) => { to_value(&U256::zero()) } Err(_) => { to_value(&U256::zero()) }
} }
@ -325,27 +353,39 @@ impl<C, S, A> Eth for EthClient<C, S, A> where C: BlockChainClient + 'static, S:
} }
/// Eth filter rpc implementation. /// Eth filter rpc implementation.
pub struct EthFilterClient<C> where C: BlockChainClient { pub struct EthFilterClient<C, M>
where C: BlockChainClient,
M: MinerService {
client: Weak<C>, client: Weak<C>,
miner: Weak<M>,
polls: Mutex<PollManager<PollFilter>>, polls: Mutex<PollManager<PollFilter>>,
} }
impl<C> EthFilterClient<C> where C: BlockChainClient { impl<C, M> EthFilterClient<C, M>
where C: BlockChainClient,
M: MinerService {
/// Creates new Eth filter client. /// Creates new Eth filter client.
pub fn new(client: &Arc<C>) -> Self { pub fn new(client: &Arc<C>, miner: &Arc<M>) -> Self {
EthFilterClient { EthFilterClient {
client: Arc::downgrade(client), client: Arc::downgrade(client),
polls: Mutex::new(PollManager::new()) miner: Arc::downgrade(miner),
polls: Mutex::new(PollManager::new()),
} }
} }
} }
impl<C> EthFilter for EthFilterClient<C> where C: BlockChainClient + 'static { impl<C, M> EthFilter for EthFilterClient<C, M>
where C: BlockChainClient + 'static,
M: MinerService + 'static {
fn new_filter(&self, params: Params) -> Result<Value, Error> { fn new_filter(&self, params: Params) -> Result<Value, Error> {
from_params::<(Filter,)>(params) from_params::<(Filter,)>(params)
.and_then(|(filter,)| { .and_then(|(filter,)| {
let mut polls = self.polls.lock().unwrap(); let mut polls = self.polls.lock().unwrap();
let id = polls.create_poll(PollFilter::Logs(filter.into()), take_weak!(self.client).chain_info().best_block_number); let block_number = take_weak!(self.client).chain_info().best_block_number;
let id = polls.create_poll(PollFilter::Logs(block_number, filter.into()));
to_value(&U256::from(id)) to_value(&U256::from(id))
}) })
} }
@ -354,7 +394,7 @@ impl<C> EthFilter for EthFilterClient<C> where C: BlockChainClient + 'static {
match params { match params {
Params::None => { Params::None => {
let mut polls = self.polls.lock().unwrap(); let mut polls = self.polls.lock().unwrap();
let id = polls.create_poll(PollFilter::Block, take_weak!(self.client).chain_info().best_block_number); let id = polls.create_poll(PollFilter::Block(take_weak!(self.client).chain_info().best_block_number));
to_value(&U256::from(id)) to_value(&U256::from(id))
}, },
_ => Err(Error::invalid_params()) _ => Err(Error::invalid_params())
@ -365,7 +405,9 @@ impl<C> EthFilter for EthFilterClient<C> where C: BlockChainClient + 'static {
match params { match params {
Params::None => { Params::None => {
let mut polls = self.polls.lock().unwrap(); let mut polls = self.polls.lock().unwrap();
let id = polls.create_poll(PollFilter::PendingTransaction, take_weak!(self.client).chain_info().best_block_number); let pending_transactions = take_weak!(self.miner).pending_transactions_hashes();
let id = polls.create_poll(PollFilter::PendingTransaction(pending_transactions));
to_value(&U256::from(id)) to_value(&U256::from(id))
}, },
_ => Err(Error::invalid_params()) _ => Err(Error::invalid_params())
@ -376,37 +418,47 @@ impl<C> EthFilter for EthFilterClient<C> where C: BlockChainClient + 'static {
let client = take_weak!(self.client); let client = take_weak!(self.client);
from_params::<(Index,)>(params) from_params::<(Index,)>(params)
.and_then(|(index,)| { .and_then(|(index,)| {
let info = self.polls.lock().unwrap().poll_info(&index.value()).cloned(); let mut polls = self.polls.lock().unwrap();
match info { match polls.poll_mut(&index.value()) {
None => Ok(Value::Array(vec![] as Vec<Value>)), None => Ok(Value::Array(vec![] as Vec<Value>)),
Some(info) => match info.filter { Some(filter) => match *filter {
PollFilter::Block => { PollFilter::Block(ref mut block_number) => {
// + 1, cause we want to return hashes including current block hash. // + 1, cause we want to return hashes including current block hash.
let current_number = client.chain_info().best_block_number + 1; let current_number = client.chain_info().best_block_number + 1;
let hashes = (info.block_number..current_number).into_iter() let hashes = (*block_number..current_number).into_iter()
.map(BlockId::Number) .map(BlockId::Number)
.filter_map(|id| client.block_hash(id)) .filter_map(|id| client.block_hash(id))
.collect::<Vec<H256>>(); .collect::<Vec<H256>>();
self.polls.lock().unwrap().update_poll(&index.value(), current_number); *block_number = current_number;
to_value(&hashes) to_value(&hashes)
}, },
PollFilter::PendingTransaction => { PollFilter::PendingTransaction(ref mut previous_hashes) => {
// TODO: fix implementation once TransactionQueue is merged let current_hashes = take_weak!(self.miner).pending_transactions_hashes();
to_value(&vec![] as &Vec<H256>) // calculate diff
let previous_hashes_set = previous_hashes.into_iter().map(|h| h.clone()).collect::<HashSet<H256>>();
let diff = current_hashes
.iter()
.filter(|hash| previous_hashes_set.contains(&hash))
.cloned()
.collect::<Vec<H256>>();
*previous_hashes = current_hashes;
to_value(&diff)
}, },
PollFilter::Logs(mut filter) => { PollFilter::Logs(ref mut block_number, ref mut filter) => {
filter.from_block = BlockId::Number(info.block_number); filter.from_block = BlockId::Number(*block_number);
filter.to_block = BlockId::Latest; filter.to_block = BlockId::Latest;
let logs = client.logs(filter) let logs = client.logs(filter.clone())
.into_iter() .into_iter()
.map(From::from) .map(From::from)
.collect::<Vec<Log>>(); .collect::<Vec<Log>>();
let current_number = client.chain_info().best_block_number; let current_number = client.chain_info().best_block_number;
self.polls.lock().unwrap().update_poll(&index.value(), current_number);
*block_number = current_number;
to_value(&logs) to_value(&logs)
} }
} }

View File

@ -21,7 +21,7 @@ use util::hash::{Address, H256};
use util::numbers::U256; use util::numbers::U256;
use ethcore::client::{TestBlockChainClient, EachBlockWith}; use ethcore::client::{TestBlockChainClient, EachBlockWith};
use v1::{Eth, EthClient}; use v1::{Eth, EthClient};
use v1::tests::helpers::{TestAccount, TestAccountProvider, TestSyncProvider, Config}; use v1::tests::helpers::{TestAccount, TestAccountProvider, TestSyncProvider, Config, TestMinerService};
fn blockchain_client() -> Arc<TestBlockChainClient> { fn blockchain_client() -> Arc<TestBlockChainClient> {
let mut client = TestBlockChainClient::new(); let mut client = TestBlockChainClient::new();
@ -46,10 +46,15 @@ fn sync_provider() -> Arc<TestSyncProvider> {
})) }))
} }
fn miner_service() -> Arc<TestMinerService> {
Arc::new(TestMinerService)
}
struct EthTester { struct EthTester {
_client: Arc<TestBlockChainClient>, _client: Arc<TestBlockChainClient>,
_sync: Arc<TestSyncProvider>, _sync: Arc<TestSyncProvider>,
_accounts_provider: Arc<TestAccountProvider>, _accounts_provider: Arc<TestAccountProvider>,
_miner: Arc<TestMinerService>,
pub io: IoHandler, pub io: IoHandler,
} }
@ -58,13 +63,15 @@ impl Default for EthTester {
let client = blockchain_client(); let client = blockchain_client();
let sync = sync_provider(); let sync = sync_provider();
let ap = accounts_provider(); let ap = accounts_provider();
let eth = EthClient::new(&client, &sync, &ap).to_delegate(); let miner = miner_service();
let eth = EthClient::new(&client, &sync, &ap, &miner).to_delegate();
let io = IoHandler::new(); let io = IoHandler::new();
io.add_delegate(eth); io.add_delegate(eth);
EthTester { EthTester {
_client: client, _client: client,
_sync: sync, _sync: sync,
_accounts_provider: ap, _accounts_provider: ap,
_miner: miner,
io: io io: io
} }
} }

View File

@ -0,0 +1,53 @@
// Copyright 2015, 2016 Ethcore (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use util::{Address, H256, U256, Bytes};
use util::standard::*;
use ethcore::error::Error;
use ethcore::client::BlockChainClient;
use ethcore::block::ClosedBlock;
use ethcore::transaction::SignedTransaction;
use ethminer::{MinerService, MinerStatus};
pub struct TestMinerService;
impl MinerService for TestMinerService {
/// Returns miner's status.
fn status(&self) -> MinerStatus { unimplemented!(); }
/// Imports transactions to transaction queue.
fn import_transactions<T>(&self, _transactions: Vec<SignedTransaction>, _fetch_nonce: T) -> Result<(), Error> where T: Fn(&Address) -> U256 { unimplemented!(); }
/// Returns hashes of transactions currently in pending
fn pending_transactions_hashes(&self) -> Vec<H256> { unimplemented!(); }
/// Removes all transactions from the queue and restart mining operation.
fn clear_and_reset(&self, _chain: &BlockChainClient) { unimplemented!(); }
/// Called when blocks are imported to chain, updates transactions queue.
fn chain_new_blocks(&self, _chain: &BlockChainClient, _imported: &[H256], _invalid: &[H256], _enacted: &[H256], _retracted: &[H256]) { unimplemented!(); }
/// New chain head event. Restart mining operation.
fn prepare_sealing(&self, _chain: &BlockChainClient) { unimplemented!(); }
/// Grab the `ClosedBlock` that we want to be sealed. Comes as a mutex that you have to lock.
fn sealing_block(&self, _chain: &BlockChainClient) -> &Mutex<Option<ClosedBlock>> { unimplemented!(); }
/// Submit `seal` as a valid solution for the header of `pow_hash`.
/// Will check the seal, but not actually insert the block into the chain.
fn submit_seal(&self, _chain: &BlockChainClient, _pow_hash: H256, _seal: Vec<Bytes>) -> Result<(), Error> { unimplemented!(); }
}

View File

@ -16,6 +16,8 @@
mod account_provider; mod account_provider;
mod sync_provider; mod sync_provider;
mod miner_service;
pub use self::account_provider::{TestAccount, TestAccountProvider}; pub use self::account_provider::{TestAccount, TestAccountProvider};
pub use self::sync_provider::{Config, TestSyncProvider}; pub use self::sync_provider::{Config, TestSyncProvider};
pub use self::miner_service::{TestMinerService};

View File

@ -14,7 +14,6 @@
// You should have received a copy of the GNU General Public License // You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>. // along with Parity. If not, see <http://www.gnu.org/licenses/>.
use ethcore::transaction::SignedTransaction;
use ethsync::{SyncProvider, SyncStatus, SyncState}; use ethsync::{SyncProvider, SyncStatus, SyncState};
pub struct Config { pub struct Config {
@ -40,7 +39,6 @@ impl TestSyncProvider {
num_peers: config.num_peers, num_peers: config.num_peers,
num_active_peers: 0, num_active_peers: 0,
mem_used: 0, mem_used: 0,
transaction_queue_pending: 0,
}, },
} }
} }
@ -50,9 +48,5 @@ impl SyncProvider for TestSyncProvider {
fn status(&self) -> SyncStatus { fn status(&self) -> SyncStatus {
self.status.clone() self.status.clone()
} }
fn insert_transaction(&self, _transaction: SignedTransaction) {
unimplemented!()
}
} }

View File

@ -11,14 +11,13 @@ authors = ["Ethcore <admin@ethcore.io"]
ethcore-util = { path = "../util" } ethcore-util = { path = "../util" }
ethcore = { path = "../ethcore" } ethcore = { path = "../ethcore" }
clippy = { version = "0.0.50", optional = true } clippy = { version = "0.0.50", optional = true }
ethminer = { path = "../miner" }
log = "0.3" log = "0.3"
env_logger = "0.3" env_logger = "0.3"
time = "0.1.34" time = "0.1.34"
rand = "0.3.13" rand = "0.3.13"
heapsize = "0.3" heapsize = "0.3"
rustc-serialize = "0.3"
rayon = "0.3.1"
[features] [features]
default = [] default = []
dev = ["clippy", "ethcore/dev", "ethcore-util/dev"] dev = ["clippy", "ethcore/dev", "ethcore-util/dev", "ethminer/dev"]

View File

@ -30,20 +30,18 @@
/// ///
use util::*; use util::*;
use rayon::prelude::*;
use std::mem::{replace}; use std::mem::{replace};
use ethcore::views::{HeaderView, BlockView}; use ethcore::views::{HeaderView};
use ethcore::header::{BlockNumber, Header as BlockHeader}; use ethcore::header::{BlockNumber, Header as BlockHeader};
use ethcore::client::{BlockChainClient, BlockStatus, BlockId, BlockChainInfo}; use ethcore::client::{BlockChainClient, BlockStatus, BlockId, BlockChainInfo};
use range_collection::{RangeCollection, ToUsize, FromUsize}; use range_collection::{RangeCollection, ToUsize, FromUsize};
use ethcore::error::*; use ethcore::error::*;
use ethcore::block::Block;
use ethcore::transaction::SignedTransaction; use ethcore::transaction::SignedTransaction;
use ethcore::block::Block;
use ethminer::{Miner, MinerService};
use io::SyncIo; use io::SyncIo;
use transaction_queue::TransactionQueue;
use time; use time;
use super::SyncConfig; use super::SyncConfig;
use ethcore;
known_heap_size!(0, PeerInfo, Header, HeaderId); known_heap_size!(0, PeerInfo, Header, HeaderId);
@ -142,8 +140,6 @@ pub struct SyncStatus {
pub num_active_peers: usize, pub num_active_peers: usize,
/// Heap memory used in bytes /// Heap memory used in bytes
pub mem_used: usize, pub mem_used: usize,
/// Number of pending transactions in queue
pub transaction_queue_pending: usize,
} }
#[derive(PartialEq, Eq, Debug, Clone)] #[derive(PartialEq, Eq, Debug, Clone)]
@ -216,15 +212,15 @@ pub struct ChainSync {
max_download_ahead_blocks: usize, max_download_ahead_blocks: usize,
/// Network ID /// Network ID
network_id: U256, network_id: U256,
/// Transactions Queue /// Miner
transaction_queue: Mutex<TransactionQueue>, miner: Arc<Miner>,
} }
type RlpResponseResult = Result<Option<(PacketId, RlpStream)>, PacketDecodeError>; type RlpResponseResult = Result<Option<(PacketId, RlpStream)>, PacketDecodeError>;
impl ChainSync { impl ChainSync {
/// Create a new instance of syncing strategy. /// Create a new instance of syncing strategy.
pub fn new(config: SyncConfig) -> ChainSync { pub fn new(config: SyncConfig, miner: Arc<Miner>) -> ChainSync {
ChainSync { ChainSync {
state: SyncState::NotSynced, state: SyncState::NotSynced,
starting_block: 0, starting_block: 0,
@ -243,7 +239,7 @@ impl ChainSync {
last_sent_block_number: 0, last_sent_block_number: 0,
max_download_ahead_blocks: max(MAX_HEADERS_TO_REQUEST, config.max_download_ahead_blocks), max_download_ahead_blocks: max(MAX_HEADERS_TO_REQUEST, config.max_download_ahead_blocks),
network_id: config.network_id, network_id: config.network_id,
transaction_queue: Mutex::new(TransactionQueue::new()), miner: miner,
} }
} }
@ -259,7 +255,6 @@ impl ChainSync {
blocks_total: match self.highest_block { Some(x) if x > self.starting_block => x - self.starting_block, _ => 0 }, blocks_total: match self.highest_block { Some(x) if x > self.starting_block => x - self.starting_block, _ => 0 },
num_peers: self.peers.len(), num_peers: self.peers.len(),
num_active_peers: self.peers.values().filter(|p| p.asking != PeerAsking::Nothing).count(), num_active_peers: self.peers.values().filter(|p| p.asking != PeerAsking::Nothing).count(),
transaction_queue_pending: self.transaction_queue.lock().unwrap().status().pending,
mem_used: mem_used:
// TODO: https://github.com/servo/heapsize/pull/50 // TODO: https://github.com/servo/heapsize/pull/50
// self.downloading_hashes.heap_size_of_children() // self.downloading_hashes.heap_size_of_children()
@ -301,7 +296,6 @@ impl ChainSync {
self.starting_block = 0; self.starting_block = 0;
self.highest_block = None; self.highest_block = None;
self.have_common_block = false; self.have_common_block = false;
self.transaction_queue.lock().unwrap().clear();
self.starting_block = io.chain().chain_info().best_block_number; self.starting_block = io.chain().chain_info().best_block_number;
self.state = SyncState::NotSynced; self.state = SyncState::NotSynced;
} }
@ -931,16 +925,17 @@ impl ChainSync {
} }
/// Called when peer sends us new transactions /// Called when peer sends us new transactions
fn on_peer_transactions(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { fn on_peer_transactions(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> {
let chain = io.chain();
let item_count = r.item_count(); let item_count = r.item_count();
trace!(target: "sync", "{} -> Transactions ({} entries)", peer_id, item_count); trace!(target: "sync", "{} -> Transactions ({} entries)", peer_id, item_count);
let fetch_latest_nonce = |a : &Address| chain.nonce(a);
let mut transaction_queue = self.transaction_queue.lock().unwrap(); let mut transactions = Vec::with_capacity(item_count);
for i in 0..item_count { for i in 0..item_count {
let tx: SignedTransaction = try!(r.val_at(i)); let tx: SignedTransaction = try!(r.val_at(i));
let _ = transaction_queue.add(tx, &fetch_latest_nonce); transactions.push(tx);
} }
let chain = io.chain();
let fetch_nonce = |a: &Address| chain.nonce(a);
let _ = self.miner.import_transactions(transactions, fetch_nonce);
Ok(()) Ok(())
} }
@ -1268,48 +1263,16 @@ impl ChainSync {
} }
/// called when block is imported to chain, updates transactions queue and propagates the blocks /// called when block is imported to chain, updates transactions queue and propagates the blocks
pub fn chain_new_blocks(&mut self, io: &mut SyncIo, good: &[H256], bad: &[H256], _retracted: &[H256]) { pub fn chain_new_blocks(&mut self, io: &mut SyncIo, imported: &[H256], invalid: &[H256], enacted: &[H256], retracted: &[H256]) {
fn fetch_transactions(chain: &BlockChainClient, hash: &H256) -> Vec<SignedTransaction> { // Notify miner
let block = chain self.miner.chain_new_blocks(io.chain(), imported, invalid, enacted, retracted);
.block(BlockId::Hash(hash.clone()))
// Client should send message after commit to db and inserting to chain.
.expect("Expected in-chain blocks.");
let block = BlockView::new(&block);
block.transactions()
}
{
let chain = io.chain();
let good = good.par_iter().map(|h| fetch_transactions(chain, h));
let bad = bad.par_iter().map(|h| fetch_transactions(chain, h));
good.for_each(|txs| {
let mut transaction_queue = self.transaction_queue.lock().unwrap();
let hashes = txs.iter().map(|tx| tx.hash()).collect::<Vec<H256>>();
transaction_queue.remove_all(&hashes, |a| chain.nonce(a));
});
bad.for_each(|txs| {
// populate sender
for tx in &txs {
let _sender = tx.sender();
}
let mut transaction_queue = self.transaction_queue.lock().unwrap();
let _ = transaction_queue.add_all(txs, |a| chain.nonce(a));
});
}
// Propagate latests blocks // Propagate latests blocks
self.propagate_latest_blocks(io); self.propagate_latest_blocks(io);
// TODO [todr] propagate transactions? // TODO [todr] propagate transactions?
} }
/// Add transaction to the transaction queue pub fn chain_new_head(&mut self, io: &mut SyncIo) {
pub fn insert_transaction<T>(&self, transaction: ethcore::transaction::SignedTransaction, fetch_nonce: &T) -> Result<(), Error> self.miner.prepare_sealing(io.chain());
where T: Fn(&Address) -> U256
{
let mut queue = self.transaction_queue.lock().unwrap();
queue.add(transaction, fetch_nonce)
} }
} }
@ -1322,6 +1285,7 @@ mod tests {
use super::{PeerInfo, PeerAsking}; use super::{PeerInfo, PeerAsking};
use ethcore::header::*; use ethcore::header::*;
use ethcore::client::*; use ethcore::client::*;
use ethminer::{Miner, MinerService};
fn get_dummy_block(order: u32, parent_hash: H256) -> Bytes { fn get_dummy_block(order: u32, parent_hash: H256) -> Bytes {
let mut header = Header::new(); let mut header = Header::new();
@ -1431,7 +1395,7 @@ mod tests {
} }
fn dummy_sync_with_peer(peer_latest_hash: H256) -> ChainSync { fn dummy_sync_with_peer(peer_latest_hash: H256) -> ChainSync {
let mut sync = ChainSync::new(SyncConfig::default()); let mut sync = ChainSync::new(SyncConfig::default(), Miner::new());
sync.peers.insert(0, sync.peers.insert(0,
PeerInfo { PeerInfo {
protocol_version: 0, protocol_version: 0,
@ -1652,15 +1616,15 @@ mod tests {
let mut io = TestIo::new(&mut client, &mut queue, None); let mut io = TestIo::new(&mut client, &mut queue, None);
// when // when
sync.chain_new_blocks(&mut io, &[], &good_blocks, &[]); sync.chain_new_blocks(&mut io, &[], &[], &[], &good_blocks);
assert_eq!(sync.transaction_queue.lock().unwrap().status().future, 0); assert_eq!(sync.miner.status().transaction_queue_future, 0);
assert_eq!(sync.transaction_queue.lock().unwrap().status().pending, 1); assert_eq!(sync.miner.status().transaction_queue_pending, 1);
sync.chain_new_blocks(&mut io, &good_blocks, &retracted_blocks, &[]); sync.chain_new_blocks(&mut io, &good_blocks, &[], &[], &retracted_blocks);
// then // then
let status = sync.transaction_queue.lock().unwrap().status(); let status = sync.miner.status();
assert_eq!(status.pending, 1); assert_eq!(status.transaction_queue_pending, 1);
assert_eq!(status.future, 0); assert_eq!(status.transaction_queue_future, 0);
} }
#[test] #[test]

View File

@ -32,18 +32,21 @@
//! extern crate ethcore_util as util; //! extern crate ethcore_util as util;
//! extern crate ethcore; //! extern crate ethcore;
//! extern crate ethsync; //! extern crate ethsync;
//! extern crate ethminer;
//! use std::env; //! use std::env;
//! use std::sync::Arc; //! use std::sync::Arc;
//! use util::network::{NetworkService, NetworkConfiguration}; //! use util::network::{NetworkService, NetworkConfiguration};
//! use ethcore::client::{Client, ClientConfig}; //! use ethcore::client::{Client, ClientConfig};
//! use ethsync::{EthSync, SyncConfig}; //! use ethsync::{EthSync, SyncConfig};
//! use ethminer::Miner;
//! use ethcore::ethereum; //! use ethcore::ethereum;
//! //!
//! fn main() { //! fn main() {
//! let mut service = NetworkService::start(NetworkConfiguration::new()).unwrap(); //! let mut service = NetworkService::start(NetworkConfiguration::new()).unwrap();
//! let dir = env::temp_dir(); //! let dir = env::temp_dir();
//! let client = Client::new(ClientConfig::default(), ethereum::new_frontier(), &dir, service.io().channel()).unwrap(); //! let client = Client::new(ClientConfig::default(), ethereum::new_frontier(), &dir, service.io().channel()).unwrap();
//! EthSync::register(&mut service, SyncConfig::default(), client); //! let miner = Miner::new();
//! EthSync::register(&mut service, SyncConfig::default(), client, miner);
//! } //! }
//! ``` //! ```
@ -52,28 +55,27 @@ extern crate log;
#[macro_use] #[macro_use]
extern crate ethcore_util as util; extern crate ethcore_util as util;
extern crate ethcore; extern crate ethcore;
extern crate ethminer;
extern crate env_logger; extern crate env_logger;
extern crate time; extern crate time;
extern crate rand; extern crate rand;
extern crate rayon;
#[macro_use] #[macro_use]
extern crate heapsize; extern crate heapsize;
use std::ops::*; use std::ops::*;
use std::sync::*; use std::sync::*;
use ethcore::client::Client;
use util::network::{NetworkProtocolHandler, NetworkService, NetworkContext, PeerId}; use util::network::{NetworkProtocolHandler, NetworkService, NetworkContext, PeerId};
use util::TimerToken; use util::TimerToken;
use util::{U256, ONE_U256}; use util::{U256, ONE_U256};
use chain::ChainSync; use ethcore::client::Client;
use ethcore::service::SyncMessage; use ethcore::service::SyncMessage;
use ethminer::Miner;
use io::NetSyncIo; use io::NetSyncIo;
use chain::ChainSync;
mod chain; mod chain;
mod io; mod io;
mod range_collection; mod range_collection;
mod transaction_queue;
pub use transaction_queue::TransactionQueue;
#[cfg(test)] #[cfg(test)]
mod tests; mod tests;
@ -99,8 +101,6 @@ impl Default for SyncConfig {
pub trait SyncProvider: Send + Sync { pub trait SyncProvider: Send + Sync {
/// Get sync status /// Get sync status
fn status(&self) -> SyncStatus; fn status(&self) -> SyncStatus;
/// Insert transaction in the sync transaction queue
fn insert_transaction(&self, transaction: ethcore::transaction::SignedTransaction);
} }
/// Ethereum network protocol handler /// Ethereum network protocol handler
@ -115,10 +115,10 @@ pub use self::chain::{SyncStatus, SyncState};
impl EthSync { impl EthSync {
/// Creates and register protocol with the network service /// Creates and register protocol with the network service
pub fn register(service: &mut NetworkService<SyncMessage>, config: SyncConfig, chain: Arc<Client>) -> Arc<EthSync> { pub fn register(service: &mut NetworkService<SyncMessage>, config: SyncConfig, chain: Arc<Client>, miner: Arc<Miner>) -> Arc<EthSync> {
let sync = Arc::new(EthSync { let sync = Arc::new(EthSync {
chain: chain, chain: chain,
sync: RwLock::new(ChainSync::new(config)), sync: RwLock::new(ChainSync::new(config, miner)),
}); });
service.register_protocol(sync.clone(), "eth", &[62u8, 63u8]).expect("Error registering eth protocol handler"); service.register_protocol(sync.clone(), "eth", &[62u8, 63u8]).expect("Error registering eth protocol handler");
sync sync
@ -140,16 +140,6 @@ impl SyncProvider for EthSync {
fn status(&self) -> SyncStatus { fn status(&self) -> SyncStatus {
self.sync.read().unwrap().status() self.sync.read().unwrap().status()
} }
/// Insert transaction in transaction queue
fn insert_transaction(&self, transaction: ethcore::transaction::SignedTransaction) {
use util::numbers::*;
let nonce_fn = |a: &Address| self.chain.state().nonce(a) + U256::one();
let sync = self.sync.write().unwrap();
sync.insert_transaction(transaction, &nonce_fn).unwrap_or_else(
|e| warn!(target: "sync", "Error inserting transaction to queue: {:?}", e));
}
} }
impl NetworkProtocolHandler<SyncMessage> for EthSync { impl NetworkProtocolHandler<SyncMessage> for EthSync {
@ -174,13 +164,16 @@ impl NetworkProtocolHandler<SyncMessage> for EthSync {
self.sync.write().unwrap().maintain_sync(&mut NetSyncIo::new(io, self.chain.deref())); self.sync.write().unwrap().maintain_sync(&mut NetSyncIo::new(io, self.chain.deref()));
} }
#[allow(single_match)]
fn message(&self, io: &NetworkContext<SyncMessage>, message: &SyncMessage) { fn message(&self, io: &NetworkContext<SyncMessage>, message: &SyncMessage) {
match *message { match *message {
SyncMessage::NewChainBlocks { ref good, ref bad, ref retracted } => { SyncMessage::NewChainBlocks { ref imported, ref invalid, ref enacted, ref retracted } => {
let mut sync_io = NetSyncIo::new(io, self.chain.deref()); let mut sync_io = NetSyncIo::new(io, self.chain.deref());
self.sync.write().unwrap().chain_new_blocks(&mut sync_io, good, bad, retracted); self.sync.write().unwrap().chain_new_blocks(&mut sync_io, imported, invalid, enacted, retracted);
}, },
SyncMessage::NewChainHead => {
let mut sync_io = NetSyncIo::new(io, self.chain.deref());
self.sync.write().unwrap().chain_new_head(&mut sync_io);
}
_ => {/* Ignore other messages */}, _ => {/* Ignore other messages */},
} }
} }

View File

@ -18,6 +18,7 @@ use util::*;
use ethcore::client::{TestBlockChainClient, BlockChainClient}; use ethcore::client::{TestBlockChainClient, BlockChainClient};
use io::SyncIo; use io::SyncIo;
use chain::ChainSync; use chain::ChainSync;
use ethminer::Miner;
use ::SyncConfig; use ::SyncConfig;
pub struct TestIo<'p> { pub struct TestIo<'p> {
@ -92,7 +93,7 @@ impl TestNet {
for _ in 0..n { for _ in 0..n {
net.peers.push(TestPeer { net.peers.push(TestPeer {
chain: TestBlockChainClient::new(), chain: TestBlockChainClient::new(),
sync: ChainSync::new(SyncConfig::default()), sync: ChainSync::new(SyncConfig::default(), Miner::new()),
queue: VecDeque::new(), queue: VecDeque::new(),
}); });
} }
@ -167,6 +168,6 @@ impl TestNet {
pub fn trigger_chain_new_blocks(&mut self, peer_id: usize) { pub fn trigger_chain_new_blocks(&mut self, peer_id: usize) {
let mut peer = self.peer_mut(peer_id); let mut peer = self.peer_mut(peer_id);
peer.sync.chain_new_blocks(&mut TestIo::new(&mut peer.chain, &mut peer.queue, None), &[], &[], &[]); peer.sync.chain_new_blocks(&mut TestIo::new(&mut peer.chain, &mut peer.queue, None), &[], &[], &[], &[]);
} }
} }

View File

@ -1,4 +1,5 @@
#!/bin/sh #!/bin/sh
# Running Parity Full Test Sute # Running Parity Full Test Sute
cargo test --features ethcore/json-tests $1 -p ethash -p ethcore-util -p ethcore -p ethsync -p ethcore-rpc -p parity cargo test --features ethcore/json-tests $1 -p ethash -p ethcore-util -p ethcore -p ethsync -p ethcore-rpc -p parity -p
ethminer

View File

@ -36,7 +36,6 @@
//! The functions here are designed to be fast. //! The functions here are designed to be fast.
//! //!
#[cfg(all(asm_available, target_arch="x86_64"))] #[cfg(all(asm_available, target_arch="x86_64"))]
use std::mem; use std::mem;
use std::fmt; use std::fmt;

View File

@ -21,12 +21,13 @@ use network::NetworkError;
use rlp::DecoderError; use rlp::DecoderError;
use io; use io;
use std::fmt; use std::fmt;
use hash::H256;
#[derive(Debug)] #[derive(Debug)]
/// Error in database subsystem. /// Error in database subsystem.
pub enum BaseDataError { pub enum BaseDataError {
/// An entry was removed more times than inserted. /// An entry was removed more times than inserted.
NegativelyReferencedHash, NegativelyReferencedHash(H256),
} }
#[derive(Debug)] #[derive(Debug)]

View File

@ -132,7 +132,7 @@ impl JournalDB for ArchiveDB {
Box::new(ArchiveDB { Box::new(ArchiveDB {
overlay: MemoryDB::new(), overlay: MemoryDB::new(),
backing: self.backing.clone(), backing: self.backing.clone(),
latest_era: None, latest_era: self.latest_era,
}) })
} }
@ -144,7 +144,7 @@ impl JournalDB for ArchiveDB {
self.latest_era.is_none() self.latest_era.is_none()
} }
fn commit(&mut self, _: u64, _: &H256, _: Option<(u64, H256)>) -> Result<u32, UtilError> { fn commit(&mut self, now: u64, _: &H256, _: Option<(u64, H256)>) -> Result<u32, UtilError> {
let batch = DBTransaction::new(); let batch = DBTransaction::new();
let mut inserts = 0usize; let mut inserts = 0usize;
let mut deletes = 0usize; let mut deletes = 0usize;
@ -160,6 +160,10 @@ impl JournalDB for ArchiveDB {
deletes += 1; deletes += 1;
} }
} }
if self.latest_era.map_or(true, |e| now > e) {
try!(batch.put(&LATEST_ERA_KEY, &encode(&now)));
self.latest_era = Some(now);
}
try!(self.backing.write(batch)); try!(self.backing.write(batch));
Ok((inserts + deletes) as u32) Ok((inserts + deletes) as u32)
} }

View File

@ -43,6 +43,7 @@ pub struct RefCountedDB {
const LATEST_ERA_KEY : [u8; 12] = [ b'l', b'a', b's', b't', 0, 0, 0, 0, 0, 0, 0, 0 ]; const LATEST_ERA_KEY : [u8; 12] = [ b'l', b'a', b's', b't', 0, 0, 0, 0, 0, 0, 0, 0 ];
const VERSION_KEY : [u8; 12] = [ b'j', b'v', b'e', b'r', 0, 0, 0, 0, 0, 0, 0, 0 ]; const VERSION_KEY : [u8; 12] = [ b'j', b'v', b'e', b'r', 0, 0, 0, 0, 0, 0, 0, 0 ];
const DB_VERSION : u32 = 512; const DB_VERSION : u32 = 512;
const PADDING : [u8; 10] = [ 0u8; 10 ];
impl RefCountedDB { impl RefCountedDB {
/// Create a new instance given a `backing` database. /// Create a new instance given a `backing` database.
@ -131,9 +132,10 @@ impl JournalDB for RefCountedDB {
let mut last; let mut last;
while try!(self.backing.get({ while try!(self.backing.get({
let mut r = RlpStream::new_list(2); let mut r = RlpStream::new_list(3);
r.append(&now); r.append(&now);
r.append(&index); r.append(&index);
r.append(&&PADDING[..]);
last = r.drain(); last = r.drain();
&last &last
})).is_some() { })).is_some() {
@ -144,7 +146,10 @@ impl JournalDB for RefCountedDB {
r.append(id); r.append(id);
r.append(&self.inserts); r.append(&self.inserts);
r.append(&self.removes); r.append(&self.removes);
try!(self.backing.put(&last, r.as_raw())); try!(batch.put(&last, r.as_raw()));
trace!(target: "rcdb", "new journal for time #{}.{} => {}: inserts={:?}, removes={:?}", now, index, id, self.inserts, self.removes);
self.inserts.clear(); self.inserts.clear();
self.removes.clear(); self.removes.clear();
@ -158,20 +163,25 @@ impl JournalDB for RefCountedDB {
if let Some((end_era, canon_id)) = end { if let Some((end_era, canon_id)) = end {
let mut index = 0usize; let mut index = 0usize;
let mut last; let mut last;
while let Some(rlp_data) = try!(self.backing.get({ while let Some(rlp_data) = {
let mut r = RlpStream::new_list(2); // trace!(target: "rcdb", "checking for journal #{}.{}", end_era, index);
r.append(&end_era); try!(self.backing.get({
r.append(&index); let mut r = RlpStream::new_list(3);
last = r.drain(); r.append(&end_era);
&last r.append(&index);
})) { r.append(&&PADDING[..]);
last = r.drain();
&last
}))
} {
let rlp = Rlp::new(&rlp_data); let rlp = Rlp::new(&rlp_data);
let to_remove: Vec<H256> = rlp.val_at(if canon_id == rlp.val_at(0) {2} else {1}); let our_id: H256 = rlp.val_at(0);
let to_remove: Vec<H256> = rlp.val_at(if canon_id == our_id {2} else {1});
trace!(target: "rcdb", "delete journal for time #{}.{}=>{}, (canon was {}): deleting {:?}", end_era, index, our_id, canon_id, to_remove);
for i in &to_remove { for i in &to_remove {
self.forward.remove(i); self.forward.remove(i);
} }
try!(self.backing.delete(&last)); try!(batch.delete(&last));
trace!("RefCountedDB: delete journal for time #{}.{}, (canon was {}): {} entries", end_era, index, canon_id, to_remove.len());
index += 1; index += 1;
} }
} }

View File

@ -70,15 +70,15 @@ impl OverlayDB {
let (back_value, back_rc) = x; let (back_value, back_rc) = x;
let total_rc: i32 = back_rc as i32 + rc; let total_rc: i32 = back_rc as i32 + rc;
if total_rc < 0 { if total_rc < 0 {
return Err(From::from(BaseDataError::NegativelyReferencedHash)); return Err(From::from(BaseDataError::NegativelyReferencedHash(key)));
} }
deletes += if self.put_payload(batch, &key, (back_value, total_rc as u32)) {1} else {0}; deletes += if self.put_payload_in_batch(batch, &key, (back_value, total_rc as u32)) {1} else {0};
} }
None => { None => {
if rc < 0 { if rc < 0 {
return Err(From::from(BaseDataError::NegativelyReferencedHash)); return Err(From::from(BaseDataError::NegativelyReferencedHash(key)));
} }
self.put_payload(batch, &key, (value, rc as u32)); self.put_payload_in_batch(batch, &key, (value, rc as u32));
} }
}; };
ret += 1; ret += 1;
@ -116,10 +116,32 @@ impl OverlayDB {
/// } /// }
/// ``` /// ```
pub fn commit(&mut self) -> Result<u32, UtilError> { pub fn commit(&mut self) -> Result<u32, UtilError> {
let batch = DBTransaction::new(); let mut ret = 0u32;
let r = try!(self.commit_to_batch(&batch)); let mut deletes = 0usize;
try!(self.backing.write(batch)); for i in self.overlay.drain().into_iter() {
Ok(r) let (key, (value, rc)) = i;
if rc != 0 {
match self.payload(&key) {
Some(x) => {
let (back_value, back_rc) = x;
let total_rc: i32 = back_rc as i32 + rc;
if total_rc < 0 {
return Err(From::from(BaseDataError::NegativelyReferencedHash(key)));
}
deletes += if self.put_payload(&key, (back_value, total_rc as u32)) {1} else {0};
}
None => {
if rc < 0 {
return Err(From::from(BaseDataError::NegativelyReferencedHash(key)));
}
self.put_payload(&key, (value, rc as u32));
}
};
ret += 1;
}
}
trace!("OverlayDB::commit() deleted {} nodes", deletes);
Ok(ret)
} }
/// Revert all operations on this object (i.e. `insert()`s and `kill()`s) since the /// Revert all operations on this object (i.e. `insert()`s and `kill()`s) since the
@ -145,6 +167,9 @@ impl OverlayDB {
/// ``` /// ```
pub fn revert(&mut self) { self.overlay.clear(); } pub fn revert(&mut self) { self.overlay.clear(); }
/// Get the number of references that would be committed.
pub fn commit_refs(&self, key: &H256) -> i32 { self.overlay.raw(&key).map_or(0, |&(_, refs)| refs) }
/// Get the refs and value of the given key. /// Get the refs and value of the given key.
fn payload(&self, key: &H256) -> Option<(Bytes, u32)> { fn payload(&self, key: &H256) -> Option<(Bytes, u32)> {
self.backing.get(&key.bytes()) self.backing.get(&key.bytes())
@ -156,7 +181,7 @@ impl OverlayDB {
} }
/// Put the refs and value of the given key, possibly deleting it from the db. /// Put the refs and value of the given key, possibly deleting it from the db.
fn put_payload(&self, batch: &DBTransaction, key: &H256, payload: (Bytes, u32)) -> bool { fn put_payload_in_batch(&self, batch: &DBTransaction, key: &H256, payload: (Bytes, u32)) -> bool {
if payload.1 > 0 { if payload.1 > 0 {
let mut s = RlpStream::new_list(2); let mut s = RlpStream::new_list(2);
s.append(&payload.1); s.append(&payload.1);
@ -168,6 +193,20 @@ impl OverlayDB {
true true
} }
} }
/// Put the refs and value of the given key, possibly deleting it from the db.
fn put_payload(&self, key: &H256, payload: (Bytes, u32)) -> bool {
if payload.1 > 0 {
let mut s = RlpStream::new_list(2);
s.append(&payload.1);
s.append(&payload.0);
self.backing.put(&key.bytes(), s.as_raw()).expect("Low-level database error. Some issue with your hard disk?");
false
} else {
self.backing.delete(&key.bytes()).expect("Low-level database error. Some issue with your hard disk?");
true
}
}
} }
impl HashDB for OverlayDB { impl HashDB for OverlayDB {

View File

@ -24,7 +24,7 @@ impl<'a> From<UntrustedRlp<'a>> for Rlp<'a> {
} }
/// Data-oriented view onto trusted rlp-slice. /// Data-oriented view onto trusted rlp-slice.
/// ///
/// Unlikely to `UntrustedRlp` doesn't bother you with error /// Unlikely to `UntrustedRlp` doesn't bother you with error
/// handling. It assumes that you know what you are doing. /// handling. It assumes that you know what you are doing.
#[derive(Debug)] #[derive(Debug)]
@ -44,7 +44,7 @@ impl<'a, 'view> View<'a, 'view> for Rlp<'a> where 'a: 'view {
type Data = &'a [u8]; type Data = &'a [u8];
type Item = Rlp<'a>; type Item = Rlp<'a>;
type Iter = RlpIterator<'a, 'view>; type Iter = RlpIterator<'a, 'view>;
/// Create a new instance of `Rlp` /// Create a new instance of `Rlp`
fn new(bytes: &'a [u8]) -> Rlp<'a> { fn new(bytes: &'a [u8]) -> Rlp<'a> {
Rlp { Rlp {
@ -116,7 +116,7 @@ impl<'a, 'view> View<'a, 'view> for Rlp<'a> where 'a: 'view {
impl <'a, 'view> Rlp<'a> where 'a: 'view { impl <'a, 'view> Rlp<'a> where 'a: 'view {
fn view_as_val<T, R>(r: &R) -> T where R: View<'a, 'view>, T: RlpDecodable { fn view_as_val<T, R>(r: &R) -> T where R: View<'a, 'view>, T: RlpDecodable {
let res: Result<T, DecoderError> = r.as_val(); let res: Result<T, DecoderError> = r.as_val();
res.unwrap_or_else(|_| panic!()) res.unwrap_or_else(|e| panic!("DecodeError: {}", e))
} }
/// Decode into an object /// Decode into an object