Merge branch 'master' into tx_queue_integration

Conflicts:
	sync/src/transaction_queue.rs
This commit is contained in:
Tomasz Drwięga 2016-03-05 16:44:35 +01:00
commit 5ac7b9f812
22 changed files with 273 additions and 253 deletions

View File

@ -13,6 +13,8 @@ matrix:
allow_failures:
- rust: nightly
include:
- rust: stable
env: FEATURES="--features travis-beta" KCOV_FEATURES="" TARGETS="-p ethash -p ethcore-util -p ethcore -p ethsync -p ethcore-rpc -p parity" ARCHIVE_SUFFIX="-${TRAVIS_OS_NAME}-${TRAVIS_TAG}"
- rust: beta
env: FEATURES="--features travis-beta" KCOV_FEATURES="" TARGETS="-p ethash -p ethcore-util -p ethcore -p ethsync -p ethcore-rpc -p parity" ARCHIVE_SUFFIX="-${TRAVIS_OS_NAME}-${TRAVIS_TAG}"
- rust: nightly
@ -52,7 +54,7 @@ after_success: |
./kcov-master/tmp/usr/local/bin/kcov --coveralls-id=${TRAVIS_JOB_ID} --exclude-pattern /usr/,/.cargo,/root/.multirust target/kcov target/debug/parity-* &&
[ $TRAVIS_BRANCH = master ] &&
[ $TRAVIS_PULL_REQUEST = false ] &&
[ $TRAVIS_RUST_VERSION = beta ] &&
[ $TRAVIS_RUST_VERSION = stable ] &&
cargo doc --no-deps --verbose ${KCOV_FEATURES} ${TARGETS} &&
echo '<meta http-equiv=refresh content=0;url=ethcore/index.html>' > target/doc/index.html &&
pip install --user ghp-import &&

19
Cargo.lock generated
View File

