Merge branch 'master' into tx-rpc-expose
Conflicts: sync/src/chain.rs
This commit is contained in:
commit
ddc3440712
@ -418,7 +418,9 @@ impl<V> Client<V> where V: Verifier {
|
|||||||
if !good_blocks.is_empty() && block_queue.queue_info().is_empty() {
|
if !good_blocks.is_empty() && block_queue.queue_info().is_empty() {
|
||||||
io.send(NetworkIoMessage::User(SyncMessage::NewChainBlocks {
|
io.send(NetworkIoMessage::User(SyncMessage::NewChainBlocks {
|
||||||
good: good_blocks,
|
good: good_blocks,
|
||||||
retracted: bad_blocks,
|
bad: bad_blocks,
|
||||||
|
// TODO [todr] were to take those from?
|
||||||
|
retracted: vec![],
|
||||||
})).unwrap();
|
})).unwrap();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -30,6 +30,8 @@ pub enum SyncMessage {
|
|||||||
/// Hashes of blocks imported to blockchain
|
/// Hashes of blocks imported to blockchain
|
||||||
good: Vec<H256>,
|
good: Vec<H256>,
|
||||||
/// Hashes of blocks not imported to blockchain
|
/// Hashes of blocks not imported to blockchain
|
||||||
|
bad: Vec<H256>,
|
||||||
|
/// Hashes of blocks that were removed from canonical chain
|
||||||
retracted: Vec<H256>,
|
retracted: Vec<H256>,
|
||||||
},
|
},
|
||||||
/// A block is ready
|
/// A block is ready
|
||||||
|
@ -207,7 +207,7 @@ pub struct ChainSync {
|
|||||||
/// True if common block for our and remote chain has been found
|
/// True if common block for our and remote chain has been found
|
||||||
have_common_block: bool,
|
have_common_block: bool,
|
||||||
/// Last propagated block number
|
/// Last propagated block number
|
||||||
last_send_block_number: BlockNumber,
|
last_sent_block_number: BlockNumber,
|
||||||
/// Max blocks to download ahead
|
/// Max blocks to download ahead
|
||||||
max_download_ahead_blocks: usize,
|
max_download_ahead_blocks: usize,
|
||||||
/// Network ID
|
/// Network ID
|
||||||
@ -236,7 +236,7 @@ impl ChainSync {
|
|||||||
last_imported_hash: None,
|
last_imported_hash: None,
|
||||||
syncing_difficulty: U256::from(0u64),
|
syncing_difficulty: U256::from(0u64),
|
||||||
have_common_block: false,
|
have_common_block: false,
|
||||||
last_send_block_number: 0,
|
last_sent_block_number: 0,
|
||||||
max_download_ahead_blocks: max(MAX_HEADERS_TO_REQUEST, config.max_download_ahead_blocks),
|
max_download_ahead_blocks: max(MAX_HEADERS_TO_REQUEST, config.max_download_ahead_blocks),
|
||||||
network_id: config.network_id,
|
network_id: config.network_id,
|
||||||
transaction_queue: Mutex::new(TransactionQueue::new()),
|
transaction_queue: Mutex::new(TransactionQueue::new()),
|
||||||
@ -850,8 +850,8 @@ impl ChainSync {
|
|||||||
self.downloading_bodies.remove(&n);
|
self.downloading_bodies.remove(&n);
|
||||||
self.downloading_headers.remove(&n);
|
self.downloading_headers.remove(&n);
|
||||||
}
|
}
|
||||||
self.headers.remove_tail(&start);
|
self.headers.remove_from(&start);
|
||||||
self.bodies.remove_tail(&start);
|
self.bodies.remove_from(&start);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Request headers from a peer by block hash
|
/// Request headers from a peer by block hash
|
||||||
@ -931,9 +931,11 @@ impl ChainSync {
|
|||||||
let item_count = r.item_count();
|
let item_count = r.item_count();
|
||||||
trace!(target: "sync", "{} -> Transactions ({} entries)", peer_id, item_count);
|
trace!(target: "sync", "{} -> Transactions ({} entries)", peer_id, item_count);
|
||||||
let fetch_latest_nonce = |a : &Address| chain.nonce(a);
|
let fetch_latest_nonce = |a : &Address| chain.nonce(a);
|
||||||
|
|
||||||
|
let mut transaction_queue = self.transaction_queue.lock().unwrap();
|
||||||
for i in 0..item_count {
|
for i in 0..item_count {
|
||||||
let tx: SignedTransaction = try!(r.val_at(i));
|
let tx: SignedTransaction = try!(r.val_at(i));
|
||||||
self.transaction_queue.lock().unwrap().add(tx, &fetch_latest_nonce);
|
transaction_queue.add(tx, &fetch_latest_nonce);
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@ -1244,26 +1246,25 @@ impl ChainSync {
|
|||||||
sent
|
sent
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn propagate_latest_blocks(&mut self, io: &mut SyncIo) {
|
||||||
|
let chain_info = io.chain().chain_info();
|
||||||
|
if (((chain_info.best_block_number as i64) - (self.last_sent_block_number as i64)).abs() as BlockNumber) < MAX_PEER_LAG_PROPAGATION {
|
||||||
|
let blocks = self.propagate_blocks(&chain_info, io);
|
||||||
|
let hashes = self.propagate_new_hashes(&chain_info, io);
|
||||||
|
if blocks != 0 || hashes != 0 {
|
||||||
|
trace!(target: "sync", "Sent latest {} blocks and {} hashes to peers.", blocks, hashes);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
self.last_sent_block_number = chain_info.best_block_number;
|
||||||
|
}
|
||||||
|
|
||||||
/// Maintain other peers. Send out any new blocks and transactions
|
/// Maintain other peers. Send out any new blocks and transactions
|
||||||
pub fn maintain_sync(&mut self, io: &mut SyncIo) {
|
pub fn maintain_sync(&mut self, io: &mut SyncIo) {
|
||||||
self.check_resume(io);
|
self.check_resume(io);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// should be called once chain has new block, triggers the latest block propagation
|
/// called when block is imported to chain, updates transactions queue and propagates the blocks
|
||||||
pub fn chain_blocks_verified(&mut self, io: &mut SyncIo) {
|
pub fn chain_new_blocks(&mut self, io: &mut SyncIo, good: &[H256], bad: &[H256], _retracted: &[H256]) {
|
||||||
let chain = io.chain().chain_info();
|
|
||||||
if (((chain.best_block_number as i64) - (self.last_send_block_number as i64)).abs() as BlockNumber) < MAX_PEER_LAG_PROPAGATION {
|
|
||||||
let blocks = self.propagate_blocks(&chain, io);
|
|
||||||
let hashes = self.propagate_new_hashes(&chain, io);
|
|
||||||
if blocks != 0 || hashes != 0 {
|
|
||||||
trace!(target: "sync", "Sent latest {} blocks and {} hashes to peers.", blocks, hashes);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
self.last_send_block_number = chain.best_block_number;
|
|
||||||
}
|
|
||||||
|
|
||||||
/// called when block is imported to chain, updates transactions queue
|
|
||||||
pub fn chain_new_blocks(&mut self, io: &SyncIo, good: &[H256], retracted: &[H256]) {
|
|
||||||
fn fetch_transactions(chain: &BlockChainClient, hash: &H256) -> Vec<SignedTransaction> {
|
fn fetch_transactions(chain: &BlockChainClient, hash: &H256) -> Vec<SignedTransaction> {
|
||||||
let block = chain
|
let block = chain
|
||||||
.block(BlockId::Hash(hash.clone()))
|
.block(BlockId::Hash(hash.clone()))
|
||||||
@ -1274,16 +1275,17 @@ impl ChainSync {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
{
|
||||||
let chain = io.chain();
|
let chain = io.chain();
|
||||||
let good = good.par_iter().map(|h| fetch_transactions(chain, h));
|
let good = good.par_iter().map(|h| fetch_transactions(chain, h));
|
||||||
let retracted = retracted.par_iter().map(|h| fetch_transactions(chain, h));
|
let bad = bad.par_iter().map(|h| fetch_transactions(chain, h));
|
||||||
|
|
||||||
good.for_each(|txs| {
|
good.for_each(|txs| {
|
||||||
let mut transaction_queue = self.transaction_queue.lock().unwrap();
|
let mut transaction_queue = self.transaction_queue.lock().unwrap();
|
||||||
let hashes = txs.iter().map(|tx| tx.hash()).collect::<Vec<H256>>();
|
let hashes = txs.iter().map(|tx| tx.hash()).collect::<Vec<H256>>();
|
||||||
transaction_queue.remove_all(&hashes, |a| chain.nonce(a));
|
transaction_queue.remove_all(&hashes, |a| chain.nonce(a));
|
||||||
});
|
});
|
||||||
retracted.for_each(|txs| {
|
bad.for_each(|txs| {
|
||||||
// populate sender
|
// populate sender
|
||||||
for tx in &txs {
|
for tx in &txs {
|
||||||
let _sender = tx.sender();
|
let _sender = tx.sender();
|
||||||
@ -1293,6 +1295,11 @@ impl ChainSync {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Propagate latests blocks
|
||||||
|
self.propagate_latest_blocks(io);
|
||||||
|
// TODO [todr] propagate transactions?
|
||||||
|
}
|
||||||
|
|
||||||
pub fn transaction_queue(&self) -> &Mutex<TransactionQueue> {
|
pub fn transaction_queue(&self) -> &Mutex<TransactionQueue> {
|
||||||
return &self.transaction_queue;
|
return &self.transaction_queue;
|
||||||
}
|
}
|
||||||
@ -1634,13 +1641,13 @@ mod tests {
|
|||||||
let retracted_blocks = vec![client.block_hash_delta_minus(1)];
|
let retracted_blocks = vec![client.block_hash_delta_minus(1)];
|
||||||
|
|
||||||
let mut queue = VecDeque::new();
|
let mut queue = VecDeque::new();
|
||||||
let io = TestIo::new(&mut client, &mut queue, None);
|
let mut io = TestIo::new(&mut client, &mut queue, None);
|
||||||
|
|
||||||
// when
|
// when
|
||||||
sync.chain_new_blocks(&io, &[], &good_blocks);
|
sync.chain_new_blocks(&mut io, &[], &good_blocks, &[]);
|
||||||
assert_eq!(sync.transaction_queue.lock().unwrap().status().future, 0);
|
assert_eq!(sync.transaction_queue.lock().unwrap().status().future, 0);
|
||||||
assert_eq!(sync.transaction_queue.lock().unwrap().status().pending, 1);
|
assert_eq!(sync.transaction_queue.lock().unwrap().status().pending, 1);
|
||||||
sync.chain_new_blocks(&io, &good_blocks, &retracted_blocks);
|
sync.chain_new_blocks(&mut io, &good_blocks, &retracted_blocks, &[]);
|
||||||
|
|
||||||
// then
|
// then
|
||||||
let status = sync.transaction_queue.lock().unwrap().status();
|
let status = sync.transaction_queue.lock().unwrap().status();
|
||||||
|
@ -164,13 +164,11 @@ impl NetworkProtocolHandler<SyncMessage> for EthSync {
|
|||||||
|
|
||||||
fn message(&self, io: &NetworkContext<SyncMessage>, message: &SyncMessage) {
|
fn message(&self, io: &NetworkContext<SyncMessage>, message: &SyncMessage) {
|
||||||
match *message {
|
match *message {
|
||||||
SyncMessage::BlockVerified => {
|
SyncMessage::NewChainBlocks { ref good, ref bad, ref retracted } => {
|
||||||
self.sync.write().unwrap().chain_blocks_verified(&mut NetSyncIo::new(io, self.chain.deref()));
|
let mut sync_io = NetSyncIo::new(io, self.chain.deref());
|
||||||
|
self.sync.write().unwrap().chain_new_blocks(&mut sync_io, good, bad, retracted);
|
||||||
},
|
},
|
||||||
SyncMessage::NewChainBlocks { ref good, ref retracted } => {
|
_ => {/* Ignore other messages */},
|
||||||
let sync_io = NetSyncIo::new(io, self.chain.deref());
|
|
||||||
self.sync.write().unwrap().chain_new_blocks(&sync_io, good, retracted);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -42,6 +42,8 @@ pub trait RangeCollection<K, V> {
|
|||||||
fn remove_head(&mut self, start: &K);
|
fn remove_head(&mut self, start: &K);
|
||||||
/// Remove all elements >= `start` in the range that contains `start`
|
/// Remove all elements >= `start` in the range that contains `start`
|
||||||
fn remove_tail(&mut self, start: &K);
|
fn remove_tail(&mut self, start: &K);
|
||||||
|
/// Remove all elements >= `start`
|
||||||
|
fn remove_from(&mut self, start: &K);
|
||||||
/// Remove all elements >= `tail`
|
/// Remove all elements >= `tail`
|
||||||
fn insert_item(&mut self, key: K, value: V);
|
fn insert_item(&mut self, key: K, value: V);
|
||||||
/// Get an iterator over ranges
|
/// Get an iterator over ranges
|
||||||
@ -137,6 +139,28 @@ impl<K, V> RangeCollection<K, V> for Vec<(K, Vec<V>)> where K: Ord + PartialEq +
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Remove the element and all following it.
|
||||||
|
fn remove_from(&mut self, key: &K) {
|
||||||
|
match self.binary_search_by(|&(k, _)| k.cmp(key).reverse()) {
|
||||||
|
Ok(index) => { self.drain(.. index + 1); },
|
||||||
|
Err(index) =>{
|
||||||
|
let mut empty = false;
|
||||||
|
match self.get_mut(index) {
|
||||||
|
Some(&mut (ref k, ref mut v)) if k <= key && (*k + FromUsize::from_usize(v.len())) > *key => {
|
||||||
|
v.truncate((*key - *k).to_usize());
|
||||||
|
empty = v.is_empty();
|
||||||
|
}
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
if empty {
|
||||||
|
self.drain(.. index + 1);
|
||||||
|
} else {
|
||||||
|
self.drain(.. index);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Remove range elements up to key
|
/// Remove range elements up to key
|
||||||
fn remove_head(&mut self, key: &K) {
|
fn remove_head(&mut self, key: &K) {
|
||||||
if *key == FromUsize::from_usize(0) {
|
if *key == FromUsize::from_usize(0) {
|
||||||
@ -272,5 +296,17 @@ fn test_range() {
|
|||||||
assert_eq!(r.range_iter().cmp(vec![(2, &['b'][..])]), Ordering::Equal);
|
assert_eq!(r.range_iter().cmp(vec![(2, &['b'][..])]), Ordering::Equal);
|
||||||
r.remove_tail(&2);
|
r.remove_tail(&2);
|
||||||
assert_eq!(r.range_iter().next(), None);
|
assert_eq!(r.range_iter().next(), None);
|
||||||
|
|
||||||
|
let mut r = ranges.clone();
|
||||||
|
r.remove_from(&20);
|
||||||
|
assert_eq!(r.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..]), (16, &['p', 'q', 'r'][..])]), Ordering::Equal);
|
||||||
|
r.remove_from(&17);
|
||||||
|
assert_eq!(r.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..]), (16, &['p'][..])]), Ordering::Equal);
|
||||||
|
r.remove_from(&15);
|
||||||
|
assert_eq!(r.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..])]), Ordering::Equal);
|
||||||
|
r.remove_from(&3);
|
||||||
|
assert_eq!(r.range_iter().cmp(vec![(2, &['b'][..])]), Ordering::Equal);
|
||||||
|
r.remove_from(&2);
|
||||||
|
assert_eq!(r.range_iter().next(), None);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -129,8 +129,8 @@ fn propagate_hashes() {
|
|||||||
|
|
||||||
net.peer_mut(0).chain.add_blocks(10, EachBlockWith::Uncle);
|
net.peer_mut(0).chain.add_blocks(10, EachBlockWith::Uncle);
|
||||||
net.sync();
|
net.sync();
|
||||||
net.trigger_block_verified(0); //first event just sets the marker
|
net.trigger_chain_new_blocks(0); //first event just sets the marker
|
||||||
net.trigger_block_verified(0);
|
net.trigger_chain_new_blocks(0);
|
||||||
|
|
||||||
// 5 peers to sync
|
// 5 peers to sync
|
||||||
assert_eq!(5, net.peer(0).queue.len());
|
assert_eq!(5, net.peer(0).queue.len());
|
||||||
@ -154,8 +154,8 @@ fn propagate_blocks() {
|
|||||||
net.sync();
|
net.sync();
|
||||||
|
|
||||||
net.peer_mut(0).chain.add_blocks(10, EachBlockWith::Uncle);
|
net.peer_mut(0).chain.add_blocks(10, EachBlockWith::Uncle);
|
||||||
net.trigger_block_verified(0); //first event just sets the marker
|
net.trigger_chain_new_blocks(0); //first event just sets the marker
|
||||||
net.trigger_block_verified(0);
|
net.trigger_chain_new_blocks(0);
|
||||||
|
|
||||||
assert!(!net.peer(0).queue.is_empty());
|
assert!(!net.peer(0).queue.is_empty());
|
||||||
// NEW_BLOCK_PACKET
|
// NEW_BLOCK_PACKET
|
||||||
|
@ -455,8 +455,8 @@ impl TestNet {
|
|||||||
self.peers.iter().all(|p| p.queue.is_empty())
|
self.peers.iter().all(|p| p.queue.is_empty())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn trigger_block_verified(&mut self, peer_id: usize) {
|
pub fn trigger_chain_new_blocks(&mut self, peer_id: usize) {
|
||||||
let mut peer = self.peer_mut(peer_id);
|
let mut peer = self.peer_mut(peer_id);
|
||||||
peer.sync.chain_blocks_verified(&mut TestIo::new(&mut peer.chain, &mut peer.queue, None));
|
peer.sync.chain_new_blocks(&mut TestIo::new(&mut peer.chain, &mut peer.queue, None), &[], &[], &[]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user