Merge branch 'master' into import_route

This commit is contained in:
debris 2016-03-10 01:04:05 +01:00
commit 4750d2f667
14 changed files with 528 additions and 145 deletions

39
Cargo.lock generated
View File

@ -15,6 +15,7 @@ dependencies = [
"fdlimit 0.1.0", "fdlimit 0.1.0",
"log 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)",
"number_prefix 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)", "number_prefix 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)",
"rpassword 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)",
"rustc-serialize 0.3.18 (registry+https://github.com/rust-lang/crates.io-index)", "rustc-serialize 0.3.18 (registry+https://github.com/rust-lang/crates.io-index)",
"time 0.1.34 (registry+https://github.com/rust-lang/crates.io-index)", "time 0.1.34 (registry+https://github.com/rust-lang/crates.io-index)",
] ]
@ -146,6 +147,14 @@ dependencies = [
"libc 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)",
] ]
[[package]]
name = "deque"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]] [[package]]
name = "docopt" name = "docopt"
version = "0.6.78" version = "0.6.78"
@ -285,6 +294,7 @@ dependencies = [
"heapsize 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", "heapsize 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)",
"rayon 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"rustc-serialize 0.3.18 (registry+https://github.com/rust-lang/crates.io-index)", "rustc-serialize 0.3.18 (registry+https://github.com/rust-lang/crates.io-index)",
"time 0.1.34 (registry+https://github.com/rust-lang/crates.io-index)", "time 0.1.34 (registry+https://github.com/rust-lang/crates.io-index)",
] ]
@ -655,6 +665,16 @@ dependencies = [
"libc 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)",
] ]
[[package]]
name = "rayon"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"deque 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"num_cpus 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]] [[package]]
name = "regex" name = "regex"
version = "0.1.54" version = "0.1.54"
@ -680,6 +700,17 @@ dependencies = [
"librocksdb-sys 0.2.1 (git+https://github.com/arkpar/rust-rocksdb.git)", "librocksdb-sys 0.2.1 (git+https://github.com/arkpar/rust-rocksdb.git)",
] ]
[[package]]
name = "rpassword"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"kernel32-sys 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)",
"termios 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)",
"winapi 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]] [[package]]
name = "rust-crypto" name = "rust-crypto"
version = "0.2.34" version = "0.2.34"
@ -813,6 +844,14 @@ dependencies = [
"winapi 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)", "winapi 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)",
] ]
[[package]]
name = "termios"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"libc 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]] [[package]]
name = "time" name = "time"
version = "0.1.34" version = "0.1.34"

View File

@ -21,6 +21,7 @@ fdlimit = { path = "util/fdlimit" }
daemonize = "0.2" daemonize = "0.2"
ethcore-devtools = { path = "devtools" } ethcore-devtools = { path = "devtools" }
number_prefix = "0.2" number_prefix = "0.2"
rpassword = "0.1"
[features] [features]
default = ["rpc"] default = ["rpc"]

View File

