diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index caa92db97..d8c48e696 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -334,6 +334,12 @@ impl Client where V: Verifier { } } + { + if self.queue_info().is_empty() { + io.send(NetworkIoMessage::User(SyncMessage::BlockQueueEmpty)).expect("error sending message to sync module"); + } + } + imported } diff --git a/ethcore/src/service.rs b/ethcore/src/service.rs index bcfe7724f..1c2385934 100644 --- a/ethcore/src/service.rs +++ b/ethcore/src/service.rs @@ -40,6 +40,8 @@ pub enum SyncMessage { NewChainHead, /// A block is ready BlockVerified, + /// blocks queue is empty + BlockQueueEmpty, } /// IO Message type used for Network service diff --git a/rpc/src/v1/impls/eth.rs b/rpc/src/v1/impls/eth.rs index 520b37045..b0e31ba97 100644 --- a/rpc/src/v1/impls/eth.rs +++ b/rpc/src/v1/impls/eth.rs @@ -151,7 +151,7 @@ impl Eth for EthClient Params::None => { let status = take_weak!(self.sync).status(); let res = match status.state { - SyncState::NotSynced | SyncState::Idle => SyncStatus::None, + SyncState::NotSynced | SyncState::Idle | SyncState::FullySynced => SyncStatus::None, SyncState::Waiting | SyncState::Blocks | SyncState::NewBlocks => SyncStatus::Info(SyncInfo { starting_block: U256::from(status.start_block_number), current_block: U256::from(take_weak!(self.client).chain_info().best_block_number), diff --git a/sync/src/chain.rs b/sync/src/chain.rs index 8f0194289..316718571 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -116,6 +116,8 @@ pub enum SyncState { Blocks, /// Downloading blocks learned from NewHashes packet NewBlocks, + /// Once has an event that client finished processing new blocks + FullySynced, } /// Syncing status and statistics @@ -931,6 +933,11 @@ impl ChainSync { } /// Called when peer sends us new transactions fn on_peer_transactions(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { + // accepting transactions once only fully synced + if self.state != SyncState::FullySynced { + return Ok(()); + } + let item_count = r.item_count(); trace!(target: "sync", "{} -> Transactions ({} entries)", peer_id, item_count); @@ -1273,15 +1280,31 @@ impl ChainSync { /// called when block is imported to chain, updates transactions queue and propagates the blocks pub fn chain_new_blocks(&mut self, io: &mut SyncIo, imported: &[H256], invalid: &[H256], enacted: &[H256], retracted: &[H256]) { - // Notify miner - self.miner.chain_new_blocks(io.chain(), imported, invalid, enacted, retracted); + if self.state == SyncState::FullySynced { + // Notify miner + self.miner.chain_new_blocks(io.chain(), imported, invalid, enacted, retracted); + } // Propagate latests blocks self.propagate_latest_blocks(io); // TODO [todr] propagate transactions? } pub fn chain_new_head(&mut self, io: &mut SyncIo) { - self.miner.prepare_sealing(io.chain()); + if self.state == SyncState::FullySynced { + self.miner.prepare_sealing(io.chain()); + } + } + + // called once has nothing to download and client reports that all that downloaded is imported + pub fn set_fully_synced(&mut self) { + self.state = SyncState::FullySynced; + } + + // handles event from client about empty blow_queue + pub fn client_block_queue_empty(&mut self) { + if self.state == SyncState::Idle { + self.set_fully_synced(); + } } } @@ -1618,6 +1641,7 @@ mod tests { client.add_blocks(1, EachBlockWith::UncleAndTransaction); client.add_blocks(1, EachBlockWith::Transaction); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); + sync.state = SyncState::FullySynced; let good_blocks = vec![client.block_hash_delta_minus(2)]; let retracted_blocks = vec![client.block_hash_delta_minus(1)]; @@ -1637,6 +1661,34 @@ mod tests { assert_eq!(status.transaction_queue_future, 0); } + #[test] + fn should_not_add_transactions_to_queue_if_not_synced() { + // given + let mut client = TestBlockChainClient::new(); + client.add_blocks(98, EachBlockWith::Uncle); + client.add_blocks(1, EachBlockWith::UncleAndTransaction); + client.add_blocks(1, EachBlockWith::Transaction); + let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); + sync.state = SyncState::Idle; + + let good_blocks = vec![client.block_hash_delta_minus(2)]; + let retracted_blocks = vec![client.block_hash_delta_minus(1)]; + + let mut queue = VecDeque::new(); + let mut io = TestIo::new(&mut client, &mut queue, None); + + // when + sync.chain_new_blocks(&mut io, &[], &[], &[], &good_blocks); + assert_eq!(sync.miner.status().transaction_queue_future, 0); + assert_eq!(sync.miner.status().transaction_queue_pending, 0); + sync.chain_new_blocks(&mut io, &good_blocks, &[], &[], &retracted_blocks); + + // then + let status = sync.miner.status(); + assert_eq!(status.transaction_queue_pending, 0); + assert_eq!(status.transaction_queue_future, 0); + } + #[test] fn returns_requested_block_headers() { let mut client = TestBlockChainClient::new(); diff --git a/sync/src/lib.rs b/sync/src/lib.rs index 1c87da2de..d78ac3b89 100644 --- a/sync/src/lib.rs +++ b/sync/src/lib.rs @@ -173,6 +173,9 @@ impl NetworkProtocolHandler for EthSync { SyncMessage::NewChainHead => { let mut sync_io = NetSyncIo::new(io, self.chain.deref()); self.sync.write().unwrap().chain_new_head(&mut sync_io); + }, + SyncMessage::BlockQueueEmpty => { + self.sync.write().unwrap().client_block_queue_empty(); } _ => {/* Ignore other messages */}, }