Merge branch 'master' into tx_queue_integration

Conflicts:
	sync/src/transaction_queue.rs
This commit is contained in:
Tomasz Drwięga
2016-03-10 11:23:39 +01:00
46 changed files with 529 additions and 227 deletions

View File

@@ -4,9 +4,13 @@ name = "ethsync"
version = "0.9.99"
license = "GPL-3.0"
authors = ["Ethcore <admin@ethcore.io"]
build = "build.rs"
[lib]
[build-dependencies]
rustc_version = "0.1"
[dependencies]
ethcore-util = { path = "../util" }
ethcore = { path = "../ethcore" }
@@ -21,4 +25,4 @@ rayon = "0.3.1"
[features]
default = []
dev = ["clippy", "ethcore/dev", "ethcore-util/dev"]
dev = ["ethcore/dev", "ethcore-util/dev"]

25
sync/build.rs Normal file
View File

@@ -0,0 +1,25 @@
// Copyright 2015, 2016 Ethcore (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
extern crate rustc_version;
use rustc_version::{version_meta, Channel};
fn main() {
if let Channel::Nightly = version_meta().channel {
println!("cargo:rustc-cfg=nightly");
}
}

View File

@@ -274,7 +274,7 @@ impl ChainSync {
}
#[cfg_attr(feature="dev", allow(for_kv_map))] // Because it's not possible to get `values_mut()`
#[cfg_attr(all(nightly, feature="dev"), allow(for_kv_map))] // Because it's not possible to get `values_mut()`
/// Rest sync. Clear all downloaded data but keep the queue
fn reset(&mut self) {
self.downloading_headers.clear();
@@ -342,7 +342,7 @@ impl ChainSync {
Ok(())
}
#[cfg_attr(feature="dev", allow(cyclomatic_complexity))]
#[cfg_attr(all(nightly, feature="dev"), allow(cyclomatic_complexity))]
/// Called by peer once it has new block headers during sync
fn on_peer_block_headers(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> {
self.reset_peer_asking(peer_id, PeerAsking::BlockHeaders);
@@ -469,6 +469,7 @@ impl ChainSync {
}
/// Called by peer once it has new block bodies
#[cfg_attr(all(nightly, feature="dev"), allow(cyclomatic_complexity))]
fn on_peer_new_block(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> {
let block_rlp = try!(r.at(0));
let header_rlp = try!(block_rlp.at(0));
@@ -850,8 +851,8 @@ impl ChainSync {
self.downloading_bodies.remove(&n);
self.downloading_headers.remove(&n);
}
self.headers.remove_tail(&start);
self.bodies.remove_tail(&start);
self.headers.remove_from(&start);
self.bodies.remove_from(&start);
}
/// Request headers from a peer by block hash
@@ -935,7 +936,7 @@ impl ChainSync {
let mut transaction_queue = self.transaction_queue.lock().unwrap();
for i in 0..item_count {
let tx: SignedTransaction = try!(r.val_at(i));
transaction_queue.add(tx, &fetch_latest_nonce);
let _ = transaction_queue.add(tx, &fetch_latest_nonce);
}
Ok(())
}
@@ -1291,7 +1292,7 @@ impl ChainSync {
let _sender = tx.sender();
}
let mut transaction_queue = self.transaction_queue.lock().unwrap();
transaction_queue.add_all(txs, |a| chain.nonce(a));
let _ = transaction_queue.add_all(txs, |a| chain.nonce(a));
});
}

View File

@@ -15,11 +15,11 @@
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
#![warn(missing_docs)]
#![cfg_attr(feature="dev", feature(plugin))]
#![cfg_attr(feature="dev", plugin(clippy))]
#![cfg_attr(all(nightly, feature="dev"), feature(plugin))]
#![cfg_attr(all(nightly, feature="dev"), plugin(clippy))]
// Keeps consistency (all lines with `.clone()`) and helpful when changing ref to non-ref.
#![cfg_attr(feature="dev", allow(clone_on_copy))]
#![cfg_attr(all(nightly, feature="dev"), allow(clone_on_copy))]
//! Blockchain sync module
//! Implements ethereum protocol version 63 as specified here:

View File

@@ -42,6 +42,8 @@ pub trait RangeCollection<K, V> {
fn remove_head(&mut self, start: &K);
/// Remove all elements >= `start` in the range that contains `start`
fn remove_tail(&mut self, start: &K);
/// Remove all elements >= `start`
fn remove_from(&mut self, start: &K);
/// Remove all elements >= `tail`
fn insert_item(&mut self, key: K, value: V);
/// Get an iterator over ranges
@@ -137,6 +139,28 @@ impl<K, V> RangeCollection<K, V> for Vec<(K, Vec<V>)> where K: Ord + PartialEq +
}
}
/// Remove the element and all following it.
fn remove_from(&mut self, key: &K) {
match self.binary_search_by(|&(k, _)| k.cmp(key).reverse()) {
Ok(index) => { self.drain(.. index + 1); },
Err(index) =>{
let mut empty = false;
match self.get_mut(index) {
Some(&mut (ref k, ref mut v)) if k <= key && (*k + FromUsize::from_usize(v.len())) > *key => {
v.truncate((*key - *k).to_usize());
empty = v.is_empty();
}
_ => {}
}
if empty {
self.drain(.. index + 1);
} else {
self.drain(.. index);
}
},
}
}
/// Remove range elements up to key
fn remove_head(&mut self, key: &K) {
if *key == FromUsize::from_usize(0) {
@@ -207,7 +231,7 @@ impl<K, V> RangeCollection<K, V> for Vec<(K, Vec<V>)> where K: Ord + PartialEq +
}
#[test]
#[cfg_attr(feature="dev", allow(cyclomatic_complexity))]
#[cfg_attr(all(nightly, feature="dev"), allow(cyclomatic_complexity))]
fn test_range() {
use std::cmp::{Ordering};
@@ -272,5 +296,17 @@ fn test_range() {
assert_eq!(r.range_iter().cmp(vec![(2, &['b'][..])]), Ordering::Equal);
r.remove_tail(&2);
assert_eq!(r.range_iter().next(), None);
let mut r = ranges.clone();
r.remove_from(&20);
assert_eq!(r.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..]), (16, &['p', 'q', 'r'][..])]), Ordering::Equal);
r.remove_from(&17);
assert_eq!(r.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..]), (16, &['p'][..])]), Ordering::Equal);
r.remove_from(&15);
assert_eq!(r.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..])]), Ordering::Equal);
r.remove_from(&3);
assert_eq!(r.range_iter().cmp(vec![(2, &['b'][..])]), Ordering::Equal);
r.remove_from(&2);
assert_eq!(r.range_iter().next(), None);
}