@ -146,14 +146,6 @@ dependencies = [
"libc 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "deque"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "docopt"
version = "0.6.78"
@ -293,7 +285,6 @@ dependencies = [
"heapsize 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)",
"rayon 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"rustc-serialize 0.3.18 (registry+https://github.com/rust-lang/crates.io-index)",
"time 0.1.34 (registry+https://github.com/rust-lang/crates.io-index)",
]
@ -664,16 +655,6 @@ dependencies = [
"libc 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "rayon"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"deque 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"num_cpus 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "regex"
version = "0.1.54"

View File

@ -220,8 +220,8 @@ impl<'x> OpenBlock<'x> {
/// NOTE Will check chain constraints and the uncle number but will NOT check
/// that the header itself is actually valid.
pub fn push_uncle(&mut self, valid_uncle_header: Header) -> Result<(), BlockError> {
if self.block.base.uncles.len() >= self.engine.maximum_uncle_count() {
return Err(BlockError::TooManyUncles(OutOfBounds{min: None, max: Some(self.engine.maximum_uncle_count()), found: self.block.base.uncles.len()}));
if self.block.base.uncles.len() + 1 > self.engine.maximum_uncle_count() {
return Err(BlockError::TooManyUncles(OutOfBounds{min: None, max: Some(self.engine.maximum_uncle_count()), found: self.block.base.uncles.len() + 1}));
}
// TODO: check number
// TODO: check not a direct ancestor (use last_hashes for that)

View File

@ -78,7 +78,7 @@ pub trait BlockProvider {
}
/// Get a list of uncles for a given block.
/// Returns None if block deos not exist.
/// Returns None if block does not exist.
fn uncles(&self, hash: &H256) -> Option<Vec<Header>> {
self.block(hash).map(|bytes| BlockView::new(&bytes).uncles())
}
@ -227,6 +227,24 @@ impl BlockProvider for BlockChain {
const COLLECTION_QUEUE_SIZE: usize = 8;
pub struct AncestryIter<'a> {
current: H256,
chain: &'a BlockChain,
}
impl<'a> Iterator for AncestryIter<'a> {
type Item = H256;
fn next(&mut self) -> Option<H256> {
if self.current.is_zero() {
Option::None
} else {
let mut n = self.chain.block_details(&self.current).unwrap().parent;
mem::swap(&mut self.current, &mut n);
Some(n)
}
}
}
impl BlockChain {
/// Create new instance of blockchain from given Genesis
pub fn new(config: BlockChainConfig, genesis: &[u8], path: &Path) -> BlockChain {
@ -474,10 +492,35 @@ impl BlockChain {
self.extras_db.write(batch).unwrap();
}
/// Given a block's `parent`, find every block header which represents a valid uncle.
pub fn find_uncle_headers(&self, _parent: &H256) -> Vec<Header> {
// TODO
Vec::new()
/// Iterator that lists `first` and then all of `first`'s ancestors, by hash.
pub fn ancestry_iter(&self, first: H256) -> Option<AncestryIter> {
if self.is_known(&first) {
Some(AncestryIter {
current: first,
chain: &self,
})
} else {
None
}
}
/// Given a block's `parent`, find every block header which represents a valid possible uncle.
pub fn find_uncle_headers(&self, parent: &H256, uncle_generations: usize) -> Option<Vec<Header>> {
if !self.is_known(parent) { return None; }
let mut excluded = HashSet::new();
for a in self.ancestry_iter(parent.clone()).unwrap().take(uncle_generations) {
excluded.extend(self.uncle_hashes(&a).unwrap().into_iter());
excluded.insert(a);
}
let mut ret = Vec::new();
for a in self.ancestry_iter(parent.clone()).unwrap().skip(1).take(uncle_generations) {
ret.extend(self.block_details(&a).unwrap().children.iter()
.filter_map(|h| if excluded.contains(h) { None } else { self.block_header(h) })
);
}
Some(ret)
}
/// Get inserted block info which is critical to preapre extras updates.
@ -818,6 +861,66 @@ mod tests {
assert_eq!(bc.block_hash(2), None);
}
#[test]
fn check_ancestry_iter() {
let mut canon_chain = ChainGenerator::default();
let mut finalizer = BlockFinalizer::default();
let genesis = canon_chain.generate(&mut finalizer).unwrap();
let genesis_hash = BlockView::new(&genesis).header_view().sha3();
let temp = RandomTempPath::new();
let bc = BlockChain::new(BlockChainConfig::default(), &genesis, temp.as_path());
let mut block_hashes = vec![genesis_hash.clone()];
for _ in 0..10 {
let block = canon_chain.generate(&mut finalizer).unwrap();
block_hashes.push(BlockView::new(&block).header_view().sha3());
bc.insert_block(&block, vec![]);
}
block_hashes.reverse();
assert_eq!(bc.ancestry_iter(block_hashes[0].clone()).unwrap().collect::<Vec<_>>(), block_hashes)
}
#[test]
#[cfg_attr(feature="dev", allow(cyclomatic_complexity))]
fn test_find_uncles() {
let mut canon_chain = ChainGenerator::default();
let mut finalizer = BlockFinalizer::default();
let genesis = canon_chain.generate(&mut finalizer).unwrap();
let b1b = canon_chain.fork(1).generate(&mut finalizer.fork()).unwrap();
let b1a = canon_chain.generate(&mut finalizer).unwrap();
let b2b = canon_chain.fork(1).generate(&mut finalizer.fork()).unwrap();
let b2a = canon_chain.generate(&mut finalizer).unwrap();
let b3b = canon_chain.fork(1).generate(&mut finalizer.fork()).unwrap();
let b3a = canon_chain.generate(&mut finalizer).unwrap();
let b4b = canon_chain.fork(1).generate(&mut finalizer.fork()).unwrap();
let b4a = canon_chain.generate(&mut finalizer).unwrap();
let b5b = canon_chain.fork(1).generate(&mut finalizer.fork()).unwrap();
let b5a = canon_chain.generate(&mut finalizer).unwrap();
let temp = RandomTempPath::new();
let bc = BlockChain::new(BlockChainConfig::default(), &genesis, temp.as_path());
bc.insert_block(&b1a, vec![]);
bc.insert_block(&b1b, vec![]);
bc.insert_block(&b2a, vec![]);
bc.insert_block(&b2b, vec![]);
bc.insert_block(&b3a, vec![]);
bc.insert_block(&b3b, vec![]);
bc.insert_block(&b4a, vec![]);
bc.insert_block(&b4b, vec![]);
bc.insert_block(&b5a, vec![]);
bc.insert_block(&b5b, vec![]);
assert_eq!(
[&b4b, &b3b, &b2b].iter().map(|b| BlockView::new(b).header()).collect::<Vec<_>>(),
bc.find_uncle_headers(&BlockView::new(&b4a).header_view().sha3(), 3).unwrap()
);
// TODO: insert block that already includes one of them as an uncle to check it's not allowed.
}
#[test]
#[cfg_attr(feature="dev", allow(cyclomatic_complexity))]
fn test_small_fork() {

View File

@ -17,6 +17,7 @@
//! Blockchain database client.
use std::marker::PhantomData;
use std::sync::atomic::AtomicBool;
use util::*;
use util::panics::*;
use blockchain::{BlockChain, BlockProvider};
@ -137,9 +138,6 @@ pub trait BlockChainClient : Sync + Send {
/// Get block total difficulty.
fn block_total_difficulty(&self, id: BlockId) -> Option<U256>;
/// Get address nonce.
fn nonce(&self, address: &Address) -> U256;
/// Get block hash.
fn block_hash(&self, id: BlockId) -> Option<H256>;
@ -215,6 +213,7 @@ pub struct Client<V = CanonVerifier> where V: Verifier {
panic_handler: Arc<PanicHandler>,
// for sealing...
sealing_enabled: AtomicBool,
sealing_block: Mutex<Option<ClosedBlock>>,
author: RwLock<Address>,
extra_data: RwLock<Bytes>,
@ -234,7 +233,7 @@ impl Client<CanonVerifier> {
impl<V> Client<V> where V: Verifier {
/// Create a new client with given spec and DB path and custom verifier.
pub fn new_with_verifier(config: ClientConfig, spec: Spec, path: &Path, message_channel: IoChannel<NetSyncMessage> ) -> Result<Arc<Client>, Error> {
pub fn new_with_verifier(config: ClientConfig, spec: Spec, path: &Path, message_channel: IoChannel<NetSyncMessage> ) -> Result<Arc<Client<V>>, Error> {
let mut dir = path.to_path_buf();
dir.push(H64::from(spec.genesis_header().hash()).hex());
//TODO: sec/fat: pruned/full versioning
@ -266,6 +265,7 @@ impl<V> Client<V> where V: Verifier {
report: RwLock::new(Default::default()),
import_lock: Mutex::new(()),
panic_handler: panic_handler,
sealing_enabled: AtomicBool::new(false),
sealing_block: Mutex::new(None),
author: RwLock::new(Address::new()),
extra_data: RwLock::new(Vec::new()),
@ -312,7 +312,7 @@ impl<V> Client<V> where V: Verifier {
}
// Verify Block Family
let verify_family_result = verify_block_family(&header, &block.bytes, engine, self.chain.read().unwrap().deref());
let verify_family_result = V::verify_block_family(&header, &block.bytes, engine, self.chain.read().unwrap().deref());
if let Err(e) = verify_family_result {
warn!(target: "client", "Stage 3 block verification failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e);
return Err(());
@ -365,14 +365,18 @@ impl<V> Client<V> where V: Verifier {
bad_blocks.insert(header.hash());
continue;
}
let closed_block = self.check_and_close_block(&block);
if let Err(_) = closed_block {
bad_blocks.insert(header.hash());
break;
}
// Insert block
let closed_block = closed_block.unwrap();
self.chain.write().unwrap().insert_block(&block.bytes, closed_block.block().receipts().clone());
good_blocks.push(header.hash());
// Are we committing an era?
let ancient = if header.number() >= HISTORY {
let n = header.number() - HISTORY;
let chain = self.chain.read().unwrap();
@ -382,16 +386,10 @@ impl<V> Client<V> where V: Verifier {
};
// Commit results
let closed_block = closed_block.unwrap();
let receipts = closed_block.block().receipts().clone();
closed_block.drain()
.commit(header.number(), &header.hash(), ancient)
.expect("State DB commit failed.");
// And update the chain
self.chain.write().unwrap()
.insert_block(&block.bytes, receipts);
self.report.write().unwrap().accrue_block(&block);
trace!(target: "client", "Imported #{} ({})", header.number(), header.hash());
}
@ -410,12 +408,12 @@ impl<V> Client<V> where V: Verifier {
if !good_blocks.is_empty() && block_queue.queue_info().is_empty() {
io.send(NetworkIoMessage::User(SyncMessage::NewChainBlocks {
good: good_blocks,
retracted: bad_blocks,
bad: bad_blocks,
})).unwrap();
}
}
if self.chain_info().best_block_hash != original_best {
if self.chain_info().best_block_hash != original_best && self.sealing_enabled.load(atomic::Ordering::Relaxed) {
self.prepare_sealing();
}
@ -498,7 +496,7 @@ impl<V> Client<V> where V: Verifier {
self.extra_data()
);
self.chain.read().unwrap().find_uncle_headers(&h).into_iter().foreach(|h| { b.push_uncle(h).unwrap(); });
self.chain.read().unwrap().find_uncle_headers(&h, self.engine.deref().deref().maximum_uncle_age()).unwrap().into_iter().take(self.engine.deref().deref().maximum_uncle_count()).foreach(|h| { b.push_uncle(h).unwrap(); });
// TODO: push transactions.
@ -510,6 +508,8 @@ impl<V> Client<V> where V: Verifier {
/// Grab the `ClosedBlock` that we want to be sealed. Comes as a mutex that you have to lock.
pub fn sealing_block(&self) -> &Mutex<Option<ClosedBlock>> {
if self.sealing_block.lock().unwrap().is_none() {
self.sealing_enabled.store(true, atomic::Ordering::Relaxed);
// TODO: Above should be on a timer that resets after two blocks have arrived without being asked for.
self.prepare_sealing();
}
&self.sealing_block
@ -581,10 +581,6 @@ impl<V> BlockChainClient for Client<V> where V: Verifier {
Self::block_hash(&chain, id).and_then(|hash| chain.block_details(&hash)).map(|d| d.total_difficulty)
}
fn nonce(&self, address: &Address) -> U256 {
self.state().nonce(address)
}
fn block_hash(&self, id: BlockId) -> Option<H256> {
let chain = self.chain.read().unwrap();
Self::block_hash(&chain, id)

View File

@ -47,6 +47,8 @@ pub trait Engine : Sync + Send {
fn maximum_extra_data_size(&self) -> usize { decode(&self.spec().engine_params.get("maximumExtraDataSize").unwrap()) }
/// Maximum number of uncles a block is allowed to declare.
fn maximum_uncle_count(&self) -> usize { 2 }
/// The number of generations back that uncles can be.
fn maximum_uncle_age(&self) -> usize { 6 }
/// The nonce with which accounts begin.
fn account_start_nonce(&self) -> U256 { decode(&self.spec().engine_params.get("accountStartNonce").unwrap()) }

View File

@ -29,7 +29,7 @@ pub type BlockNumber = u64;
/// which is non-specific.
///
/// Doesn't do all that much on its own.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Eq)]
pub struct Header {
// TODO: make all private.
/// Parent hash.
@ -70,6 +70,25 @@ pub struct Header {
pub bare_hash: RefCell<Option<H256>>,
}
impl PartialEq for Header {
fn eq(&self, c: &Header) -> bool {
self.parent_hash == c.parent_hash &&
self.timestamp == c.timestamp &&
self.number == c.number &&
self.author == c.author &&
self.transactions_root == c.transactions_root &&
self.uncles_hash == c.uncles_hash &&
self.extra_data == c.extra_data &&
self.state_root == c.state_root &&
self.receipts_root == c.receipts_root &&
self.log_bloom == c.log_bloom &&
self.gas_used == c.gas_used &&
self.gas_limit == c.gas_limit &&
self.difficulty == c.difficulty &&
self.seal == c.seal
}
}
impl Default for Header {
fn default() -> Self {
Header {

View File

@ -30,7 +30,7 @@ pub enum SyncMessage {
/// Hashes of blocks imported to blockchain
good: Vec<H256>,
/// Hashes of blocks not imported to blockchain
retracted: Vec<H256>,
bad: Vec<H256>,
},
/// A block is ready
BlockVerified,

View File

@ -14,6 +14,8 @@
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use blockchain::BlockProvider;
use engine::Engine;
use error::Error;
use header::Header;
use super::Verifier;
@ -22,6 +24,10 @@ use super::verification;
pub struct CanonVerifier;
impl Verifier for CanonVerifier {
fn verify_block_family(header: &Header, bytes: &[u8], engine: &Engine, bc: &BlockProvider) -> Result<(), Error> {
verification::verify_block_family(header, bytes, engine, bc)
}
fn verify_block_final(expected: &Header, got: &Header) -> Result<(), Error> {
verification::verify_block_final(expected, got)
}

View File

@ -14,6 +14,8 @@
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use blockchain::BlockProvider;
use engine::Engine;
use error::Error;
use header::Header;
use super::Verifier;
@ -21,6 +23,10 @@ use super::Verifier;
pub struct NoopVerifier;
impl Verifier for NoopVerifier {
fn verify_block_family(_header: &Header, _bytes: &[u8], _engine: &Engine, _bc: &BlockProvider) -> Result<(), Error> {
Ok(())
}
fn verify_block_final(_expected: &Header, _got: &Header) -> Result<(), Error> {
Ok(())
}

View File

@ -78,7 +78,7 @@ pub fn verify_block_unordered(header: Header, bytes: Bytes, engine: &Engine) ->
}
/// Phase 3 verification. Check block information against parent and uncles.
pub fn verify_block_family<BC>(header: &Header, bytes: &[u8], engine: &Engine, bc: &BC) -> Result<(), Error> where BC: BlockProvider {
pub fn verify_block_family(header: &Header, bytes: &[u8], engine: &Engine, bc: &BlockProvider) -> Result<(), Error> {
// TODO: verify timestamp
let parent = try!(bc.block_header(&header.parent_hash).ok_or_else(|| Error::from(BlockError::UnknownParent(header.parent_hash.clone()))));
try!(verify_parent(&header, &parent));
@ -94,7 +94,7 @@ pub fn verify_block_family<BC>(header: &Header, bytes: &[u8], engine: &Engine, b
excluded.insert(header.hash());
let mut hash = header.parent_hash.clone();
excluded.insert(hash.clone());
for _ in 0..6 {
for _ in 0..engine.maximum_uncle_age() {
match bc.block_details(&hash) {
Some(details) => {
excluded.insert(details.parent.clone());
@ -121,7 +121,7 @@ pub fn verify_block_family<BC>(header: &Header, bytes: &[u8], engine: &Engine, b
// (8 Invalid)
let depth = if header.number > uncle.number { header.number - uncle.number } else { 0 };
if depth > 6 {
if depth > engine.maximum_uncle_age() as u64 {
return Err(From::from(BlockError::UncleTooOld(OutOfBounds { min: Some(header.number - depth), max: Some(header.number - 1), found: uncle.number })));
}
else if depth < 1 {

View File

@ -14,10 +14,13 @@
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use blockchain::BlockProvider;
use engine::Engine;
use error::Error;
use header::Header;
/// Should be used to verify blocks.
pub trait Verifier: Send + Sync {
fn verify_block_family(header: &Header, bytes: &[u8], engine: &Engine, bc: &BlockProvider) -> Result<(), Error>;
fn verify_block_final(expected: &Header, got: &Header) -> Result<(), Error>;
}

View File

@ -17,7 +17,6 @@ time = "0.1.34"
rand = "0.3.13"
heapsize = "0.3"
rustc-serialize = "0.3"
rayon = "0.3.1"
[features]
default = []

View File

@ -30,17 +30,14 @@
///
use util::*;
use rayon::prelude::*;
use std::mem::{replace};
use ethcore::views::{HeaderView, BlockView};
use ethcore::views::{HeaderView};
use ethcore::header::{BlockNumber, Header as BlockHeader};
use ethcore::client::{BlockChainClient, BlockStatus, BlockId, BlockChainInfo};
use range_collection::{RangeCollection, ToUsize, FromUsize};
use ethcore::error::*;
use ethcore::block::Block;
use ethcore::transaction::SignedTransaction;
use io::SyncIo;
use transaction_queue::TransactionQueue;
use time;
use super::SyncConfig;
@ -212,8 +209,6 @@ pub struct ChainSync {
max_download_ahead_blocks: usize,
/// Network ID
network_id: U256,
/// Transactions Queue
transaction_queue: Mutex<TransactionQueue>,
}
type RlpResponseResult = Result<Option<(PacketId, RlpStream)>, PacketDecodeError>;
@ -239,7 +234,6 @@ impl ChainSync {
last_send_block_number: 0,
max_download_ahead_blocks: max(MAX_HEADERS_TO_REQUEST, config.max_download_ahead_blocks),
network_id: config.network_id,
transaction_queue: Mutex::new(TransactionQueue::new()),
}
}
@ -298,7 +292,6 @@ impl ChainSync {
self.starting_block = 0;
self.highest_block = None;
self.have_common_block = false;
self.transaction_queue.lock().unwrap().clear();
self.starting_block = io.chain().chain_info().best_block_number;
self.state = SyncState::NotSynced;
}
@ -491,7 +484,7 @@ impl ChainSync {
trace!(target: "sync", "New block already queued {:?}", h);
},
Ok(_) => {
if self.current_base_block() < header.number {
if self.current_base_block() < header.number {
self.last_imported_block = Some(header.number);
self.remove_downloaded_blocks(header.number);
}
@ -928,16 +921,8 @@ impl ChainSync {
}
}
/// Called when peer sends us new transactions
fn on_peer_transactions(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> {
let chain = io.chain();
let item_count = r.item_count();
trace!(target: "sync", "{} -> Transactions ({} entries)", peer_id, item_count);
let fetch_latest_nonce = |a : &Address| chain.nonce(a);
for i in 0..item_count {
let tx: SignedTransaction = try!(r.val_at(i));
self.transaction_queue.lock().unwrap().add(tx, &fetch_latest_nonce);
}
Ok(())
fn on_peer_transactions(&mut self, _io: &mut SyncIo, _peer_id: PeerId, _r: &UntrustedRlp) -> Result<(), PacketDecodeError> {
Ok(())
}
/// Send Status message
@ -1263,37 +1248,6 @@ impl ChainSync {
}
self.last_send_block_number = chain.best_block_number;
}
/// called when block is imported to chain, updates transactions queue
pub fn chain_new_blocks(&mut self, io: &SyncIo, good: &[H256], retracted: &[H256]) {
fn fetch_transactions(chain: &BlockChainClient, hash: &H256) -> Vec<SignedTransaction> {
let block = chain
.block(BlockId::Hash(hash.clone()))
// Client should send message after commit to db and inserting to chain.
.expect("Expected in-chain blocks.");
let block = BlockView::new(&block);
block.transactions()
}
let chain = io.chain();
let good = good.par_iter().map(|h| fetch_transactions(chain, h));
let retracted = retracted.par_iter().map(|h| fetch_transactions(chain, h));
good.for_each(|txs| {
let mut transaction_queue = self.transaction_queue.lock().unwrap();
let hashes = txs.iter().map(|tx| tx.hash()).collect::<Vec<H256>>();
transaction_queue.remove_all(&hashes, |a| chain.nonce(a));
});
retracted.for_each(|txs| {
// populate sender
for tx in &txs {
let _sender = tx.sender();
}
let mut transaction_queue = self.transaction_queue.lock().unwrap();
transaction_queue.add_all(txs, |a| chain.nonce(a));
});
}
}
#[cfg(test)]
@ -1434,7 +1388,7 @@ mod tests {
#[test]
fn finds_lagging_peers() {
let mut client = TestBlockChainClient::new();
client.add_blocks(100, EachBlockWith::Uncle);
client.add_blocks(100, false);
let mut queue = VecDeque::new();
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(10));
let chain_info = client.chain_info();
@ -1448,7 +1402,7 @@ mod tests {
#[test]
fn calculates_tree_for_lagging_peer() {
let mut client = TestBlockChainClient::new();
client.add_blocks(15, EachBlockWith::Uncle);
client.add_blocks(15, false);
let start = client.block_hash_delta_minus(4);
let end = client.block_hash_delta_minus(2);
@ -1465,7 +1419,7 @@ mod tests {
#[test]
fn sends_new_hashes_to_lagging_peer() {
let mut client = TestBlockChainClient::new();
client.add_blocks(100, EachBlockWith::Uncle);
client.add_blocks(100, false);
let mut queue = VecDeque::new();
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5));
let chain_info = client.chain_info();
@ -1484,7 +1438,7 @@ mod tests {
#[test]
fn sends_latest_block_to_lagging_peer() {
let mut client = TestBlockChainClient::new();
client.add_blocks(100, EachBlockWith::Uncle);
client.add_blocks(100, false);
let mut queue = VecDeque::new();
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5));
let chain_info = client.chain_info();
@ -1502,7 +1456,7 @@ mod tests {
#[test]
fn handles_peer_new_block_mallformed() {
let mut client = TestBlockChainClient::new();
client.add_blocks(10, EachBlockWith::Uncle);
client.add_blocks(10, false);
let block_data = get_dummy_block(11, client.chain_info().best_block_hash);
@ -1520,7 +1474,7 @@ mod tests {
#[test]
fn handles_peer_new_block() {
let mut client = TestBlockChainClient::new();
client.add_blocks(10, EachBlockWith::Uncle);
client.add_blocks(10, false);
let block_data = get_dummy_blocks(11, client.chain_info().best_block_hash);
@ -1538,7 +1492,7 @@ mod tests {
#[test]
fn handles_peer_new_block_empty() {
let mut client = TestBlockChainClient::new();
client.add_blocks(10, EachBlockWith::Uncle);
client.add_blocks(10, false);
let mut queue = VecDeque::new();
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5));
let mut io = TestIo::new(&mut client, &mut queue, None);
@ -1554,7 +1508,7 @@ mod tests {
#[test]
fn handles_peer_new_hashes() {
let mut client = TestBlockChainClient::new();
client.add_blocks(10, EachBlockWith::Uncle);
client.add_blocks(10, false);
let mut queue = VecDeque::new();
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5));
let mut io = TestIo::new(&mut client, &mut queue, None);
@ -1570,7 +1524,7 @@ mod tests {
#[test]
fn handles_peer_new_hashes_empty() {
let mut client = TestBlockChainClient::new();
client.add_blocks(10, EachBlockWith::Uncle);
client.add_blocks(10, false);
let mut queue = VecDeque::new();
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5));
let mut io = TestIo::new(&mut client, &mut queue, None);
@ -1588,7 +1542,7 @@ mod tests {
#[test]
fn hashes_rlp_mutually_acceptable() {
let mut client = TestBlockChainClient::new();
client.add_blocks(100, EachBlockWith::Uncle);
client.add_blocks(100, false);
let mut queue = VecDeque::new();
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5));
let chain_info = client.chain_info();
@ -1606,7 +1560,7 @@ mod tests {
#[test]
fn block_rlp_mutually_acceptable() {
let mut client = TestBlockChainClient::new();
client.add_blocks(100, EachBlockWith::Uncle);
client.add_blocks(100, false);
let mut queue = VecDeque::new();
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5));
let chain_info = client.chain_info();
@ -1619,37 +1573,10 @@ mod tests {
assert!(result.is_ok());
}
#[test]
fn should_add_transactions_to_queue() {
// given
let mut client = TestBlockChainClient::new();
client.add_blocks(98, EachBlockWith::Uncle);
client.add_blocks(1, EachBlockWith::UncleAndTransaction);
client.add_blocks(1, EachBlockWith::Transaction);
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5));
let good_blocks = vec![client.block_hash_delta_minus(2)];
let retracted_blocks = vec![client.block_hash_delta_minus(1)];
let mut queue = VecDeque::new();
let io = TestIo::new(&mut client, &mut queue, None);
// when
sync.chain_new_blocks(&io, &[], &good_blocks);
assert_eq!(sync.transaction_queue.lock().unwrap().status().future, 0);
assert_eq!(sync.transaction_queue.lock().unwrap().status().pending, 1);
sync.chain_new_blocks(&io, &good_blocks, &retracted_blocks);
// then
let status = sync.transaction_queue.lock().unwrap().status();
assert_eq!(status.pending, 1);
assert_eq!(status.future, 0);
}
#[test]
fn returns_requested_block_headers() {
let mut client = TestBlockChainClient::new();
client.add_blocks(100, EachBlockWith::Uncle);
client.add_blocks(100, false);
let mut queue = VecDeque::new();
let io = TestIo::new(&mut client, &mut queue, None);
@ -1673,7 +1600,7 @@ mod tests {
#[test]
fn returns_requested_block_headers_reverse() {
let mut client = TestBlockChainClient::new();
client.add_blocks(100, EachBlockWith::Uncle);
client.add_blocks(100, false);
let mut queue = VecDeque::new();
let io = TestIo::new(&mut client, &mut queue, None);

View File

@ -54,7 +54,6 @@ extern crate ethcore;
extern crate env_logger;
extern crate time;
extern crate rand;
extern crate rayon;
#[macro_use]
extern crate heapsize;
@ -71,7 +70,8 @@ use io::NetSyncIo;
mod chain;
mod io;
mod range_collection;
mod transaction_queue;
// TODO [todr] Made public to suppress dead code warnings
pub mod transaction_queue;
#[cfg(test)]
mod tests;
@ -153,14 +153,8 @@ impl NetworkProtocolHandler<SyncMessage> for EthSync {
}
fn message(&self, io: &NetworkContext<SyncMessage>, message: &SyncMessage) {
match *message {
SyncMessage::BlockVerified => {
self.sync.write().unwrap().chain_blocks_verified(&mut NetSyncIo::new(io, self.chain.deref()));
},
SyncMessage::NewChainBlocks { ref good, ref retracted } => {
let sync_io = NetSyncIo::new(io, self.chain.deref());
self.sync.write().unwrap().chain_new_blocks(&sync_io, good, retracted);
}
if let SyncMessage::BlockVerified = *message {
self.sync.write().unwrap().chain_blocks_verified(&mut NetSyncIo::new(io, self.chain.deref()));
}
}
}

View File

@ -24,8 +24,8 @@ use super::helpers::*;
fn two_peers() {
::env_logger::init().ok();
let mut net = TestNet::new(3);
net.peer_mut(1).chain.add_blocks(1000, EachBlockWith::Uncle);
net.peer_mut(2).chain.add_blocks(1000, EachBlockWith::Uncle);
net.peer_mut(1).chain.add_blocks(1000, false);
net.peer_mut(2).chain.add_blocks(1000, false);
net.sync();
assert!(net.peer(0).chain.block(BlockId::Number(1000)).is_some());
assert_eq!(net.peer(0).chain.blocks.read().unwrap().deref(), net.peer(1).chain.blocks.read().unwrap().deref());
@ -35,8 +35,8 @@ fn two_peers() {
fn status_after_sync() {
::env_logger::init().ok();
let mut net = TestNet::new(3);
net.peer_mut(1).chain.add_blocks(1000, EachBlockWith::Uncle);
net.peer_mut(2).chain.add_blocks(1000, EachBlockWith::Uncle);
net.peer_mut(1).chain.add_blocks(1000, false);
net.peer_mut(2).chain.add_blocks(1000, false);
net.sync();
let status = net.peer(0).sync.status();
assert_eq!(status.state, SyncState::Idle);
@ -45,8 +45,8 @@ fn status_after_sync() {
#[test]
fn takes_few_steps() {
let mut net = TestNet::new(3);
net.peer_mut(1).chain.add_blocks(100, EachBlockWith::Uncle);
net.peer_mut(2).chain.add_blocks(100, EachBlockWith::Uncle);
net.peer_mut(1).chain.add_blocks(100, false);
net.peer_mut(2).chain.add_blocks(100, false);
let total_steps = net.sync();
assert!(total_steps < 7);
}
@ -56,9 +56,8 @@ fn empty_blocks() {
::env_logger::init().ok();
let mut net = TestNet::new(3);
for n in 0..200 {
let with = if n % 2 == 0 { EachBlockWith::Nothing } else { EachBlockWith::Uncle };
net.peer_mut(1).chain.add_blocks(5, with.clone());
net.peer_mut(2).chain.add_blocks(5, with);
net.peer_mut(1).chain.add_blocks(5, n % 2 == 0);
net.peer_mut(2).chain.add_blocks(5, n % 2 == 0);
}
net.sync();
assert!(net.peer(0).chain.block(BlockId::Number(1000)).is_some());
@ -69,14 +68,14 @@ fn empty_blocks() {
fn forked() {
::env_logger::init().ok();
let mut net = TestNet::new(3);
net.peer_mut(0).chain.add_blocks(300, EachBlockWith::Uncle);
net.peer_mut(1).chain.add_blocks(300, EachBlockWith::Uncle);
net.peer_mut(2).chain.add_blocks(300, EachBlockWith::Uncle);
net.peer_mut(0).chain.add_blocks(100, EachBlockWith::Nothing); //fork
net.peer_mut(1).chain.add_blocks(200, EachBlockWith::Uncle);
net.peer_mut(2).chain.add_blocks(200, EachBlockWith::Uncle);
net.peer_mut(1).chain.add_blocks(100, EachBlockWith::Uncle); //fork between 1 and 2
net.peer_mut(2).chain.add_blocks(10, EachBlockWith::Nothing);
net.peer_mut(0).chain.add_blocks(300, false);
net.peer_mut(1).chain.add_blocks(300, false);
net.peer_mut(2).chain.add_blocks(300, false);
net.peer_mut(0).chain.add_blocks(100, true); //fork
net.peer_mut(1).chain.add_blocks(200, false);
net.peer_mut(2).chain.add_blocks(200, false);
net.peer_mut(1).chain.add_blocks(100, false); //fork between 1 and 2
net.peer_mut(2).chain.add_blocks(10, true);
// peer 1 has the best chain of 601 blocks
let peer1_chain = net.peer(1).chain.numbers.read().unwrap().clone();
net.sync();
@ -88,8 +87,8 @@ fn forked() {
#[test]
fn restart() {
let mut net = TestNet::new(3);
net.peer_mut(1).chain.add_blocks(1000, EachBlockWith::Uncle);
net.peer_mut(2).chain.add_blocks(1000, EachBlockWith::Uncle);
net.peer_mut(1).chain.add_blocks(1000, false);
net.peer_mut(2).chain.add_blocks(1000, false);
net.sync_steps(8);
@ -110,8 +109,8 @@ fn status_empty() {
#[test]
fn status_packet() {
let mut net = TestNet::new(2);
net.peer_mut(0).chain.add_blocks(100, EachBlockWith::Uncle);
net.peer_mut(1).chain.add_blocks(1, EachBlockWith::Uncle);
net.peer_mut(0).chain.add_blocks(100, false);
net.peer_mut(1).chain.add_blocks(1, false);
net.start();
@ -124,10 +123,10 @@ fn status_packet() {
#[test]
fn propagate_hashes() {
let mut net = TestNet::new(6);
net.peer_mut(1).chain.add_blocks(10, EachBlockWith::Uncle);
net.peer_mut(1).chain.add_blocks(10, false);
net.sync();
net.peer_mut(0).chain.add_blocks(10, EachBlockWith::Uncle);
net.peer_mut(0).chain.add_blocks(10, false);
net.sync();
net.trigger_block_verified(0); //first event just sets the marker
net.trigger_block_verified(0);
@ -150,10 +149,10 @@ fn propagate_hashes() {
#[test]
fn propagate_blocks() {
let mut net = TestNet::new(2);
net.peer_mut(1).chain.add_blocks(10, EachBlockWith::Uncle);
net.peer_mut(1).chain.add_blocks(10, false);
net.sync();
net.peer_mut(0).chain.add_blocks(10, EachBlockWith::Uncle);
net.peer_mut(0).chain.add_blocks(10, false);
net.trigger_block_verified(0); //first event just sets the marker
net.trigger_block_verified(0);
@ -165,7 +164,7 @@ fn propagate_blocks() {
#[test]
fn restart_on_malformed_block() {
let mut net = TestNet::new(2);
net.peer_mut(1).chain.add_blocks(10, EachBlockWith::Uncle);
net.peer_mut(1).chain.add_blocks(10, false);
net.peer_mut(1).chain.corrupt_block(6);
net.sync_steps(10);

View File

@ -22,7 +22,7 @@ use io::SyncIo;
use chain::ChainSync;
use ::SyncConfig;
use ethcore::receipt::Receipt;
use ethcore::transaction::{LocalizedTransaction, Transaction, Action};
use ethcore::transaction::LocalizedTransaction;
use ethcore::filter::Filter;
use ethcore::log_entry::LocalizedLogEntry;
@ -34,14 +34,6 @@ pub struct TestBlockChainClient {
pub difficulty: RwLock<U256>,
}
#[derive(Clone)]
pub enum EachBlockWith {
Nothing,
Uncle,
Transaction,
UncleAndTransaction
}
impl TestBlockChainClient {
pub fn new() -> TestBlockChainClient {
@ -52,53 +44,30 @@ impl TestBlockChainClient {
last_hash: RwLock::new(H256::new()),
difficulty: RwLock::new(From::from(0)),
};
client.add_blocks(1, EachBlockWith::Nothing); // add genesis block
client.add_blocks(1, true); // add genesis block
client.genesis_hash = client.last_hash.read().unwrap().clone();
client
}
pub fn add_blocks(&mut self, count: usize, with: EachBlockWith) {
pub fn add_blocks(&mut self, count: usize, empty: bool) {
let len = self.numbers.read().unwrap().len();
for n in len..(len + count) {
let mut header = BlockHeader::new();
header.difficulty = From::from(n);
header.parent_hash = self.last_hash.read().unwrap().clone();
header.number = n as BlockNumber;
let uncles = match with {
EachBlockWith::Uncle | EachBlockWith::UncleAndTransaction => {
let mut uncles = RlpStream::new_list(1);
let mut uncle_header = BlockHeader::new();
uncle_header.difficulty = From::from(n);
uncle_header.parent_hash = self.last_hash.read().unwrap().clone();
uncle_header.number = n as BlockNumber;
uncles.append(&uncle_header);
header.uncles_hash = uncles.as_raw().sha3();
uncles
},
_ => RlpStream::new_list(0)
};
let txs = match with {
EachBlockWith::Transaction | EachBlockWith::UncleAndTransaction => {
let mut txs = RlpStream::new_list(1);
let keypair = KeyPair::create().unwrap();
let tx = Transaction {
action: Action::Create,
value: U256::from(100),
data: "3331600055".from_hex().unwrap(),
gas: U256::from(100_000),
gas_price: U256::one(),
nonce: U256::zero()
};
let signed_tx = tx.sign(&keypair.secret());
txs.append(&signed_tx);
txs.out()
},
_ => rlp::NULL_RLP.to_vec()
};
let mut uncles = RlpStream::new_list(if empty {0} else {1});
if !empty {
let mut uncle_header = BlockHeader::new();
uncle_header.difficulty = From::from(n);
uncle_header.parent_hash = self.last_hash.read().unwrap().clone();
uncle_header.number = n as BlockNumber;
uncles.append(&uncle_header);
header.uncles_hash = uncles.as_raw().sha3();
}
let mut rlp = RlpStream::new_list(3);
rlp.append(&header);
rlp.append_raw(&txs, 1);
rlp.append_raw(&rlp::NULL_RLP, 1);
rlp.append_raw(uncles.as_raw(), 1);
self.import_block(rlp.as_raw().to_vec()).unwrap();
}
@ -140,10 +109,6 @@ impl BlockChainClient for TestBlockChainClient {
unimplemented!();
}
fn nonce(&self, _address: &Address) -> U256 {
U256::zero()
}
fn code(&self, _address: &Address) -> Option<Bytes> {
unimplemented!();
}

View File

@ -221,19 +221,19 @@ impl TransactionQueue {
/// Removes all transactions identified by hashes given in slice
///
/// If gap is introduced marks subsequent transactions as future
pub fn remove_all<T>(&mut self, transaction_hashes: &[H256], fetch_nonce: T)
pub fn remove_all<T>(&mut self, txs: &[H256], fetch_nonce: T)
where T: Fn(&Address) -> U256 {
for hash in transaction_hashes {
self.remove(&hash, &fetch_nonce);
for tx in txs {
self.remove(&tx, &fetch_nonce);
}
}
/// Removes transaction identified by hashes from queue.
///
/// If gap is introduced marks subsequent transactions as future
pub fn remove<T>(&mut self, transaction_hash: &H256, fetch_nonce: &T)
pub fn remove<T>(&mut self, hash: &H256, fetch_nonce: &T)
where T: Fn(&Address) -> U256 {
let transaction = self.by_hash.remove(transaction_hash);
let transaction = self.by_hash.remove(hash);
if transaction.is_none() {
// We don't know this transaction
return;
@ -244,6 +244,7 @@ impl TransactionQueue {
let nonce = transaction.nonce();
let current_nonce = fetch_nonce(&sender);
println!("Removing tx: {:?}", transaction.transaction);
// Remove from future
let order = self.future.drop(&sender, &nonce);
if order.is_some() {
@ -291,12 +292,19 @@ impl TransactionQueue {
// Goes to future or is removed
let order = self.current.drop(&sender, &k).unwrap();
if k >= current_nonce {
println!("Moving to future: {:?}", order);
self.future.insert(sender.clone(), k, order.update_height(k, current_nonce));
} else {
self.by_hash.remove(&order.hash);
}
}
self.future.enforce_limit(&mut self.by_hash);
// And now lets check if there is some chain of transactions in future
// that should be placed in current
if let Some(new_current_top) = self.move_future_txs(sender.clone(), current_nonce - U256::one(), current_nonce) {
self.last_nonces.insert(sender, new_current_top);
}
}
@ -329,6 +337,7 @@ impl TransactionQueue {
// remove also from priority and hash
self.future.by_priority.remove(&order);
// Put to current
println!("Moved: {:?}", order);
let order = order.update_height(current_nonce.clone(), first_nonce);
self.current.insert(address.clone(), current_nonce, order);
current_nonce = current_nonce + U256::one();
@ -357,6 +366,7 @@ impl TransactionQueue {
.cloned()
.map_or(state_nonce, |n| n + U256::one());
println!("Expected next: {:?}, got: {:?}", next_nonce, nonce);
// Check height
if nonce > next_nonce {
// We have a gap - put to future
@ -365,7 +375,6 @@ impl TransactionQueue {
return;
} else if nonce < state_nonce {
// Droping transaction
trace!(target: "sync", "Dropping transaction with nonce: {} - expecting: {}", nonce, next_nonce);
return;
}

View File

@ -28,7 +28,7 @@ extern crate ethcore_util;
extern crate rand;
use test::{Bencher, black_box};
use ethcore_util::uint::*;
use ethcore_util::numbers::*;
#[bench]
fn u256_add(b: &mut Bencher) {

View File

@ -28,7 +28,7 @@ extern crate ethcore_util;
use test::Bencher;
use std::str::FromStr;
use ethcore_util::rlp::*;
use ethcore_util::uint::U256;
use ethcore_util::numbers::U256;
#[bench]
fn bench_stream_u64_value(b: &mut Bencher) {

View File

@ -122,29 +122,29 @@ impl JournalDB {
}
/// Drain the overlay and place it into a batch for the DB.
fn batch_overlay_insertions(overlay: &mut MemoryDB, batch: &DBTransaction) -> (usize, usize) {
let mut ret = 0usize;
fn batch_overlay_insertions(overlay: &mut MemoryDB, batch: &DBTransaction) -> usize {
let mut inserts = 0usize;
let mut deletes = 0usize;
for i in overlay.drain().into_iter() {
let (key, (value, rc)) = i;
if rc > 0 {
assert!(rc == 1);
batch.put(&key.bytes(), &value).expect("Low-level database error. Some issue with your hard disk?");
ret += 1;
inserts += 1;
}
if rc < 0 {
assert!(rc == -1);
ret += 1;
deletes += 1;
}
}
(ret, deletes)
trace!("commit: Inserted {}, Deleted {} nodes", inserts, deletes);
inserts + deletes
}
/// Just commit the overlay into the backing DB.
fn commit_without_counters(&mut self) -> Result<u32, UtilError> {
let batch = DBTransaction::new();
let (ret, _) = Self::batch_overlay_insertions(&mut self.overlay, &batch);
let ret = Self::batch_overlay_insertions(&mut self.overlay, &batch);
try!(self.backing.write(batch));
Ok(ret as u32)
}
@ -183,14 +183,23 @@ impl JournalDB {
let mut index = 0usize;
let mut last;
while try!(self.backing.get({
let mut r = RlpStream::new_list(3);
r.append(&now);
r.append(&index);
r.append(&&PADDING[..]);
last = r.drain();
&last
})).is_some() {
while {
let record = try!(self.backing.get({
let mut r = RlpStream::new_list(3);
r.append(&now);
r.append(&index);
r.append(&&PADDING[..]);
last = r.drain();
&last
}));
match record {
Some(r) => {
assert!(&Rlp::new(&r).val_at::<H256>(0) != id);
true
},
None => false,
}
} {
index += 1;
}
@ -236,6 +245,7 @@ impl JournalDB {
trace!("Purging nodes inserted in non-canon: {:?}", inserts);
to_remove.append(&mut inserts);
}
trace!("commit: Delete journal for time #{}.{}: {}, (canon was {}): {} entries", end_era, index, rlp.val_at::<H256>(0), canon_id, to_remove.len());
try!(batch.delete(&last));
index += 1;
}
@ -243,18 +253,17 @@ impl JournalDB {
let canon_inserts = canon_inserts.drain(..).collect::<HashSet<_>>();
// Purge removed keys if they are not referenced and not re-inserted in the canon commit
let mut deletes = 0;
trace!("Purging filtered nodes: {:?}", to_remove.iter().filter(|h| !counters.contains_key(h) && !canon_inserts.contains(h)).collect::<Vec<_>>());
for h in to_remove.iter().filter(|h| !counters.contains_key(h) && !canon_inserts.contains(h)) {
try!(batch.delete(&h));
deletes += 1;
}
trace!("commit: Delete journal for time #{}.{}, (canon was {}): {} entries", end_era, index, canon_id, deletes);
trace!("Total nodes purged: {}", deletes);
}
// Commit overlay insertions
let (ret, deletes) = Self::batch_overlay_insertions(&mut self.overlay, &batch);
let ret = Self::batch_overlay_insertions(&mut self.overlay, &batch);
try!(self.backing.write(batch));
trace!("commit: Deleted {} nodes", deletes);
Ok(ret as u32)
}

View File

@ -105,7 +105,7 @@ impl SecretStore {
import_path.push(".ethereum");
import_path.push("keystore");
if let Err(e) = geth_import::import_geth_keys(self, &import_path) {
warn!(target: "sstore", "Error retrieving geth keys: {:?}", e)
trace!(target: "sstore", "Geth key not imported: {:?}", e);
}
}