@ -141,6 +141,9 @@ pub trait BlockChainClient : Sync + Send {
/// Get block total difficulty. /// Get block total difficulty.
fn block_total_difficulty(&self, id: BlockId) -> Option<U256>; fn block_total_difficulty(&self, id: BlockId) -> Option<U256>;
/// Get address nonce.
fn nonce(&self, address: &Address) -> U256;
/// Get block hash. /// Get block hash.
fn block_hash(&self, id: BlockId) -> Option<H256>; fn block_hash(&self, id: BlockId) -> Option<H256>;
@ -370,18 +373,14 @@ impl<V> Client<V> where V: Verifier {
bad_blocks.insert(header.hash()); bad_blocks.insert(header.hash());
continue; continue;
} }
let closed_block = self.check_and_close_block(&block); let closed_block = self.check_and_close_block(&block);
if let Err(_) = closed_block { if let Err(_) = closed_block {
bad_blocks.insert(header.hash()); bad_blocks.insert(header.hash());
break; break;
} }
// Insert block
let closed_block = closed_block.unwrap();
self.chain.write().unwrap().insert_block(&block.bytes, closed_block.block().receipts().clone());
good_blocks.push(header.hash()); good_blocks.push(header.hash());
// Are we committing an era?
let ancient = if header.number() >= HISTORY { let ancient = if header.number() >= HISTORY {
let n = header.number() - HISTORY; let n = header.number() - HISTORY;
let chain = self.chain.read().unwrap(); let chain = self.chain.read().unwrap();
@ -391,10 +390,16 @@ impl<V> Client<V> where V: Verifier {
}; };
// Commit results // Commit results
let closed_block = closed_block.unwrap();
let receipts = closed_block.block().receipts().clone();
closed_block.drain() closed_block.drain()
.commit(header.number(), &header.hash(), ancient) .commit(header.number(), &header.hash(), ancient)
.expect("State DB commit failed."); .expect("State DB commit failed.");
// And update the chain
self.chain.write().unwrap()
.insert_block(&block.bytes, receipts);
self.report.write().unwrap().accrue_block(&block); self.report.write().unwrap().accrue_block(&block);
trace!(target: "client", "Imported #{} ({})", header.number(), header.hash()); trace!(target: "client", "Imported #{} ({})", header.number(), header.hash());
} }
@ -414,6 +419,8 @@ impl<V> Client<V> where V: Verifier {
io.send(NetworkIoMessage::User(SyncMessage::NewChainBlocks { io.send(NetworkIoMessage::User(SyncMessage::NewChainBlocks {
good: good_blocks, good: good_blocks,
bad: bad_blocks, bad: bad_blocks,
// TODO [todr] were to take those from?
retracted: vec![],
})).unwrap(); })).unwrap();
} }
} }
@ -588,6 +595,10 @@ impl<V> BlockChainClient for Client<V> where V: Verifier {
Self::block_hash(&chain, id).and_then(|hash| chain.block_details(&hash)).map(|d| d.total_difficulty) Self::block_hash(&chain, id).and_then(|hash| chain.block_details(&hash)).map(|d| d.total_difficulty)
} }
fn nonce(&self, address: &Address) -> U256 {
self.state().nonce(address)
}
fn block_hash(&self, id: BlockId) -> Option<H256> { fn block_hash(&self, id: BlockId) -> Option<H256> {
let chain = self.chain.read().unwrap(); let chain = self.chain.read().unwrap();
Self::block_hash(&chain, id) Self::block_hash(&chain, id)

View File

@ -31,6 +31,8 @@ pub enum SyncMessage {
good: Vec<H256>, good: Vec<H256>,
/// Hashes of blocks not imported to blockchain /// Hashes of blocks not imported to blockchain
bad: Vec<H256>, bad: Vec<H256>,
/// Hashes of blocks that were removed from canonical chain
retracted: Vec<H256>,
}, },
/// A block is ready /// A block is ready
BlockVerified, BlockVerified,

View File

@ -17,9 +17,11 @@
pub mod verification; pub mod verification;
pub mod verifier; pub mod verifier;
mod canon_verifier; mod canon_verifier;
#[cfg(test)]
mod noop_verifier; mod noop_verifier;
pub use self::verification::*; pub use self::verification::*;
pub use self::verifier::Verifier; pub use self::verifier::Verifier;
pub use self::canon_verifier::CanonVerifier; pub use self::canon_verifier::CanonVerifier;
#[cfg(test)]
pub use self::noop_verifier::NoopVerifier; pub use self::noop_verifier::NoopVerifier;

View File

@ -32,6 +32,7 @@ extern crate fdlimit;
extern crate daemonize; extern crate daemonize;
extern crate time; extern crate time;
extern crate number_prefix; extern crate number_prefix;
extern crate rpassword;
#[cfg(feature = "rpc")] #[cfg(feature = "rpc")]
extern crate ethcore_rpc as rpc; extern crate ethcore_rpc as rpc;
@ -70,6 +71,7 @@ Parity. Ethereum Client.
Usage: Usage:
parity daemon <pid-file> [options] [ --no-bootstrap | <enode>... ] parity daemon <pid-file> [options] [ --no-bootstrap | <enode>... ]
parity account (new | list)
parity [options] [ --no-bootstrap | <enode>... ] parity [options] [ --no-bootstrap | <enode>... ]
Protocol Options: Protocol Options:
@ -126,6 +128,9 @@ Miscellaneous Options:
#[derive(Debug, RustcDecodable)] #[derive(Debug, RustcDecodable)]
struct Args { struct Args {
cmd_daemon: bool, cmd_daemon: bool,
cmd_account: bool,
cmd_new: bool,
cmd_list: bool,
arg_pid_file: String, arg_pid_file: String,
arg_enode: Vec<String>, arg_enode: Vec<String>,
flag_chain: String, flag_chain: String,
@ -337,9 +342,40 @@ impl Configuration {
.start() .start()
.unwrap_or_else(|e| die!("Couldn't daemonize; {}", e)); .unwrap_or_else(|e| die!("Couldn't daemonize; {}", e));
} }
if self.args.cmd_account {
self.execute_account_cli();
return;
}
self.execute_client(); self.execute_client();
} }
fn execute_account_cli(&self) {
use util::keys::store::SecretStore;
use rpassword::read_password;
let mut secret_store = SecretStore::new();
if self.args.cmd_new {
println!("Please note that password is NOT RECOVERABLE.");
println!("Type password: ");
let password = read_password().unwrap();
println!("Repeat password: ");
let password_repeat = read_password().unwrap();
if password != password_repeat {
println!("Passwords do not match!");
return;
}
println!("New account address:");
let new_address = secret_store.new_account(&password).unwrap();
println!("{:?}", new_address);
return;
}
if self.args.cmd_list {
println!("Known addresses:");
for &(addr, _) in secret_store.accounts().unwrap().iter() {
println!("{:?}", addr);
}
}
}
fn execute_client(&self) { fn execute_client(&self) {
// Setup panic handler // Setup panic handler
let panic_handler = PanicHandler::new_in_arc(); let panic_handler = PanicHandler::new_in_arc();

View File

@ -17,6 +17,7 @@ time = "0.1.34"
rand = "0.3.13" rand = "0.3.13"
heapsize = "0.3" heapsize = "0.3"
rustc-serialize = "0.3" rustc-serialize = "0.3"
rayon = "0.3.1"
[features] [features]
default = [] default = []

View File

@ -30,14 +30,17 @@
/// ///
use util::*; use util::*;
use rayon::prelude::*;
use std::mem::{replace}; use std::mem::{replace};
use ethcore::views::{HeaderView}; use ethcore::views::{HeaderView, BlockView};
use ethcore::header::{BlockNumber, Header as BlockHeader}; use ethcore::header::{BlockNumber, Header as BlockHeader};
use ethcore::client::{BlockChainClient, BlockStatus, BlockId, BlockChainInfo}; use ethcore::client::{BlockChainClient, BlockStatus, BlockId, BlockChainInfo};
use range_collection::{RangeCollection, ToUsize, FromUsize}; use range_collection::{RangeCollection, ToUsize, FromUsize};
use ethcore::error::*; use ethcore::error::*;
use ethcore::block::Block; use ethcore::block::Block;
use ethcore::transaction::SignedTransaction;
use io::SyncIo; use io::SyncIo;
use transaction_queue::TransactionQueue;
use time; use time;
use super::SyncConfig; use super::SyncConfig;
@ -204,11 +207,13 @@ pub struct ChainSync {
/// True if common block for our and remote chain has been found /// True if common block for our and remote chain has been found
have_common_block: bool, have_common_block: bool,
/// Last propagated block number /// Last propagated block number
last_send_block_number: BlockNumber, last_sent_block_number: BlockNumber,
/// Max blocks to download ahead /// Max blocks to download ahead
max_download_ahead_blocks: usize, max_download_ahead_blocks: usize,
/// Network ID /// Network ID
network_id: U256, network_id: U256,
/// Transactions Queue
transaction_queue: Mutex<TransactionQueue>,
} }
type RlpResponseResult = Result<Option<(PacketId, RlpStream)>, PacketDecodeError>; type RlpResponseResult = Result<Option<(PacketId, RlpStream)>, PacketDecodeError>;
@ -231,9 +236,10 @@ impl ChainSync {
last_imported_hash: None, last_imported_hash: None,
syncing_difficulty: U256::from(0u64), syncing_difficulty: U256::from(0u64),
have_common_block: false, have_common_block: false,
last_send_block_number: 0, last_sent_block_number: 0,
max_download_ahead_blocks: max(MAX_HEADERS_TO_REQUEST, config.max_download_ahead_blocks), max_download_ahead_blocks: max(MAX_HEADERS_TO_REQUEST, config.max_download_ahead_blocks),
network_id: config.network_id, network_id: config.network_id,
transaction_queue: Mutex::new(TransactionQueue::new()),
} }
} }
@ -292,6 +298,7 @@ impl ChainSync {
self.starting_block = 0; self.starting_block = 0;
self.highest_block = None; self.highest_block = None;
self.have_common_block = false; self.have_common_block = false;
self.transaction_queue.lock().unwrap().clear();
self.starting_block = io.chain().chain_info().best_block_number; self.starting_block = io.chain().chain_info().best_block_number;
self.state = SyncState::NotSynced; self.state = SyncState::NotSynced;
} }
@ -484,7 +491,7 @@ impl ChainSync {
trace!(target: "sync", "New block already queued {:?}", h); trace!(target: "sync", "New block already queued {:?}", h);
}, },
Ok(_) => { Ok(_) => {
if self.current_base_block() < header.number { if self.current_base_block() < header.number {
self.last_imported_block = Some(header.number); self.last_imported_block = Some(header.number);
self.remove_downloaded_blocks(header.number); self.remove_downloaded_blocks(header.number);
} }
@ -843,8 +850,8 @@ impl ChainSync {
self.downloading_bodies.remove(&n); self.downloading_bodies.remove(&n);
self.downloading_headers.remove(&n); self.downloading_headers.remove(&n);
} }
self.headers.remove_tail(&start); self.headers.remove_from(&start);
self.bodies.remove_tail(&start); self.bodies.remove_from(&start);
} }
/// Request headers from a peer by block hash /// Request headers from a peer by block hash
@ -919,8 +926,18 @@ impl ChainSync {
} }
} }
/// Called when peer sends us new transactions /// Called when peer sends us new transactions
fn on_peer_transactions(&mut self, _io: &mut SyncIo, _peer_id: PeerId, _r: &UntrustedRlp) -> Result<(), PacketDecodeError> { fn on_peer_transactions(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> {
Ok(()) let chain = io.chain();
let item_count = r.item_count();
trace!(target: "sync", "{} -> Transactions ({} entries)", peer_id, item_count);
let fetch_latest_nonce = |a : &Address| chain.nonce(a);
let mut transaction_queue = self.transaction_queue.lock().unwrap();
for i in 0..item_count {
let tx: SignedTransaction = try!(r.val_at(i));
transaction_queue.add(tx, &fetch_latest_nonce);
}
Ok(())
} }
/// Send Status message /// Send Status message
@ -1229,23 +1246,60 @@ impl ChainSync {
sent sent
} }
fn propagate_latest_blocks(&mut self, io: &mut SyncIo) {
let chain_info = io.chain().chain_info();
if (((chain_info.best_block_number as i64) - (self.last_sent_block_number as i64)).abs() as BlockNumber) < MAX_PEER_LAG_PROPAGATION {
let blocks = self.propagate_blocks(&chain_info, io);
let hashes = self.propagate_new_hashes(&chain_info, io);
if blocks != 0 || hashes != 0 {
trace!(target: "sync", "Sent latest {} blocks and {} hashes to peers.", blocks, hashes);
}
}
self.last_sent_block_number = chain_info.best_block_number;
}
/// Maintain other peers. Send out any new blocks and transactions /// Maintain other peers. Send out any new blocks and transactions
pub fn maintain_sync(&mut self, io: &mut SyncIo) { pub fn maintain_sync(&mut self, io: &mut SyncIo) {
self.check_resume(io); self.check_resume(io);
} }
/// should be called once chain has new block, triggers the latest block propagation /// called when block is imported to chain, updates transactions queue and propagates the blocks
pub fn chain_blocks_verified(&mut self, io: &mut SyncIo) { pub fn chain_new_blocks(&mut self, io: &mut SyncIo, good: &[H256], bad: &[H256], _retracted: &[H256]) {
let chain = io.chain().chain_info(); fn fetch_transactions(chain: &BlockChainClient, hash: &H256) -> Vec<SignedTransaction> {
if (((chain.best_block_number as i64) - (self.last_send_block_number as i64)).abs() as BlockNumber) < MAX_PEER_LAG_PROPAGATION { let block = chain
let blocks = self.propagate_blocks(&chain, io); .block(BlockId::Hash(hash.clone()))
let hashes = self.propagate_new_hashes(&chain, io); // Client should send message after commit to db and inserting to chain.
if blocks != 0 || hashes != 0 { .expect("Expected in-chain blocks.");
trace!(target: "sync", "Sent latest {} blocks and {} hashes to peers.", blocks, hashes); let block = BlockView::new(&block);
} block.transactions()
} }
self.last_send_block_number = chain.best_block_number;
{
let chain = io.chain();
let good = good.par_iter().map(|h| fetch_transactions(chain, h));
let bad = bad.par_iter().map(|h| fetch_transactions(chain, h));
good.for_each(|txs| {
let mut transaction_queue = self.transaction_queue.lock().unwrap();
let hashes = txs.iter().map(|tx| tx.hash()).collect::<Vec<H256>>();
transaction_queue.remove_all(&hashes, |a| chain.nonce(a));
});
bad.for_each(|txs| {
// populate sender
for tx in &txs {
let _sender = tx.sender();
}
let mut transaction_queue = self.transaction_queue.lock().unwrap();
transaction_queue.add_all(txs, |a| chain.nonce(a));
});
}
// Propagate latests blocks
self.propagate_latest_blocks(io);
// TODO [todr] propagate transactions?
} }
} }
#[cfg(test)] #[cfg(test)]
@ -1386,7 +1440,7 @@ mod tests {
#[test] #[test]
fn finds_lagging_peers() { fn finds_lagging_peers() {
let mut client = TestBlockChainClient::new(); let mut client = TestBlockChainClient::new();
client.add_blocks(100, false); client.add_blocks(100, EachBlockWith::Uncle);
let mut queue = VecDeque::new(); let mut queue = VecDeque::new();
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(10)); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(10));
let chain_info = client.chain_info(); let chain_info = client.chain_info();
@ -1400,7 +1454,7 @@ mod tests {
#[test] #[test]
fn calculates_tree_for_lagging_peer() { fn calculates_tree_for_lagging_peer() {
let mut client = TestBlockChainClient::new(); let mut client = TestBlockChainClient::new();
client.add_blocks(15, false); client.add_blocks(15, EachBlockWith::Uncle);
let start = client.block_hash_delta_minus(4); let start = client.block_hash_delta_minus(4);
let end = client.block_hash_delta_minus(2); let end = client.block_hash_delta_minus(2);
@ -1417,7 +1471,7 @@ mod tests {
#[test] #[test]
fn sends_new_hashes_to_lagging_peer() { fn sends_new_hashes_to_lagging_peer() {
let mut client = TestBlockChainClient::new(); let mut client = TestBlockChainClient::new();
client.add_blocks(100, false); client.add_blocks(100, EachBlockWith::Uncle);
let mut queue = VecDeque::new(); let mut queue = VecDeque::new();
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5));
let chain_info = client.chain_info(); let chain_info = client.chain_info();
@ -1436,7 +1490,7 @@ mod tests {
#[test] #[test]
fn sends_latest_block_to_lagging_peer() { fn sends_latest_block_to_lagging_peer() {
let mut client = TestBlockChainClient::new(); let mut client = TestBlockChainClient::new();
client.add_blocks(100, false); client.add_blocks(100, EachBlockWith::Uncle);
let mut queue = VecDeque::new(); let mut queue = VecDeque::new();
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5));
let chain_info = client.chain_info(); let chain_info = client.chain_info();
@ -1454,7 +1508,7 @@ mod tests {
#[test] #[test]
fn handles_peer_new_block_mallformed() { fn handles_peer_new_block_mallformed() {
let mut client = TestBlockChainClient::new(); let mut client = TestBlockChainClient::new();
client.add_blocks(10, false); client.add_blocks(10, EachBlockWith::Uncle);
let block_data = get_dummy_block(11, client.chain_info().best_block_hash); let block_data = get_dummy_block(11, client.chain_info().best_block_hash);
@ -1472,7 +1526,7 @@ mod tests {
#[test] #[test]
fn handles_peer_new_block() { fn handles_peer_new_block() {
let mut client = TestBlockChainClient::new(); let mut client = TestBlockChainClient::new();
client.add_blocks(10, false); client.add_blocks(10, EachBlockWith::Uncle);
let block_data = get_dummy_blocks(11, client.chain_info().best_block_hash); let block_data = get_dummy_blocks(11, client.chain_info().best_block_hash);
@ -1490,7 +1544,7 @@ mod tests {
#[test] #[test]
fn handles_peer_new_block_empty() { fn handles_peer_new_block_empty() {
let mut client = TestBlockChainClient::new(); let mut client = TestBlockChainClient::new();
client.add_blocks(10, false); client.add_blocks(10, EachBlockWith::Uncle);
let mut queue = VecDeque::new(); let mut queue = VecDeque::new();
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5));
let mut io = TestIo::new(&mut client, &mut queue, None); let mut io = TestIo::new(&mut client, &mut queue, None);
@ -1506,7 +1560,7 @@ mod tests {
#[test] #[test]
fn handles_peer_new_hashes() { fn handles_peer_new_hashes() {
let mut client = TestBlockChainClient::new(); let mut client = TestBlockChainClient::new();
client.add_blocks(10, false); client.add_blocks(10, EachBlockWith::Uncle);
let mut queue = VecDeque::new(); let mut queue = VecDeque::new();
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5));
let mut io = TestIo::new(&mut client, &mut queue, None); let mut io = TestIo::new(&mut client, &mut queue, None);
@ -1522,7 +1576,7 @@ mod tests {
#[test] #[test]
fn handles_peer_new_hashes_empty() { fn handles_peer_new_hashes_empty() {
let mut client = TestBlockChainClient::new(); let mut client = TestBlockChainClient::new();
client.add_blocks(10, false); client.add_blocks(10, EachBlockWith::Uncle);
let mut queue = VecDeque::new(); let mut queue = VecDeque::new();
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5));
let mut io = TestIo::new(&mut client, &mut queue, None); let mut io = TestIo::new(&mut client, &mut queue, None);
@ -1540,7 +1594,7 @@ mod tests {
#[test] #[test]
fn hashes_rlp_mutually_acceptable() { fn hashes_rlp_mutually_acceptable() {
let mut client = TestBlockChainClient::new(); let mut client = TestBlockChainClient::new();
client.add_blocks(100, false); client.add_blocks(100, EachBlockWith::Uncle);
let mut queue = VecDeque::new(); let mut queue = VecDeque::new();
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5));
let chain_info = client.chain_info(); let chain_info = client.chain_info();
@ -1558,7 +1612,7 @@ mod tests {
#[test] #[test]
fn block_rlp_mutually_acceptable() { fn block_rlp_mutually_acceptable() {
let mut client = TestBlockChainClient::new(); let mut client = TestBlockChainClient::new();
client.add_blocks(100, false); client.add_blocks(100, EachBlockWith::Uncle);
let mut queue = VecDeque::new(); let mut queue = VecDeque::new();
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5));
let chain_info = client.chain_info(); let chain_info = client.chain_info();
@ -1571,10 +1625,37 @@ mod tests {
assert!(result.is_ok()); assert!(result.is_ok());
} }
#[test]
fn should_add_transactions_to_queue() {
// given
let mut client = TestBlockChainClient::new();
client.add_blocks(98, EachBlockWith::Uncle);
client.add_blocks(1, EachBlockWith::UncleAndTransaction);
client.add_blocks(1, EachBlockWith::Transaction);
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5));
let good_blocks = vec![client.block_hash_delta_minus(2)];
let retracted_blocks = vec![client.block_hash_delta_minus(1)];
let mut queue = VecDeque::new();
let mut io = TestIo::new(&mut client, &mut queue, None);
// when
sync.chain_new_blocks(&mut io, &[], &good_blocks, &[]);
assert_eq!(sync.transaction_queue.lock().unwrap().status().future, 0);
assert_eq!(sync.transaction_queue.lock().unwrap().status().pending, 1);
sync.chain_new_blocks(&mut io, &good_blocks, &retracted_blocks, &[]);
// then
let status = sync.transaction_queue.lock().unwrap().status();
assert_eq!(status.pending, 1);
assert_eq!(status.future, 0);
}
#[test] #[test]
fn returns_requested_block_headers() { fn returns_requested_block_headers() {
let mut client = TestBlockChainClient::new(); let mut client = TestBlockChainClient::new();
client.add_blocks(100, false); client.add_blocks(100, EachBlockWith::Uncle);
let mut queue = VecDeque::new(); let mut queue = VecDeque::new();
let io = TestIo::new(&mut client, &mut queue, None); let io = TestIo::new(&mut client, &mut queue, None);
@ -1598,7 +1679,7 @@ mod tests {
#[test] #[test]
fn returns_requested_block_headers_reverse() { fn returns_requested_block_headers_reverse() {
let mut client = TestBlockChainClient::new(); let mut client = TestBlockChainClient::new();
client.add_blocks(100, false); client.add_blocks(100, EachBlockWith::Uncle);
let mut queue = VecDeque::new(); let mut queue = VecDeque::new();
let io = TestIo::new(&mut client, &mut queue, None); let io = TestIo::new(&mut client, &mut queue, None);

View File

@ -54,6 +54,7 @@ extern crate ethcore;
extern crate env_logger; extern crate env_logger;
extern crate time; extern crate time;
extern crate rand; extern crate rand;
extern crate rayon;
#[macro_use] #[macro_use]
extern crate heapsize; extern crate heapsize;
@ -70,8 +71,7 @@ use io::NetSyncIo;
mod chain; mod chain;
mod io; mod io;
mod range_collection; mod range_collection;
// TODO [todr] Made public to suppress dead code warnings mod transaction_queue;
pub mod transaction_queue;
#[cfg(test)] #[cfg(test)]
mod tests; mod tests;
@ -153,8 +153,12 @@ impl NetworkProtocolHandler<SyncMessage> for EthSync {
} }
fn message(&self, io: &NetworkContext<SyncMessage>, message: &SyncMessage) { fn message(&self, io: &NetworkContext<SyncMessage>, message: &SyncMessage) {
if let SyncMessage::BlockVerified = *message { match *message {
self.sync.write().unwrap().chain_blocks_verified(&mut NetSyncIo::new(io, self.chain.deref())); SyncMessage::NewChainBlocks { ref good, ref bad, ref retracted } => {
let mut sync_io = NetSyncIo::new(io, self.chain.deref());
self.sync.write().unwrap().chain_new_blocks(&mut sync_io, good, bad, retracted);
},
_ => {/* Ignore other messages */},
} }
} }
} }

View File

@ -42,6 +42,8 @@ pub trait RangeCollection<K, V> {
fn remove_head(&mut self, start: &K); fn remove_head(&mut self, start: &K);
/// Remove all elements >= `start` in the range that contains `start` /// Remove all elements >= `start` in the range that contains `start`
fn remove_tail(&mut self, start: &K); fn remove_tail(&mut self, start: &K);
/// Remove all elements >= `start`
fn remove_from(&mut self, start: &K);
/// Remove all elements >= `tail` /// Remove all elements >= `tail`
fn insert_item(&mut self, key: K, value: V); fn insert_item(&mut self, key: K, value: V);
/// Get an iterator over ranges /// Get an iterator over ranges
@ -137,6 +139,28 @@ impl<K, V> RangeCollection<K, V> for Vec<(K, Vec<V>)> where K: Ord + PartialEq +
} }
} }
/// Remove the element and all following it.
fn remove_from(&mut self, key: &K) {
match self.binary_search_by(|&(k, _)| k.cmp(key).reverse()) {
Ok(index) => { self.drain(.. index + 1); },
Err(index) =>{
let mut empty = false;
match self.get_mut(index) {
Some(&mut (ref k, ref mut v)) if k <= key && (*k + FromUsize::from_usize(v.len())) > *key => {
v.truncate((*key - *k).to_usize());
empty = v.is_empty();
}
_ => {}
}
if empty {
self.drain(.. index + 1);
} else {
self.drain(.. index);
}
},
}
}
/// Remove range elements up to key /// Remove range elements up to key
fn remove_head(&mut self, key: &K) { fn remove_head(&mut self, key: &K) {
if *key == FromUsize::from_usize(0) { if *key == FromUsize::from_usize(0) {
@ -272,5 +296,17 @@ fn test_range() {
assert_eq!(r.range_iter().cmp(vec![(2, &['b'][..])]), Ordering::Equal); assert_eq!(r.range_iter().cmp(vec![(2, &['b'][..])]), Ordering::Equal);
r.remove_tail(&2); r.remove_tail(&2);
assert_eq!(r.range_iter().next(), None); assert_eq!(r.range_iter().next(), None);
let mut r = ranges.clone();
r.remove_from(&20);
assert_eq!(r.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..]), (16, &['p', 'q', 'r'][..])]), Ordering::Equal);
r.remove_from(&17);
assert_eq!(r.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..]), (16, &['p'][..])]), Ordering::Equal);
r.remove_from(&15);
assert_eq!(r.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..])]), Ordering::Equal);
r.remove_from(&3);
assert_eq!(r.range_iter().cmp(vec![(2, &['b'][..])]), Ordering::Equal);
r.remove_from(&2);
assert_eq!(r.range_iter().next(), None);
} }

View File

@ -24,8 +24,8 @@ use super::helpers::*;
fn two_peers() { fn two_peers() {
::env_logger::init().ok(); ::env_logger::init().ok();
let mut net = TestNet::new(3); let mut net = TestNet::new(3);
net.peer_mut(1).chain.add_blocks(1000, false); net.peer_mut(1).chain.add_blocks(1000, EachBlockWith::Uncle);
net.peer_mut(2).chain.add_blocks(1000, false); net.peer_mut(2).chain.add_blocks(1000, EachBlockWith::Uncle);
net.sync(); net.sync();
assert!(net.peer(0).chain.block(BlockId::Number(1000)).is_some()); assert!(net.peer(0).chain.block(BlockId::Number(1000)).is_some());
assert_eq!(net.peer(0).chain.blocks.read().unwrap().deref(), net.peer(1).chain.blocks.read().unwrap().deref()); assert_eq!(net.peer(0).chain.blocks.read().unwrap().deref(), net.peer(1).chain.blocks.read().unwrap().deref());
@ -35,8 +35,8 @@ fn two_peers() {
fn status_after_sync() { fn status_after_sync() {
::env_logger::init().ok(); ::env_logger::init().ok();
let mut net = TestNet::new(3); let mut net = TestNet::new(3);
net.peer_mut(1).chain.add_blocks(1000, false); net.peer_mut(1).chain.add_blocks(1000, EachBlockWith::Uncle);
net.peer_mut(2).chain.add_blocks(1000, false); net.peer_mut(2).chain.add_blocks(1000, EachBlockWith::Uncle);
net.sync(); net.sync();
let status = net.peer(0).sync.status(); let status = net.peer(0).sync.status();
assert_eq!(status.state, SyncState::Idle); assert_eq!(status.state, SyncState::Idle);
@ -45,8 +45,8 @@ fn status_after_sync() {
#[test] #[test]
fn takes_few_steps() { fn takes_few_steps() {
let mut net = TestNet::new(3); let mut net = TestNet::new(3);
net.peer_mut(1).chain.add_blocks(100, false); net.peer_mut(1).chain.add_blocks(100, EachBlockWith::Uncle);
net.peer_mut(2).chain.add_blocks(100, false); net.peer_mut(2).chain.add_blocks(100, EachBlockWith::Uncle);
let total_steps = net.sync(); let total_steps = net.sync();
assert!(total_steps < 7); assert!(total_steps < 7);
} }
@ -56,8 +56,9 @@ fn empty_blocks() {
::env_logger::init().ok(); ::env_logger::init().ok();
let mut net = TestNet::new(3); let mut net = TestNet::new(3);
for n in 0..200 { for n in 0..200 {
net.peer_mut(1).chain.add_blocks(5, n % 2 == 0); let with = if n % 2 == 0 { EachBlockWith::Nothing } else { EachBlockWith::Uncle };
net.peer_mut(2).chain.add_blocks(5, n % 2 == 0); net.peer_mut(1).chain.add_blocks(5, with.clone());
net.peer_mut(2).chain.add_blocks(5, with);
} }
net.sync(); net.sync();
assert!(net.peer(0).chain.block(BlockId::Number(1000)).is_some()); assert!(net.peer(0).chain.block(BlockId::Number(1000)).is_some());
@ -68,14 +69,14 @@ fn empty_blocks() {
fn forked() { fn forked() {
::env_logger::init().ok(); ::env_logger::init().ok();
let mut net = TestNet::new(3); let mut net = TestNet::new(3);
net.peer_mut(0).chain.add_blocks(300, false); net.peer_mut(0).chain.add_blocks(300, EachBlockWith::Uncle);
net.peer_mut(1).chain.add_blocks(300, false); net.peer_mut(1).chain.add_blocks(300, EachBlockWith::Uncle);
net.peer_mut(2).chain.add_blocks(300, false); net.peer_mut(2).chain.add_blocks(300, EachBlockWith::Uncle);
net.peer_mut(0).chain.add_blocks(100, true); //fork net.peer_mut(0).chain.add_blocks(100, EachBlockWith::Nothing); //fork
net.peer_mut(1).chain.add_blocks(200, false); net.peer_mut(1).chain.add_blocks(200, EachBlockWith::Uncle);
net.peer_mut(2).chain.add_blocks(200, false); net.peer_mut(2).chain.add_blocks(200, EachBlockWith::Uncle);
net.peer_mut(1).chain.add_blocks(100, false); //fork between 1 and 2 net.peer_mut(1).chain.add_blocks(100, EachBlockWith::Uncle); //fork between 1 and 2
net.peer_mut(2).chain.add_blocks(10, true); net.peer_mut(2).chain.add_blocks(10, EachBlockWith::Nothing);
// peer 1 has the best chain of 601 blocks // peer 1 has the best chain of 601 blocks
let peer1_chain = net.peer(1).chain.numbers.read().unwrap().clone(); let peer1_chain = net.peer(1).chain.numbers.read().unwrap().clone();
net.sync(); net.sync();
@ -87,8 +88,8 @@ fn forked() {
#[test] #[test]
fn restart() { fn restart() {
let mut net = TestNet::new(3); let mut net = TestNet::new(3);
net.peer_mut(1).chain.add_blocks(1000, false); net.peer_mut(1).chain.add_blocks(1000, EachBlockWith::Uncle);
net.peer_mut(2).chain.add_blocks(1000, false); net.peer_mut(2).chain.add_blocks(1000, EachBlockWith::Uncle);
net.sync_steps(8); net.sync_steps(8);
@ -109,8 +110,8 @@ fn status_empty() {
#[test] #[test]
fn status_packet() { fn status_packet() {
let mut net = TestNet::new(2); let mut net = TestNet::new(2);
net.peer_mut(0).chain.add_blocks(100, false); net.peer_mut(0).chain.add_blocks(100, EachBlockWith::Uncle);
net.peer_mut(1).chain.add_blocks(1, false); net.peer_mut(1).chain.add_blocks(1, EachBlockWith::Uncle);
net.start(); net.start();
@ -123,13 +124,13 @@ fn status_packet() {
#[test] #[test]
fn propagate_hashes() { fn propagate_hashes() {
let mut net = TestNet::new(6); let mut net = TestNet::new(6);
net.peer_mut(1).chain.add_blocks(10, false); net.peer_mut(1).chain.add_blocks(10, EachBlockWith::Uncle);
net.sync(); net.sync();
net.peer_mut(0).chain.add_blocks(10, false); net.peer_mut(0).chain.add_blocks(10, EachBlockWith::Uncle);
net.sync(); net.sync();
net.trigger_block_verified(0); //first event just sets the marker net.trigger_chain_new_blocks(0); //first event just sets the marker
net.trigger_block_verified(0); net.trigger_chain_new_blocks(0);
// 5 peers to sync // 5 peers to sync
assert_eq!(5, net.peer(0).queue.len()); assert_eq!(5, net.peer(0).queue.len());
@ -149,12 +150,12 @@ fn propagate_hashes() {
#[test] #[test]
fn propagate_blocks() { fn propagate_blocks() {
let mut net = TestNet::new(2); let mut net = TestNet::new(2);
net.peer_mut(1).chain.add_blocks(10, false); net.peer_mut(1).chain.add_blocks(10, EachBlockWith::Uncle);
net.sync(); net.sync();
net.peer_mut(0).chain.add_blocks(10, false); net.peer_mut(0).chain.add_blocks(10, EachBlockWith::Uncle);
net.trigger_block_verified(0); //first event just sets the marker net.trigger_chain_new_blocks(0); //first event just sets the marker
net.trigger_block_verified(0); net.trigger_chain_new_blocks(0);
assert!(!net.peer(0).queue.is_empty()); assert!(!net.peer(0).queue.is_empty());
// NEW_BLOCK_PACKET // NEW_BLOCK_PACKET
@ -164,7 +165,7 @@ fn propagate_blocks() {
#[test] #[test]
fn restart_on_malformed_block() { fn restart_on_malformed_block() {
let mut net = TestNet::new(2); let mut net = TestNet::new(2);
net.peer_mut(1).chain.add_blocks(10, false); net.peer_mut(1).chain.add_blocks(10, EachBlockWith::Uncle);
net.peer_mut(1).chain.corrupt_block(6); net.peer_mut(1).chain.corrupt_block(6);
net.sync_steps(10); net.sync_steps(10);

View File

@ -22,7 +22,7 @@ use io::SyncIo;
use chain::ChainSync; use chain::ChainSync;
use ::SyncConfig; use ::SyncConfig;
use ethcore::receipt::Receipt; use ethcore::receipt::Receipt;
use ethcore::transaction::LocalizedTransaction; use ethcore::transaction::{LocalizedTransaction, Transaction, Action};
use ethcore::filter::Filter; use ethcore::filter::Filter;
use ethcore::log_entry::LocalizedLogEntry; use ethcore::log_entry::LocalizedLogEntry;
@ -34,6 +34,14 @@ pub struct TestBlockChainClient {
pub difficulty: RwLock<U256>, pub difficulty: RwLock<U256>,
} }
#[derive(Clone)]
pub enum EachBlockWith {
Nothing,
Uncle,
Transaction,
UncleAndTransaction
}
impl TestBlockChainClient { impl TestBlockChainClient {
pub fn new() -> TestBlockChainClient { pub fn new() -> TestBlockChainClient {
@ -44,30 +52,53 @@ impl TestBlockChainClient {
last_hash: RwLock::new(H256::new()), last_hash: RwLock::new(H256::new()),
difficulty: RwLock::new(From::from(0)), difficulty: RwLock::new(From::from(0)),
}; };
client.add_blocks(1, true); // add genesis block client.add_blocks(1, EachBlockWith::Nothing); // add genesis block
client.genesis_hash = client.last_hash.read().unwrap().clone(); client.genesis_hash = client.last_hash.read().unwrap().clone();
client client
} }
pub fn add_blocks(&mut self, count: usize, empty: bool) { pub fn add_blocks(&mut self, count: usize, with: EachBlockWith) {
let len = self.numbers.read().unwrap().len(); let len = self.numbers.read().unwrap().len();
for n in len..(len + count) { for n in len..(len + count) {
let mut header = BlockHeader::new(); let mut header = BlockHeader::new();
header.difficulty = From::from(n); header.difficulty = From::from(n);
header.parent_hash = self.last_hash.read().unwrap().clone(); header.parent_hash = self.last_hash.read().unwrap().clone();
header.number = n as BlockNumber; header.number = n as BlockNumber;
let mut uncles = RlpStream::new_list(if empty {0} else {1}); let uncles = match with {
if !empty { EachBlockWith::Uncle | EachBlockWith::UncleAndTransaction => {
let mut uncle_header = BlockHeader::new(); let mut uncles = RlpStream::new_list(1);
uncle_header.difficulty = From::from(n); let mut uncle_header = BlockHeader::new();
uncle_header.parent_hash = self.last_hash.read().unwrap().clone(); uncle_header.difficulty = From::from(n);
uncle_header.number = n as BlockNumber; uncle_header.parent_hash = self.last_hash.read().unwrap().clone();
uncles.append(&uncle_header); uncle_header.number = n as BlockNumber;
header.uncles_hash = uncles.as_raw().sha3(); uncles.append(&uncle_header);
} header.uncles_hash = uncles.as_raw().sha3();
uncles
},
_ => RlpStream::new_list(0)
};
let txs = match with {
EachBlockWith::Transaction | EachBlockWith::UncleAndTransaction => {
let mut txs = RlpStream::new_list(1);
let keypair = KeyPair::create().unwrap();
let tx = Transaction {
action: Action::Create,
value: U256::from(100),
data: "3331600055".from_hex().unwrap(),
gas: U256::from(100_000),
gas_price: U256::one(),
nonce: U256::zero()
};
let signed_tx = tx.sign(&keypair.secret());
txs.append(&signed_tx);
txs.out()
},
_ => rlp::NULL_RLP.to_vec()
};
let mut rlp = RlpStream::new_list(3); let mut rlp = RlpStream::new_list(3);
rlp.append(&header); rlp.append(&header);
rlp.append_raw(&rlp::NULL_RLP, 1); rlp.append_raw(&txs, 1);
rlp.append_raw(uncles.as_raw(), 1); rlp.append_raw(uncles.as_raw(), 1);
self.import_block(rlp.as_raw().to_vec()).unwrap(); self.import_block(rlp.as_raw().to_vec()).unwrap();
} }
@ -109,6 +140,10 @@ impl BlockChainClient for TestBlockChainClient {
unimplemented!(); unimplemented!();
} }
fn nonce(&self, _address: &Address) -> U256 {
U256::zero()
}
fn code(&self, _address: &Address) -> Option<Bytes> { fn code(&self, _address: &Address) -> Option<Bytes> {
unimplemented!(); unimplemented!();
} }
@ -420,8 +455,8 @@ impl TestNet {
self.peers.iter().all(|p| p.queue.is_empty()) self.peers.iter().all(|p| p.queue.is_empty())
} }
pub fn trigger_block_verified(&mut self, peer_id: usize) { pub fn trigger_chain_new_blocks(&mut self, peer_id: usize) {
let mut peer = self.peer_mut(peer_id); let mut peer = self.peer_mut(peer_id);
peer.sync.chain_blocks_verified(&mut TestIo::new(&mut peer.chain, &mut peer.queue, None)); peer.sync.chain_new_blocks(&mut TestIo::new(&mut peer.chain, &mut peer.queue, None), &[], &[], &[]);
} }
} }

View File

@ -108,27 +108,29 @@ struct TransactionSet {
} }
impl TransactionSet { impl TransactionSet {
fn insert(&mut self, sender: Address, nonce: U256, order: TransactionOrder) { fn insert(&mut self, sender: Address, nonce: U256, order: TransactionOrder) -> Option<TransactionOrder> {
self.by_priority.insert(order.clone()); self.by_priority.insert(order.clone());
self.by_address.insert(sender, nonce, order); self.by_address.insert(sender, nonce, order)
} }
fn enforce_limit(&mut self, by_hash: &HashMap<H256, VerifiedTransaction>) { fn enforce_limit(&mut self, by_hash: &mut HashMap<H256, VerifiedTransaction>) {
let len = self.by_priority.len(); let len = self.by_priority.len();
if len <= self.limit { if len <= self.limit {
return; return;
} }
let to_drop : Vec<&VerifiedTransaction> = { let to_drop : Vec<(Address, U256)> = {
self.by_priority self.by_priority
.iter() .iter()
.skip(self.limit) .skip(self.limit)
.map(|order| by_hash.get(&order.hash).expect("Inconsistency in queue detected.")) .map(|order| by_hash.get(&order.hash).expect("Inconsistency in queue detected."))
.map(|tx| (tx.sender(), tx.nonce()))
.collect() .collect()
}; };
for tx in to_drop { for (sender, nonce) in to_drop {
self.drop(&tx.sender(), &tx.nonce()); let order = self.drop(&sender, &nonce).expect("Dropping transaction found in priority queue failed.");
by_hash.remove(&order.hash).expect("Inconsistency in queue.");
} }
} }
@ -219,44 +221,67 @@ impl TransactionQueue {
/// Removes all transactions identified by hashes given in slice /// Removes all transactions identified by hashes given in slice
/// ///
/// If gap is introduced marks subsequent transactions as future /// If gap is introduced marks subsequent transactions as future
pub fn remove_all<T>(&mut self, txs: &[H256], fetch_nonce: T) pub fn remove_all<T>(&mut self, transaction_hashes: &[H256], fetch_nonce: T)
where T: Fn(&Address) -> U256 { where T: Fn(&Address) -> U256 {
for tx in txs { for hash in transaction_hashes {
self.remove(&tx, &fetch_nonce); self.remove(&hash, &fetch_nonce);
} }
} }
/// Removes transaction identified by hashes from queue. /// Removes transaction identified by hashes from queue.
/// ///
/// If gap is introduced marks subsequent transactions as future /// If gap is introduced marks subsequent transactions as future
pub fn remove<T>(&mut self, hash: &H256, fetch_nonce: &T) pub fn remove<T>(&mut self, transaction_hash: &H256, fetch_nonce: &T)
where T: Fn(&Address) -> U256 { where T: Fn(&Address) -> U256 {
let transaction = self.by_hash.remove(hash); let transaction = self.by_hash.remove(transaction_hash);
if transaction.is_none() { if transaction.is_none() {
// We don't know this transaction // We don't know this transaction
return; return;
} }
let transaction = transaction.unwrap(); let transaction = transaction.unwrap();
let sender = transaction.sender(); let sender = transaction.sender();
let nonce = transaction.nonce(); let nonce = transaction.nonce();
let current_nonce = fetch_nonce(&sender);
println!("Removing tx: {:?}", transaction.transaction);
// Remove from future // Remove from future
self.future.drop(&sender, &nonce); let order = self.future.drop(&sender, &nonce);
if order.is_some() {
// Remove from current self.update_future(&sender, current_nonce);
let order = self.current.drop(&sender, &nonce); // And now lets check if there is some chain of transactions in future
if order.is_none() { // that should be placed in current
self.move_matching_future_to_current(sender.clone(), current_nonce, current_nonce);
return; return;
} }
// Let's remove transactions where tx.nonce < current_nonce // Remove from current
// and if there are any future transactions matching current_nonce+1 - move to current let order = self.current.drop(&sender, &nonce);
let current_nonce = fetch_nonce(&sender); if order.is_some() {
// We will either move transaction to future or remove it completely // We will either move transaction to future or remove it completely
// so there will be no transactions from this sender in current // so there will be no transactions from this sender in current
self.last_nonces.remove(&sender); self.last_nonces.remove(&sender);
// This should move all current transactions to future and remove old transactions
self.move_all_to_future(&sender, current_nonce);
// And now lets check if there is some chain of transactions in future
// that should be placed in current. It should also update last_nonces.
self.move_matching_future_to_current(sender.clone(), current_nonce, current_nonce);
return;
}
}
fn update_future(&mut self, sender: &Address, current_nonce: U256) {
// We need to drain all transactions for current sender from future and reinsert them with updated height
let all_nonces_from_sender = match self.future.by_address.row(&sender) {
Some(row_map) => row_map.keys().cloned().collect::<Vec<U256>>(),
None => vec![],
};
for k in all_nonces_from_sender {
let order = self.future.drop(&sender, &k).unwrap();
self.future.insert(sender.clone(), k, order.update_height(k, current_nonce));
}
}
fn move_all_to_future(&mut self, sender: &Address, current_nonce: U256) {
let all_nonces_from_sender = match self.current.by_address.row(&sender) { let all_nonces_from_sender = match self.current.by_address.row(&sender) {
Some(row_map) => row_map.keys().cloned().collect::<Vec<U256>>(), Some(row_map) => row_map.keys().cloned().collect::<Vec<U256>>(),
None => vec![], None => vec![],
@ -266,21 +291,15 @@ impl TransactionQueue {
// Goes to future or is removed // Goes to future or is removed
let order = self.current.drop(&sender, &k).unwrap(); let order = self.current.drop(&sender, &k).unwrap();
if k >= current_nonce { if k >= current_nonce {
println!("Moving to future: {:?}", order);
self.future.insert(sender.clone(), k, order.update_height(k, current_nonce)); self.future.insert(sender.clone(), k, order.update_height(k, current_nonce));
} else { } else {
self.by_hash.remove(&order.hash); self.by_hash.remove(&order.hash);
} }
} }
self.future.enforce_limit(&self.by_hash); self.future.enforce_limit(&mut self.by_hash);
// And now lets check if there is some chain of transactions in future
// that should be placed in current
if let Some(new_current_top) = self.move_future_txs(sender.clone(), current_nonce - U256::one(), current_nonce) {
self.last_nonces.insert(sender, new_current_top);
}
} }
/// Returns top transactions from the queue /// Returns top transactions from the queue
pub fn top_transactions(&self, size: usize) -> Vec<SignedTransaction> { pub fn top_transactions(&self, size: usize) -> Vec<SignedTransaction> {
self.current.by_priority self.current.by_priority
@ -299,67 +318,88 @@ impl TransactionQueue {
self.last_nonces.clear(); self.last_nonces.clear();
} }
fn move_future_txs(&mut self, address: Address, current_nonce: U256, first_nonce: U256) -> Option<U256> { fn move_matching_future_to_current(&mut self, address: Address, mut current_nonce: U256, first_nonce: U256) {
println!("Moving from future for: {:?} base: {:?}", current_nonce, first_nonce);
let mut current_nonce = current_nonce + U256::one();
{ {
let by_nonce = self.future.by_address.row_mut(&address); let by_nonce = self.future.by_address.row_mut(&address);
if let None = by_nonce { if let None = by_nonce {
return None; return;
} }
let mut by_nonce = by_nonce.unwrap(); let mut by_nonce = by_nonce.unwrap();
while let Some(order) = by_nonce.remove(&current_nonce) { while let Some(order) = by_nonce.remove(&current_nonce) {
// remove also from priority and hash // remove also from priority and hash
self.future.by_priority.remove(&order); self.future.by_priority.remove(&order);
// Put to current // Put to current
println!("Moved: {:?}", order);
let order = order.update_height(current_nonce.clone(), first_nonce); let order = order.update_height(current_nonce.clone(), first_nonce);
self.current.insert(address.clone(), current_nonce, order); self.current.insert(address.clone(), current_nonce, order);
current_nonce = current_nonce + U256::one(); current_nonce = current_nonce + U256::one();
} }
} }
self.future.by_address.clear_if_empty(&address); self.future.by_address.clear_if_empty(&address);
// Returns last inserted nonce // Update last inserted nonce
Some(current_nonce - U256::one()) self.last_nonces.insert(address, current_nonce - U256::one());
} }
fn import_tx<T>(&mut self, tx: VerifiedTransaction, fetch_nonce: &T) fn import_tx<T>(&mut self, tx: VerifiedTransaction, fetch_nonce: &T)
where T: Fn(&Address) -> U256 { where T: Fn(&Address) -> U256 {
let nonce = tx.nonce();
let address = tx.sender();
if self.by_hash.get(&tx.hash()).is_some() {
// Transaction is already imported.
trace!(target: "sync", "Dropping already imported transaction with hash: {:?}", tx.hash());
return;
}
let address = tx.sender();
let nonce = tx.nonce();
let state_nonce = fetch_nonce(&address);
let next_nonce = self.last_nonces let next_nonce = self.last_nonces
.get(&address) .get(&address)
.cloned() .cloned()
.map_or_else(|| fetch_nonce(&address), |n| n + U256::one()); .map_or(state_nonce, |n| n + U256::one());
println!("Expected next: {:?}, got: {:?}", next_nonce, nonce);
// Check height // Check height
if nonce > next_nonce { if nonce > next_nonce {
let order = TransactionOrder::for_transaction(&tx, next_nonce);
// Insert to by_hash
self.by_hash.insert(tx.hash(), tx);
// We have a gap - put to future // We have a gap - put to future
self.future.insert(address, nonce, order); Self::replace_transaction(tx, next_nonce, &mut self.future, &mut self.by_hash);
self.future.enforce_limit(&self.by_hash); self.future.enforce_limit(&mut self.by_hash);
return; return;
} else if next_nonce > nonce { } else if nonce < state_nonce {
// Droping transaction // Droping transaction
trace!(target: "sync", "Dropping transaction with nonce: {} - expecting: {}", nonce, next_nonce);
return; return;
} }
let base_nonce = fetch_nonce(&address); let base_nonce = fetch_nonce(&address);
let order = TransactionOrder::for_transaction(&tx, base_nonce); Self::replace_transaction(tx, base_nonce.clone(), &mut self.current, &mut self.by_hash);
// Insert to by_hash self.last_nonces.insert(address.clone(), nonce);
self.by_hash.insert(tx.hash(), tx);
// Insert to current
self.current.insert(address.clone(), nonce, order);
// But maybe there are some more items waiting in future? // But maybe there are some more items waiting in future?
let new_last_nonce = self.move_future_txs(address.clone(), nonce, base_nonce); self.move_matching_future_to_current(address.clone(), nonce + U256::one(), base_nonce);
self.last_nonces.insert(address.clone(), new_last_nonce.unwrap_or(nonce)); self.current.enforce_limit(&mut self.by_hash);
// Enforce limit }
self.current.enforce_limit(&self.by_hash);
fn replace_transaction(tx: VerifiedTransaction, base_nonce: U256, set: &mut TransactionSet, by_hash: &mut HashMap<H256, VerifiedTransaction>) {
let order = TransactionOrder::for_transaction(&tx, base_nonce);
let hash = tx.hash();
let address = tx.sender();
let nonce = tx.nonce();
by_hash.insert(hash.clone(), tx);
if let Some(old) = set.insert(address, nonce, order.clone()) {
// There was already transaction in queue. Let's check which one should stay
let old_fee = old.gas_price;
let new_fee = order.gas_price;
if old_fee.cmp(&new_fee) == Ordering::Greater {
// Put back old transaction since it has greater priority (higher gas_price)
set.by_address.insert(address, nonce, old);
// and remove new one
set.by_priority.remove(&order);
by_hash.remove(&hash);
} else {
// Make sure we remove old transaction entirely
set.by_priority.remove(&old);
by_hash.remove(&old.hash);
}
}
} }
} }
@ -368,6 +408,7 @@ impl TransactionQueue {
mod test { mod test {
extern crate rustc_serialize; extern crate rustc_serialize;
use self::rustc_serialize::hex::FromHex; use self::rustc_serialize::hex::FromHex;
use std::ops::Deref;
use std::collections::{HashMap, BTreeSet}; use std::collections::{HashMap, BTreeSet};
use util::crypto::KeyPair; use util::crypto::KeyPair;
use util::numbers::{U256, Uint}; use util::numbers::{U256, Uint};
@ -418,7 +459,7 @@ mod test {
let (tx1, tx2) = new_txs(U256::from(1)); let (tx1, tx2) = new_txs(U256::from(1));
let tx1 = VerifiedTransaction::new(tx1); let tx1 = VerifiedTransaction::new(tx1);
let tx2 = VerifiedTransaction::new(tx2); let tx2 = VerifiedTransaction::new(tx2);
let by_hash = { let mut by_hash = {
let mut x = HashMap::new(); let mut x = HashMap::new();
let tx1 = VerifiedTransaction::new(tx1.transaction.clone()); let tx1 = VerifiedTransaction::new(tx1.transaction.clone());
let tx2 = VerifiedTransaction::new(tx2.transaction.clone()); let tx2 = VerifiedTransaction::new(tx2.transaction.clone());
@ -435,9 +476,10 @@ mod test {
assert_eq!(set.by_address.len(), 2); assert_eq!(set.by_address.len(), 2);
// when // when
set.enforce_limit(&by_hash); set.enforce_limit(&mut by_hash);
// then // then
assert_eq!(by_hash.len(), 1);
assert_eq!(set.by_priority.len(), 1); assert_eq!(set.by_priority.len(), 1);
assert_eq!(set.by_address.len(), 1); assert_eq!(set.by_address.len(), 1);
assert_eq!(set.by_priority.iter().next().unwrap().clone(), order1); assert_eq!(set.by_priority.iter().next().unwrap().clone(), order1);
@ -638,7 +680,26 @@ mod test {
} }
#[test] #[test]
fn should_accept_same_transaction_twice() { fn should_not_insert_same_transaction_twice() {
// given
let nonce = |a: &Address| default_nonce(a) + U256::one();
let mut txq = TransactionQueue::new();
let (_tx1, tx2) = new_txs(U256::from(1));
txq.add(tx2.clone(), &default_nonce);
assert_eq!(txq.status().future, 1);
assert_eq!(txq.status().pending, 0);
// when
txq.add(tx2.clone(), &nonce);
// then
let stats = txq.status();
assert_eq!(stats.future, 1);
assert_eq!(stats.pending, 0);
}
#[test]
fn should_accept_same_transaction_twice_if_removed() {
// given // given
let mut txq = TransactionQueue::new(); let mut txq = TransactionQueue::new();
let (tx1, tx2) = new_txs(U256::from(1)); let (tx1, tx2) = new_txs(U256::from(1));
@ -680,4 +741,76 @@ mod test {
assert_eq!(stats.pending, 2); assert_eq!(stats.pending, 2);
} }
#[test]
fn should_replace_same_transaction_when_has_higher_fee() {
// given
let mut txq = TransactionQueue::new();
let keypair = KeyPair::create().unwrap();
let tx = new_unsigned_tx(U256::from(123)).sign(&keypair.secret());
let tx2 = {
let mut tx2 = tx.deref().clone();
tx2.gas_price = U256::from(200);
tx2.sign(&keypair.secret())
};
// when
txq.add(tx, &default_nonce);
txq.add(tx2, &default_nonce);
// then
let stats = txq.status();
assert_eq!(stats.pending, 1);
assert_eq!(stats.future, 0);
assert_eq!(txq.top_transactions(1)[0].gas_price, U256::from(200));
}
#[test]
fn should_replace_same_transaction_when_importing_to_futures() {
// given
let mut txq = TransactionQueue::new();
let keypair = KeyPair::create().unwrap();
let tx0 = new_unsigned_tx(U256::from(123)).sign(&keypair.secret());
let tx1 = {
let mut tx1 = tx0.deref().clone();
tx1.nonce = U256::from(124);
tx1.sign(&keypair.secret())
};
let tx2 = {
let mut tx2 = tx1.deref().clone();
tx2.gas_price = U256::from(200);
tx2.sign(&keypair.secret())
};
// when
txq.add(tx1, &default_nonce);
txq.add(tx2, &default_nonce);
assert_eq!(txq.status().future, 1);
txq.add(tx0, &default_nonce);
// then
let stats = txq.status();
assert_eq!(stats.future, 0);
assert_eq!(stats.pending, 2);
assert_eq!(txq.top_transactions(2)[1].gas_price, U256::from(200));
}
#[test]
fn should_recalculate_height_when_removing_from_future() {
// given
let previous_nonce = |a: &Address| default_nonce(a) - U256::one();
let next_nonce = |a: &Address| default_nonce(a) + U256::one();
let mut txq = TransactionQueue::new();
let (tx1, tx2) = new_txs(U256::one());
txq.add(tx1.clone(), &previous_nonce);
txq.add(tx2, &previous_nonce);
assert_eq!(txq.status().future, 2);
// when
txq.remove(&tx1.hash(), &next_nonce);
// then
let stats = txq.status();
assert_eq!(stats.future, 0);
assert_eq!(stats.pending, 1);
}
} }

View File

@ -84,6 +84,7 @@ impl SecretStore {
let mut path = ::std::env::home_dir().expect("Failed to get home dir"); let mut path = ::std::env::home_dir().expect("Failed to get home dir");
path.push(".parity"); path.push(".parity");
path.push("keys"); path.push("keys");
::std::fs::create_dir_all(&path).expect("Should panic since it is critical to be able to access home dir");
Self::new_in(&path) Self::new_in(&path)
} }