adding message from client to sync and disabling sending transactions to the queue while syncing

This commit is contained in:
Nikolay Volf 2016-03-17 12:17:53 +01:00 committed by arkpar
parent 81f8f939b9
commit 31f4a214a9
5 changed files with 67 additions and 4 deletions

View File

@ -334,6 +334,12 @@ impl<V> Client<V> where V: Verifier {
}
}
{
if self.queue_info().is_empty() {
io.send(NetworkIoMessage::User(SyncMessage::BlockQueueEmpty)).expect("error sending message to sync module");
}
}
imported
}

View File

@ -40,6 +40,8 @@ pub enum SyncMessage {
NewChainHead,
/// A block is ready
BlockVerified,
/// blocks queue is empty
BlockQueueEmpty,
}
/// IO Message type used for Network service

View File

@ -151,7 +151,7 @@ impl<C, S, A, M, EM> Eth for EthClient<C, S, A, M, EM>
Params::None => {
let status = take_weak!(self.sync).status();
let res = match status.state {
SyncState::NotSynced | SyncState::Idle => SyncStatus::None,
SyncState::NotSynced | SyncState::Idle | SyncState::FullySynced => SyncStatus::None,
SyncState::Waiting | SyncState::Blocks | SyncState::NewBlocks => SyncStatus::Info(SyncInfo {
starting_block: U256::from(status.start_block_number),
current_block: U256::from(take_weak!(self.client).chain_info().best_block_number),

View File

@ -116,6 +116,8 @@ pub enum SyncState {
Blocks,
/// Downloading blocks learned from NewHashes packet
NewBlocks,
/// Once has an event that client finished processing new blocks
FullySynced,
}
/// Syncing status and statistics
@ -931,6 +933,11 @@ impl ChainSync {
}
/// Called when peer sends us new transactions
fn on_peer_transactions(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> {
// accepting transactions once only fully synced
if self.state != SyncState::FullySynced {
return Ok(());
}
let item_count = r.item_count();
trace!(target: "sync", "{} -> Transactions ({} entries)", peer_id, item_count);
@ -1273,15 +1280,31 @@ impl ChainSync {
/// called when block is imported to chain, updates transactions queue and propagates the blocks
pub fn chain_new_blocks(&mut self, io: &mut SyncIo, imported: &[H256], invalid: &[H256], enacted: &[H256], retracted: &[H256]) {
// Notify miner
self.miner.chain_new_blocks(io.chain(), imported, invalid, enacted, retracted);
if self.state == SyncState::FullySynced {
// Notify miner
self.miner.chain_new_blocks(io.chain(), imported, invalid, enacted, retracted);
}
// Propagate latests blocks
self.propagate_latest_blocks(io);
// TODO [todr] propagate transactions?
}
pub fn chain_new_head(&mut self, io: &mut SyncIo) {
self.miner.prepare_sealing(io.chain());
if self.state == SyncState::FullySynced {
self.miner.prepare_sealing(io.chain());
}
}
// called once has nothing to download and client reports that all that downloaded is imported
pub fn set_fully_synced(&mut self) {
self.state = SyncState::FullySynced;
}
// handles event from client about empty blow_queue
pub fn client_block_queue_empty(&mut self) {
if self.state == SyncState::Idle {
self.set_fully_synced();
}
}
}
@ -1618,6 +1641,7 @@ mod tests {
client.add_blocks(1, EachBlockWith::UncleAndTransaction);
client.add_blocks(1, EachBlockWith::Transaction);
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5));
sync.state = SyncState::FullySynced;
let good_blocks = vec![client.block_hash_delta_minus(2)];
let retracted_blocks = vec![client.block_hash_delta_minus(1)];
@ -1637,6 +1661,34 @@ mod tests {
assert_eq!(status.transaction_queue_future, 0);
}
#[test]
fn should_not_add_transactions_to_queue_if_not_synced() {
// given
let mut client = TestBlockChainClient::new();
client.add_blocks(98, EachBlockWith::Uncle);
client.add_blocks(1, EachBlockWith::UncleAndTransaction);
client.add_blocks(1, EachBlockWith::Transaction);
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5));
sync.state = SyncState::Idle;
let good_blocks = vec![client.block_hash_delta_minus(2)];
let retracted_blocks = vec![client.block_hash_delta_minus(1)];
let mut queue = VecDeque::new();
let mut io = TestIo::new(&mut client, &mut queue, None);
// when
sync.chain_new_blocks(&mut io, &[], &[], &[], &good_blocks);
assert_eq!(sync.miner.status().transaction_queue_future, 0);
assert_eq!(sync.miner.status().transaction_queue_pending, 0);
sync.chain_new_blocks(&mut io, &good_blocks, &[], &[], &retracted_blocks);
// then
let status = sync.miner.status();
assert_eq!(status.transaction_queue_pending, 0);
assert_eq!(status.transaction_queue_future, 0);
}
#[test]
fn returns_requested_block_headers() {
let mut client = TestBlockChainClient::new();

View File

@ -173,6 +173,9 @@ impl NetworkProtocolHandler<SyncMessage> for EthSync {
SyncMessage::NewChainHead => {
let mut sync_io = NetSyncIo::new(io, self.chain.deref());
self.sync.write().unwrap().chain_new_head(&mut sync_io);
},
SyncMessage::BlockQueueEmpty => {
self.sync.write().unwrap().client_block_queue_empty();
}
_ => {/* Ignore other messages */},
}