Merge pull request #607 from ethcore/tx_queue_integration
Transaction Queue Integration
This commit is contained in:
commit
50c8d7f633
19
Cargo.lock
generated
19
Cargo.lock
generated
@ -147,6 +147,14 @@ dependencies = [
|
|||||||
"libc 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)",
|
"libc 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "deque"
|
||||||
|
version = "0.3.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
dependencies = [
|
||||||
|
"rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "docopt"
|
name = "docopt"
|
||||||
version = "0.6.78"
|
version = "0.6.78"
|
||||||
@ -286,6 +294,7 @@ dependencies = [
|
|||||||
"heapsize 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
|
"heapsize 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
"log 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)",
|
"log 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
"rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)",
|
"rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
|
"rayon 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
"rustc-serialize 0.3.18 (registry+https://github.com/rust-lang/crates.io-index)",
|
"rustc-serialize 0.3.18 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
"time 0.1.34 (registry+https://github.com/rust-lang/crates.io-index)",
|
"time 0.1.34 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
]
|
]
|
||||||
@ -656,6 +665,16 @@ dependencies = [
|
|||||||
"libc 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)",
|
"libc 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "rayon"
|
||||||
|
version = "0.3.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
dependencies = [
|
||||||
|
"deque 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
|
"num_cpus 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
|
"rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "regex"
|
name = "regex"
|
||||||
version = "0.1.54"
|
version = "0.1.54"
|
||||||
|
@ -141,6 +141,9 @@ pub trait BlockChainClient : Sync + Send {
|
|||||||
/// Get block total difficulty.
|
/// Get block total difficulty.
|
||||||
fn block_total_difficulty(&self, id: BlockId) -> Option<U256>;
|
fn block_total_difficulty(&self, id: BlockId) -> Option<U256>;
|
||||||
|
|
||||||
|
/// Get address nonce.
|
||||||
|
fn nonce(&self, address: &Address) -> U256;
|
||||||
|
|
||||||
/// Get block hash.
|
/// Get block hash.
|
||||||
fn block_hash(&self, id: BlockId) -> Option<H256>;
|
fn block_hash(&self, id: BlockId) -> Option<H256>;
|
||||||
|
|
||||||
@ -370,18 +373,14 @@ impl<V> Client<V> where V: Verifier {
|
|||||||
bad_blocks.insert(header.hash());
|
bad_blocks.insert(header.hash());
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
let closed_block = self.check_and_close_block(&block);
|
let closed_block = self.check_and_close_block(&block);
|
||||||
if let Err(_) = closed_block {
|
if let Err(_) = closed_block {
|
||||||
bad_blocks.insert(header.hash());
|
bad_blocks.insert(header.hash());
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Insert block
|
|
||||||
let closed_block = closed_block.unwrap();
|
|
||||||
self.chain.write().unwrap().insert_block(&block.bytes, closed_block.block().receipts().clone());
|
|
||||||
good_blocks.push(header.hash());
|
good_blocks.push(header.hash());
|
||||||
|
|
||||||
|
// Are we committing an era?
|
||||||
let ancient = if header.number() >= HISTORY {
|
let ancient = if header.number() >= HISTORY {
|
||||||
let n = header.number() - HISTORY;
|
let n = header.number() - HISTORY;
|
||||||
let chain = self.chain.read().unwrap();
|
let chain = self.chain.read().unwrap();
|
||||||
@ -391,10 +390,16 @@ impl<V> Client<V> where V: Verifier {
|
|||||||
};
|
};
|
||||||
|
|
||||||
// Commit results
|
// Commit results
|
||||||
|
let closed_block = closed_block.unwrap();
|
||||||
|
let receipts = closed_block.block().receipts().clone();
|
||||||
closed_block.drain()
|
closed_block.drain()
|
||||||
.commit(header.number(), &header.hash(), ancient)
|
.commit(header.number(), &header.hash(), ancient)
|
||||||
.expect("State DB commit failed.");
|
.expect("State DB commit failed.");
|
||||||
|
|
||||||
|
// And update the chain
|
||||||
|
self.chain.write().unwrap()
|
||||||
|
.insert_block(&block.bytes, receipts);
|
||||||
|
|
||||||
self.report.write().unwrap().accrue_block(&block);
|
self.report.write().unwrap().accrue_block(&block);
|
||||||
trace!(target: "client", "Imported #{} ({})", header.number(), header.hash());
|
trace!(target: "client", "Imported #{} ({})", header.number(), header.hash());
|
||||||
}
|
}
|
||||||
@ -414,6 +419,8 @@ impl<V> Client<V> where V: Verifier {
|
|||||||
io.send(NetworkIoMessage::User(SyncMessage::NewChainBlocks {
|
io.send(NetworkIoMessage::User(SyncMessage::NewChainBlocks {
|
||||||
good: good_blocks,
|
good: good_blocks,
|
||||||
bad: bad_blocks,
|
bad: bad_blocks,
|
||||||
|
// TODO [todr] were to take those from?
|
||||||
|
retracted: vec![],
|
||||||
})).unwrap();
|
})).unwrap();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -588,6 +595,10 @@ impl<V> BlockChainClient for Client<V> where V: Verifier {
|
|||||||
Self::block_hash(&chain, id).and_then(|hash| chain.block_details(&hash)).map(|d| d.total_difficulty)
|
Self::block_hash(&chain, id).and_then(|hash| chain.block_details(&hash)).map(|d| d.total_difficulty)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn nonce(&self, address: &Address) -> U256 {
|
||||||
|
self.state().nonce(address)
|
||||||
|
}
|
||||||
|
|
||||||
fn block_hash(&self, id: BlockId) -> Option<H256> {
|
fn block_hash(&self, id: BlockId) -> Option<H256> {
|
||||||
let chain = self.chain.read().unwrap();
|
let chain = self.chain.read().unwrap();
|
||||||
Self::block_hash(&chain, id)
|
Self::block_hash(&chain, id)
|
||||||
|
@ -31,6 +31,8 @@ pub enum SyncMessage {
|
|||||||
good: Vec<H256>,
|
good: Vec<H256>,
|
||||||
/// Hashes of blocks not imported to blockchain
|
/// Hashes of blocks not imported to blockchain
|
||||||
bad: Vec<H256>,
|
bad: Vec<H256>,
|
||||||
|
/// Hashes of blocks that were removed from canonical chain
|
||||||
|
retracted: Vec<H256>,
|
||||||
},
|
},
|
||||||
/// A block is ready
|
/// A block is ready
|
||||||
BlockVerified,
|
BlockVerified,
|
||||||
|
@ -17,6 +17,7 @@ time = "0.1.34"
|
|||||||
rand = "0.3.13"
|
rand = "0.3.13"
|
||||||
heapsize = "0.3"
|
heapsize = "0.3"
|
||||||
rustc-serialize = "0.3"
|
rustc-serialize = "0.3"
|
||||||
|
rayon = "0.3.1"
|
||||||
|
|
||||||
[features]
|
[features]
|
||||||
default = []
|
default = []
|
||||||
|
@ -30,14 +30,17 @@
|
|||||||
///
|
///
|
||||||
|
|
||||||
use util::*;
|
use util::*;
|
||||||
|
use rayon::prelude::*;
|
||||||
use std::mem::{replace};
|
use std::mem::{replace};
|
||||||
use ethcore::views::{HeaderView};
|
use ethcore::views::{HeaderView, BlockView};
|
||||||
use ethcore::header::{BlockNumber, Header as BlockHeader};
|
use ethcore::header::{BlockNumber, Header as BlockHeader};
|
||||||
use ethcore::client::{BlockChainClient, BlockStatus, BlockId, BlockChainInfo};
|
use ethcore::client::{BlockChainClient, BlockStatus, BlockId, BlockChainInfo};
|
||||||
use range_collection::{RangeCollection, ToUsize, FromUsize};
|
use range_collection::{RangeCollection, ToUsize, FromUsize};
|
||||||
use ethcore::error::*;
|
use ethcore::error::*;
|
||||||
use ethcore::block::Block;
|
use ethcore::block::Block;
|
||||||
|
use ethcore::transaction::SignedTransaction;
|
||||||
use io::SyncIo;
|
use io::SyncIo;
|
||||||
|
use transaction_queue::TransactionQueue;
|
||||||
use time;
|
use time;
|
||||||
use super::SyncConfig;
|
use super::SyncConfig;
|
||||||
|
|
||||||
@ -204,11 +207,13 @@ pub struct ChainSync {
|
|||||||
/// True if common block for our and remote chain has been found
|
/// True if common block for our and remote chain has been found
|
||||||
have_common_block: bool,
|
have_common_block: bool,
|
||||||
/// Last propagated block number
|
/// Last propagated block number
|
||||||
last_send_block_number: BlockNumber,
|
last_sent_block_number: BlockNumber,
|
||||||
/// Max blocks to download ahead
|
/// Max blocks to download ahead
|
||||||
max_download_ahead_blocks: usize,
|
max_download_ahead_blocks: usize,
|
||||||
/// Network ID
|
/// Network ID
|
||||||
network_id: U256,
|
network_id: U256,
|
||||||
|
/// Transactions Queue
|
||||||
|
transaction_queue: Mutex<TransactionQueue>,
|
||||||
}
|
}
|
||||||
|
|
||||||
type RlpResponseResult = Result<Option<(PacketId, RlpStream)>, PacketDecodeError>;
|
type RlpResponseResult = Result<Option<(PacketId, RlpStream)>, PacketDecodeError>;
|
||||||
@ -231,9 +236,10 @@ impl ChainSync {
|
|||||||
last_imported_hash: None,
|
last_imported_hash: None,
|
||||||
syncing_difficulty: U256::from(0u64),
|
syncing_difficulty: U256::from(0u64),
|
||||||
have_common_block: false,
|
have_common_block: false,
|
||||||
last_send_block_number: 0,
|
last_sent_block_number: 0,
|
||||||
max_download_ahead_blocks: max(MAX_HEADERS_TO_REQUEST, config.max_download_ahead_blocks),
|
max_download_ahead_blocks: max(MAX_HEADERS_TO_REQUEST, config.max_download_ahead_blocks),
|
||||||
network_id: config.network_id,
|
network_id: config.network_id,
|
||||||
|
transaction_queue: Mutex::new(TransactionQueue::new()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -292,6 +298,7 @@ impl ChainSync {
|
|||||||
self.starting_block = 0;
|
self.starting_block = 0;
|
||||||
self.highest_block = None;
|
self.highest_block = None;
|
||||||
self.have_common_block = false;
|
self.have_common_block = false;
|
||||||
|
self.transaction_queue.lock().unwrap().clear();
|
||||||
self.starting_block = io.chain().chain_info().best_block_number;
|
self.starting_block = io.chain().chain_info().best_block_number;
|
||||||
self.state = SyncState::NotSynced;
|
self.state = SyncState::NotSynced;
|
||||||
}
|
}
|
||||||
@ -919,8 +926,18 @@ impl ChainSync {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
/// Called when peer sends us new transactions
|
/// Called when peer sends us new transactions
|
||||||
fn on_peer_transactions(&mut self, _io: &mut SyncIo, _peer_id: PeerId, _r: &UntrustedRlp) -> Result<(), PacketDecodeError> {
|
fn on_peer_transactions(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> {
|
||||||
Ok(())
|
let chain = io.chain();
|
||||||
|
let item_count = r.item_count();
|
||||||
|
trace!(target: "sync", "{} -> Transactions ({} entries)", peer_id, item_count);
|
||||||
|
let fetch_latest_nonce = |a : &Address| chain.nonce(a);
|
||||||
|
|
||||||
|
let mut transaction_queue = self.transaction_queue.lock().unwrap();
|
||||||
|
for i in 0..item_count {
|
||||||
|
let tx: SignedTransaction = try!(r.val_at(i));
|
||||||
|
transaction_queue.add(tx, &fetch_latest_nonce);
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Send Status message
|
/// Send Status message
|
||||||
@ -1229,23 +1246,60 @@ impl ChainSync {
|
|||||||
sent
|
sent
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn propagate_latest_blocks(&mut self, io: &mut SyncIo) {
|
||||||
|
let chain_info = io.chain().chain_info();
|
||||||
|
if (((chain_info.best_block_number as i64) - (self.last_sent_block_number as i64)).abs() as BlockNumber) < MAX_PEER_LAG_PROPAGATION {
|
||||||
|
let blocks = self.propagate_blocks(&chain_info, io);
|
||||||
|
let hashes = self.propagate_new_hashes(&chain_info, io);
|
||||||
|
if blocks != 0 || hashes != 0 {
|
||||||
|
trace!(target: "sync", "Sent latest {} blocks and {} hashes to peers.", blocks, hashes);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
self.last_sent_block_number = chain_info.best_block_number;
|
||||||
|
}
|
||||||
|
|
||||||
/// Maintain other peers. Send out any new blocks and transactions
|
/// Maintain other peers. Send out any new blocks and transactions
|
||||||
pub fn maintain_sync(&mut self, io: &mut SyncIo) {
|
pub fn maintain_sync(&mut self, io: &mut SyncIo) {
|
||||||
self.check_resume(io);
|
self.check_resume(io);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// should be called once chain has new block, triggers the latest block propagation
|
/// called when block is imported to chain, updates transactions queue and propagates the blocks
|
||||||
pub fn chain_blocks_verified(&mut self, io: &mut SyncIo) {
|
pub fn chain_new_blocks(&mut self, io: &mut SyncIo, good: &[H256], bad: &[H256], _retracted: &[H256]) {
|
||||||
let chain = io.chain().chain_info();
|
fn fetch_transactions(chain: &BlockChainClient, hash: &H256) -> Vec<SignedTransaction> {
|
||||||
if (((chain.best_block_number as i64) - (self.last_send_block_number as i64)).abs() as BlockNumber) < MAX_PEER_LAG_PROPAGATION {
|
let block = chain
|
||||||
let blocks = self.propagate_blocks(&chain, io);
|
.block(BlockId::Hash(hash.clone()))
|
||||||
let hashes = self.propagate_new_hashes(&chain, io);
|
// Client should send message after commit to db and inserting to chain.
|
||||||
if blocks != 0 || hashes != 0 {
|
.expect("Expected in-chain blocks.");
|
||||||
trace!(target: "sync", "Sent latest {} blocks and {} hashes to peers.", blocks, hashes);
|
let block = BlockView::new(&block);
|
||||||
}
|
block.transactions()
|
||||||
}
|
}
|
||||||
self.last_send_block_number = chain.best_block_number;
|
|
||||||
|
|
||||||
|
{
|
||||||
|
let chain = io.chain();
|
||||||
|
let good = good.par_iter().map(|h| fetch_transactions(chain, h));
|
||||||
|
let bad = bad.par_iter().map(|h| fetch_transactions(chain, h));
|
||||||
|
|
||||||
|
good.for_each(|txs| {
|
||||||
|
let mut transaction_queue = self.transaction_queue.lock().unwrap();
|
||||||
|
let hashes = txs.iter().map(|tx| tx.hash()).collect::<Vec<H256>>();
|
||||||
|
transaction_queue.remove_all(&hashes, |a| chain.nonce(a));
|
||||||
|
});
|
||||||
|
bad.for_each(|txs| {
|
||||||
|
// populate sender
|
||||||
|
for tx in &txs {
|
||||||
|
let _sender = tx.sender();
|
||||||
|
}
|
||||||
|
let mut transaction_queue = self.transaction_queue.lock().unwrap();
|
||||||
|
transaction_queue.add_all(txs, |a| chain.nonce(a));
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Propagate latests blocks
|
||||||
|
self.propagate_latest_blocks(io);
|
||||||
|
// TODO [todr] propagate transactions?
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
@ -1386,7 +1440,7 @@ mod tests {
|
|||||||
#[test]
|
#[test]
|
||||||
fn finds_lagging_peers() {
|
fn finds_lagging_peers() {
|
||||||
let mut client = TestBlockChainClient::new();
|
let mut client = TestBlockChainClient::new();
|
||||||
client.add_blocks(100, false);
|
client.add_blocks(100, EachBlockWith::Uncle);
|
||||||
let mut queue = VecDeque::new();
|
let mut queue = VecDeque::new();
|
||||||
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(10));
|
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(10));
|
||||||
let chain_info = client.chain_info();
|
let chain_info = client.chain_info();
|
||||||
@ -1400,7 +1454,7 @@ mod tests {
|
|||||||
#[test]
|
#[test]
|
||||||
fn calculates_tree_for_lagging_peer() {
|
fn calculates_tree_for_lagging_peer() {
|
||||||
let mut client = TestBlockChainClient::new();
|
let mut client = TestBlockChainClient::new();
|
||||||
client.add_blocks(15, false);
|
client.add_blocks(15, EachBlockWith::Uncle);
|
||||||
|
|
||||||
let start = client.block_hash_delta_minus(4);
|
let start = client.block_hash_delta_minus(4);
|
||||||
let end = client.block_hash_delta_minus(2);
|
let end = client.block_hash_delta_minus(2);
|
||||||
@ -1417,7 +1471,7 @@ mod tests {
|
|||||||
#[test]
|
#[test]
|
||||||
fn sends_new_hashes_to_lagging_peer() {
|
fn sends_new_hashes_to_lagging_peer() {
|
||||||
let mut client = TestBlockChainClient::new();
|
let mut client = TestBlockChainClient::new();
|
||||||
client.add_blocks(100, false);
|
client.add_blocks(100, EachBlockWith::Uncle);
|
||||||
let mut queue = VecDeque::new();
|
let mut queue = VecDeque::new();
|
||||||
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5));
|
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5));
|
||||||
let chain_info = client.chain_info();
|
let chain_info = client.chain_info();
|
||||||
@ -1436,7 +1490,7 @@ mod tests {
|
|||||||
#[test]
|
#[test]
|
||||||
fn sends_latest_block_to_lagging_peer() {
|
fn sends_latest_block_to_lagging_peer() {
|
||||||
let mut client = TestBlockChainClient::new();
|
let mut client = TestBlockChainClient::new();
|
||||||
client.add_blocks(100, false);
|
client.add_blocks(100, EachBlockWith::Uncle);
|
||||||
let mut queue = VecDeque::new();
|
let mut queue = VecDeque::new();
|
||||||
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5));
|
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5));
|
||||||
let chain_info = client.chain_info();
|
let chain_info = client.chain_info();
|
||||||
@ -1454,7 +1508,7 @@ mod tests {
|
|||||||
#[test]
|
#[test]
|
||||||
fn handles_peer_new_block_mallformed() {
|
fn handles_peer_new_block_mallformed() {
|
||||||
let mut client = TestBlockChainClient::new();
|
let mut client = TestBlockChainClient::new();
|
||||||
client.add_blocks(10, false);
|
client.add_blocks(10, EachBlockWith::Uncle);
|
||||||
|
|
||||||
let block_data = get_dummy_block(11, client.chain_info().best_block_hash);
|
let block_data = get_dummy_block(11, client.chain_info().best_block_hash);
|
||||||
|
|
||||||
@ -1472,7 +1526,7 @@ mod tests {
|
|||||||
#[test]
|
#[test]
|
||||||
fn handles_peer_new_block() {
|
fn handles_peer_new_block() {
|
||||||
let mut client = TestBlockChainClient::new();
|
let mut client = TestBlockChainClient::new();
|
||||||
client.add_blocks(10, false);
|
client.add_blocks(10, EachBlockWith::Uncle);
|
||||||
|
|
||||||
let block_data = get_dummy_blocks(11, client.chain_info().best_block_hash);
|
let block_data = get_dummy_blocks(11, client.chain_info().best_block_hash);
|
||||||
|
|
||||||
@ -1490,7 +1544,7 @@ mod tests {
|
|||||||
#[test]
|
#[test]
|
||||||
fn handles_peer_new_block_empty() {
|
fn handles_peer_new_block_empty() {
|
||||||
let mut client = TestBlockChainClient::new();
|
let mut client = TestBlockChainClient::new();
|
||||||
client.add_blocks(10, false);
|
client.add_blocks(10, EachBlockWith::Uncle);
|
||||||
let mut queue = VecDeque::new();
|
let mut queue = VecDeque::new();
|
||||||
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5));
|
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5));
|
||||||
let mut io = TestIo::new(&mut client, &mut queue, None);
|
let mut io = TestIo::new(&mut client, &mut queue, None);
|
||||||
@ -1506,7 +1560,7 @@ mod tests {
|
|||||||
#[test]
|
#[test]
|
||||||
fn handles_peer_new_hashes() {
|
fn handles_peer_new_hashes() {
|
||||||
let mut client = TestBlockChainClient::new();
|
let mut client = TestBlockChainClient::new();
|
||||||
client.add_blocks(10, false);
|
client.add_blocks(10, EachBlockWith::Uncle);
|
||||||
let mut queue = VecDeque::new();
|
let mut queue = VecDeque::new();
|
||||||
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5));
|
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5));
|
||||||
let mut io = TestIo::new(&mut client, &mut queue, None);
|
let mut io = TestIo::new(&mut client, &mut queue, None);
|
||||||
@ -1522,7 +1576,7 @@ mod tests {
|
|||||||
#[test]
|
#[test]
|
||||||
fn handles_peer_new_hashes_empty() {
|
fn handles_peer_new_hashes_empty() {
|
||||||
let mut client = TestBlockChainClient::new();
|
let mut client = TestBlockChainClient::new();
|
||||||
client.add_blocks(10, false);
|
client.add_blocks(10, EachBlockWith::Uncle);
|
||||||
let mut queue = VecDeque::new();
|
let mut queue = VecDeque::new();
|
||||||
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5));
|
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5));
|
||||||
let mut io = TestIo::new(&mut client, &mut queue, None);
|
let mut io = TestIo::new(&mut client, &mut queue, None);
|
||||||
@ -1540,7 +1594,7 @@ mod tests {
|
|||||||
#[test]
|
#[test]
|
||||||
fn hashes_rlp_mutually_acceptable() {
|
fn hashes_rlp_mutually_acceptable() {
|
||||||
let mut client = TestBlockChainClient::new();
|
let mut client = TestBlockChainClient::new();
|
||||||
client.add_blocks(100, false);
|
client.add_blocks(100, EachBlockWith::Uncle);
|
||||||
let mut queue = VecDeque::new();
|
let mut queue = VecDeque::new();
|
||||||
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5));
|
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5));
|
||||||
let chain_info = client.chain_info();
|
let chain_info = client.chain_info();
|
||||||
@ -1558,7 +1612,7 @@ mod tests {
|
|||||||
#[test]
|
#[test]
|
||||||
fn block_rlp_mutually_acceptable() {
|
fn block_rlp_mutually_acceptable() {
|
||||||
let mut client = TestBlockChainClient::new();
|
let mut client = TestBlockChainClient::new();
|
||||||
client.add_blocks(100, false);
|
client.add_blocks(100, EachBlockWith::Uncle);
|
||||||
let mut queue = VecDeque::new();
|
let mut queue = VecDeque::new();
|
||||||
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5));
|
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5));
|
||||||
let chain_info = client.chain_info();
|
let chain_info = client.chain_info();
|
||||||
@ -1571,10 +1625,37 @@ mod tests {
|
|||||||
assert!(result.is_ok());
|
assert!(result.is_ok());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn should_add_transactions_to_queue() {
|
||||||
|
// given
|
||||||
|
let mut client = TestBlockChainClient::new();
|
||||||
|
client.add_blocks(98, EachBlockWith::Uncle);
|
||||||
|
client.add_blocks(1, EachBlockWith::UncleAndTransaction);
|
||||||
|
client.add_blocks(1, EachBlockWith::Transaction);
|
||||||
|
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5));
|
||||||
|
|
||||||
|
let good_blocks = vec![client.block_hash_delta_minus(2)];
|
||||||
|
let retracted_blocks = vec![client.block_hash_delta_minus(1)];
|
||||||
|
|
||||||
|
let mut queue = VecDeque::new();
|
||||||
|
let mut io = TestIo::new(&mut client, &mut queue, None);
|
||||||
|
|
||||||
|
// when
|
||||||
|
sync.chain_new_blocks(&mut io, &[], &good_blocks, &[]);
|
||||||
|
assert_eq!(sync.transaction_queue.lock().unwrap().status().future, 0);
|
||||||
|
assert_eq!(sync.transaction_queue.lock().unwrap().status().pending, 1);
|
||||||
|
sync.chain_new_blocks(&mut io, &good_blocks, &retracted_blocks, &[]);
|
||||||
|
|
||||||
|
// then
|
||||||
|
let status = sync.transaction_queue.lock().unwrap().status();
|
||||||
|
assert_eq!(status.pending, 1);
|
||||||
|
assert_eq!(status.future, 0);
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn returns_requested_block_headers() {
|
fn returns_requested_block_headers() {
|
||||||
let mut client = TestBlockChainClient::new();
|
let mut client = TestBlockChainClient::new();
|
||||||
client.add_blocks(100, false);
|
client.add_blocks(100, EachBlockWith::Uncle);
|
||||||
let mut queue = VecDeque::new();
|
let mut queue = VecDeque::new();
|
||||||
let io = TestIo::new(&mut client, &mut queue, None);
|
let io = TestIo::new(&mut client, &mut queue, None);
|
||||||
|
|
||||||
@ -1598,7 +1679,7 @@ mod tests {
|
|||||||
#[test]
|
#[test]
|
||||||
fn returns_requested_block_headers_reverse() {
|
fn returns_requested_block_headers_reverse() {
|
||||||
let mut client = TestBlockChainClient::new();
|
let mut client = TestBlockChainClient::new();
|
||||||
client.add_blocks(100, false);
|
client.add_blocks(100, EachBlockWith::Uncle);
|
||||||
let mut queue = VecDeque::new();
|
let mut queue = VecDeque::new();
|
||||||
let io = TestIo::new(&mut client, &mut queue, None);
|
let io = TestIo::new(&mut client, &mut queue, None);
|
||||||
|
|
||||||
|
@ -54,6 +54,7 @@ extern crate ethcore;
|
|||||||
extern crate env_logger;
|
extern crate env_logger;
|
||||||
extern crate time;
|
extern crate time;
|
||||||
extern crate rand;
|
extern crate rand;
|
||||||
|
extern crate rayon;
|
||||||
#[macro_use]
|
#[macro_use]
|
||||||
extern crate heapsize;
|
extern crate heapsize;
|
||||||
|
|
||||||
@ -70,8 +71,7 @@ use io::NetSyncIo;
|
|||||||
mod chain;
|
mod chain;
|
||||||
mod io;
|
mod io;
|
||||||
mod range_collection;
|
mod range_collection;
|
||||||
// TODO [todr] Made public to suppress dead code warnings
|
mod transaction_queue;
|
||||||
pub mod transaction_queue;
|
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests;
|
mod tests;
|
||||||
@ -153,8 +153,12 @@ impl NetworkProtocolHandler<SyncMessage> for EthSync {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn message(&self, io: &NetworkContext<SyncMessage>, message: &SyncMessage) {
|
fn message(&self, io: &NetworkContext<SyncMessage>, message: &SyncMessage) {
|
||||||
if let SyncMessage::BlockVerified = *message {
|
match *message {
|
||||||
self.sync.write().unwrap().chain_blocks_verified(&mut NetSyncIo::new(io, self.chain.deref()));
|
SyncMessage::NewChainBlocks { ref good, ref bad, ref retracted } => {
|
||||||
|
let mut sync_io = NetSyncIo::new(io, self.chain.deref());
|
||||||
|
self.sync.write().unwrap().chain_new_blocks(&mut sync_io, good, bad, retracted);
|
||||||
|
},
|
||||||
|
_ => {/* Ignore other messages */},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -24,8 +24,8 @@ use super::helpers::*;
|
|||||||
fn two_peers() {
|
fn two_peers() {
|
||||||
::env_logger::init().ok();
|
::env_logger::init().ok();
|
||||||
let mut net = TestNet::new(3);
|
let mut net = TestNet::new(3);
|
||||||
net.peer_mut(1).chain.add_blocks(1000, false);
|
net.peer_mut(1).chain.add_blocks(1000, EachBlockWith::Uncle);
|
||||||
net.peer_mut(2).chain.add_blocks(1000, false);
|
net.peer_mut(2).chain.add_blocks(1000, EachBlockWith::Uncle);
|
||||||
net.sync();
|
net.sync();
|
||||||
assert!(net.peer(0).chain.block(BlockId::Number(1000)).is_some());
|
assert!(net.peer(0).chain.block(BlockId::Number(1000)).is_some());
|
||||||
assert_eq!(net.peer(0).chain.blocks.read().unwrap().deref(), net.peer(1).chain.blocks.read().unwrap().deref());
|
assert_eq!(net.peer(0).chain.blocks.read().unwrap().deref(), net.peer(1).chain.blocks.read().unwrap().deref());
|
||||||
@ -35,8 +35,8 @@ fn two_peers() {
|
|||||||
fn status_after_sync() {
|
fn status_after_sync() {
|
||||||
::env_logger::init().ok();
|
::env_logger::init().ok();
|
||||||
let mut net = TestNet::new(3);
|
let mut net = TestNet::new(3);
|
||||||
net.peer_mut(1).chain.add_blocks(1000, false);
|
net.peer_mut(1).chain.add_blocks(1000, EachBlockWith::Uncle);
|
||||||
net.peer_mut(2).chain.add_blocks(1000, false);
|
net.peer_mut(2).chain.add_blocks(1000, EachBlockWith::Uncle);
|
||||||
net.sync();
|
net.sync();
|
||||||
let status = net.peer(0).sync.status();
|
let status = net.peer(0).sync.status();
|
||||||
assert_eq!(status.state, SyncState::Idle);
|
assert_eq!(status.state, SyncState::Idle);
|
||||||
@ -45,8 +45,8 @@ fn status_after_sync() {
|
|||||||
#[test]
|
#[test]
|
||||||
fn takes_few_steps() {
|
fn takes_few_steps() {
|
||||||
let mut net = TestNet::new(3);
|
let mut net = TestNet::new(3);
|
||||||
net.peer_mut(1).chain.add_blocks(100, false);
|
net.peer_mut(1).chain.add_blocks(100, EachBlockWith::Uncle);
|
||||||
net.peer_mut(2).chain.add_blocks(100, false);
|
net.peer_mut(2).chain.add_blocks(100, EachBlockWith::Uncle);
|
||||||
let total_steps = net.sync();
|
let total_steps = net.sync();
|
||||||
assert!(total_steps < 7);
|
assert!(total_steps < 7);
|
||||||
}
|
}
|
||||||
@ -56,8 +56,9 @@ fn empty_blocks() {
|
|||||||
::env_logger::init().ok();
|
::env_logger::init().ok();
|
||||||
let mut net = TestNet::new(3);
|
let mut net = TestNet::new(3);
|
||||||
for n in 0..200 {
|
for n in 0..200 {
|
||||||
net.peer_mut(1).chain.add_blocks(5, n % 2 == 0);
|
let with = if n % 2 == 0 { EachBlockWith::Nothing } else { EachBlockWith::Uncle };
|
||||||
net.peer_mut(2).chain.add_blocks(5, n % 2 == 0);
|
net.peer_mut(1).chain.add_blocks(5, with.clone());
|
||||||
|
net.peer_mut(2).chain.add_blocks(5, with);
|
||||||
}
|
}
|
||||||
net.sync();
|
net.sync();
|
||||||
assert!(net.peer(0).chain.block(BlockId::Number(1000)).is_some());
|
assert!(net.peer(0).chain.block(BlockId::Number(1000)).is_some());
|
||||||
@ -68,14 +69,14 @@ fn empty_blocks() {
|
|||||||
fn forked() {
|
fn forked() {
|
||||||
::env_logger::init().ok();
|
::env_logger::init().ok();
|
||||||
let mut net = TestNet::new(3);
|
let mut net = TestNet::new(3);
|
||||||
net.peer_mut(0).chain.add_blocks(300, false);
|
net.peer_mut(0).chain.add_blocks(300, EachBlockWith::Uncle);
|
||||||
net.peer_mut(1).chain.add_blocks(300, false);
|
net.peer_mut(1).chain.add_blocks(300, EachBlockWith::Uncle);
|
||||||
net.peer_mut(2).chain.add_blocks(300, false);
|
net.peer_mut(2).chain.add_blocks(300, EachBlockWith::Uncle);
|
||||||
net.peer_mut(0).chain.add_blocks(100, true); //fork
|
net.peer_mut(0).chain.add_blocks(100, EachBlockWith::Nothing); //fork
|
||||||
net.peer_mut(1).chain.add_blocks(200, false);
|
net.peer_mut(1).chain.add_blocks(200, EachBlockWith::Uncle);
|
||||||
net.peer_mut(2).chain.add_blocks(200, false);
|
net.peer_mut(2).chain.add_blocks(200, EachBlockWith::Uncle);
|
||||||
net.peer_mut(1).chain.add_blocks(100, false); //fork between 1 and 2
|
net.peer_mut(1).chain.add_blocks(100, EachBlockWith::Uncle); //fork between 1 and 2
|
||||||
net.peer_mut(2).chain.add_blocks(10, true);
|
net.peer_mut(2).chain.add_blocks(10, EachBlockWith::Nothing);
|
||||||
// peer 1 has the best chain of 601 blocks
|
// peer 1 has the best chain of 601 blocks
|
||||||
let peer1_chain = net.peer(1).chain.numbers.read().unwrap().clone();
|
let peer1_chain = net.peer(1).chain.numbers.read().unwrap().clone();
|
||||||
net.sync();
|
net.sync();
|
||||||
@ -87,8 +88,8 @@ fn forked() {
|
|||||||
#[test]
|
#[test]
|
||||||
fn restart() {
|
fn restart() {
|
||||||
let mut net = TestNet::new(3);
|
let mut net = TestNet::new(3);
|
||||||
net.peer_mut(1).chain.add_blocks(1000, false);
|
net.peer_mut(1).chain.add_blocks(1000, EachBlockWith::Uncle);
|
||||||
net.peer_mut(2).chain.add_blocks(1000, false);
|
net.peer_mut(2).chain.add_blocks(1000, EachBlockWith::Uncle);
|
||||||
|
|
||||||
net.sync_steps(8);
|
net.sync_steps(8);
|
||||||
|
|
||||||
@ -109,8 +110,8 @@ fn status_empty() {
|
|||||||
#[test]
|
#[test]
|
||||||
fn status_packet() {
|
fn status_packet() {
|
||||||
let mut net = TestNet::new(2);
|
let mut net = TestNet::new(2);
|
||||||
net.peer_mut(0).chain.add_blocks(100, false);
|
net.peer_mut(0).chain.add_blocks(100, EachBlockWith::Uncle);
|
||||||
net.peer_mut(1).chain.add_blocks(1, false);
|
net.peer_mut(1).chain.add_blocks(1, EachBlockWith::Uncle);
|
||||||
|
|
||||||
net.start();
|
net.start();
|
||||||
|
|
||||||
@ -123,13 +124,13 @@ fn status_packet() {
|
|||||||
#[test]
|
#[test]
|
||||||
fn propagate_hashes() {
|
fn propagate_hashes() {
|
||||||
let mut net = TestNet::new(6);
|
let mut net = TestNet::new(6);
|
||||||
net.peer_mut(1).chain.add_blocks(10, false);
|
net.peer_mut(1).chain.add_blocks(10, EachBlockWith::Uncle);
|
||||||
net.sync();
|
net.sync();
|
||||||
|
|
||||||
net.peer_mut(0).chain.add_blocks(10, false);
|
net.peer_mut(0).chain.add_blocks(10, EachBlockWith::Uncle);
|
||||||
net.sync();
|
net.sync();
|
||||||
net.trigger_block_verified(0); //first event just sets the marker
|
net.trigger_chain_new_blocks(0); //first event just sets the marker
|
||||||
net.trigger_block_verified(0);
|
net.trigger_chain_new_blocks(0);
|
||||||
|
|
||||||
// 5 peers to sync
|
// 5 peers to sync
|
||||||
assert_eq!(5, net.peer(0).queue.len());
|
assert_eq!(5, net.peer(0).queue.len());
|
||||||
@ -149,12 +150,12 @@ fn propagate_hashes() {
|
|||||||
#[test]
|
#[test]
|
||||||
fn propagate_blocks() {
|
fn propagate_blocks() {
|
||||||
let mut net = TestNet::new(2);
|
let mut net = TestNet::new(2);
|
||||||
net.peer_mut(1).chain.add_blocks(10, false);
|
net.peer_mut(1).chain.add_blocks(10, EachBlockWith::Uncle);
|
||||||
net.sync();
|
net.sync();
|
||||||
|
|
||||||
net.peer_mut(0).chain.add_blocks(10, false);
|
net.peer_mut(0).chain.add_blocks(10, EachBlockWith::Uncle);
|
||||||
net.trigger_block_verified(0); //first event just sets the marker
|
net.trigger_chain_new_blocks(0); //first event just sets the marker
|
||||||
net.trigger_block_verified(0);
|
net.trigger_chain_new_blocks(0);
|
||||||
|
|
||||||
assert!(!net.peer(0).queue.is_empty());
|
assert!(!net.peer(0).queue.is_empty());
|
||||||
// NEW_BLOCK_PACKET
|
// NEW_BLOCK_PACKET
|
||||||
@ -164,7 +165,7 @@ fn propagate_blocks() {
|
|||||||
#[test]
|
#[test]
|
||||||
fn restart_on_malformed_block() {
|
fn restart_on_malformed_block() {
|
||||||
let mut net = TestNet::new(2);
|
let mut net = TestNet::new(2);
|
||||||
net.peer_mut(1).chain.add_blocks(10, false);
|
net.peer_mut(1).chain.add_blocks(10, EachBlockWith::Uncle);
|
||||||
net.peer_mut(1).chain.corrupt_block(6);
|
net.peer_mut(1).chain.corrupt_block(6);
|
||||||
net.sync_steps(10);
|
net.sync_steps(10);
|
||||||
|
|
||||||
|
@ -22,7 +22,7 @@ use io::SyncIo;
|
|||||||
use chain::ChainSync;
|
use chain::ChainSync;
|
||||||
use ::SyncConfig;
|
use ::SyncConfig;
|
||||||
use ethcore::receipt::Receipt;
|
use ethcore::receipt::Receipt;
|
||||||
use ethcore::transaction::LocalizedTransaction;
|
use ethcore::transaction::{LocalizedTransaction, Transaction, Action};
|
||||||
use ethcore::filter::Filter;
|
use ethcore::filter::Filter;
|
||||||
use ethcore::log_entry::LocalizedLogEntry;
|
use ethcore::log_entry::LocalizedLogEntry;
|
||||||
|
|
||||||
@ -34,6 +34,14 @@ pub struct TestBlockChainClient {
|
|||||||
pub difficulty: RwLock<U256>,
|
pub difficulty: RwLock<U256>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub enum EachBlockWith {
|
||||||
|
Nothing,
|
||||||
|
Uncle,
|
||||||
|
Transaction,
|
||||||
|
UncleAndTransaction
|
||||||
|
}
|
||||||
|
|
||||||
impl TestBlockChainClient {
|
impl TestBlockChainClient {
|
||||||
pub fn new() -> TestBlockChainClient {
|
pub fn new() -> TestBlockChainClient {
|
||||||
|
|
||||||
@ -44,30 +52,53 @@ impl TestBlockChainClient {
|
|||||||
last_hash: RwLock::new(H256::new()),
|
last_hash: RwLock::new(H256::new()),
|
||||||
difficulty: RwLock::new(From::from(0)),
|
difficulty: RwLock::new(From::from(0)),
|
||||||
};
|
};
|
||||||
client.add_blocks(1, true); // add genesis block
|
client.add_blocks(1, EachBlockWith::Nothing); // add genesis block
|
||||||
client.genesis_hash = client.last_hash.read().unwrap().clone();
|
client.genesis_hash = client.last_hash.read().unwrap().clone();
|
||||||
client
|
client
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn add_blocks(&mut self, count: usize, empty: bool) {
|
pub fn add_blocks(&mut self, count: usize, with: EachBlockWith) {
|
||||||
let len = self.numbers.read().unwrap().len();
|
let len = self.numbers.read().unwrap().len();
|
||||||
for n in len..(len + count) {
|
for n in len..(len + count) {
|
||||||
let mut header = BlockHeader::new();
|
let mut header = BlockHeader::new();
|
||||||
header.difficulty = From::from(n);
|
header.difficulty = From::from(n);
|
||||||
header.parent_hash = self.last_hash.read().unwrap().clone();
|
header.parent_hash = self.last_hash.read().unwrap().clone();
|
||||||
header.number = n as BlockNumber;
|
header.number = n as BlockNumber;
|
||||||
let mut uncles = RlpStream::new_list(if empty {0} else {1});
|
let uncles = match with {
|
||||||
if !empty {
|
EachBlockWith::Uncle | EachBlockWith::UncleAndTransaction => {
|
||||||
let mut uncle_header = BlockHeader::new();
|
let mut uncles = RlpStream::new_list(1);
|
||||||
uncle_header.difficulty = From::from(n);
|
let mut uncle_header = BlockHeader::new();
|
||||||
uncle_header.parent_hash = self.last_hash.read().unwrap().clone();
|
uncle_header.difficulty = From::from(n);
|
||||||
uncle_header.number = n as BlockNumber;
|
uncle_header.parent_hash = self.last_hash.read().unwrap().clone();
|
||||||
uncles.append(&uncle_header);
|
uncle_header.number = n as BlockNumber;
|
||||||
header.uncles_hash = uncles.as_raw().sha3();
|
uncles.append(&uncle_header);
|
||||||
}
|
header.uncles_hash = uncles.as_raw().sha3();
|
||||||
|
uncles
|
||||||
|
},
|
||||||
|
_ => RlpStream::new_list(0)
|
||||||
|
};
|
||||||
|
let txs = match with {
|
||||||
|
EachBlockWith::Transaction | EachBlockWith::UncleAndTransaction => {
|
||||||
|
let mut txs = RlpStream::new_list(1);
|
||||||
|
let keypair = KeyPair::create().unwrap();
|
||||||
|
let tx = Transaction {
|
||||||
|
action: Action::Create,
|
||||||
|
value: U256::from(100),
|
||||||
|
data: "3331600055".from_hex().unwrap(),
|
||||||
|
gas: U256::from(100_000),
|
||||||
|
gas_price: U256::one(),
|
||||||
|
nonce: U256::zero()
|
||||||
|
};
|
||||||
|
let signed_tx = tx.sign(&keypair.secret());
|
||||||
|
txs.append(&signed_tx);
|
||||||
|
txs.out()
|
||||||
|
},
|
||||||
|
_ => rlp::NULL_RLP.to_vec()
|
||||||
|
};
|
||||||
|
|
||||||
let mut rlp = RlpStream::new_list(3);
|
let mut rlp = RlpStream::new_list(3);
|
||||||
rlp.append(&header);
|
rlp.append(&header);
|
||||||
rlp.append_raw(&rlp::NULL_RLP, 1);
|
rlp.append_raw(&txs, 1);
|
||||||
rlp.append_raw(uncles.as_raw(), 1);
|
rlp.append_raw(uncles.as_raw(), 1);
|
||||||
self.import_block(rlp.as_raw().to_vec()).unwrap();
|
self.import_block(rlp.as_raw().to_vec()).unwrap();
|
||||||
}
|
}
|
||||||
@ -109,6 +140,10 @@ impl BlockChainClient for TestBlockChainClient {
|
|||||||
unimplemented!();
|
unimplemented!();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn nonce(&self, _address: &Address) -> U256 {
|
||||||
|
U256::zero()
|
||||||
|
}
|
||||||
|
|
||||||
fn code(&self, _address: &Address) -> Option<Bytes> {
|
fn code(&self, _address: &Address) -> Option<Bytes> {
|
||||||
unimplemented!();
|
unimplemented!();
|
||||||
}
|
}
|
||||||
@ -420,8 +455,8 @@ impl TestNet {
|
|||||||
self.peers.iter().all(|p| p.queue.is_empty())
|
self.peers.iter().all(|p| p.queue.is_empty())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn trigger_block_verified(&mut self, peer_id: usize) {
|
pub fn trigger_chain_new_blocks(&mut self, peer_id: usize) {
|
||||||
let mut peer = self.peer_mut(peer_id);
|
let mut peer = self.peer_mut(peer_id);
|
||||||
peer.sync.chain_blocks_verified(&mut TestIo::new(&mut peer.chain, &mut peer.queue, None));
|
peer.sync.chain_new_blocks(&mut TestIo::new(&mut peer.chain, &mut peer.queue, None), &[], &[], &[]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -108,27 +108,29 @@ struct TransactionSet {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl TransactionSet {
|
impl TransactionSet {
|
||||||
fn insert(&mut self, sender: Address, nonce: U256, order: TransactionOrder) {
|
fn insert(&mut self, sender: Address, nonce: U256, order: TransactionOrder) -> Option<TransactionOrder> {
|
||||||
self.by_priority.insert(order.clone());
|
self.by_priority.insert(order.clone());
|
||||||
self.by_address.insert(sender, nonce, order);
|
self.by_address.insert(sender, nonce, order)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn enforce_limit(&mut self, by_hash: &HashMap<H256, VerifiedTransaction>) {
|
fn enforce_limit(&mut self, by_hash: &mut HashMap<H256, VerifiedTransaction>) {
|
||||||
let len = self.by_priority.len();
|
let len = self.by_priority.len();
|
||||||
if len <= self.limit {
|
if len <= self.limit {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
let to_drop : Vec<&VerifiedTransaction> = {
|
let to_drop : Vec<(Address, U256)> = {
|
||||||
self.by_priority
|
self.by_priority
|
||||||
.iter()
|
.iter()
|
||||||
.skip(self.limit)
|
.skip(self.limit)
|
||||||
.map(|order| by_hash.get(&order.hash).expect("Inconsistency in queue detected."))
|
.map(|order| by_hash.get(&order.hash).expect("Inconsistency in queue detected."))
|
||||||
|
.map(|tx| (tx.sender(), tx.nonce()))
|
||||||
.collect()
|
.collect()
|
||||||
};
|
};
|
||||||
|
|
||||||
for tx in to_drop {
|
for (sender, nonce) in to_drop {
|
||||||
self.drop(&tx.sender(), &tx.nonce());
|
let order = self.drop(&sender, &nonce).expect("Dropping transaction found in priority queue failed.");
|
||||||
|
by_hash.remove(&order.hash).expect("Inconsistency in queue.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -219,44 +221,67 @@ impl TransactionQueue {
|
|||||||
/// Removes all transactions identified by hashes given in slice
|
/// Removes all transactions identified by hashes given in slice
|
||||||
///
|
///
|
||||||
/// If gap is introduced marks subsequent transactions as future
|
/// If gap is introduced marks subsequent transactions as future
|
||||||
pub fn remove_all<T>(&mut self, txs: &[H256], fetch_nonce: T)
|
pub fn remove_all<T>(&mut self, transaction_hashes: &[H256], fetch_nonce: T)
|
||||||
where T: Fn(&Address) -> U256 {
|
where T: Fn(&Address) -> U256 {
|
||||||
for tx in txs {
|
for hash in transaction_hashes {
|
||||||
self.remove(&tx, &fetch_nonce);
|
self.remove(&hash, &fetch_nonce);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Removes transaction identified by hashes from queue.
|
/// Removes transaction identified by hashes from queue.
|
||||||
///
|
///
|
||||||
/// If gap is introduced marks subsequent transactions as future
|
/// If gap is introduced marks subsequent transactions as future
|
||||||
pub fn remove<T>(&mut self, hash: &H256, fetch_nonce: &T)
|
pub fn remove<T>(&mut self, transaction_hash: &H256, fetch_nonce: &T)
|
||||||
where T: Fn(&Address) -> U256 {
|
where T: Fn(&Address) -> U256 {
|
||||||
let transaction = self.by_hash.remove(hash);
|
let transaction = self.by_hash.remove(transaction_hash);
|
||||||
if transaction.is_none() {
|
if transaction.is_none() {
|
||||||
// We don't know this transaction
|
// We don't know this transaction
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
let transaction = transaction.unwrap();
|
let transaction = transaction.unwrap();
|
||||||
let sender = transaction.sender();
|
let sender = transaction.sender();
|
||||||
let nonce = transaction.nonce();
|
let nonce = transaction.nonce();
|
||||||
|
let current_nonce = fetch_nonce(&sender);
|
||||||
|
|
||||||
println!("Removing tx: {:?}", transaction.transaction);
|
|
||||||
// Remove from future
|
// Remove from future
|
||||||
self.future.drop(&sender, &nonce);
|
let order = self.future.drop(&sender, &nonce);
|
||||||
|
if order.is_some() {
|
||||||
// Remove from current
|
self.update_future(&sender, current_nonce);
|
||||||
let order = self.current.drop(&sender, &nonce);
|
// And now lets check if there is some chain of transactions in future
|
||||||
if order.is_none() {
|
// that should be placed in current
|
||||||
|
self.move_matching_future_to_current(sender.clone(), current_nonce, current_nonce);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Let's remove transactions where tx.nonce < current_nonce
|
// Remove from current
|
||||||
// and if there are any future transactions matching current_nonce+1 - move to current
|
let order = self.current.drop(&sender, &nonce);
|
||||||
let current_nonce = fetch_nonce(&sender);
|
if order.is_some() {
|
||||||
// We will either move transaction to future or remove it completely
|
// We will either move transaction to future or remove it completely
|
||||||
// so there will be no transactions from this sender in current
|
// so there will be no transactions from this sender in current
|
||||||
self.last_nonces.remove(&sender);
|
self.last_nonces.remove(&sender);
|
||||||
|
// This should move all current transactions to future and remove old transactions
|
||||||
|
self.move_all_to_future(&sender, current_nonce);
|
||||||
|
// And now lets check if there is some chain of transactions in future
|
||||||
|
// that should be placed in current. It should also update last_nonces.
|
||||||
|
self.move_matching_future_to_current(sender.clone(), current_nonce, current_nonce);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn update_future(&mut self, sender: &Address, current_nonce: U256) {
|
||||||
|
// We need to drain all transactions for current sender from future and reinsert them with updated height
|
||||||
|
let all_nonces_from_sender = match self.future.by_address.row(&sender) {
|
||||||
|
Some(row_map) => row_map.keys().cloned().collect::<Vec<U256>>(),
|
||||||
|
None => vec![],
|
||||||
|
};
|
||||||
|
for k in all_nonces_from_sender {
|
||||||
|
let order = self.future.drop(&sender, &k).unwrap();
|
||||||
|
self.future.insert(sender.clone(), k, order.update_height(k, current_nonce));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn move_all_to_future(&mut self, sender: &Address, current_nonce: U256) {
|
||||||
let all_nonces_from_sender = match self.current.by_address.row(&sender) {
|
let all_nonces_from_sender = match self.current.by_address.row(&sender) {
|
||||||
Some(row_map) => row_map.keys().cloned().collect::<Vec<U256>>(),
|
Some(row_map) => row_map.keys().cloned().collect::<Vec<U256>>(),
|
||||||
None => vec![],
|
None => vec![],
|
||||||
@ -266,21 +291,15 @@ impl TransactionQueue {
|
|||||||
// Goes to future or is removed
|
// Goes to future or is removed
|
||||||
let order = self.current.drop(&sender, &k).unwrap();
|
let order = self.current.drop(&sender, &k).unwrap();
|
||||||
if k >= current_nonce {
|
if k >= current_nonce {
|
||||||
println!("Moving to future: {:?}", order);
|
|
||||||
self.future.insert(sender.clone(), k, order.update_height(k, current_nonce));
|
self.future.insert(sender.clone(), k, order.update_height(k, current_nonce));
|
||||||
} else {
|
} else {
|
||||||
self.by_hash.remove(&order.hash);
|
self.by_hash.remove(&order.hash);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
self.future.enforce_limit(&self.by_hash);
|
self.future.enforce_limit(&mut self.by_hash);
|
||||||
|
|
||||||
// And now lets check if there is some chain of transactions in future
|
|
||||||
// that should be placed in current
|
|
||||||
if let Some(new_current_top) = self.move_future_txs(sender.clone(), current_nonce - U256::one(), current_nonce) {
|
|
||||||
self.last_nonces.insert(sender, new_current_top);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/// Returns top transactions from the queue
|
/// Returns top transactions from the queue
|
||||||
pub fn top_transactions(&self, size: usize) -> Vec<SignedTransaction> {
|
pub fn top_transactions(&self, size: usize) -> Vec<SignedTransaction> {
|
||||||
self.current.by_priority
|
self.current.by_priority
|
||||||
@ -299,67 +318,88 @@ impl TransactionQueue {
|
|||||||
self.last_nonces.clear();
|
self.last_nonces.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
fn move_future_txs(&mut self, address: Address, current_nonce: U256, first_nonce: U256) -> Option<U256> {
|
fn move_matching_future_to_current(&mut self, address: Address, mut current_nonce: U256, first_nonce: U256) {
|
||||||
println!("Moving from future for: {:?} base: {:?}", current_nonce, first_nonce);
|
|
||||||
let mut current_nonce = current_nonce + U256::one();
|
|
||||||
{
|
{
|
||||||
let by_nonce = self.future.by_address.row_mut(&address);
|
let by_nonce = self.future.by_address.row_mut(&address);
|
||||||
if let None = by_nonce {
|
if let None = by_nonce {
|
||||||
return None;
|
return;
|
||||||
}
|
}
|
||||||
let mut by_nonce = by_nonce.unwrap();
|
let mut by_nonce = by_nonce.unwrap();
|
||||||
while let Some(order) = by_nonce.remove(¤t_nonce) {
|
while let Some(order) = by_nonce.remove(¤t_nonce) {
|
||||||
// remove also from priority and hash
|
// remove also from priority and hash
|
||||||
self.future.by_priority.remove(&order);
|
self.future.by_priority.remove(&order);
|
||||||
// Put to current
|
// Put to current
|
||||||
println!("Moved: {:?}", order);
|
|
||||||
let order = order.update_height(current_nonce.clone(), first_nonce);
|
let order = order.update_height(current_nonce.clone(), first_nonce);
|
||||||
self.current.insert(address.clone(), current_nonce, order);
|
self.current.insert(address.clone(), current_nonce, order);
|
||||||
current_nonce = current_nonce + U256::one();
|
current_nonce = current_nonce + U256::one();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
self.future.by_address.clear_if_empty(&address);
|
self.future.by_address.clear_if_empty(&address);
|
||||||
// Returns last inserted nonce
|
// Update last inserted nonce
|
||||||
Some(current_nonce - U256::one())
|
self.last_nonces.insert(address, current_nonce - U256::one());
|
||||||
}
|
}
|
||||||
|
|
||||||
fn import_tx<T>(&mut self, tx: VerifiedTransaction, fetch_nonce: &T)
|
fn import_tx<T>(&mut self, tx: VerifiedTransaction, fetch_nonce: &T)
|
||||||
where T: Fn(&Address) -> U256 {
|
where T: Fn(&Address) -> U256 {
|
||||||
let nonce = tx.nonce();
|
|
||||||
let address = tx.sender();
|
|
||||||
|
|
||||||
|
if self.by_hash.get(&tx.hash()).is_some() {
|
||||||
|
// Transaction is already imported.
|
||||||
|
trace!(target: "sync", "Dropping already imported transaction with hash: {:?}", tx.hash());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
let address = tx.sender();
|
||||||
|
let nonce = tx.nonce();
|
||||||
|
|
||||||
|
let state_nonce = fetch_nonce(&address);
|
||||||
let next_nonce = self.last_nonces
|
let next_nonce = self.last_nonces
|
||||||
.get(&address)
|
.get(&address)
|
||||||
.cloned()
|
.cloned()
|
||||||
.map_or_else(|| fetch_nonce(&address), |n| n + U256::one());
|
.map_or(state_nonce, |n| n + U256::one());
|
||||||
|
|
||||||
println!("Expected next: {:?}, got: {:?}", next_nonce, nonce);
|
|
||||||
// Check height
|
// Check height
|
||||||
if nonce > next_nonce {
|
if nonce > next_nonce {
|
||||||
let order = TransactionOrder::for_transaction(&tx, next_nonce);
|
|
||||||
// Insert to by_hash
|
|
||||||
self.by_hash.insert(tx.hash(), tx);
|
|
||||||
// We have a gap - put to future
|
// We have a gap - put to future
|
||||||
self.future.insert(address, nonce, order);
|
Self::replace_transaction(tx, next_nonce, &mut self.future, &mut self.by_hash);
|
||||||
self.future.enforce_limit(&self.by_hash);
|
self.future.enforce_limit(&mut self.by_hash);
|
||||||
return;
|
return;
|
||||||
} else if next_nonce > nonce {
|
} else if nonce < state_nonce {
|
||||||
// Droping transaction
|
// Droping transaction
|
||||||
|
trace!(target: "sync", "Dropping transaction with nonce: {} - expecting: {}", nonce, next_nonce);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
let base_nonce = fetch_nonce(&address);
|
let base_nonce = fetch_nonce(&address);
|
||||||
let order = TransactionOrder::for_transaction(&tx, base_nonce);
|
Self::replace_transaction(tx, base_nonce.clone(), &mut self.current, &mut self.by_hash);
|
||||||
// Insert to by_hash
|
self.last_nonces.insert(address.clone(), nonce);
|
||||||
self.by_hash.insert(tx.hash(), tx);
|
|
||||||
|
|
||||||
// Insert to current
|
|
||||||
self.current.insert(address.clone(), nonce, order);
|
|
||||||
// But maybe there are some more items waiting in future?
|
// But maybe there are some more items waiting in future?
|
||||||
let new_last_nonce = self.move_future_txs(address.clone(), nonce, base_nonce);
|
self.move_matching_future_to_current(address.clone(), nonce + U256::one(), base_nonce);
|
||||||
self.last_nonces.insert(address.clone(), new_last_nonce.unwrap_or(nonce));
|
self.current.enforce_limit(&mut self.by_hash);
|
||||||
// Enforce limit
|
}
|
||||||
self.current.enforce_limit(&self.by_hash);
|
|
||||||
|
fn replace_transaction(tx: VerifiedTransaction, base_nonce: U256, set: &mut TransactionSet, by_hash: &mut HashMap<H256, VerifiedTransaction>) {
|
||||||
|
let order = TransactionOrder::for_transaction(&tx, base_nonce);
|
||||||
|
let hash = tx.hash();
|
||||||
|
let address = tx.sender();
|
||||||
|
let nonce = tx.nonce();
|
||||||
|
|
||||||
|
by_hash.insert(hash.clone(), tx);
|
||||||
|
if let Some(old) = set.insert(address, nonce, order.clone()) {
|
||||||
|
// There was already transaction in queue. Let's check which one should stay
|
||||||
|
let old_fee = old.gas_price;
|
||||||
|
let new_fee = order.gas_price;
|
||||||
|
if old_fee.cmp(&new_fee) == Ordering::Greater {
|
||||||
|
// Put back old transaction since it has greater priority (higher gas_price)
|
||||||
|
set.by_address.insert(address, nonce, old);
|
||||||
|
// and remove new one
|
||||||
|
set.by_priority.remove(&order);
|
||||||
|
by_hash.remove(&hash);
|
||||||
|
} else {
|
||||||
|
// Make sure we remove old transaction entirely
|
||||||
|
set.by_priority.remove(&old);
|
||||||
|
by_hash.remove(&old.hash);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -368,6 +408,7 @@ impl TransactionQueue {
|
|||||||
mod test {
|
mod test {
|
||||||
extern crate rustc_serialize;
|
extern crate rustc_serialize;
|
||||||
use self::rustc_serialize::hex::FromHex;
|
use self::rustc_serialize::hex::FromHex;
|
||||||
|
use std::ops::Deref;
|
||||||
use std::collections::{HashMap, BTreeSet};
|
use std::collections::{HashMap, BTreeSet};
|
||||||
use util::crypto::KeyPair;
|
use util::crypto::KeyPair;
|
||||||
use util::numbers::{U256, Uint};
|
use util::numbers::{U256, Uint};
|
||||||
@ -418,7 +459,7 @@ mod test {
|
|||||||
let (tx1, tx2) = new_txs(U256::from(1));
|
let (tx1, tx2) = new_txs(U256::from(1));
|
||||||
let tx1 = VerifiedTransaction::new(tx1);
|
let tx1 = VerifiedTransaction::new(tx1);
|
||||||
let tx2 = VerifiedTransaction::new(tx2);
|
let tx2 = VerifiedTransaction::new(tx2);
|
||||||
let by_hash = {
|
let mut by_hash = {
|
||||||
let mut x = HashMap::new();
|
let mut x = HashMap::new();
|
||||||
let tx1 = VerifiedTransaction::new(tx1.transaction.clone());
|
let tx1 = VerifiedTransaction::new(tx1.transaction.clone());
|
||||||
let tx2 = VerifiedTransaction::new(tx2.transaction.clone());
|
let tx2 = VerifiedTransaction::new(tx2.transaction.clone());
|
||||||
@ -435,9 +476,10 @@ mod test {
|
|||||||
assert_eq!(set.by_address.len(), 2);
|
assert_eq!(set.by_address.len(), 2);
|
||||||
|
|
||||||
// when
|
// when
|
||||||
set.enforce_limit(&by_hash);
|
set.enforce_limit(&mut by_hash);
|
||||||
|
|
||||||
// then
|
// then
|
||||||
|
assert_eq!(by_hash.len(), 1);
|
||||||
assert_eq!(set.by_priority.len(), 1);
|
assert_eq!(set.by_priority.len(), 1);
|
||||||
assert_eq!(set.by_address.len(), 1);
|
assert_eq!(set.by_address.len(), 1);
|
||||||
assert_eq!(set.by_priority.iter().next().unwrap().clone(), order1);
|
assert_eq!(set.by_priority.iter().next().unwrap().clone(), order1);
|
||||||
@ -638,7 +680,26 @@ mod test {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn should_accept_same_transaction_twice() {
|
fn should_not_insert_same_transaction_twice() {
|
||||||
|
// given
|
||||||
|
let nonce = |a: &Address| default_nonce(a) + U256::one();
|
||||||
|
let mut txq = TransactionQueue::new();
|
||||||
|
let (_tx1, tx2) = new_txs(U256::from(1));
|
||||||
|
txq.add(tx2.clone(), &default_nonce);
|
||||||
|
assert_eq!(txq.status().future, 1);
|
||||||
|
assert_eq!(txq.status().pending, 0);
|
||||||
|
|
||||||
|
// when
|
||||||
|
txq.add(tx2.clone(), &nonce);
|
||||||
|
|
||||||
|
// then
|
||||||
|
let stats = txq.status();
|
||||||
|
assert_eq!(stats.future, 1);
|
||||||
|
assert_eq!(stats.pending, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn should_accept_same_transaction_twice_if_removed() {
|
||||||
// given
|
// given
|
||||||
let mut txq = TransactionQueue::new();
|
let mut txq = TransactionQueue::new();
|
||||||
let (tx1, tx2) = new_txs(U256::from(1));
|
let (tx1, tx2) = new_txs(U256::from(1));
|
||||||
@ -680,4 +741,76 @@ mod test {
|
|||||||
assert_eq!(stats.pending, 2);
|
assert_eq!(stats.pending, 2);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn should_replace_same_transaction_when_has_higher_fee() {
|
||||||
|
// given
|
||||||
|
let mut txq = TransactionQueue::new();
|
||||||
|
let keypair = KeyPair::create().unwrap();
|
||||||
|
let tx = new_unsigned_tx(U256::from(123)).sign(&keypair.secret());
|
||||||
|
let tx2 = {
|
||||||
|
let mut tx2 = tx.deref().clone();
|
||||||
|
tx2.gas_price = U256::from(200);
|
||||||
|
tx2.sign(&keypair.secret())
|
||||||
|
};
|
||||||
|
|
||||||
|
// when
|
||||||
|
txq.add(tx, &default_nonce);
|
||||||
|
txq.add(tx2, &default_nonce);
|
||||||
|
|
||||||
|
// then
|
||||||
|
let stats = txq.status();
|
||||||
|
assert_eq!(stats.pending, 1);
|
||||||
|
assert_eq!(stats.future, 0);
|
||||||
|
assert_eq!(txq.top_transactions(1)[0].gas_price, U256::from(200));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn should_replace_same_transaction_when_importing_to_futures() {
|
||||||
|
// given
|
||||||
|
let mut txq = TransactionQueue::new();
|
||||||
|
let keypair = KeyPair::create().unwrap();
|
||||||
|
let tx0 = new_unsigned_tx(U256::from(123)).sign(&keypair.secret());
|
||||||
|
let tx1 = {
|
||||||
|
let mut tx1 = tx0.deref().clone();
|
||||||
|
tx1.nonce = U256::from(124);
|
||||||
|
tx1.sign(&keypair.secret())
|
||||||
|
};
|
||||||
|
let tx2 = {
|
||||||
|
let mut tx2 = tx1.deref().clone();
|
||||||
|
tx2.gas_price = U256::from(200);
|
||||||
|
tx2.sign(&keypair.secret())
|
||||||
|
};
|
||||||
|
|
||||||
|
// when
|
||||||
|
txq.add(tx1, &default_nonce);
|
||||||
|
txq.add(tx2, &default_nonce);
|
||||||
|
assert_eq!(txq.status().future, 1);
|
||||||
|
txq.add(tx0, &default_nonce);
|
||||||
|
|
||||||
|
// then
|
||||||
|
let stats = txq.status();
|
||||||
|
assert_eq!(stats.future, 0);
|
||||||
|
assert_eq!(stats.pending, 2);
|
||||||
|
assert_eq!(txq.top_transactions(2)[1].gas_price, U256::from(200));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn should_recalculate_height_when_removing_from_future() {
|
||||||
|
// given
|
||||||
|
let previous_nonce = |a: &Address| default_nonce(a) - U256::one();
|
||||||
|
let next_nonce = |a: &Address| default_nonce(a) + U256::one();
|
||||||
|
let mut txq = TransactionQueue::new();
|
||||||
|
let (tx1, tx2) = new_txs(U256::one());
|
||||||
|
txq.add(tx1.clone(), &previous_nonce);
|
||||||
|
txq.add(tx2, &previous_nonce);
|
||||||
|
assert_eq!(txq.status().future, 2);
|
||||||
|
|
||||||
|
// when
|
||||||
|
txq.remove(&tx1.hash(), &next_nonce);
|
||||||
|
|
||||||
|
// then
|
||||||
|
let stats = txq.status();
|
||||||
|
assert_eq!(stats.future, 0);
|
||||||
|
assert_eq!(stats.pending, 1);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user