|
|
|
|
@@ -209,7 +209,7 @@ pub struct ChainSync {
|
|
|
|
|
/// True if common block for our and remote chain has been found
|
|
|
|
|
have_common_block: bool,
|
|
|
|
|
/// Last propagated block number
|
|
|
|
|
last_send_block_number: BlockNumber,
|
|
|
|
|
last_sent_block_number: BlockNumber,
|
|
|
|
|
/// Max blocks to download ahead
|
|
|
|
|
max_download_ahead_blocks: usize,
|
|
|
|
|
/// Network ID
|
|
|
|
|
@@ -238,7 +238,7 @@ impl ChainSync {
|
|
|
|
|
last_imported_hash: None,
|
|
|
|
|
syncing_difficulty: U256::from(0u64),
|
|
|
|
|
have_common_block: false,
|
|
|
|
|
last_send_block_number: 0,
|
|
|
|
|
last_sent_block_number: 0,
|
|
|
|
|
max_download_ahead_blocks: max(MAX_HEADERS_TO_REQUEST, config.max_download_ahead_blocks),
|
|
|
|
|
network_id: config.network_id,
|
|
|
|
|
transaction_queue: Mutex::new(TransactionQueue::new()),
|
|
|
|
|
@@ -910,9 +910,8 @@ impl ChainSync {
|
|
|
|
|
}
|
|
|
|
|
match sync.send(peer_id, packet_id, packet) {
|
|
|
|
|
Err(e) => {
|
|
|
|
|
warn!(target:"sync", "Error sending request: {:?}", e);
|
|
|
|
|
debug!(target:"sync", "Error sending request: {:?}", e);
|
|
|
|
|
sync.disable_peer(peer_id);
|
|
|
|
|
self.on_peer_aborting(sync, peer_id);
|
|
|
|
|
}
|
|
|
|
|
Ok(_) => {
|
|
|
|
|
let mut peer = self.peers.get_mut(&peer_id).unwrap();
|
|
|
|
|
@@ -925,9 +924,8 @@ impl ChainSync {
|
|
|
|
|
/// Generic packet sender
|
|
|
|
|
fn send_packet(&mut self, sync: &mut SyncIo, peer_id: PeerId, packet_id: PacketId, packet: Bytes) {
|
|
|
|
|
if let Err(e) = sync.send(peer_id, packet_id, packet) {
|
|
|
|
|
warn!(target:"sync", "Error sending packet: {:?}", e);
|
|
|
|
|
debug!(target:"sync", "Error sending packet: {:?}", e);
|
|
|
|
|
sync.disable_peer(peer_id);
|
|
|
|
|
self.on_peer_aborting(sync, peer_id);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
/// Called when peer sends us new transactions
|
|
|
|
|
@@ -1251,26 +1249,25 @@ impl ChainSync {
|
|
|
|
|
sent
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn propagate_latest_blocks(&mut self, io: &mut SyncIo) {
|
|
|
|
|
let chain_info = io.chain().chain_info();
|
|
|
|
|
if (((chain_info.best_block_number as i64) - (self.last_sent_block_number as i64)).abs() as BlockNumber) < MAX_PEER_LAG_PROPAGATION {
|
|
|
|
|
let blocks = self.propagate_blocks(&chain_info, io);
|
|
|
|
|
let hashes = self.propagate_new_hashes(&chain_info, io);
|
|
|
|
|
if blocks != 0 || hashes != 0 {
|
|
|
|
|
trace!(target: "sync", "Sent latest {} blocks and {} hashes to peers.", blocks, hashes);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
self.last_sent_block_number = chain_info.best_block_number;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Maintain other peers. Send out any new blocks and transactions
|
|
|
|
|
pub fn maintain_sync(&mut self, io: &mut SyncIo) {
|
|
|
|
|
self.check_resume(io);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// should be called once chain has new block, triggers the latest block propagation
|
|
|
|
|
pub fn chain_blocks_verified(&mut self, io: &mut SyncIo) {
|
|
|
|
|
let chain = io.chain().chain_info();
|
|
|
|
|
if (((chain.best_block_number as i64) - (self.last_send_block_number as i64)).abs() as BlockNumber) < MAX_PEER_LAG_PROPAGATION {
|
|
|
|
|
let blocks = self.propagate_blocks(&chain, io);
|
|
|
|
|
let hashes = self.propagate_new_hashes(&chain, io);
|
|
|
|
|
if blocks != 0 || hashes != 0 {
|
|
|
|
|
trace!(target: "sync", "Sent latest {} blocks and {} hashes to peers.", blocks, hashes);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
self.last_send_block_number = chain.best_block_number;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// called when block is imported to chain, updates transactions queue
|
|
|
|
|
pub fn chain_new_blocks(&mut self, io: &SyncIo, good: &[H256], bad: &[H256], _retracted: &[H256]) {
|
|
|
|
|
/// called when block is imported to chain, updates transactions queue and propagates the blocks
|
|
|
|
|
pub fn chain_new_blocks(&mut self, io: &mut SyncIo, good: &[H256], bad: &[H256], _retracted: &[H256]) {
|
|
|
|
|
fn fetch_transactions(chain: &BlockChainClient, hash: &H256) -> Vec<SignedTransaction> {
|
|
|
|
|
let block = chain
|
|
|
|
|
.block(BlockId::Hash(hash.clone()))
|
|
|
|
|
@@ -1281,24 +1278,31 @@ impl ChainSync {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
let chain = io.chain();
|
|
|
|
|
let good = good.par_iter().map(|h| fetch_transactions(chain, h));
|
|
|
|
|
let bad = bad.par_iter().map(|h| fetch_transactions(chain, h));
|
|
|
|
|
{
|
|
|
|
|
let chain = io.chain();
|
|
|
|
|
let good = good.par_iter().map(|h| fetch_transactions(chain, h));
|
|
|
|
|
let bad = bad.par_iter().map(|h| fetch_transactions(chain, h));
|
|
|
|
|
|
|
|
|
|
good.for_each(|txs| {
|
|
|
|
|
let mut transaction_queue = self.transaction_queue.lock().unwrap();
|
|
|
|
|
let hashes = txs.iter().map(|tx| tx.hash()).collect::<Vec<H256>>();
|
|
|
|
|
transaction_queue.remove_all(&hashes, |a| chain.nonce(a));
|
|
|
|
|
});
|
|
|
|
|
bad.for_each(|txs| {
|
|
|
|
|
// populate sender
|
|
|
|
|
for tx in &txs {
|
|
|
|
|
let _sender = tx.sender();
|
|
|
|
|
}
|
|
|
|
|
let mut transaction_queue = self.transaction_queue.lock().unwrap();
|
|
|
|
|
transaction_queue.add_all(txs, |a| chain.nonce(a));
|
|
|
|
|
});
|
|
|
|
|
good.for_each(|txs| {
|
|
|
|
|
let mut transaction_queue = self.transaction_queue.lock().unwrap();
|
|
|
|
|
let hashes = txs.iter().map(|tx| tx.hash()).collect::<Vec<H256>>();
|
|
|
|
|
transaction_queue.remove_all(&hashes, |a| chain.nonce(a));
|
|
|
|
|
});
|
|
|
|
|
bad.for_each(|txs| {
|
|
|
|
|
// populate sender
|
|
|
|
|
for tx in &txs {
|
|
|
|
|
let _sender = tx.sender();
|
|
|
|
|
}
|
|
|
|
|
let mut transaction_queue = self.transaction_queue.lock().unwrap();
|
|
|
|
|
transaction_queue.add_all(txs, |a| chain.nonce(a));
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Propagate latests blocks
|
|
|
|
|
self.propagate_latest_blocks(io);
|
|
|
|
|
// TODO [todr] propagate transactions?
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[cfg(test)]
|
|
|
|
|
@@ -1637,13 +1641,13 @@ mod tests {
|
|
|
|
|
let retracted_blocks = vec![client.block_hash_delta_minus(1)];
|
|
|
|
|
|
|
|
|
|
let mut queue = VecDeque::new();
|
|
|
|
|
let io = TestIo::new(&mut client, &mut queue, None);
|
|
|
|
|
let mut io = TestIo::new(&mut client, &mut queue, None);
|
|
|
|
|
|
|
|
|
|
// when
|
|
|
|
|
sync.chain_new_blocks(&io, &[], &good_blocks, &[]);
|
|
|
|
|
sync.chain_new_blocks(&mut io, &[], &good_blocks, &[]);
|
|
|
|
|
assert_eq!(sync.transaction_queue.lock().unwrap().status().future, 0);
|
|
|
|
|
assert_eq!(sync.transaction_queue.lock().unwrap().status().pending, 1);
|
|
|
|
|
sync.chain_new_blocks(&io, &good_blocks, &retracted_blocks, &[]);
|
|
|
|
|
sync.chain_new_blocks(&mut io, &good_blocks, &retracted_blocks, &[]);
|
|
|
|
|
|
|
|
|
|
// then
|
|
|
|
|
let status = sync.transaction_queue.lock().unwrap().status();
|
|
|
|
|
|