Integrating TransactionQueue with client

This commit is contained in:
Tomasz Drwięga 2016-03-01 22:30:23 +01:00
parent 725e894f9b
commit 7565625ce0
6 changed files with 120 additions and 14 deletions

19
Cargo.lock generated
View File

@ -125,6 +125,14 @@ dependencies = [
"libc 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)",
] ]
[[package]]
name = "deque"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]] [[package]]
name = "docopt" name = "docopt"
version = "0.6.78" version = "0.6.78"
@ -259,6 +267,7 @@ dependencies = [
"heapsize 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", "heapsize 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)",
"rayon 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"rustc-serialize 0.3.18 (registry+https://github.com/rust-lang/crates.io-index)", "rustc-serialize 0.3.18 (registry+https://github.com/rust-lang/crates.io-index)",
"time 0.1.34 (registry+https://github.com/rust-lang/crates.io-index)", "time 0.1.34 (registry+https://github.com/rust-lang/crates.io-index)",
] ]
@ -629,6 +638,16 @@ dependencies = [
"libc 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)",
] ]
[[package]]
name = "rayon"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"deque 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"num_cpus 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]] [[package]]
name = "regex" name = "regex"
version = "0.1.54" version = "0.1.54"

View File

@ -123,6 +123,9 @@ pub trait BlockChainClient : Sync + Send {
/// Get block total difficulty. /// Get block total difficulty.
fn block_total_difficulty(&self, id: BlockId) -> Option<U256>; fn block_total_difficulty(&self, id: BlockId) -> Option<U256>;
/// Get address nonce.
fn nonce(&self, address: &Address) -> U256;
/// Get address code. /// Get address code.
fn code(&self, address: &Address) -> Option<Bytes>; fn code(&self, address: &Address) -> Option<Bytes>;
@ -445,6 +448,10 @@ impl BlockChainClient for Client {
Self::block_hash(&chain, id).and_then(|hash| chain.block_details(&hash)).map(|d| d.total_difficulty) Self::block_hash(&chain, id).and_then(|hash| chain.block_details(&hash)).map(|d| d.total_difficulty)
} }
fn nonce(&self, address: &Address) -> U256 {
self.state().nonce(address)
}
fn code(&self, address: &Address) -> Option<Bytes> { fn code(&self, address: &Address) -> Option<Bytes> {
self.state().code(address) self.state().code(address)
} }

View File

@ -17,6 +17,7 @@ time = "0.1.34"
rand = "0.3.13" rand = "0.3.13"
heapsize = "0.3" heapsize = "0.3"
rustc-serialize = "0.3" rustc-serialize = "0.3"
rayon = "0.3.1"
[features] [features]
default = [] default = []

View File

