Revert "Revert "Transaction Queue integration""

This reverts commit d330f0b7b7.

Conflicts:
	sync/src/transaction_queue.rs
This commit is contained in:
Tomasz Drwięga 2016-03-05 16:46:04 +01:00
parent 5ac7b9f812
commit cc3839ae57
9 changed files with 217 additions and 76 deletions

19
Cargo.lock generated
View File

@ -146,6 +146,14 @@ dependencies = [
"libc 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "deque"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "docopt"
version = "0.6.78"
@ -285,6 +293,7 @@ dependencies = [
"heapsize 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)",
"rayon 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"rustc-serialize 0.3.18 (registry+https://github.com/rust-lang/crates.io-index)",
"time 0.1.34 (registry+https://github.com/rust-lang/crates.io-index)",
]
@ -655,6 +664,16 @@ dependencies = [
"libc 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "rayon"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"deque 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"num_cpus 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "regex"
version = "0.1.54"

View File

@ -138,6 +138,9 @@ pub trait BlockChainClient : Sync + Send {
/// Get block total difficulty.
fn block_total_difficulty(&self, id: BlockId) -> Option<U256>;
/// Get address nonce.
fn nonce(&self, address: &Address) -> U256;
/// Get block hash.
fn block_hash(&self, id: BlockId) -> Option<H256>;
@ -365,18 +368,14 @@ impl<V> Client<V> where V: Verifier {
bad_blocks.insert(header.hash());
continue;
}
let closed_block = self.check_and_close_block(&block);
if let Err(_) = closed_block {
bad_blocks.insert(header.hash());
break;
}
// Insert block
let closed_block = closed_block.unwrap();
self.chain.write().unwrap().insert_block(&block.bytes, closed_block.block().receipts().clone());
good_blocks.push(header.hash());
// Are we committing an era?
let ancient = if header.number() >= HISTORY {
let n = header.number() - HISTORY;
let chain = self.chain.read().unwrap();
@ -386,10 +385,16 @@ impl<V> Client<V> where V: Verifier {
};
// Commit results
let closed_block = closed_block.unwrap();
let receipts = closed_block.block().receipts().clone();
closed_block.drain()
.commit(header.number(), &header.hash(), ancient)
.expect("State DB commit failed.");
// And update the chain
self.chain.write().unwrap()
.insert_block(&block.bytes, receipts);
self.report.write().unwrap().accrue_block(&block);
trace!(target: "client", "Imported #{} ({})", header.number(), header.hash());
}
@ -408,7 +413,7 @@ impl<V> Client<V> where V: Verifier {
if !good_blocks.is_empty() && block_queue.queue_info().is_empty() {
io.send(NetworkIoMessage::User(SyncMessage::NewChainBlocks {
good: good_blocks,
bad: bad_blocks,
retracted: bad_blocks,
})).unwrap();
}
}
@ -581,6 +586,10 @@ impl<V> BlockChainClient for Client<V> where V: Verifier {
Self::block_hash(&chain, id).and_then(|hash| chain.block_details(&hash)).map(|d| d.total_difficulty)
}
fn nonce(&self, address: &Address) -> U256 {
self.state().nonce(address)
}
fn block_hash(&self, id: BlockId) -> Option<H256> {
let chain = self.chain.read().unwrap();
Self::block_hash(&chain, id)

View File

@ -30,7 +30,7 @@ pub enum SyncMessage {
/// Hashes of blocks imported to blockchain
good: Vec<H256>,
/// Hashes of blocks not imported to blockchain
bad: Vec<H256>,
retracted: Vec<H256>,
},
/// A block is ready
BlockVerified,

View File

@ -17,6 +17,7 @@ time = "0.1.34"
rand = "0.3.13"
heapsize = "0.3"
rustc-serialize = "0.3"
rayon = "0.3.1"
[features]
default = []

View File

@ -30,14 +30,17 @@
///
use util::*;
use rayon::prelude::*;
use std::mem::{replace};
use ethcore::views::{HeaderView};
use ethcore::views::{HeaderView, BlockView};
use ethcore::header::{BlockNumber, Header as BlockHeader};
use ethcore::client::{BlockChainClient, BlockStatus, BlockId, BlockChainInfo};
use range_collection::{RangeCollection, ToUsize, FromUsize};
use ethcore::error::*;
use ethcore::block::Block;
use ethcore::transaction::SignedTransaction;
use io::SyncIo;
use transaction_queue::TransactionQueue;
use time;
use super::SyncConfig;
@ -209,6 +212,8 @@ pub struct ChainSync {
max_download_ahead_blocks: usize,
/// Network ID
network_id: U256,
/// Transactions Queue
transaction_queue: Mutex<TransactionQueue>,
}
type RlpResponseResult = Result<Option<(PacketId, RlpStream)>, PacketDecodeError>;
@ -234,6 +239,7 @@ impl ChainSync {
last_send_block_number: 0,
max_download_ahead_blocks: max(MAX_HEADERS_TO_REQUEST, config.max_download_ahead_blocks),
network_id: config.network_id,
transaction_queue: Mutex::new(TransactionQueue::new()),
}
}
@ -292,6 +298,7 @@ impl ChainSync {
self.starting_block = 0;
self.highest_block = None;
self.have_common_block = false;
self.transaction_queue.lock().unwrap().clear();
self.starting_block = io.chain().chain_info().best_block_number;
self.state = SyncState::NotSynced;
}
@ -484,7 +491,7 @@ impl ChainSync {
trace!(target: "sync", "New block already queued {:?}", h);
},
Ok(_) => {
if self.current_base_block() < header.number {
if self.current_base_block() < header.number {
self.last_imported_block = Some(header.number);
self.remove_downloaded_blocks(header.number);
}
@ -921,8 +928,16 @@ impl ChainSync {
}
}
/// Called when peer sends us new transactions
fn on_peer_transactions(&mut self, _io: &mut SyncIo, _peer_id: PeerId, _r: &UntrustedRlp) -> Result<(), PacketDecodeError> {
Ok(())
fn on_peer_transactions(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> {
let chain = io.chain();
let item_count = r.item_count();
trace!(target: "sync", "{} -> Transactions ({} entries)", peer_id, item_count);
let fetch_latest_nonce = |a : &Address| chain.nonce(a);
for i in 0..item_count {
let tx: SignedTransaction = try!(r.val_at(i));
self.transaction_queue.lock().unwrap().add(tx, &fetch_latest_nonce);
}
Ok(())
}
/// Send Status message
@ -1248,6 +1263,37 @@ impl ChainSync {
}
self.last_send_block_number = chain.best_block_number;
}
/// called when block is imported to chain, updates transactions queue
pub fn chain_new_blocks(&mut self, io: &SyncIo, good: &[H256], retracted: &[H256]) {
fn fetch_transactions(chain: &BlockChainClient, hash: &H256) -> Vec<SignedTransaction> {
let block = chain
.block(BlockId::Hash(hash.clone()))
// Client should send message after commit to db and inserting to chain.
.expect("Expected in-chain blocks.");
let block = BlockView::new(&block);
block.transactions()
}
let chain = io.chain();
let good = good.par_iter().map(|h| fetch_transactions(chain, h));
let retracted = retracted.par_iter().map(|h| fetch_transactions(chain, h));
good.for_each(|txs| {
let mut transaction_queue = self.transaction_queue.lock().unwrap();
let hashes = txs.iter().map(|tx| tx.hash()).collect::<Vec<H256>>();
transaction_queue.remove_all(&hashes, |a| chain.nonce(a));
});
retracted.for_each(|txs| {
// populate sender
for tx in &txs {
let _sender = tx.sender();
}
let mut transaction_queue = self.transaction_queue.lock().unwrap();
transaction_queue.add_all(txs, |a| chain.nonce(a));
});
}
}
#[cfg(test)]
@ -1388,7 +1434,7 @@ mod tests {
#[test]
fn finds_lagging_peers() {
let mut client = TestBlockChainClient::new();
client.add_blocks(100, false);
client.add_blocks(100, EachBlockWith::Uncle);
let mut queue = VecDeque::new();
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(10));
let chain_info = client.chain_info();
@ -1402,7 +1448,7 @@ mod tests {
#[test]
fn calculates_tree_for_lagging_peer() {
let mut client = TestBlockChainClient::new();
client.add_blocks(15, false);
client.add_blocks(15, EachBlockWith::Uncle);
let start = client.block_hash_delta_minus(4);
let end = client.block_hash_delta_minus(2);
@ -1419,7 +1465,7 @@ mod tests {
#[test]
fn sends_new_hashes_to_lagging_peer() {
let mut client = TestBlockChainClient::new();
client.add_blocks(100, false);
client.add_blocks(100, EachBlockWith::Uncle);
let mut queue = VecDeque::new();
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5));
let chain_info = client.chain_info();
@ -1438,7 +1484,7 @@ mod tests {
#[test]
fn sends_latest_block_to_lagging_peer() {
let mut client = TestBlockChainClient::new();
client.add_blocks(100, false);
client.add_blocks(100, EachBlockWith::Uncle);
let mut queue = VecDeque::new();
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5));
let chain_info = client.chain_info();
@ -1456,7 +1502,7 @@ mod tests {
#[test]
fn handles_peer_new_block_mallformed() {
let mut client = TestBlockChainClient::new();
client.add_blocks(10, false);
client.add_blocks(10, EachBlockWith::Uncle);
let block_data = get_dummy_block(11, client.chain_info().best_block_hash);
@ -1474,7 +1520,7 @@ mod tests {
#[test]
fn handles_peer_new_block() {
let mut client = TestBlockChainClient::new();
client.add_blocks(10, false);
client.add_blocks(10, EachBlockWith::Uncle);
let block_data = get_dummy_blocks(11, client.chain_info().best_block_hash);
@ -1492,7 +1538,7 @@ mod tests {
#[test]
fn handles_peer_new_block_empty() {
let mut client = TestBlockChainClient::new();
client.add_blocks(10, false);
client.add_blocks(10, EachBlockWith::Uncle);
let mut queue = VecDeque::new();
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5));
let mut io = TestIo::new(&mut client, &mut queue, None);
@ -1508,7 +1554,7 @@ mod tests {
#[test]
fn handles_peer_new_hashes() {
let mut client = TestBlockChainClient::new();
client.add_blocks(10, false);
client.add_blocks(10, EachBlockWith::Uncle);
let mut queue = VecDeque::new();
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5));
let mut io = TestIo::new(&mut client, &mut queue, None);
@ -1524,7 +1570,7 @@ mod tests {
#[test]
fn handles_peer_new_hashes_empty() {
let mut client = TestBlockChainClient::new();
client.add_blocks(10, false);
client.add_blocks(10, EachBlockWith::Uncle);
let mut queue = VecDeque::new();
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5));
let mut io = TestIo::new(&mut client, &mut queue, None);
@ -1542,7 +1588,7 @@ mod tests {
#[test]
fn hashes_rlp_mutually_acceptable() {
let mut client = TestBlockChainClient::new();
client.add_blocks(100, false);
client.add_blocks(100, EachBlockWith::Uncle);
let mut queue = VecDeque::new();
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5));
let chain_info = client.chain_info();
@ -1560,7 +1606,7 @@ mod tests {
#[test]
fn block_rlp_mutually_acceptable() {
let mut client = TestBlockChainClient::new();
client.add_blocks(100, false);
client.add_blocks(100, EachBlockWith::Uncle);
let mut queue = VecDeque::new();
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5));
let chain_info = client.chain_info();
@ -1573,10 +1619,37 @@ mod tests {
assert!(result.is_ok());
}
#[test]
fn should_add_transactions_to_queue() {
// given
let mut client = TestBlockChainClient::new();
client.add_blocks(98, EachBlockWith::Uncle);
client.add_blocks(1, EachBlockWith::UncleAndTransaction);
client.add_blocks(1, EachBlockWith::Transaction);
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5));
let good_blocks = vec![client.block_hash_delta_minus(2)];
let retracted_blocks = vec![client.block_hash_delta_minus(1)];
let mut queue = VecDeque::new();
let io = TestIo::new(&mut client, &mut queue, None);
// when
sync.chain_new_blocks(&io, &[], &good_blocks);
assert_eq!(sync.transaction_queue.lock().unwrap().status().future, 0);
assert_eq!(sync.transaction_queue.lock().unwrap().status().pending, 1);
sync.chain_new_blocks(&io, &good_blocks, &retracted_blocks);
// then
let status = sync.transaction_queue.lock().unwrap().status();
assert_eq!(status.pending, 1);
assert_eq!(status.future, 0);
}
#[test]
fn returns_requested_block_headers() {
let mut client = TestBlockChainClient::new();
client.add_blocks(100, false);
client.add_blocks(100, EachBlockWith::Uncle);
let mut queue = VecDeque::new();
let io = TestIo::new(&mut client, &mut queue, None);
@ -1600,7 +1673,7 @@ mod tests {
#[test]
fn returns_requested_block_headers_reverse() {
let mut client = TestBlockChainClient::new();
client.add_blocks(100, false);
client.add_blocks(100, EachBlockWith::Uncle);
let mut queue = VecDeque::new();
let io = TestIo::new(&mut client, &mut queue, None);

View File

@ -54,6 +54,7 @@ extern crate ethcore;
extern crate env_logger;
extern crate time;
extern crate rand;
extern crate rayon;
#[macro_use]
extern crate heapsize;
@ -70,8 +71,7 @@ use io::NetSyncIo;
mod chain;
mod io;
mod range_collection;
// TODO [todr] Made public to suppress dead code warnings
pub mod transaction_queue;
mod transaction_queue;
#[cfg(test)]
mod tests;
@ -153,8 +153,14 @@ impl NetworkProtocolHandler<SyncMessage> for EthSync {
}
fn message(&self, io: &NetworkContext<SyncMessage>, message: &SyncMessage) {
if let SyncMessage::BlockVerified = *message {
self.sync.write().unwrap().chain_blocks_verified(&mut NetSyncIo::new(io, self.chain.deref()));
match *message {
SyncMessage::BlockVerified => {
self.sync.write().unwrap().chain_blocks_verified(&mut NetSyncIo::new(io, self.chain.deref()));
},
SyncMessage::NewChainBlocks { ref good, ref retracted } => {
let sync_io = NetSyncIo::new(io, self.chain.deref());
self.sync.write().unwrap().chain_new_blocks(&sync_io, good, retracted);
}
}
}
}

View File

@ -24,8 +24,8 @@ use super::helpers::*;
fn two_peers() {
::env_logger::init().ok();
let mut net = TestNet::new(3);
net.peer_mut(1).chain.add_blocks(1000, false);
net.peer_mut(2).chain.add_blocks(1000, false);
net.peer_mut(1).chain.add_blocks(1000, EachBlockWith::Uncle);
net.peer_mut(2).chain.add_blocks(1000, EachBlockWith::Uncle);
net.sync();
assert!(net.peer(0).chain.block(BlockId::Number(1000)).is_some());
assert_eq!(net.peer(0).chain.blocks.read().unwrap().deref(), net.peer(1).chain.blocks.read().unwrap().deref());
@ -35,8 +35,8 @@ fn two_peers() {
fn status_after_sync() {
::env_logger::init().ok();
let mut net = TestNet::new(3);
net.peer_mut(1).chain.add_blocks(1000, false);
net.peer_mut(2).chain.add_blocks(1000, false);
net.peer_mut(1).chain.add_blocks(1000, EachBlockWith::Uncle);
net.peer_mut(2).chain.add_blocks(1000, EachBlockWith::Uncle);
net.sync();
let status = net.peer(0).sync.status();
assert_eq!(status.state, SyncState::Idle);
@ -45,8 +45,8 @@ fn status_after_sync() {
#[test]
fn takes_few_steps() {
let mut net = TestNet::new(3);
net.peer_mut(1).chain.add_blocks(100, false);
net.peer_mut(2).chain.add_blocks(100, false);
net.peer_mut(1).chain.add_blocks(100, EachBlockWith::Uncle);
net.peer_mut(2).chain.add_blocks(100, EachBlockWith::Uncle);
let total_steps = net.sync();
assert!(total_steps < 7);
}
@ -56,8 +56,9 @@ fn empty_blocks() {
::env_logger::init().ok();
let mut net = TestNet::new(3);
for n in 0..200 {
net.peer_mut(1).chain.add_blocks(5, n % 2 == 0);
net.peer_mut(2).chain.add_blocks(5, n % 2 == 0);
let with = if n % 2 == 0 { EachBlockWith::Nothing } else { EachBlockWith::Uncle };
net.peer_mut(1).chain.add_blocks(5, with.clone());
net.peer_mut(2).chain.add_blocks(5, with);
}
net.sync();
assert!(net.peer(0).chain.block(BlockId::Number(1000)).is_some());
@ -68,14 +69,14 @@ fn empty_blocks() {
fn forked() {
::env_logger::init().ok();
let mut net = TestNet::new(3);
net.peer_mut(0).chain.add_blocks(300, false);
net.peer_mut(1).chain.add_blocks(300, false);
net.peer_mut(2).chain.add_blocks(300, false);
net.peer_mut(0).chain.add_blocks(100, true); //fork
net.peer_mut(1).chain.add_blocks(200, false);
net.peer_mut(2).chain.add_blocks(200, false);
net.peer_mut(1).chain.add_blocks(100, false); //fork between 1 and 2
net.peer_mut(2).chain.add_blocks(10, true);
net.peer_mut(0).chain.add_blocks(300, EachBlockWith::Uncle);
net.peer_mut(1).chain.add_blocks(300, EachBlockWith::Uncle);
net.peer_mut(2).chain.add_blocks(300, EachBlockWith::Uncle);
net.peer_mut(0).chain.add_blocks(100, EachBlockWith::Nothing); //fork
net.peer_mut(1).chain.add_blocks(200, EachBlockWith::Uncle);
net.peer_mut(2).chain.add_blocks(200, EachBlockWith::Uncle);
net.peer_mut(1).chain.add_blocks(100, EachBlockWith::Uncle); //fork between 1 and 2
net.peer_mut(2).chain.add_blocks(10, EachBlockWith::Nothing);
// peer 1 has the best chain of 601 blocks
let peer1_chain = net.peer(1).chain.numbers.read().unwrap().clone();
net.sync();
@ -87,8 +88,8 @@ fn forked() {
#[test]
fn restart() {
let mut net = TestNet::new(3);
net.peer_mut(1).chain.add_blocks(1000, false);
net.peer_mut(2).chain.add_blocks(1000, false);
net.peer_mut(1).chain.add_blocks(1000, EachBlockWith::Uncle);
net.peer_mut(2).chain.add_blocks(1000, EachBlockWith::Uncle);
net.sync_steps(8);
@ -109,8 +110,8 @@ fn status_empty() {
#[test]
fn status_packet() {
let mut net = TestNet::new(2);
net.peer_mut(0).chain.add_blocks(100, false);
net.peer_mut(1).chain.add_blocks(1, false);
net.peer_mut(0).chain.add_blocks(100, EachBlockWith::Uncle);
net.peer_mut(1).chain.add_blocks(1, EachBlockWith::Uncle);
net.start();
@ -123,10 +124,10 @@ fn status_packet() {
#[test]
fn propagate_hashes() {
let mut net = TestNet::new(6);
net.peer_mut(1).chain.add_blocks(10, false);
net.peer_mut(1).chain.add_blocks(10, EachBlockWith::Uncle);
net.sync();
net.peer_mut(0).chain.add_blocks(10, false);
net.peer_mut(0).chain.add_blocks(10, EachBlockWith::Uncle);
net.sync();
net.trigger_block_verified(0); //first event just sets the marker
net.trigger_block_verified(0);
@ -149,10 +150,10 @@ fn propagate_hashes() {
#[test]
fn propagate_blocks() {
let mut net = TestNet::new(2);
net.peer_mut(1).chain.add_blocks(10, false);
net.peer_mut(1).chain.add_blocks(10, EachBlockWith::Uncle);
net.sync();
net.peer_mut(0).chain.add_blocks(10, false);
net.peer_mut(0).chain.add_blocks(10, EachBlockWith::Uncle);
net.trigger_block_verified(0); //first event just sets the marker
net.trigger_block_verified(0);
@ -164,7 +165,7 @@ fn propagate_blocks() {
#[test]
fn restart_on_malformed_block() {
let mut net = TestNet::new(2);
net.peer_mut(1).chain.add_blocks(10, false);
net.peer_mut(1).chain.add_blocks(10, EachBlockWith::Uncle);
net.peer_mut(1).chain.corrupt_block(6);
net.sync_steps(10);

View File

@ -22,7 +22,7 @@ use io::SyncIo;
use chain::ChainSync;
use ::SyncConfig;
use ethcore::receipt::Receipt;
use ethcore::transaction::LocalizedTransaction;
use ethcore::transaction::{LocalizedTransaction, Transaction, Action};
use ethcore::filter::Filter;
use ethcore::log_entry::LocalizedLogEntry;
@ -34,6 +34,14 @@ pub struct TestBlockChainClient {
pub difficulty: RwLock<U256>,
}
#[derive(Clone)]
pub enum EachBlockWith {
Nothing,
Uncle,
Transaction,
UncleAndTransaction
}
impl TestBlockChainClient {
pub fn new() -> TestBlockChainClient {
@ -44,30 +52,53 @@ impl TestBlockChainClient {
last_hash: RwLock::new(H256::new()),
difficulty: RwLock::new(From::from(0)),
};
client.add_blocks(1, true); // add genesis block
client.add_blocks(1, EachBlockWith::Nothing); // add genesis block
client.genesis_hash = client.last_hash.read().unwrap().clone();
client
}
pub fn add_blocks(&mut self, count: usize, empty: bool) {
pub fn add_blocks(&mut self, count: usize, with: EachBlockWith) {
let len = self.numbers.read().unwrap().len();
for n in len..(len + count) {
let mut header = BlockHeader::new();
header.difficulty = From::from(n);
header.parent_hash = self.last_hash.read().unwrap().clone();
header.number = n as BlockNumber;
let mut uncles = RlpStream::new_list(if empty {0} else {1});
if !empty {
let mut uncle_header = BlockHeader::new();
uncle_header.difficulty = From::from(n);
uncle_header.parent_hash = self.last_hash.read().unwrap().clone();
uncle_header.number = n as BlockNumber;
uncles.append(&uncle_header);
header.uncles_hash = uncles.as_raw().sha3();
}
let uncles = match with {
EachBlockWith::Uncle | EachBlockWith::UncleAndTransaction => {
let mut uncles = RlpStream::new_list(1);
let mut uncle_header = BlockHeader::new();
uncle_header.difficulty = From::from(n);
uncle_header.parent_hash = self.last_hash.read().unwrap().clone();
uncle_header.number = n as BlockNumber;
uncles.append(&uncle_header);
header.uncles_hash = uncles.as_raw().sha3();
uncles
},
_ => RlpStream::new_list(0)
};
let txs = match with {
EachBlockWith::Transaction | EachBlockWith::UncleAndTransaction => {
let mut txs = RlpStream::new_list(1);
let keypair = KeyPair::create().unwrap();
let tx = Transaction {
action: Action::Create,
value: U256::from(100),
data: "3331600055".from_hex().unwrap(),
gas: U256::from(100_000),
gas_price: U256::one(),
nonce: U256::zero()
};
let signed_tx = tx.sign(&keypair.secret());
txs.append(&signed_tx);
txs.out()
},
_ => rlp::NULL_RLP.to_vec()
};
let mut rlp = RlpStream::new_list(3);
rlp.append(&header);
rlp.append_raw(&rlp::NULL_RLP, 1);
rlp.append_raw(&txs, 1);
rlp.append_raw(uncles.as_raw(), 1);
self.import_block(rlp.as_raw().to_vec()).unwrap();
}
@ -109,6 +140,10 @@ impl BlockChainClient for TestBlockChainClient {
unimplemented!();
}
fn nonce(&self, _address: &Address) -> U256 {
U256::zero()
}
fn code(&self, _address: &Address) -> Option<Bytes> {
unimplemented!();
}

View File

@ -221,19 +221,19 @@ impl TransactionQueue {
/// Removes all transactions identified by hashes given in slice
///
/// If gap is introduced marks subsequent transactions as future
pub fn remove_all<T>(&mut self, txs: &[H256], fetch_nonce: T)
pub fn remove_all<T>(&mut self, transaction_hashes: &[H256], fetch_nonce: T)
where T: Fn(&Address) -> U256 {
for tx in txs {
self.remove(&tx, &fetch_nonce);
for hash in transaction_hashes {
self.remove(&hash, &fetch_nonce);
}
}
/// Removes transaction identified by hashes from queue.
///
/// If gap is introduced marks subsequent transactions as future
pub fn remove<T>(&mut self, hash: &H256, fetch_nonce: &T)
pub fn remove<T>(&mut self, transaction_hash: &H256, fetch_nonce: &T)
where T: Fn(&Address) -> U256 {
let transaction = self.by_hash.remove(hash);
let transaction = self.by_hash.remove(transaction_hash);
if transaction.is_none() {
// We don't know this transaction
return;
@ -244,7 +244,6 @@ impl TransactionQueue {
let nonce = transaction.nonce();
let current_nonce = fetch_nonce(&sender);
println!("Removing tx: {:?}", transaction.transaction);
// Remove from future
let order = self.future.drop(&sender, &nonce);
if order.is_some() {
@ -292,7 +291,6 @@ impl TransactionQueue {
// Goes to future or is removed
let order = self.current.drop(&sender, &k).unwrap();
if k >= current_nonce {
println!("Moving to future: {:?}", order);
self.future.insert(sender.clone(), k, order.update_height(k, current_nonce));
} else {
self.by_hash.remove(&order.hash);
@ -302,7 +300,7 @@ impl TransactionQueue {
// And now lets check if there is some chain of transactions in future
// that should be placed in current
if let Some(new_current_top) = self.move_future_txs(sender.clone(), current_nonce - U256::one(), current_nonce) {
if let Some(new_current_top) = self.move_future_txs(sender.clone(), current_nonce, current_nonce) {
self.last_nonces.insert(sender, new_current_top);
}
}
@ -337,7 +335,6 @@ impl TransactionQueue {
// remove also from priority and hash
self.future.by_priority.remove(&order);
// Put to current
println!("Moved: {:?}", order);
let order = order.update_height(current_nonce.clone(), first_nonce);
self.current.insert(address.clone(), current_nonce, order);
current_nonce = current_nonce + U256::one();
@ -366,7 +363,6 @@ impl TransactionQueue {
.cloned()
.map_or(state_nonce, |n| n + U256::one());
println!("Expected next: {:?}, got: {:?}", next_nonce, nonce);
// Check height
if nonce > next_nonce {
// We have a gap - put to future
@ -375,6 +371,7 @@ impl TransactionQueue {
return;
} else if nonce < state_nonce {
// Droping transaction
trace!(target: "sync", "Dropping transaction with nonce: {} - expecting: {}", nonce, next_nonce);
return;
}