Merge remote-tracking branch 'origin/tx_queue_integration' into tx_queue_rpc
This commit is contained in:
@@ -9,7 +9,7 @@ authors = ["Ethcore <admin@ethcore.io"]
|
||||
|
||||
[dependencies]
|
||||
ethcore-util = { path = "../util" }
|
||||
ethcore = { path = ".." }
|
||||
ethcore = { path = "../ethcore" }
|
||||
clippy = { version = "0.0.44", optional = true }
|
||||
log = "0.3"
|
||||
env_logger = "0.3"
|
||||
|
||||
@@ -34,7 +34,7 @@ use rayon::prelude::*;
|
||||
use std::mem::{replace};
|
||||
use ethcore::views::{HeaderView, BlockView};
|
||||
use ethcore::header::{BlockNumber, Header as BlockHeader};
|
||||
use ethcore::client::{BlockChainClient, BlockStatus, BlockId};
|
||||
use ethcore::client::{BlockChainClient, BlockStatus, BlockId, BlockChainInfo};
|
||||
use range_collection::{RangeCollection, ToUsize, FromUsize};
|
||||
use ethcore::error::*;
|
||||
use ethcore::block::Block;
|
||||
@@ -485,7 +485,7 @@ impl ChainSync {
|
||||
peer.latest_number = Some(header.number());
|
||||
}
|
||||
// TODO: Decompose block and add to self.headers and self.bodies instead
|
||||
if header.number == From::from(self.current_base_block() + 1) {
|
||||
if header.number <= From::from(self.current_base_block() + 1) {
|
||||
match io.chain().import_block(block_rlp.as_raw().to_vec()) {
|
||||
Err(Error::Import(ImportError::AlreadyInChain)) => {
|
||||
trace!(target: "sync", "New block already in chain {:?}", h);
|
||||
@@ -494,7 +494,10 @@ impl ChainSync {
|
||||
trace!(target: "sync", "New block already queued {:?}", h);
|
||||
},
|
||||
Ok(_) => {
|
||||
self.last_imported_block = Some(header.number);
|
||||
if self.current_base_block() < header.number {
|
||||
self.last_imported_block = Some(header.number);
|
||||
self.remove_downloaded_blocks(header.number);
|
||||
}
|
||||
trace!(target: "sync", "New block queued {:?}", h);
|
||||
},
|
||||
Err(Error::Block(BlockError::UnknownParent(p))) => {
|
||||
@@ -1174,9 +1177,7 @@ impl ChainSync {
|
||||
}
|
||||
|
||||
/// returns peer ids that have less blocks than our chain
|
||||
fn get_lagging_peers(&mut self, io: &SyncIo) -> Vec<(PeerId, BlockNumber)> {
|
||||
let chain = io.chain();
|
||||
let chain_info = chain.chain_info();
|
||||
fn get_lagging_peers(&mut self, chain_info: &BlockChainInfo, io: &SyncIo) -> Vec<(PeerId, BlockNumber)> {
|
||||
let latest_hash = chain_info.best_block_hash;
|
||||
let latest_number = chain_info.best_block_number;
|
||||
self.peers.iter_mut().filter_map(|(&id, ref mut peer_info)|
|
||||
@@ -1195,9 +1196,9 @@ impl ChainSync {
|
||||
}
|
||||
|
||||
/// propagates latest block to lagging peers
|
||||
fn propagate_blocks(&mut self, local_best: &H256, best_number: BlockNumber, io: &mut SyncIo) -> usize {
|
||||
fn propagate_blocks(&mut self, chain_info: &BlockChainInfo, io: &mut SyncIo) -> usize {
|
||||
let updated_peers = {
|
||||
let lagging_peers = self.get_lagging_peers(io);
|
||||
let lagging_peers = self.get_lagging_peers(chain_info, io);
|
||||
|
||||
// sqrt(x)/x scaled to max u32
|
||||
let fraction = (self.peers.len() as f64).powf(-0.5).mul(u32::max_value() as f64).round() as u32;
|
||||
@@ -1214,30 +1215,30 @@ impl ChainSync {
|
||||
for peer_id in updated_peers {
|
||||
let rlp = ChainSync::create_latest_block_rlp(io.chain());
|
||||
self.send_packet(io, peer_id, NEW_BLOCK_PACKET, rlp);
|
||||
self.peers.get_mut(&peer_id).unwrap().latest_hash = local_best.clone();
|
||||
self.peers.get_mut(&peer_id).unwrap().latest_number = Some(best_number);
|
||||
self.peers.get_mut(&peer_id).unwrap().latest_hash = chain_info.best_block_hash.clone();
|
||||
self.peers.get_mut(&peer_id).unwrap().latest_number = Some(chain_info.best_block_number);
|
||||
sent = sent + 1;
|
||||
}
|
||||
sent
|
||||
}
|
||||
|
||||
/// propagates new known hashes to all peers
|
||||
fn propagate_new_hashes(&mut self, local_best: &H256, best_number: BlockNumber, io: &mut SyncIo) -> usize {
|
||||
let updated_peers = self.get_lagging_peers(io);
|
||||
fn propagate_new_hashes(&mut self, chain_info: &BlockChainInfo, io: &mut SyncIo) -> usize {
|
||||
let updated_peers = self.get_lagging_peers(chain_info, io);
|
||||
let mut sent = 0;
|
||||
let last_parent = HeaderView::new(&io.chain().block_header(BlockId::Hash(local_best.clone())).unwrap()).parent_hash();
|
||||
let last_parent = HeaderView::new(&io.chain().block_header(BlockId::Hash(chain_info.best_block_hash.clone())).unwrap()).parent_hash();
|
||||
for (peer_id, peer_number) in updated_peers {
|
||||
let mut peer_best = self.peers.get(&peer_id).unwrap().latest_hash.clone();
|
||||
if best_number - peer_number > MAX_PEERS_PROPAGATION as BlockNumber {
|
||||
if chain_info.best_block_number - peer_number > MAX_PEERS_PROPAGATION as BlockNumber {
|
||||
// If we think peer is too far behind just send one latest hash
|
||||
peer_best = last_parent.clone();
|
||||
}
|
||||
sent = sent + match ChainSync::create_new_hashes_rlp(io.chain(), &peer_best, &local_best) {
|
||||
sent = sent + match ChainSync::create_new_hashes_rlp(io.chain(), &peer_best, &chain_info.best_block_hash) {
|
||||
Some(rlp) => {
|
||||
{
|
||||
let peer = self.peers.get_mut(&peer_id).unwrap();
|
||||
peer.latest_hash = local_best.clone();
|
||||
peer.latest_number = Some(best_number);
|
||||
peer.latest_hash = chain_info.best_block_hash.clone();
|
||||
peer.latest_number = Some(chain_info.best_block_number);
|
||||
}
|
||||
self.send_packet(io, peer_id, NEW_BLOCK_HASHES_PACKET, rlp);
|
||||
1
|
||||
@@ -1257,8 +1258,8 @@ impl ChainSync {
|
||||
pub fn chain_blocks_verified(&mut self, io: &mut SyncIo) {
|
||||
let chain = io.chain().chain_info();
|
||||
if (((chain.best_block_number as i64) - (self.last_send_block_number as i64)).abs() as BlockNumber) < MAX_PEER_LAG_PROPAGATION {
|
||||
let blocks = self.propagate_blocks(&chain.best_block_hash, chain.best_block_number, io);
|
||||
let hashes = self.propagate_new_hashes(&chain.best_block_hash, chain.best_block_number, io);
|
||||
let blocks = self.propagate_blocks(&chain, io);
|
||||
let hashes = self.propagate_new_hashes(&chain, io);
|
||||
if blocks != 0 || hashes != 0 {
|
||||
trace!(target: "sync", "Sent latest {} blocks and {} hashes to peers.", blocks, hashes);
|
||||
}
|
||||
@@ -1267,10 +1268,11 @@ impl ChainSync {
|
||||
}
|
||||
|
||||
/// called when block is imported to chain, updates transactions queue
|
||||
pub fn chain_new_blocks(&mut self, io: &SyncIo, good: &[H256], bad: &[H256]) {
|
||||
pub fn chain_new_blocks(&mut self, io: &SyncIo, good: &[H256], retracted: &[H256]) {
|
||||
fn fetch_transactions(chain: &BlockChainClient, hash: &H256) -> Vec<SignedTransaction> {
|
||||
let block = chain
|
||||
.block(BlockId::Hash(hash.clone()))
|
||||
// Client should send message after commit to db and inserting to chain.
|
||||
.expect("Expected in-chain blocks.");
|
||||
let block = BlockView::new(&block);
|
||||
block.transactions()
|
||||
@@ -1279,14 +1281,14 @@ impl ChainSync {
|
||||
|
||||
let chain = io.chain();
|
||||
let good = good.par_iter().map(|h| fetch_transactions(chain, h));
|
||||
let bad = bad.par_iter().map(|h| fetch_transactions(chain, h));
|
||||
let retracted = retracted.par_iter().map(|h| fetch_transactions(chain, h));
|
||||
|
||||
good.for_each(|txs| {
|
||||
let mut transaction_queue = self.transaction_queue.lock().unwrap();
|
||||
let hashes = txs.iter().map(|tx| tx.hash()).collect::<Vec<H256>>();
|
||||
transaction_queue.remove_all(&hashes, |a| chain.nonce(a));
|
||||
});
|
||||
bad.for_each(|txs| {
|
||||
retracted.for_each(|txs| {
|
||||
// populate sender
|
||||
for tx in &txs {
|
||||
let _sender = tx.sender();
|
||||
@@ -1435,12 +1437,13 @@ mod tests {
|
||||
#[test]
|
||||
fn finds_lagging_peers() {
|
||||
let mut client = TestBlockChainClient::new();
|
||||
client.add_blocks(100, BlocksWith::Uncle);
|
||||
client.add_blocks(100, EachBlockWith::Uncle);
|
||||
let mut queue = VecDeque::new();
|
||||
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(10));
|
||||
let chain_info = client.chain_info();
|
||||
let io = TestIo::new(&mut client, &mut queue, None);
|
||||
|
||||
let lagging_peers = sync.get_lagging_peers(&io);
|
||||
let lagging_peers = sync.get_lagging_peers(&chain_info, &io);
|
||||
|
||||
assert_eq!(1, lagging_peers.len())
|
||||
}
|
||||
@@ -1448,7 +1451,7 @@ mod tests {
|
||||
#[test]
|
||||
fn calculates_tree_for_lagging_peer() {
|
||||
let mut client = TestBlockChainClient::new();
|
||||
client.add_blocks(15, BlocksWith::Uncle);
|
||||
client.add_blocks(15, EachBlockWith::Uncle);
|
||||
|
||||
let start = client.block_hash_delta_minus(4);
|
||||
let end = client.block_hash_delta_minus(2);
|
||||
@@ -1465,14 +1468,13 @@ mod tests {
|
||||
#[test]
|
||||
fn sends_new_hashes_to_lagging_peer() {
|
||||
let mut client = TestBlockChainClient::new();
|
||||
client.add_blocks(100, BlocksWith::Uncle);
|
||||
client.add_blocks(100, EachBlockWith::Uncle);
|
||||
let mut queue = VecDeque::new();
|
||||
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5));
|
||||
let best_hash = client.chain_info().best_block_hash.clone();
|
||||
let best_number = client.chain_info().best_block_number;
|
||||
let chain_info = client.chain_info();
|
||||
let mut io = TestIo::new(&mut client, &mut queue, None);
|
||||
|
||||
let peer_count = sync.propagate_new_hashes(&best_hash, best_number, &mut io);
|
||||
let peer_count = sync.propagate_new_hashes(&chain_info, &mut io);
|
||||
|
||||
// 1 message should be send
|
||||
assert_eq!(1, io.queue.len());
|
||||
@@ -1485,14 +1487,12 @@ mod tests {
|
||||
#[test]
|
||||
fn sends_latest_block_to_lagging_peer() {
|
||||
let mut client = TestBlockChainClient::new();
|
||||
client.add_blocks(100, BlocksWith::Uncle);
|
||||
client.add_blocks(100, EachBlockWith::Uncle);
|
||||
let mut queue = VecDeque::new();
|
||||
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5));
|
||||
let best_hash = client.chain_info().best_block_hash.clone();
|
||||
let best_number = client.chain_info().best_block_number;
|
||||
let chain_info = client.chain_info();
|
||||
let mut io = TestIo::new(&mut client, &mut queue, None);
|
||||
|
||||
let peer_count = sync.propagate_blocks(&best_hash, best_number, &mut io);
|
||||
let peer_count = sync.propagate_blocks(&chain_info, &mut io);
|
||||
|
||||
// 1 message should be send
|
||||
assert_eq!(1, io.queue.len());
|
||||
@@ -1505,7 +1505,7 @@ mod tests {
|
||||
#[test]
|
||||
fn handles_peer_new_block_mallformed() {
|
||||
let mut client = TestBlockChainClient::new();
|
||||
client.add_blocks(10, BlocksWith::Uncle);
|
||||
client.add_blocks(10, EachBlockWith::Uncle);
|
||||
|
||||
let block_data = get_dummy_block(11, client.chain_info().best_block_hash);
|
||||
|
||||
@@ -1523,7 +1523,7 @@ mod tests {
|
||||
#[test]
|
||||
fn handles_peer_new_block() {
|
||||
let mut client = TestBlockChainClient::new();
|
||||
client.add_blocks(10, BlocksWith::Uncle);
|
||||
client.add_blocks(10, EachBlockWith::Uncle);
|
||||
|
||||
let block_data = get_dummy_blocks(11, client.chain_info().best_block_hash);
|
||||
|
||||
@@ -1541,7 +1541,7 @@ mod tests {
|
||||
#[test]
|
||||
fn handles_peer_new_block_empty() {
|
||||
let mut client = TestBlockChainClient::new();
|
||||
client.add_blocks(10, BlocksWith::Uncle);
|
||||
client.add_blocks(10, EachBlockWith::Uncle);
|
||||
let mut queue = VecDeque::new();
|
||||
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5));
|
||||
let mut io = TestIo::new(&mut client, &mut queue, None);
|
||||
@@ -1557,7 +1557,7 @@ mod tests {
|
||||
#[test]
|
||||
fn handles_peer_new_hashes() {
|
||||
let mut client = TestBlockChainClient::new();
|
||||
client.add_blocks(10, BlocksWith::Uncle);
|
||||
client.add_blocks(10, EachBlockWith::Uncle);
|
||||
let mut queue = VecDeque::new();
|
||||
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5));
|
||||
let mut io = TestIo::new(&mut client, &mut queue, None);
|
||||
@@ -1573,7 +1573,7 @@ mod tests {
|
||||
#[test]
|
||||
fn handles_peer_new_hashes_empty() {
|
||||
let mut client = TestBlockChainClient::new();
|
||||
client.add_blocks(10, BlocksWith::Uncle);
|
||||
client.add_blocks(10, EachBlockWith::Uncle);
|
||||
let mut queue = VecDeque::new();
|
||||
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5));
|
||||
let mut io = TestIo::new(&mut client, &mut queue, None);
|
||||
@@ -1591,14 +1591,13 @@ mod tests {
|
||||
#[test]
|
||||
fn hashes_rlp_mutually_acceptable() {
|
||||
let mut client = TestBlockChainClient::new();
|
||||
client.add_blocks(100, BlocksWith::Uncle);
|
||||
client.add_blocks(100, EachBlockWith::Uncle);
|
||||
let mut queue = VecDeque::new();
|
||||
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5));
|
||||
let best_hash = client.chain_info().best_block_hash.clone();
|
||||
let best_number = client.chain_info().best_block_number;
|
||||
let chain_info = client.chain_info();
|
||||
let mut io = TestIo::new(&mut client, &mut queue, None);
|
||||
|
||||
sync.propagate_new_hashes(&best_hash, best_number, &mut io);
|
||||
sync.propagate_new_hashes(&chain_info, &mut io);
|
||||
|
||||
let data = &io.queue[0].data.clone();
|
||||
let result = sync.on_peer_new_hashes(&mut io, 0, &UntrustedRlp::new(&data));
|
||||
@@ -1610,14 +1609,13 @@ mod tests {
|
||||
#[test]
|
||||
fn block_rlp_mutually_acceptable() {
|
||||
let mut client = TestBlockChainClient::new();
|
||||
client.add_blocks(100, BlocksWith::Uncle);
|
||||
client.add_blocks(100, EachBlockWith::Uncle);
|
||||
let mut queue = VecDeque::new();
|
||||
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5));
|
||||
let best_hash = client.chain_info().best_block_hash.clone();
|
||||
let best_number = client.chain_info().best_block_number;
|
||||
let chain_info = client.chain_info();
|
||||
let mut io = TestIo::new(&mut client, &mut queue, None);
|
||||
|
||||
sync.propagate_blocks(&best_hash, best_number, &mut io);
|
||||
sync.propagate_blocks(&chain_info, &mut io);
|
||||
|
||||
let data = &io.queue[0].data.clone();
|
||||
let result = sync.on_peer_new_block(&mut io, 0, &UntrustedRlp::new(&data));
|
||||
@@ -1628,13 +1626,13 @@ mod tests {
|
||||
fn should_add_transactions_to_queue() {
|
||||
// given
|
||||
let mut client = TestBlockChainClient::new();
|
||||
client.add_blocks(98, BlocksWith::Uncle);
|
||||
client.add_blocks(1, BlocksWith::UncleAndTransaction);
|
||||
client.add_blocks(1, BlocksWith::Transaction);
|
||||
client.add_blocks(98, EachBlockWith::Uncle);
|
||||
client.add_blocks(1, EachBlockWith::UncleAndTransaction);
|
||||
client.add_blocks(1, EachBlockWith::Transaction);
|
||||
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5));
|
||||
|
||||
let good_blocks = vec![client.block_hash_delta_minus(2)];
|
||||
let bad_blocks = vec![client.block_hash_delta_minus(1)];
|
||||
let retracted_blocks = vec![client.block_hash_delta_minus(1)];
|
||||
|
||||
let mut queue = VecDeque::new();
|
||||
let io = TestIo::new(&mut client, &mut queue, None);
|
||||
@@ -1643,7 +1641,7 @@ mod tests {
|
||||
sync.chain_new_blocks(&io, &[], &good_blocks);
|
||||
assert_eq!(sync.transaction_queue.lock().unwrap().status().future, 0);
|
||||
assert_eq!(sync.transaction_queue.lock().unwrap().status().pending, 1);
|
||||
sync.chain_new_blocks(&io, &good_blocks, &bad_blocks);
|
||||
sync.chain_new_blocks(&io, &good_blocks, &retracted_blocks);
|
||||
|
||||
// then
|
||||
let status = sync.transaction_queue.lock().unwrap().status();
|
||||
@@ -1654,7 +1652,7 @@ mod tests {
|
||||
#[test]
|
||||
fn returns_requested_block_headers() {
|
||||
let mut client = TestBlockChainClient::new();
|
||||
client.add_blocks(100, BlocksWith::Uncle);
|
||||
client.add_blocks(100, EachBlockWith::Uncle);
|
||||
let mut queue = VecDeque::new();
|
||||
let io = TestIo::new(&mut client, &mut queue, None);
|
||||
|
||||
@@ -1678,7 +1676,7 @@ mod tests {
|
||||
#[test]
|
||||
fn returns_requested_block_headers_reverse() {
|
||||
let mut client = TestBlockChainClient::new();
|
||||
client.add_blocks(100, BlocksWith::Uncle);
|
||||
client.add_blocks(100, EachBlockWith::Uncle);
|
||||
let mut queue = VecDeque::new();
|
||||
let io = TestIo::new(&mut client, &mut queue, None);
|
||||
|
||||
|
||||
@@ -157,9 +157,9 @@ impl NetworkProtocolHandler<SyncMessage> for EthSync {
|
||||
SyncMessage::BlockVerified => {
|
||||
self.sync.write().unwrap().chain_blocks_verified(&mut NetSyncIo::new(io, self.chain.deref()));
|
||||
},
|
||||
SyncMessage::NewChainBlocks { ref good, ref bad } => {
|
||||
SyncMessage::NewChainBlocks { ref good, ref retracted } => {
|
||||
let sync_io = NetSyncIo::new(io, self.chain.deref());
|
||||
self.sync.write().unwrap().chain_new_blocks(&sync_io, good, bad);
|
||||
self.sync.write().unwrap().chain_new_blocks(&sync_io, good, retracted);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -24,8 +24,8 @@ use super::helpers::*;
|
||||
fn two_peers() {
|
||||
::env_logger::init().ok();
|
||||
let mut net = TestNet::new(3);
|
||||
net.peer_mut(1).chain.add_blocks(1000, BlocksWith::Uncle);
|
||||
net.peer_mut(2).chain.add_blocks(1000, BlocksWith::Uncle);
|
||||
net.peer_mut(1).chain.add_blocks(1000, EachBlockWith::Uncle);
|
||||
net.peer_mut(2).chain.add_blocks(1000, EachBlockWith::Uncle);
|
||||
net.sync();
|
||||
assert!(net.peer(0).chain.block(BlockId::Number(1000)).is_some());
|
||||
assert_eq!(net.peer(0).chain.blocks.read().unwrap().deref(), net.peer(1).chain.blocks.read().unwrap().deref());
|
||||
@@ -35,8 +35,8 @@ fn two_peers() {
|
||||
fn status_after_sync() {
|
||||
::env_logger::init().ok();
|
||||
let mut net = TestNet::new(3);
|
||||
net.peer_mut(1).chain.add_blocks(1000, BlocksWith::Uncle);
|
||||
net.peer_mut(2).chain.add_blocks(1000, BlocksWith::Uncle);
|
||||
net.peer_mut(1).chain.add_blocks(1000, EachBlockWith::Uncle);
|
||||
net.peer_mut(2).chain.add_blocks(1000, EachBlockWith::Uncle);
|
||||
net.sync();
|
||||
let status = net.peer(0).sync.status();
|
||||
assert_eq!(status.state, SyncState::Idle);
|
||||
@@ -45,8 +45,8 @@ fn status_after_sync() {
|
||||
#[test]
|
||||
fn takes_few_steps() {
|
||||
let mut net = TestNet::new(3);
|
||||
net.peer_mut(1).chain.add_blocks(100, BlocksWith::Uncle);
|
||||
net.peer_mut(2).chain.add_blocks(100, BlocksWith::Uncle);
|
||||
net.peer_mut(1).chain.add_blocks(100, EachBlockWith::Uncle);
|
||||
net.peer_mut(2).chain.add_blocks(100, EachBlockWith::Uncle);
|
||||
let total_steps = net.sync();
|
||||
assert!(total_steps < 7);
|
||||
}
|
||||
@@ -56,7 +56,7 @@ fn empty_blocks() {
|
||||
::env_logger::init().ok();
|
||||
let mut net = TestNet::new(3);
|
||||
for n in 0..200 {
|
||||
let with = if n % 2 == 0 { BlocksWith::Nothing } else { BlocksWith::Uncle };
|
||||
let with = if n % 2 == 0 { EachBlockWith::Nothing } else { EachBlockWith::Uncle };
|
||||
net.peer_mut(1).chain.add_blocks(5, with.clone());
|
||||
net.peer_mut(2).chain.add_blocks(5, with);
|
||||
}
|
||||
@@ -69,14 +69,14 @@ fn empty_blocks() {
|
||||
fn forked() {
|
||||
::env_logger::init().ok();
|
||||
let mut net = TestNet::new(3);
|
||||
net.peer_mut(0).chain.add_blocks(300, BlocksWith::Uncle);
|
||||
net.peer_mut(1).chain.add_blocks(300, BlocksWith::Uncle);
|
||||
net.peer_mut(2).chain.add_blocks(300, BlocksWith::Uncle);
|
||||
net.peer_mut(0).chain.add_blocks(100, BlocksWith::Nothing); //fork
|
||||
net.peer_mut(1).chain.add_blocks(200, BlocksWith::Uncle);
|
||||
net.peer_mut(2).chain.add_blocks(200, BlocksWith::Uncle);
|
||||
net.peer_mut(1).chain.add_blocks(100, BlocksWith::Uncle); //fork between 1 and 2
|
||||
net.peer_mut(2).chain.add_blocks(10, BlocksWith::Nothing);
|
||||
net.peer_mut(0).chain.add_blocks(300, EachBlockWith::Uncle);
|
||||
net.peer_mut(1).chain.add_blocks(300, EachBlockWith::Uncle);
|
||||
net.peer_mut(2).chain.add_blocks(300, EachBlockWith::Uncle);
|
||||
net.peer_mut(0).chain.add_blocks(100, EachBlockWith::Nothing); //fork
|
||||
net.peer_mut(1).chain.add_blocks(200, EachBlockWith::Uncle);
|
||||
net.peer_mut(2).chain.add_blocks(200, EachBlockWith::Uncle);
|
||||
net.peer_mut(1).chain.add_blocks(100, EachBlockWith::Uncle); //fork between 1 and 2
|
||||
net.peer_mut(2).chain.add_blocks(10, EachBlockWith::Nothing);
|
||||
// peer 1 has the best chain of 601 blocks
|
||||
let peer1_chain = net.peer(1).chain.numbers.read().unwrap().clone();
|
||||
net.sync();
|
||||
@@ -88,8 +88,8 @@ fn forked() {
|
||||
#[test]
|
||||
fn restart() {
|
||||
let mut net = TestNet::new(3);
|
||||
net.peer_mut(1).chain.add_blocks(1000, BlocksWith::Uncle);
|
||||
net.peer_mut(2).chain.add_blocks(1000, BlocksWith::Uncle);
|
||||
net.peer_mut(1).chain.add_blocks(1000, EachBlockWith::Uncle);
|
||||
net.peer_mut(2).chain.add_blocks(1000, EachBlockWith::Uncle);
|
||||
|
||||
net.sync_steps(8);
|
||||
|
||||
@@ -110,8 +110,8 @@ fn status_empty() {
|
||||
#[test]
|
||||
fn status_packet() {
|
||||
let mut net = TestNet::new(2);
|
||||
net.peer_mut(0).chain.add_blocks(100, BlocksWith::Uncle);
|
||||
net.peer_mut(1).chain.add_blocks(1, BlocksWith::Uncle);
|
||||
net.peer_mut(0).chain.add_blocks(100, EachBlockWith::Uncle);
|
||||
net.peer_mut(1).chain.add_blocks(1, EachBlockWith::Uncle);
|
||||
|
||||
net.start();
|
||||
|
||||
@@ -124,10 +124,10 @@ fn status_packet() {
|
||||
#[test]
|
||||
fn propagate_hashes() {
|
||||
let mut net = TestNet::new(6);
|
||||
net.peer_mut(1).chain.add_blocks(10, BlocksWith::Uncle);
|
||||
net.peer_mut(1).chain.add_blocks(10, EachBlockWith::Uncle);
|
||||
net.sync();
|
||||
|
||||
net.peer_mut(0).chain.add_blocks(10, BlocksWith::Uncle);
|
||||
net.peer_mut(0).chain.add_blocks(10, EachBlockWith::Uncle);
|
||||
net.sync();
|
||||
net.trigger_block_verified(0); //first event just sets the marker
|
||||
net.trigger_block_verified(0);
|
||||
@@ -150,10 +150,10 @@ fn propagate_hashes() {
|
||||
#[test]
|
||||
fn propagate_blocks() {
|
||||
let mut net = TestNet::new(2);
|
||||
net.peer_mut(1).chain.add_blocks(10, BlocksWith::Uncle);
|
||||
net.peer_mut(1).chain.add_blocks(10, EachBlockWith::Uncle);
|
||||
net.sync();
|
||||
|
||||
net.peer_mut(0).chain.add_blocks(10, BlocksWith::Uncle);
|
||||
net.peer_mut(0).chain.add_blocks(10, EachBlockWith::Uncle);
|
||||
net.trigger_block_verified(0); //first event just sets the marker
|
||||
net.trigger_block_verified(0);
|
||||
|
||||
@@ -165,7 +165,7 @@ fn propagate_blocks() {
|
||||
#[test]
|
||||
fn restart_on_malformed_block() {
|
||||
let mut net = TestNet::new(2);
|
||||
net.peer_mut(1).chain.add_blocks(10, BlocksWith::Uncle);
|
||||
net.peer_mut(1).chain.add_blocks(10, EachBlockWith::Uncle);
|
||||
net.peer_mut(1).chain.corrupt_block(6);
|
||||
net.sync_steps(10);
|
||||
|
||||
|
||||
@@ -35,7 +35,7 @@ pub struct TestBlockChainClient {
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub enum BlocksWith {
|
||||
pub enum EachBlockWith {
|
||||
Nothing,
|
||||
Uncle,
|
||||
Transaction,
|
||||
@@ -52,12 +52,12 @@ impl TestBlockChainClient {
|
||||
last_hash: RwLock::new(H256::new()),
|
||||
difficulty: RwLock::new(From::from(0)),
|
||||
};
|
||||
client.add_blocks(1, BlocksWith::Nothing); // add genesis block
|
||||
client.add_blocks(1, EachBlockWith::Nothing); // add genesis block
|
||||
client.genesis_hash = client.last_hash.read().unwrap().clone();
|
||||
client
|
||||
}
|
||||
|
||||
pub fn add_blocks(&mut self, count: usize, with: BlocksWith) {
|
||||
pub fn add_blocks(&mut self, count: usize, with: EachBlockWith) {
|
||||
let len = self.numbers.read().unwrap().len();
|
||||
for n in len..(len + count) {
|
||||
let mut header = BlockHeader::new();
|
||||
@@ -65,7 +65,7 @@ impl TestBlockChainClient {
|
||||
header.parent_hash = self.last_hash.read().unwrap().clone();
|
||||
header.number = n as BlockNumber;
|
||||
let uncles = match with {
|
||||
BlocksWith::Uncle | BlocksWith::UncleAndTransaction => {
|
||||
EachBlockWith::Uncle | EachBlockWith::UncleAndTransaction => {
|
||||
let mut uncles = RlpStream::new_list(1);
|
||||
let mut uncle_header = BlockHeader::new();
|
||||
uncle_header.difficulty = From::from(n);
|
||||
@@ -78,7 +78,7 @@ impl TestBlockChainClient {
|
||||
_ => RlpStream::new_list(0)
|
||||
};
|
||||
let txs = match with {
|
||||
BlocksWith::Transaction | BlocksWith::UncleAndTransaction => {
|
||||
EachBlockWith::Transaction | EachBlockWith::UncleAndTransaction => {
|
||||
let mut txs = RlpStream::new_list(1);
|
||||
let keypair = KeyPair::create().unwrap();
|
||||
let tx = Transaction {
|
||||
@@ -136,6 +136,10 @@ impl BlockChainClient for TestBlockChainClient {
|
||||
Some(U256::zero())
|
||||
}
|
||||
|
||||
fn block_hash(&self, _id: BlockId) -> Option<H256> {
|
||||
unimplemented!();
|
||||
}
|
||||
|
||||
fn nonce(&self, _address: &Address) -> U256 {
|
||||
U256::zero()
|
||||
}
|
||||
|
||||
@@ -108,27 +108,29 @@ struct TransactionSet {
|
||||
}
|
||||
|
||||
impl TransactionSet {
|
||||
fn insert(&mut self, sender: Address, nonce: U256, order: TransactionOrder) {
|
||||
fn insert(&mut self, sender: Address, nonce: U256, order: TransactionOrder) -> Option<TransactionOrder> {
|
||||
self.by_priority.insert(order.clone());
|
||||
self.by_address.insert(sender, nonce, order);
|
||||
self.by_address.insert(sender, nonce, order)
|
||||
}
|
||||
|
||||
fn enforce_limit(&mut self, by_hash: &HashMap<H256, VerifiedTransaction>) {
|
||||
fn enforce_limit(&mut self, by_hash: &mut HashMap<H256, VerifiedTransaction>) {
|
||||
let len = self.by_priority.len();
|
||||
if len <= self.limit {
|
||||
return;
|
||||
}
|
||||
|
||||
let to_drop : Vec<&VerifiedTransaction> = {
|
||||
let to_drop : Vec<(Address, U256)> = {
|
||||
self.by_priority
|
||||
.iter()
|
||||
.skip(self.limit)
|
||||
.map(|order| by_hash.get(&order.hash).expect("Inconsistency in queue detected."))
|
||||
.map(|tx| (tx.sender(), tx.nonce()))
|
||||
.collect()
|
||||
};
|
||||
|
||||
for tx in to_drop {
|
||||
self.drop(&tx.sender(), &tx.nonce());
|
||||
for (sender, nonce) in to_drop {
|
||||
let order = self.drop(&sender, &nonce).expect("Dropping transaction found in priority queue failed.");
|
||||
by_hash.remove(&order.hash).expect("Inconsistency in queue.");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -236,26 +238,50 @@ impl TransactionQueue {
|
||||
// We don't know this transaction
|
||||
return;
|
||||
}
|
||||
|
||||
let transaction = transaction.unwrap();
|
||||
let sender = transaction.sender();
|
||||
let nonce = transaction.nonce();
|
||||
let current_nonce = fetch_nonce(&sender);
|
||||
|
||||
// Remove from future
|
||||
self.future.drop(&sender, &nonce);
|
||||
|
||||
// Remove from current
|
||||
let order = self.current.drop(&sender, &nonce);
|
||||
if order.is_none() {
|
||||
let order = self.future.drop(&sender, &nonce);
|
||||
if order.is_some() {
|
||||
self.recalculate_future_for_sender(&sender, current_nonce);
|
||||
// And now lets check if there is some chain of transactions in future
|
||||
// that should be placed in current
|
||||
self.move_future_txs(sender.clone(), current_nonce, current_nonce);
|
||||
return;
|
||||
}
|
||||
|
||||
// Let's remove transactions where tx.nonce < current_nonce
|
||||
// and if there are any future transactions matching current_nonce+1 - move to current
|
||||
let current_nonce = fetch_nonce(&sender);
|
||||
// We will either move transaction to future or remove it completely
|
||||
// so there will be no transactions from this sender in current
|
||||
self.last_nonces.remove(&sender);
|
||||
// Remove from current
|
||||
let order = self.current.drop(&sender, &nonce);
|
||||
if order.is_some() {
|
||||
// We will either move transaction to future or remove it completely
|
||||
// so there will be no transactions from this sender in current
|
||||
self.last_nonces.remove(&sender);
|
||||
// This should move all current transactions to future and remove old transactions
|
||||
self.move_all_to_future(&sender, current_nonce);
|
||||
// And now lets check if there is some chain of transactions in future
|
||||
// that should be placed in current. It should also update last_nonces.
|
||||
self.move_future_txs(sender.clone(), current_nonce, current_nonce);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
fn recalculate_future_for_sender(&mut self, sender: &Address, current_nonce: U256) {
|
||||
// We need to drain all transactions for current sender from future and reinsert them with updated height
|
||||
let all_nonces_from_sender = match self.future.by_address.row(&sender) {
|
||||
Some(row_map) => row_map.keys().cloned().collect::<Vec<U256>>(),
|
||||
None => vec![],
|
||||
};
|
||||
for k in all_nonces_from_sender {
|
||||
let order = self.future.drop(&sender, &k).unwrap();
|
||||
self.future.insert(sender.clone(), k, order.update_height(k, current_nonce));
|
||||
}
|
||||
}
|
||||
|
||||
fn move_all_to_future(&mut self, sender: &Address, current_nonce: U256) {
|
||||
let all_nonces_from_sender = match self.current.by_address.row(&sender) {
|
||||
Some(row_map) => row_map.keys().cloned().collect::<Vec<U256>>(),
|
||||
None => vec![],
|
||||
@@ -270,7 +296,7 @@ impl TransactionQueue {
|
||||
self.by_hash.remove(&order.hash);
|
||||
}
|
||||
}
|
||||
self.future.enforce_limit(&self.by_hash);
|
||||
self.future.enforce_limit(&mut self.by_hash);
|
||||
|
||||
// And now lets check if there is some chain of transactions in future
|
||||
// that should be placed in current
|
||||
@@ -279,6 +305,7 @@ impl TransactionQueue {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/// Returns top transactions from the queue
|
||||
pub fn top_transactions(&self, size: usize) -> Vec<SignedTransaction> {
|
||||
self.current.by_priority
|
||||
@@ -297,11 +324,11 @@ impl TransactionQueue {
|
||||
self.last_nonces.clear();
|
||||
}
|
||||
|
||||
fn move_future_txs(&mut self, address: Address, mut current_nonce: U256, first_nonce: U256) -> Option<U256> {
|
||||
fn move_future_txs(&mut self, address: Address, mut current_nonce: U256, first_nonce: U256) {
|
||||
{
|
||||
let by_nonce = self.future.by_address.row_mut(&address);
|
||||
if let None = by_nonce {
|
||||
return None;
|
||||
return;
|
||||
}
|
||||
let mut by_nonce = by_nonce.unwrap();
|
||||
while let Some(order) = by_nonce.remove(¤t_nonce) {
|
||||
@@ -314,47 +341,69 @@ impl TransactionQueue {
|
||||
}
|
||||
}
|
||||
self.future.by_address.clear_if_empty(&address);
|
||||
// Returns last inserted nonce
|
||||
Some(current_nonce - U256::one())
|
||||
// Update last inserted nonce
|
||||
self.last_nonces.insert(address, current_nonce - U256::one());
|
||||
}
|
||||
|
||||
fn import_tx<T>(&mut self, tx: VerifiedTransaction, fetch_nonce: &T)
|
||||
where T: Fn(&Address) -> U256 {
|
||||
|
||||
if self.by_hash.get(&tx.hash()).is_some() {
|
||||
// Transaction is already imported.
|
||||
trace!(target: "sync", "Dropping already imported transaction with hash: {:?}", tx.hash());
|
||||
return;
|
||||
}
|
||||
|
||||
let nonce = tx.nonce();
|
||||
let address = tx.sender();
|
||||
|
||||
let state_nonce = fetch_nonce(&address);
|
||||
let next_nonce = self.last_nonces
|
||||
.get(&address)
|
||||
.cloned()
|
||||
.map_or_else(|| fetch_nonce(&address), |n| n + U256::one());
|
||||
.map_or(state_nonce, |n| n + U256::one());
|
||||
|
||||
// Check height
|
||||
if nonce > next_nonce {
|
||||
let order = TransactionOrder::for_transaction(&tx, next_nonce);
|
||||
// Insert to by_hash
|
||||
self.by_hash.insert(tx.hash(), tx);
|
||||
// We have a gap - put to future
|
||||
self.future.insert(address, nonce, order);
|
||||
self.future.enforce_limit(&self.by_hash);
|
||||
Self::replace_transaction(tx, next_nonce, &mut self.future, &mut self.by_hash);
|
||||
self.future.enforce_limit(&mut self.by_hash);
|
||||
return;
|
||||
} else if next_nonce > nonce {
|
||||
} else if nonce < state_nonce {
|
||||
// Droping transaction
|
||||
trace!(target: "sync", "Dropping transaction with nonce: {} - expecting: {}", nonce, next_nonce);
|
||||
return;
|
||||
}
|
||||
|
||||
let base_nonce = fetch_nonce(&address);
|
||||
let order = TransactionOrder::for_transaction(&tx, base_nonce);
|
||||
// Insert to by_hash
|
||||
self.by_hash.insert(tx.hash(), tx);
|
||||
|
||||
// Insert to current
|
||||
self.current.insert(address.clone(), nonce, order);
|
||||
Self::replace_transaction(tx, base_nonce.clone(), &mut self.current, &mut self.by_hash);
|
||||
self.last_nonces.insert(address.clone(), nonce);
|
||||
// But maybe there are some more items waiting in future?
|
||||
let new_last_nonce = self.move_future_txs(address.clone(), nonce + U256::one(), base_nonce);
|
||||
self.last_nonces.insert(address.clone(), new_last_nonce.unwrap_or(nonce));
|
||||
// Enforce limit
|
||||
self.current.enforce_limit(&self.by_hash);
|
||||
self.move_future_txs(address.clone(), nonce + U256::one(), base_nonce);
|
||||
self.current.enforce_limit(&mut self.by_hash);
|
||||
}
|
||||
|
||||
fn replace_transaction(tx: VerifiedTransaction, base_nonce: U256, set: &mut TransactionSet, by_hash: &mut HashMap<H256, VerifiedTransaction>) {
|
||||
let order = TransactionOrder::for_transaction(&tx, base_nonce);
|
||||
let hash = tx.hash();
|
||||
let address = tx.sender();
|
||||
let nonce = tx.nonce();
|
||||
|
||||
by_hash.insert(hash.clone(), tx);
|
||||
if let Some(old) = set.insert(address, nonce, order.clone()) {
|
||||
// There was already transaction in queue. Let's check which one should stay
|
||||
if old.cmp(&order) == Ordering::Greater {
|
||||
assert!(old.nonce_height == order.nonce_height, "Both transactions should have the same height.");
|
||||
// Put back old transaction since it has greater priority (higher gas_price)
|
||||
set.insert(address, nonce, old);
|
||||
by_hash.remove(&hash);
|
||||
} else {
|
||||
// Make sure we remove old transaction entirely
|
||||
set.by_priority.remove(&old);
|
||||
by_hash.remove(&old.hash);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -363,6 +412,7 @@ impl TransactionQueue {
|
||||
mod test {
|
||||
extern crate rustc_serialize;
|
||||
use self::rustc_serialize::hex::FromHex;
|
||||
use std::ops::Deref;
|
||||
use std::collections::{HashMap, BTreeSet};
|
||||
use util::crypto::KeyPair;
|
||||
use util::numbers::{U256, Uint};
|
||||
@@ -413,7 +463,7 @@ mod test {
|
||||
let (tx1, tx2) = new_txs(U256::from(1));
|
||||
let tx1 = VerifiedTransaction::new(tx1);
|
||||
let tx2 = VerifiedTransaction::new(tx2);
|
||||
let by_hash = {
|
||||
let mut by_hash = {
|
||||
let mut x = HashMap::new();
|
||||
let tx1 = VerifiedTransaction::new(tx1.transaction.clone());
|
||||
let tx2 = VerifiedTransaction::new(tx2.transaction.clone());
|
||||
@@ -430,9 +480,10 @@ mod test {
|
||||
assert_eq!(set.by_address.len(), 2);
|
||||
|
||||
// when
|
||||
set.enforce_limit(&by_hash);
|
||||
set.enforce_limit(&mut by_hash);
|
||||
|
||||
// then
|
||||
assert_eq!(by_hash.len(), 1);
|
||||
assert_eq!(set.by_priority.len(), 1);
|
||||
assert_eq!(set.by_address.len(), 1);
|
||||
assert_eq!(set.by_priority.iter().next().unwrap().clone(), order1);
|
||||
@@ -633,7 +684,26 @@ mod test {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn should_accept_same_transaction_twice() {
|
||||
fn should_not_insert_same_transaction_twice() {
|
||||
// given
|
||||
let nonce = |a: &Address| default_nonce(a) + U256::one();
|
||||
let mut txq = TransactionQueue::new();
|
||||
let (_tx1, tx2) = new_txs(U256::from(1));
|
||||
txq.add(tx2.clone(), &default_nonce);
|
||||
assert_eq!(txq.status().future, 1);
|
||||
assert_eq!(txq.status().pending, 0);
|
||||
|
||||
// when
|
||||
txq.add(tx2.clone(), &nonce);
|
||||
|
||||
// then
|
||||
let stats = txq.status();
|
||||
assert_eq!(stats.future, 1);
|
||||
assert_eq!(stats.pending, 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn should_accept_same_transaction_twice_if_removed() {
|
||||
// given
|
||||
let mut txq = TransactionQueue::new();
|
||||
let (tx1, tx2) = new_txs(U256::from(1));
|
||||
@@ -675,4 +745,78 @@ mod test {
|
||||
assert_eq!(stats.pending, 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn should_replace_same_transaction_when_has_higher_fee() {
|
||||
// given
|
||||
let mut txq = TransactionQueue::new();
|
||||
let keypair = KeyPair::create().unwrap();
|
||||
let tx = new_unsigned_tx(U256::from(123)).sign(&keypair.secret());
|
||||
let tx2 = {
|
||||
let mut tx2 = tx.deref().clone();
|
||||
tx2.gas_price = U256::from(200);
|
||||
tx2.sign(&keypair.secret())
|
||||
};
|
||||
|
||||
// when
|
||||
txq.add(tx, &default_nonce);
|
||||
txq.add(tx2, &default_nonce);
|
||||
|
||||
// then
|
||||
let stats = txq.status();
|
||||
assert_eq!(stats.pending, 1);
|
||||
assert_eq!(stats.future, 0);
|
||||
assert_eq!(txq.top_transactions(1)[0].gas_price, U256::from(200));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn should_replace_same_transaction_when_importing_to_futures() {
|
||||
// given
|
||||
let mut txq = TransactionQueue::new();
|
||||
let keypair = KeyPair::create().unwrap();
|
||||
let tx0 = new_unsigned_tx(U256::from(123)).sign(&keypair.secret());
|
||||
let tx1 = {
|
||||
let mut tx1 = tx0.deref().clone();
|
||||
tx1.nonce = U256::from(124);
|
||||
tx1.sign(&keypair.secret())
|
||||
};
|
||||
let tx2 = {
|
||||
let mut tx2 = tx1.deref().clone();
|
||||
tx2.gas_price = U256::from(200);
|
||||
tx2.sign(&keypair.secret())
|
||||
};
|
||||
|
||||
// when
|
||||
txq.add(tx1, &default_nonce);
|
||||
txq.add(tx2, &default_nonce);
|
||||
assert_eq!(txq.status().future, 1);
|
||||
txq.add(tx0, &default_nonce);
|
||||
|
||||
// then
|
||||
let stats = txq.status();
|
||||
assert_eq!(stats.future, 0);
|
||||
assert_eq!(stats.pending, 2);
|
||||
assert_eq!(txq.top_transactions(2)[1].gas_price, U256::from(200));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn should_recalculate_height_when_removing_from_future() {
|
||||
// given
|
||||
let previous_nonce = |a: &Address| default_nonce(a) - U256::one();
|
||||
let next_nonce = |a: &Address| default_nonce(a) + U256::one();
|
||||
let mut txq = TransactionQueue::new();
|
||||
let (tx1, tx2) = new_txs(U256::one());
|
||||
txq.add(tx1.clone(), &previous_nonce);
|
||||
txq.add(tx2, &previous_nonce);
|
||||
assert_eq!(txq.status().future, 2);
|
||||
|
||||
// when
|
||||
txq.remove(&tx1.hash(), &next_nonce);
|
||||
|
||||
// then
|
||||
let stats = txq.status();
|
||||
assert_eq!(stats.future, 0);
|
||||
assert_eq!(stats.pending, 1);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user