@ -30,14 +30,17 @@
/// ///
use util::*; use util::*;
use rayon::prelude::*;
use std::mem::{replace}; use std::mem::{replace};
use ethcore::views::{HeaderView}; use ethcore::views::{HeaderView, BlockView};
use ethcore::header::{BlockNumber, Header as BlockHeader}; use ethcore::header::{BlockNumber, Header as BlockHeader};
use ethcore::client::{BlockChainClient, BlockStatus, BlockId}; use ethcore::client::{BlockChainClient, BlockStatus, BlockId};
use range_collection::{RangeCollection, ToUsize, FromUsize}; use range_collection::{RangeCollection, ToUsize, FromUsize};
use ethcore::error::*; use ethcore::error::*;
use ethcore::block::Block; use ethcore::block::Block;
use ethcore::transaction::SignedTransaction;
use io::SyncIo; use io::SyncIo;
use transaction_queue::TransactionQueue;
use time; use time;
use super::SyncConfig; use super::SyncConfig;
@ -209,6 +212,8 @@ pub struct ChainSync {
max_download_ahead_blocks: usize, max_download_ahead_blocks: usize,
/// Network ID /// Network ID
network_id: U256, network_id: U256,
/// Transactions Queue
transaction_queue: Mutex<TransactionQueue>,
} }
type RlpResponseResult = Result<Option<(PacketId, RlpStream)>, PacketDecodeError>; type RlpResponseResult = Result<Option<(PacketId, RlpStream)>, PacketDecodeError>;
@ -234,6 +239,7 @@ impl ChainSync {
last_send_block_number: 0, last_send_block_number: 0,
max_download_ahead_blocks: max(MAX_HEADERS_TO_REQUEST, config.max_download_ahead_blocks), max_download_ahead_blocks: max(MAX_HEADERS_TO_REQUEST, config.max_download_ahead_blocks),
network_id: config.network_id, network_id: config.network_id,
transaction_queue: Mutex::new(TransactionQueue::new()),
} }
} }
@ -249,14 +255,14 @@ impl ChainSync {
blocks_total: match self.highest_block { Some(x) if x > self.starting_block => x - self.starting_block, _ => 0 }, blocks_total: match self.highest_block { Some(x) if x > self.starting_block => x - self.starting_block, _ => 0 },
num_peers: self.peers.len(), num_peers: self.peers.len(),
num_active_peers: self.peers.values().filter(|p| p.asking != PeerAsking::Nothing).count(), num_active_peers: self.peers.values().filter(|p| p.asking != PeerAsking::Nothing).count(),
mem_used: mem_used:
// TODO: https://github.com/servo/heapsize/pull/50 // TODO: https://github.com/servo/heapsize/pull/50
// self.downloading_hashes.heap_size_of_children() // self.downloading_hashes.heap_size_of_children()
//+ self.downloading_bodies.heap_size_of_children() //+ self.downloading_bodies.heap_size_of_children()
//+ self.downloading_hashes.heap_size_of_children() //+ self.downloading_hashes.heap_size_of_children()
self.headers.heap_size_of_children() self.headers.heap_size_of_children()
+ self.bodies.heap_size_of_children() + self.bodies.heap_size_of_children()
+ self.peers.heap_size_of_children() + self.peers.heap_size_of_children()
+ self.header_ids.heap_size_of_children(), + self.header_ids.heap_size_of_children(),
} }
} }
@ -292,6 +298,7 @@ impl ChainSync {
self.starting_block = 0; self.starting_block = 0;
self.highest_block = None; self.highest_block = None;
self.have_common_block = false; self.have_common_block = false;
self.transaction_queue.lock().unwrap().clear();
self.starting_block = io.chain().chain_info().best_block_number; self.starting_block = io.chain().chain_info().best_block_number;
self.state = SyncState::NotSynced; self.state = SyncState::NotSynced;
} }
@ -913,8 +920,16 @@ impl ChainSync {
} }
} }
/// Called when peer sends us new transactions /// Called when peer sends us new transactions
fn on_peer_transactions(&mut self, _io: &mut SyncIo, _peer_id: PeerId, _r: &UntrustedRlp) -> Result<(), PacketDecodeError> { fn on_peer_transactions(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> {
Ok(()) let chain = io.chain();
let item_count = r.item_count();
trace!(target: "sync", "{} -> Transactions ({} entries)", peer_id, item_count);
let fetch_latest_nonce = |a : &Address| chain.nonce(a);
for i in 0..item_count {
let tx: SignedTransaction = try!(r.val_at(i));
self.transaction_queue.lock().unwrap().add(tx, &fetch_latest_nonce);
}
Ok(())
} }
/// Send Status message /// Send Status message
@ -1242,6 +1257,34 @@ impl ChainSync {
} }
self.last_send_block_number = chain.best_block_number; self.last_send_block_number = chain.best_block_number;
} }
/// called when block is imported to chain, updates transactions queue
pub fn chain_new_blocks(&mut self, io: &SyncIo, good: &[H256], bad: &[H256]) {
fn fetch_transactions(chain: &BlockChainClient, hash: &H256) -> Vec<SignedTransaction> {
let block = chain
.block(BlockId::Hash(hash.clone()))
.expect("Expected in-chain blocks.");
let block = BlockView::new(&block);
block.transactions()
};
let chain = io.chain();
let good = good.par_iter().map(|h| fetch_transactions(chain, h));
let bad = bad.par_iter().map(|h| fetch_transactions(chain, h));
good.for_each(|txs| {
let mut transaction_queue = self.transaction_queue.lock().unwrap();
transaction_queue.remove_all(&txs);
});
bad.for_each(|txs| {
// populate sender
for tx in &txs {
let _sender = tx.sender();
}
let mut transaction_queue = self.transaction_queue.lock().unwrap();
transaction_queue.add_all(txs, |a| chain.nonce(a));
});
}
} }
#[cfg(test)] #[cfg(test)]
@ -1571,6 +1614,32 @@ mod tests {
assert!(result.is_ok()); assert!(result.is_ok());
} }
#[test]
fn should_add_transactions_to_queue() {
// given
let mut client = TestBlockChainClient::new();
// client.add_blocks(98, BlocksWith::Uncle);
// client.add_blocks(1, BlocksWith::UncleAndTransaction);
// client.add_blocks(1, BlocksWith::Transaction);
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5));
let good_blocks = vec![client.block_hash_delta_minus(2)];
let bad_blocks = vec![client.block_hash_delta_minus(1)];
let mut queue = VecDeque::new();
let io = TestIo::new(&mut client, &mut queue, None);
// when
sync.chain_new_blocks(&io, &[], &good_blocks);
assert_eq!(sync.transaction_queue.lock().unwrap().status().pending, 1);
sync.chain_new_blocks(&io, &good_blocks, &bad_blocks);
// then
let status = sync.transaction_queue.lock().unwrap().status();
assert_eq!(status.pending, 1);
assert_eq!(status.future, 0);
}
#[test] #[test]
fn returns_requested_block_headers() { fn returns_requested_block_headers() {
let mut client = TestBlockChainClient::new(); let mut client = TestBlockChainClient::new();

View File

@ -54,6 +54,7 @@ extern crate ethcore;
extern crate env_logger; extern crate env_logger;
extern crate time; extern crate time;
extern crate rand; extern crate rand;
extern crate rayon;
#[macro_use] #[macro_use]
extern crate heapsize; extern crate heapsize;
@ -70,8 +71,7 @@ use io::NetSyncIo;
mod chain; mod chain;
mod io; mod io;
mod range_collection; mod range_collection;
// TODO [todr] Made public to suppress dead code warnings mod transaction_queue;
pub mod transaction_queue;
#[cfg(test)] #[cfg(test)]
mod tests; mod tests;
@ -153,8 +153,14 @@ impl NetworkProtocolHandler<SyncMessage> for EthSync {
} }
fn message(&self, io: &NetworkContext<SyncMessage>, message: &SyncMessage) { fn message(&self, io: &NetworkContext<SyncMessage>, message: &SyncMessage) {
if let SyncMessage::BlockVerified = *message { match *message {
self.sync.write().unwrap().chain_blocks_verified(&mut NetSyncIo::new(io, self.chain.deref())); SyncMessage::BlockVerified => {
self.sync.write().unwrap().chain_blocks_verified(&mut NetSyncIo::new(io, self.chain.deref()));
},
SyncMessage::NewChainBlocks { ref good, ref bad } => {
let sync_io = NetSyncIo::new(io, self.chain.deref());
self.sync.write().unwrap().chain_new_blocks(&sync_io, good, bad);
}
} }
} }
} }

View File

@ -105,6 +105,10 @@ impl BlockChainClient for TestBlockChainClient {
Some(U256::zero()) Some(U256::zero())
} }
fn nonce(&self, _address: &Address) -> U256 {
U256::zero()
}
fn code(&self, _address: &Address) -> Option<Bytes> { fn code(&self, _address: &Address) -> Option<Bytes> {
unimplemented!(); unimplemented!();
} }