View File

@@ -85,6 +85,7 @@ use util::numbers::{Uint, U256};
use util::hash::{Address, H256};
use util::table::*;
use ethcore::transaction::*;
use ethcore::error::Error;
#[derive(Clone, Debug)]
@@ -151,10 +152,11 @@ struct VerifiedTransaction {
transaction: SignedTransaction
}
impl VerifiedTransaction {
fn new(transaction: SignedTransaction) -> Self {
VerifiedTransaction {
fn new(transaction: SignedTransaction) -> Result<Self, Error> {
try!(transaction.sender());
Ok(VerifiedTransaction {
transaction: transaction
}
})
}
fn hash(&self) -> H256 {
@@ -228,6 +230,8 @@ impl TransactionSet {
}
}
// Will be used when rpc merged
#[allow(dead_code)]
#[derive(Debug)]
/// Current status of the queue
pub struct TransactionQueueStatus {
@@ -276,6 +280,8 @@ impl TransactionQueue {
}
}
// Will be used when rpc merged
#[allow(dead_code)]
/// Returns current status for this queue
pub fn status(&self) -> TransactionQueueStatus {
TransactionQueueStatus {
@@ -285,17 +291,19 @@ impl TransactionQueue {
}
/// Adds all signed transactions to queue to be verified and imported
pub fn add_all<T>(&mut self, txs: Vec<SignedTransaction>, fetch_nonce: T)
pub fn add_all<T>(&mut self, txs: Vec<SignedTransaction>, fetch_nonce: T) -> Result<(), Error>
where T: Fn(&Address) -> U256 {
for tx in txs.into_iter() {
self.add(tx, &fetch_nonce);
try!(self.add(tx, &fetch_nonce));
}
Ok(())
}
/// Add signed transaction to queue to be verified and imported
pub fn add<T>(&mut self, tx: SignedTransaction, fetch_nonce: &T)
pub fn add<T>(&mut self, tx: SignedTransaction, fetch_nonce: &T) -> Result<(), Error>
where T: Fn(&Address) -> U256 {
self.import_tx(VerifiedTransaction::new(tx), fetch_nonce);
self.import_tx(try!(VerifiedTransaction::new(tx)), fetch_nonce);
Ok(())
}
/// Removes all transactions identified by hashes given in slice
@@ -384,7 +392,8 @@ impl TransactionQueue {
self.future.enforce_limit(&mut self.by_hash);
}
// Will be used when mining merged
#[allow(dead_code)]
/// Returns top transactions from the queue ordered by priority.
pub fn top_transactions(&self, size: usize) -> Vec<SignedTransaction> {
self.current.by_priority
@@ -506,13 +515,8 @@ impl TransactionQueue {
#[cfg(test)]
mod test {
extern crate rustc_serialize;
use self::rustc_serialize::hex::FromHex;
use std::ops::Deref;
use std::collections::{HashMap, BTreeSet};
use util::crypto::KeyPair;
use util::numbers::{U256, Uint};
use util::hash::{Address};
use util::table::*;
use util::*;
use ethcore::transaction::*;
use super::*;
use super::{TransactionSet, TransactionOrder, VerifiedTransaction};
@@ -556,12 +560,12 @@ mod test {
limit: 1
};
let (tx1, tx2) = new_txs(U256::from(1));
let tx1 = VerifiedTransaction::new(tx1);
let tx2 = VerifiedTransaction::new(tx2);
let tx1 = VerifiedTransaction::new(tx1).unwrap();
let tx2 = VerifiedTransaction::new(tx2).unwrap();
let mut by_hash = {
let mut x = HashMap::new();
let tx1 = VerifiedTransaction::new(tx1.transaction.clone());
let tx2 = VerifiedTransaction::new(tx2.transaction.clone());
let tx1 = VerifiedTransaction::new(tx1.transaction.clone()).unwrap();
let tx2 = VerifiedTransaction::new(tx2.transaction.clone()).unwrap();
x.insert(tx1.hash(), tx1);
x.insert(tx2.hash(), tx2);
x
@@ -595,13 +599,39 @@ mod test {
let tx = new_tx();
// when
txq.add(tx, &default_nonce);
let res = txq.add(tx, &default_nonce);
// then
assert!(res.is_ok());
let stats = txq.status();
assert_eq!(stats.pending, 1);
}
#[test]
fn should_reject_incorectly_signed_transaction() {
// given
let mut txq = TransactionQueue::new();
let tx = new_unsigned_tx(U256::from(123));
let stx = {
let mut s = RlpStream::new_list(9);
s.append(&tx.nonce);
s.append(&tx.gas_price);
s.append(&tx.gas);
s.append_empty_data(); // action=create
s.append(&tx.value);
s.append(&tx.data);
s.append(&0u64); // v
s.append(&U256::zero()); // r
s.append(&U256::zero()); // s
decode(s.as_raw())
};
// when
let res = txq.add(stx, &default_nonce);
// then
assert!(res.is_err());
}
#[test]
fn should_import_txs_from_same_sender() {
// given
@@ -610,8 +640,8 @@ mod test {
let (tx, tx2) = new_txs(U256::from(1));
// when
txq.add(tx.clone(), &default_nonce);
txq.add(tx2.clone(), &default_nonce);
txq.add(tx.clone(), &default_nonce).unwrap();
txq.add(tx2.clone(), &default_nonce).unwrap();
// then
let top = txq.top_transactions(5);
@@ -628,8 +658,8 @@ mod test {
let (tx, tx2) = new_txs(U256::from(2));
// when
txq.add(tx.clone(), &default_nonce);
txq.add(tx2.clone(), &default_nonce);
txq.add(tx.clone(), &default_nonce).unwrap();
txq.add(tx2.clone(), &default_nonce).unwrap();
// then
let stats = txq.status();
@@ -650,13 +680,13 @@ mod test {
let tx1 = new_unsigned_tx(U256::from(124)).sign(&secret);
let tx2 = new_unsigned_tx(U256::from(125)).sign(&secret);
txq.add(tx, &default_nonce);
txq.add(tx, &default_nonce).unwrap();
assert_eq!(txq.status().pending, 1);
txq.add(tx2, &default_nonce);
txq.add(tx2, &default_nonce).unwrap();
assert_eq!(txq.status().future, 1);
// when
txq.add(tx1, &default_nonce);
txq.add(tx1, &default_nonce).unwrap();
// then
let stats = txq.status();
@@ -669,8 +699,8 @@ mod test {
// given
let mut txq2 = TransactionQueue::new();
let (tx, tx2) = new_txs(U256::from(3));
txq2.add(tx.clone(), &default_nonce);
txq2.add(tx2.clone(), &default_nonce);
txq2.add(tx.clone(), &default_nonce).unwrap();
txq2.add(tx2.clone(), &default_nonce).unwrap();
assert_eq!(txq2.status().pending, 1);
assert_eq!(txq2.status().future, 1);
@@ -691,10 +721,10 @@ mod test {
let mut txq = TransactionQueue::new();
let (tx, tx2) = new_txs(U256::from(1));
let tx3 = new_tx();
txq.add(tx2.clone(), &default_nonce);
txq.add(tx2.clone(), &default_nonce).unwrap();
assert_eq!(txq.status().future, 1);
txq.add(tx3.clone(), &default_nonce);
txq.add(tx.clone(), &default_nonce);
txq.add(tx3.clone(), &default_nonce).unwrap();
txq.add(tx.clone(), &default_nonce).unwrap();
assert_eq!(txq.status().pending, 3);
// when
@@ -713,8 +743,8 @@ mod test {
let (tx, tx2) = new_txs(U256::one());
// add
txq.add(tx2.clone(), &default_nonce);
txq.add(tx.clone(), &default_nonce);
txq.add(tx2.clone(), &default_nonce).unwrap();
txq.add(tx.clone(), &default_nonce).unwrap();
let stats = txq.status();
assert_eq!(stats.pending, 2);
@@ -731,11 +761,11 @@ mod test {
// given
let mut txq = TransactionQueue::with_limits(1, 1);
let (tx, tx2) = new_txs(U256::one());
txq.add(tx.clone(), &default_nonce);
txq.add(tx.clone(), &default_nonce).unwrap();
assert_eq!(txq.status().pending, 1);
// when
txq.add(tx2.clone(), &default_nonce);
txq.add(tx2.clone(), &default_nonce).unwrap();
// then
let t = txq.top_transactions(2);
@@ -749,14 +779,14 @@ mod test {
let mut txq = TransactionQueue::with_limits(10, 1);
let (tx1, tx2) = new_txs(U256::from(4));
let (tx3, tx4) = new_txs(U256::from(4));
txq.add(tx1.clone(), &default_nonce);
txq.add(tx3.clone(), &default_nonce);
txq.add(tx1.clone(), &default_nonce).unwrap();
txq.add(tx3.clone(), &default_nonce).unwrap();
assert_eq!(txq.status().pending, 2);
// when
txq.add(tx2.clone(), &default_nonce);
txq.add(tx2.clone(), &default_nonce).unwrap();
assert_eq!(txq.status().future, 1);
txq.add(tx4.clone(), &default_nonce);
txq.add(tx4.clone(), &default_nonce).unwrap();
// then
assert_eq!(txq.status().future, 1);
@@ -770,7 +800,7 @@ mod test {
let fetch_last_nonce = |_a: &Address| last_nonce;
// when
txq.add(tx, &fetch_last_nonce);
txq.add(tx, &fetch_last_nonce).unwrap();
// then
let stats = txq.status();
@@ -784,12 +814,12 @@ mod test {
let nonce = |a: &Address| default_nonce(a) + U256::one();
let mut txq = TransactionQueue::new();
let (_tx1, tx2) = new_txs(U256::from(1));
txq.add(tx2.clone(), &default_nonce);
txq.add(tx2.clone(), &default_nonce).unwrap();
assert_eq!(txq.status().future, 1);
assert_eq!(txq.status().pending, 0);
// when
txq.add(tx2.clone(), &nonce);
txq.add(tx2.clone(), &nonce).unwrap();
// then
let stats = txq.status();
@@ -802,15 +832,15 @@ mod test {
// given
let mut txq = TransactionQueue::new();
let (tx1, tx2) = new_txs(U256::from(1));
txq.add(tx1.clone(), &default_nonce);
txq.add(tx2.clone(), &default_nonce);
txq.add(tx1.clone(), &default_nonce).unwrap();
txq.add(tx2.clone(), &default_nonce).unwrap();
assert_eq!(txq.status().pending, 2);
// when
txq.remove(&tx1.hash(), &default_nonce);
assert_eq!(txq.status().pending, 0);
assert_eq!(txq.status().future, 1);
txq.add(tx1.clone(), &default_nonce);
txq.add(tx1.clone(), &default_nonce).unwrap();
// then
let stats = txq.status();
@@ -825,10 +855,10 @@ mod test {
let mut txq = TransactionQueue::new();
let (tx, tx2) = new_txs(U256::from(1));
let tx3 = new_tx();
txq.add(tx2.clone(), &default_nonce);
txq.add(tx2.clone(), &default_nonce).unwrap();
assert_eq!(txq.status().future, 1);
txq.add(tx3.clone(), &default_nonce);
txq.add(tx.clone(), &default_nonce);
txq.add(tx3.clone(), &default_nonce).unwrap();
txq.add(tx.clone(), &default_nonce).unwrap();
assert_eq!(txq.status().pending, 3);
// when
@@ -853,8 +883,8 @@ mod test {
};
// when
txq.add(tx, &default_nonce);
txq.add(tx2, &default_nonce);
txq.add(tx, &default_nonce).unwrap();
txq.add(tx2, &default_nonce).unwrap();
// then
let stats = txq.status();
@@ -881,10 +911,10 @@ mod test {
};
// when
txq.add(tx1, &default_nonce);
txq.add(tx2, &default_nonce);
txq.add(tx1, &default_nonce).unwrap();
txq.add(tx2, &default_nonce).unwrap();
assert_eq!(txq.status().future, 1);
txq.add(tx0, &default_nonce);
txq.add(tx0, &default_nonce).unwrap();
// then
let stats = txq.status();
@@ -900,8 +930,8 @@ mod test {
let next_nonce = |a: &Address| default_nonce(a) + U256::one();
let mut txq = TransactionQueue::new();
let (tx1, tx2) = new_txs(U256::one());
txq.add(tx1.clone(), &previous_nonce);
txq.add(tx2, &previous_nonce);
txq.add(tx1.clone(), &previous_nonce).unwrap();
txq.add(tx2, &previous_nonce).unwrap();
assert_eq!(txq.status().future, 2);
// when