Merge branch 'tx_queue_integration' into tx_queue_rpc

This commit is contained in:
Tomasz Drwięga 2016-03-06 11:15:16 +01:00
commit c49258e866
5 changed files with 15 additions and 11 deletions

View File

@ -413,7 +413,9 @@ impl<V> Client<V> where V: Verifier {
if !good_blocks.is_empty() && block_queue.queue_info().is_empty() { if !good_blocks.is_empty() && block_queue.queue_info().is_empty() {
io.send(NetworkIoMessage::User(SyncMessage::NewChainBlocks { io.send(NetworkIoMessage::User(SyncMessage::NewChainBlocks {
good: good_blocks, good: good_blocks,
retracted: bad_blocks, bad: bad_blocks,
// TODO [todr] were to take those from?
retracted: vec![],
})).unwrap(); })).unwrap();
} }
} }

View File

@ -30,6 +30,8 @@ pub enum SyncMessage {
/// Hashes of blocks imported to blockchain /// Hashes of blocks imported to blockchain
good: Vec<H256>, good: Vec<H256>,
/// Hashes of blocks not imported to blockchain /// Hashes of blocks not imported to blockchain
bad: Vec<H256>,
/// Hashes of blocks that were removed from canonical chain
retracted: Vec<H256>, retracted: Vec<H256>,
}, },
/// A block is ready /// A block is ready

View File

@ -936,9 +936,11 @@ impl ChainSync {
let item_count = r.item_count(); let item_count = r.item_count();
trace!(target: "sync", "{} -> Transactions ({} entries)", peer_id, item_count); trace!(target: "sync", "{} -> Transactions ({} entries)", peer_id, item_count);
let fetch_latest_nonce = |a : &Address| chain.nonce(a); let fetch_latest_nonce = |a : &Address| chain.nonce(a);
let mut transaction_queue = self.transaction_queue.lock().unwrap();
for i in 0..item_count { for i in 0..item_count {
let tx: SignedTransaction = try!(r.val_at(i)); let tx: SignedTransaction = try!(r.val_at(i));
self.transaction_queue.lock().unwrap().add(tx, &fetch_latest_nonce); transaction_queue.add(tx, &fetch_latest_nonce);
} }
Ok(()) Ok(())
} }
@ -1268,7 +1270,7 @@ impl ChainSync {
} }
/// called when block is imported to chain, updates transactions queue /// called when block is imported to chain, updates transactions queue
pub fn chain_new_blocks(&mut self, io: &SyncIo, good: &[H256], retracted: &[H256]) { pub fn chain_new_blocks(&mut self, io: &SyncIo, good: &[H256], bad: &[H256], _retracted: &[H256]) {
fn fetch_transactions(chain: &BlockChainClient, hash: &H256) -> Vec<SignedTransaction> { fn fetch_transactions(chain: &BlockChainClient, hash: &H256) -> Vec<SignedTransaction> {
let block = chain let block = chain
.block(BlockId::Hash(hash.clone())) .block(BlockId::Hash(hash.clone()))
@ -1281,14 +1283,14 @@ impl ChainSync {
let chain = io.chain(); let chain = io.chain();
let good = good.par_iter().map(|h| fetch_transactions(chain, h)); let good = good.par_iter().map(|h| fetch_transactions(chain, h));
let retracted = retracted.par_iter().map(|h| fetch_transactions(chain, h)); let bad = bad.par_iter().map(|h| fetch_transactions(chain, h));
good.for_each(|txs| { good.for_each(|txs| {
let mut transaction_queue = self.transaction_queue.lock().unwrap(); let mut transaction_queue = self.transaction_queue.lock().unwrap();
let hashes = txs.iter().map(|tx| tx.hash()).collect::<Vec<H256>>(); let hashes = txs.iter().map(|tx| tx.hash()).collect::<Vec<H256>>();
transaction_queue.remove_all(&hashes, |a| chain.nonce(a)); transaction_queue.remove_all(&hashes, |a| chain.nonce(a));
}); });
retracted.for_each(|txs| { bad.for_each(|txs| {
// populate sender // populate sender
for tx in &txs { for tx in &txs {
let _sender = tx.sender(); let _sender = tx.sender();
@ -1638,10 +1640,10 @@ mod tests {
let io = TestIo::new(&mut client, &mut queue, None); let io = TestIo::new(&mut client, &mut queue, None);
// when // when
sync.chain_new_blocks(&io, &[], &good_blocks); sync.chain_new_blocks(&io, &[], &good_blocks, &[]);
assert_eq!(sync.transaction_queue.lock().unwrap().status().future, 0); assert_eq!(sync.transaction_queue.lock().unwrap().status().future, 0);
assert_eq!(sync.transaction_queue.lock().unwrap().status().pending, 1); assert_eq!(sync.transaction_queue.lock().unwrap().status().pending, 1);
sync.chain_new_blocks(&io, &good_blocks, &retracted_blocks); sync.chain_new_blocks(&io, &good_blocks, &retracted_blocks, &[]);
// then // then
let status = sync.transaction_queue.lock().unwrap().status(); let status = sync.transaction_queue.lock().unwrap().status();

View File

@ -157,9 +157,9 @@ impl NetworkProtocolHandler<SyncMessage> for EthSync {
SyncMessage::BlockVerified => { SyncMessage::BlockVerified => {
self.sync.write().unwrap().chain_blocks_verified(&mut NetSyncIo::new(io, self.chain.deref())); self.sync.write().unwrap().chain_blocks_verified(&mut NetSyncIo::new(io, self.chain.deref()));
}, },
SyncMessage::NewChainBlocks { ref good, ref retracted } => { SyncMessage::NewChainBlocks { ref good, ref bad, ref retracted } => {
let sync_io = NetSyncIo::new(io, self.chain.deref()); let sync_io = NetSyncIo::new(io, self.chain.deref());
self.sync.write().unwrap().chain_new_blocks(&sync_io, good, retracted); self.sync.write().unwrap().chain_new_blocks(&sync_io, good, bad, retracted);
} }
} }
} }

View File

@ -813,6 +813,4 @@ mod test {
assert_eq!(stats.future, 0); assert_eq!(stats.future, 0);
assert_eq!(stats.pending, 1); assert_eq!(stats.pending, 1);
} }
} }