From 215602de088ca108410aa60543abdc8f23685d01 Mon Sep 17 00:00:00 2001 From: mattrutherford <44339188+mattrutherford@users.noreply.github.com> Date: Wed, 19 Dec 2018 09:24:14 +0000 Subject: [PATCH] Fix pubsub new_blocks notifications to include all blocks (#9987) Fix: new blocks notifications sometimes missing in pubsub RPC Implement new struct to pass to `new_blocks()` with extra parameter - `has_more_blocks_to_import`, which was previously used to determine whether the notification should be sent. Now it's up to each implementation to decide what to do. Updated all implementations to behave as before, except `eth_pubsub`, which will send notification even when the queue is not empty. Update tests. --- ethcore/private-tx/src/lib.rs | 14 ++--- ethcore/src/client/chain_notify.rs | 52 +++++++++++++---- ethcore/src/client/client.rs | 57 +++++++++++-------- ethcore/src/client/mod.rs | 2 +- ethcore/src/snapshot/watcher.rs | 28 ++++----- ethcore/sync/src/api.rs | 23 +++----- ethcore/sync/src/tests/helpers.rs | 22 +++---- parity/informant.rs | 18 +++--- rpc/src/v1/impls/eth_pubsub.rs | 20 ++----- rpc/src/v1/tests/mocked/eth_pubsub.rs | 10 ++-- secret_store/src/acl_storage.rs | 11 ++-- secret_store/src/key_server_set.rs | 8 +-- .../src/listener/service_contract_listener.rs | 10 ++-- updater/src/updater.rs | 6 +- 14 files changed, 145 insertions(+), 136 deletions(-) diff --git a/ethcore/private-tx/src/lib.rs b/ethcore/private-tx/src/lib.rs index 5f7b5cde3..4b9f6d33e 100644 --- a/ethcore/private-tx/src/lib.rs +++ b/ethcore/private-tx/src/lib.rs @@ -69,7 +69,6 @@ pub use error::{Error, ErrorKind}; use std::sync::{Arc, Weak}; use std::collections::{HashMap, HashSet, BTreeMap}; -use std::time::Duration; use ethereum_types::{H128, H256, U256, Address}; use hash::keccak; use rlp::*; @@ -82,7 +81,7 @@ use ethcore::executed::{Executed}; use transaction::{SignedTransaction, Transaction, Action, UnverifiedTransaction}; use ethcore::{contract_address as ethcore_contract_address}; use ethcore::client::{ - Client, ChainNotify, ChainRoute, ChainMessageType, ClientIoMessage, BlockId, + Client, ChainNotify, NewBlocks, ChainMessageType, ClientIoMessage, BlockId, CallContract, Call, BlockInfo }; use ethcore::account_provider::AccountProvider; @@ -733,12 +732,11 @@ fn find_account_password(passwords: &Vec, account_provider: &AccountPr } impl ChainNotify for Provider { - fn new_blocks(&self, imported: Vec, _invalid: Vec, _route: ChainRoute, _sealed: Vec, _proposed: Vec, _duration: Duration) { - if !imported.is_empty() { - trace!(target: "privatetx", "New blocks imported, try to prune the queue"); - if let Err(err) = self.process_verification_queue() { - warn!(target: "privatetx", "Cannot prune private transactions queue. error: {:?}", err); - } + fn new_blocks(&self, new_blocks: NewBlocks) { + if new_blocks.imported.is_empty() || new_blocks.has_more_blocks_to_import { return } + trace!(target: "privatetx", "New blocks imported, try to prune the queue"); + if let Err(err) = self.process_verification_queue() { + warn!(target: "privatetx", "Cannot prune private transactions queue. error: {:?}", err); } } } diff --git a/ethcore/src/client/chain_notify.rs b/ethcore/src/client/chain_notify.rs index 3d576ae12..4b599ad17 100644 --- a/ethcore/src/client/chain_notify.rs +++ b/ethcore/src/client/chain_notify.rs @@ -114,19 +114,51 @@ impl ChainRoute { } } +/// Used by `ChainNotify` `new_blocks()` +pub struct NewBlocks { + /// Imported blocks + pub imported: Vec, + /// Invalid blocks + pub invalid: Vec, + /// Route + pub route: ChainRoute, + /// Sealed + pub sealed: Vec, + /// Block bytes. + pub proposed: Vec, + /// Duration + pub duration: Duration, + /// Has more blocks to import + pub has_more_blocks_to_import: bool, +} + +impl NewBlocks { + /// Constructor + pub fn new ( + imported: Vec, + invalid: Vec, + route: ChainRoute, + sealed: Vec, + proposed: Vec, + duration: Duration, + has_more_blocks_to_import: bool, + ) -> NewBlocks { + NewBlocks { + imported, + invalid, + route, + sealed, + proposed, + duration, + has_more_blocks_to_import, + } + } +} + /// Represents what has to be handled by actor listening to chain events pub trait ChainNotify : Send + Sync { /// fires when chain has new blocks. - fn new_blocks( - &self, - _imported: Vec, - _invalid: Vec, - _route: ChainRoute, - _sealed: Vec, - // Block bytes. - _proposed: Vec, - _duration: Duration, - ) { + fn new_blocks( &self, _new_blocks: NewBlocks) { // does nothing by default } diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index 00dc9cab5..1b49d0092 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -44,7 +44,7 @@ use client::{ use client::{ BlockId, TransactionId, UncleId, TraceId, ClientConfig, BlockChainClient, TraceFilter, CallAnalytics, Mode, - ChainNotify, ChainRoute, PruningInfo, ProvingBlockChainClient, EngineInfo, ChainMessageType, + ChainNotify, NewBlocks, ChainRoute, PruningInfo, ProvingBlockChainClient, EngineInfo, ChainMessageType, IoClient, BadBlocks, }; use client::bad_blocks; @@ -268,7 +268,7 @@ impl Importer { } let max_blocks_to_import = client.config.max_round_blocks_to_import; - let (imported_blocks, import_results, invalid_blocks, imported, proposed_blocks, duration, is_empty) = { + let (imported_blocks, import_results, invalid_blocks, imported, proposed_blocks, duration, has_more_blocks_to_import) = { let mut imported_blocks = Vec::with_capacity(max_blocks_to_import); let mut invalid_blocks = HashSet::new(); let mut proposed_blocks = Vec::with_capacity(max_blocks_to_import); @@ -322,26 +322,29 @@ impl Importer { if !invalid_blocks.is_empty() { self.block_queue.mark_as_bad(&invalid_blocks); } - let is_empty = self.block_queue.mark_as_good(&imported_blocks); - (imported_blocks, import_results, invalid_blocks, imported, proposed_blocks, start.elapsed(), is_empty) + let has_more_blocks_to_import = !self.block_queue.mark_as_good(&imported_blocks); + (imported_blocks, import_results, invalid_blocks, imported, proposed_blocks, start.elapsed(), has_more_blocks_to_import) }; { - if !imported_blocks.is_empty() && is_empty { + if !imported_blocks.is_empty() { let route = ChainRoute::from(import_results.as_ref()); - if is_empty { + if !has_more_blocks_to_import { self.miner.chain_new_blocks(client, &imported_blocks, &invalid_blocks, route.enacted(), route.retracted(), false); } client.notify(|notify| { notify.new_blocks( - imported_blocks.clone(), - invalid_blocks.clone(), - route.clone(), - Vec::new(), - proposed_blocks.clone(), - duration, + NewBlocks::new( + imported_blocks.clone(), + invalid_blocks.clone(), + route.clone(), + Vec::new(), + proposed_blocks.clone(), + duration, + has_more_blocks_to_import, + ) ); }); } @@ -2342,12 +2345,15 @@ impl ImportSealedBlock for Client { ); self.notify(|notify| { notify.new_blocks( - vec![hash], - vec![], - route.clone(), - vec![hash], - vec![], - start.elapsed(), + NewBlocks::new( + vec![hash], + vec![], + route.clone(), + vec![hash], + vec![], + start.elapsed(), + false + ) ); }); self.db.read().key_value().flush().expect("DB flush failed."); @@ -2360,12 +2366,15 @@ impl BroadcastProposalBlock for Client { const DURATION_ZERO: Duration = Duration::from_millis(0); self.notify(|notify| { notify.new_blocks( - vec![], - vec![], - ChainRoute::default(), - vec![], - vec![block.rlp_bytes()], - DURATION_ZERO, + NewBlocks::new( + vec![], + vec![], + ChainRoute::default(), + vec![], + vec![block.rlp_bytes()], + DURATION_ZERO, + false + ) ); }); } diff --git a/ethcore/src/client/mod.rs b/ethcore/src/client/mod.rs index 4148bb586..bbbbbea8b 100644 --- a/ethcore/src/client/mod.rs +++ b/ethcore/src/client/mod.rs @@ -34,7 +34,7 @@ pub use self::evm_test_client::{EvmTestClient, EvmTestError, TransactResult}; pub use self::io_message::ClientIoMessage; #[cfg(any(test, feature = "test-helpers"))] pub use self::test_client::{TestBlockChainClient, EachBlockWith}; -pub use self::chain_notify::{ChainNotify, ChainRoute, ChainRouteType, ChainMessageType}; +pub use self::chain_notify::{ChainNotify, NewBlocks, ChainRoute, ChainRouteType, ChainMessageType}; pub use self::traits::{ Nonce, Balance, ChainInfo, BlockInfo, ReopenBlock, PrepareOpenBlock, CallContract, TransactionInfo, RegistryInfo, ScheduleInfo, ImportSealedBlock, BroadcastProposalBlock, ImportBlock, StateOrBlock, StateClient, Call, EngineInfo, AccountData, BlockChain, BlockProducer, SealedBlockImporter, BadBlocks, diff --git a/ethcore/src/snapshot/watcher.rs b/ethcore/src/snapshot/watcher.rs index 680567962..0eee7133b 100644 --- a/ethcore/src/snapshot/watcher.rs +++ b/ethcore/src/snapshot/watcher.rs @@ -17,14 +17,13 @@ //! Watcher for snapshot-related chain events. use parking_lot::Mutex; -use client::{BlockInfo, Client, ChainNotify, ChainRoute, ClientIoMessage}; +use client::{BlockInfo, Client, ChainNotify, NewBlocks, ClientIoMessage}; use ids::BlockId; use io::IoChannel; use ethereum_types::H256; -use bytes::Bytes; -use std::{sync::Arc, time::Duration}; +use std::sync::Arc; // helper trait for transforming hashes to numbers and checking if syncing. trait Oracle: Send + Sync { @@ -99,20 +98,12 @@ impl Watcher { } impl ChainNotify for Watcher { - fn new_blocks( - &self, - imported: Vec, - _: Vec, - _: ChainRoute, - _: Vec, - _: Vec, - _duration: Duration) - { - if self.oracle.is_major_importing() { return } + fn new_blocks(&self, new_blocks: NewBlocks) { + if self.oracle.is_major_importing() || new_blocks.has_more_blocks_to_import { return } - trace!(target: "snapshot_watcher", "{} imported", imported.len()); + trace!(target: "snapshot_watcher", "{} imported", new_blocks.imported.len()); - let highest = imported.into_iter() + let highest = new_blocks.imported.into_iter() .filter_map(|h| self.oracle.to_number(h)) .filter(|&num| num >= self.period + self.history) .map(|num| num - self.history) @@ -130,7 +121,7 @@ impl ChainNotify for Watcher { mod tests { use super::{Broadcast, Oracle, Watcher}; - use client::{ChainNotify, ChainRoute}; + use client::{ChainNotify, NewBlocks, ChainRoute}; use ethereum_types::{H256, U256}; @@ -170,14 +161,15 @@ mod tests { history: history, }; - watcher.new_blocks( + watcher.new_blocks(NewBlocks::new( hashes, vec![], ChainRoute::default(), vec![], vec![], DURATION_ZERO, - ); + false + )); } // helper diff --git a/ethcore/sync/src/api.rs b/ethcore/sync/src/api.rs index 7474d79b3..7d7a1afc7 100644 --- a/ethcore/sync/src/api.rs +++ b/ethcore/sync/src/api.rs @@ -29,7 +29,7 @@ use types::pruning_info::PruningInfo; use ethereum_types::{H256, H512, U256}; use io::{TimerToken}; use ethcore::ethstore::ethkey::Secret; -use ethcore::client::{BlockChainClient, ChainNotify, ChainRoute, ChainMessageType}; +use ethcore::client::{BlockChainClient, ChainNotify, NewBlocks, ChainMessageType}; use ethcore::snapshot::SnapshotService; use ethcore::header::BlockNumber; use sync_io::NetSyncIo; @@ -498,14 +498,9 @@ impl ChainNotify for EthSync { } } - fn new_blocks(&self, - imported: Vec, - invalid: Vec, - route: ChainRoute, - sealed: Vec, - proposed: Vec, - _duration: Duration) + fn new_blocks(&self, new_blocks: NewBlocks) { + if new_blocks.has_more_blocks_to_import { return } use light::net::Announcement; self.network.with_context(self.subprotocol_name, |context| { @@ -513,12 +508,12 @@ impl ChainNotify for EthSync { &self.eth_handler.overlay); self.eth_handler.sync.write().chain_new_blocks( &mut sync_io, - &imported, - &invalid, - route.enacted(), - route.retracted(), - &sealed, - &proposed); + &new_blocks.imported, + &new_blocks.invalid, + new_blocks.route.enacted(), + new_blocks.route.retracted(), + &new_blocks.sealed, + &new_blocks.proposed); }); self.network.with_context(self.light_subprotocol_name, |context| { diff --git a/ethcore/sync/src/tests/helpers.rs b/ethcore/sync/src/tests/helpers.rs index 3eac91a0d..cc4a8ba08 100644 --- a/ethcore/sync/src/tests/helpers.rs +++ b/ethcore/sync/src/tests/helpers.rs @@ -16,14 +16,13 @@ use std::collections::{VecDeque, HashSet, HashMap}; use std::sync::Arc; -use std::time::Duration; use ethereum_types::H256; use parking_lot::{RwLock, Mutex}; use bytes::Bytes; use network::{self, PeerId, ProtocolId, PacketId, SessionInfo}; use tests::snapshot::*; use ethcore::client::{TestBlockChainClient, BlockChainClient, Client as EthcoreClient, - ClientConfig, ChainNotify, ChainRoute, ChainMessageType, ClientIoMessage}; + ClientConfig, ChainNotify, NewBlocks, ChainMessageType, ClientIoMessage}; use ethcore::header::BlockNumber; use ethcore::snapshot::SnapshotService; use ethcore::spec::Spec; @@ -535,23 +534,18 @@ impl IoHandler for TestIoHandler { } impl ChainNotify for EthPeer { - fn new_blocks(&self, - imported: Vec, - invalid: Vec, - route: ChainRoute, - sealed: Vec, - proposed: Vec, - _duration: Duration) + fn new_blocks(&self, new_blocks: NewBlocks) { - let (enacted, retracted) = route.into_enacted_retracted(); + if new_blocks.has_more_blocks_to_import { return } + let (enacted, retracted) = new_blocks.route.into_enacted_retracted(); self.new_blocks_queue.write().push_back(NewBlockMessage { - imported, - invalid, + imported: new_blocks.imported, + invalid: new_blocks.invalid, enacted, retracted, - sealed, - proposed, + sealed: new_blocks.sealed, + proposed: new_blocks.proposed, }); } diff --git a/parity/informant.rs b/parity/informant.rs index 8cc37813c..5209a8551 100644 --- a/parity/informant.rs +++ b/parity/informant.rs @@ -25,7 +25,7 @@ use std::time::{Instant, Duration}; use atty; use ethcore::client::{ BlockId, BlockChainClient, ChainInfo, BlockInfo, BlockChainInfo, - BlockQueueInfo, ChainNotify, ChainRoute, ClientReport, Client, ClientIoMessage + BlockQueueInfo, ChainNotify, NewBlocks, ClientReport, Client, ClientIoMessage }; use ethcore::header::BlockNumber; use ethcore::snapshot::{RestorationStatus, SnapshotService as SS}; @@ -38,7 +38,6 @@ use number_prefix::{binary_prefix, Standalone, Prefixed}; use parity_rpc::is_major_importing_or_waiting; use parity_rpc::informant::RpcStats; use ethereum_types::H256; -use bytes::Bytes; use parking_lot::{RwLock, Mutex}; /// Format byte counts to standard denominations. @@ -365,29 +364,30 @@ impl Informant { } impl ChainNotify for Informant { - fn new_blocks(&self, imported: Vec, _invalid: Vec, _route: ChainRoute, _sealed: Vec, _proposed: Vec, duration: Duration) { + fn new_blocks(&self, new_blocks: NewBlocks) { + if new_blocks.has_more_blocks_to_import { return } let mut last_import = self.last_import.lock(); let client = &self.target.client; let importing = self.target.is_major_importing(); let ripe = Instant::now() > *last_import + Duration::from_secs(1) && !importing; - let txs_imported = imported.iter() - .take(imported.len().saturating_sub(if ripe { 1 } else { 0 })) + let txs_imported = new_blocks.imported.iter() + .take(new_blocks.imported.len().saturating_sub(if ripe { 1 } else { 0 })) .filter_map(|h| client.block(BlockId::Hash(*h))) .map(|b| b.transactions_count()) .sum(); if ripe { - if let Some(block) = imported.last().and_then(|h| client.block(BlockId::Hash(*h))) { + if let Some(block) = new_blocks.imported.last().and_then(|h| client.block(BlockId::Hash(*h))) { let header_view = block.header_view(); let size = block.rlp().as_raw().len(); - let (skipped, skipped_txs) = (self.skipped.load(AtomicOrdering::Relaxed) + imported.len() - 1, self.skipped_txs.load(AtomicOrdering::Relaxed) + txs_imported); + let (skipped, skipped_txs) = (self.skipped.load(AtomicOrdering::Relaxed) + new_blocks.imported.len() - 1, self.skipped_txs.load(AtomicOrdering::Relaxed) + txs_imported); info!(target: "import", "Imported {} {} ({} txs, {} Mgas, {} ms, {} KiB){}", Colour::White.bold().paint(format!("#{}", header_view.number())), Colour::White.bold().paint(format!("{}", header_view.hash())), Colour::Yellow.bold().paint(format!("{}", block.transactions_count())), Colour::Yellow.bold().paint(format!("{:.2}", header_view.gas_used().low_u64() as f32 / 1000000f32)), - Colour::Purple.bold().paint(format!("{}", duration.as_milliseconds())), + Colour::Purple.bold().paint(format!("{}", new_blocks.duration.as_milliseconds())), Colour::Blue.bold().paint(format!("{:.2}", size as f32 / 1024f32)), if skipped > 0 { format!(" + another {} block(s) containing {} tx(s)", @@ -403,7 +403,7 @@ impl ChainNotify for Informant { *last_import = Instant::now(); } } else { - self.skipped.fetch_add(imported.len(), AtomicOrdering::Relaxed); + self.skipped.fetch_add(new_blocks.imported.len(), AtomicOrdering::Relaxed); self.skipped_txs.fetch_add(txs_imported, AtomicOrdering::Relaxed); } } diff --git a/rpc/src/v1/impls/eth_pubsub.rs b/rpc/src/v1/impls/eth_pubsub.rs index 989a43fa5..0e39f4e31 100644 --- a/rpc/src/v1/impls/eth_pubsub.rs +++ b/rpc/src/v1/impls/eth_pubsub.rs @@ -18,7 +18,6 @@ use std::sync::{Arc, Weak}; use std::collections::BTreeMap; -use std::time::Duration; use jsonrpc_core::{BoxFuture, Result, Error}; use jsonrpc_core::futures::{self, Future, IntoFuture}; @@ -34,14 +33,13 @@ use v1::types::{pubsub, RichHeader, Log}; use ethcore::encoded; use ethcore::filter::Filter as EthFilter; -use ethcore::client::{BlockChainClient, ChainNotify, ChainRoute, ChainRouteType, BlockId}; +use ethcore::client::{BlockChainClient, ChainNotify, NewBlocks, ChainRouteType, BlockId}; use sync::LightSync; use light::cache::Cache; use light::on_demand::OnDemand; use light::client::{LightChainClient, LightChainNotify}; use parity_runtime::Executor; use ethereum_types::H256; -use bytes::Bytes; use parking_lot::{RwLock, Mutex}; type Client = Sink; @@ -220,18 +218,10 @@ impl LightChainNotify for ChainNotificationHandler { } impl ChainNotify for ChainNotificationHandler { - fn new_blocks( - &self, - _imported: Vec, - _invalid: Vec, - route: ChainRoute, - _sealed: Vec, - // Block bytes. - _proposed: Vec, - _duration: Duration, - ) { + fn new_blocks(&self, new_blocks: NewBlocks) { + if self.heads_subscribers.read().is_empty() && self.logs_subscribers.read().is_empty() { return } const EXTRA_INFO_PROOF: &'static str = "Object exists in in blockchain (fetched earlier), extra_info is always available if object exists; qed"; - let headers = route.route() + let headers = new_blocks.route.route() .iter() .filter_map(|&(hash, ref typ)| { match typ { @@ -249,7 +239,7 @@ impl ChainNotify for ChainNotificationHandler { self.notify_heads(&headers); // We notify logs enacting and retracting as the order in route. - self.notify_logs(route.route(), |filter, ex| { + self.notify_logs(new_blocks.route.route(), |filter, ex| { match ex { &ChainRouteType::Enacted => Ok(self.client.logs(filter).unwrap_or_default().into_iter().map(Into::into).collect()), diff --git a/rpc/src/v1/tests/mocked/eth_pubsub.rs b/rpc/src/v1/tests/mocked/eth_pubsub.rs index e363b9135..2eb1295d8 100644 --- a/rpc/src/v1/tests/mocked/eth_pubsub.rs +++ b/rpc/src/v1/tests/mocked/eth_pubsub.rs @@ -24,7 +24,7 @@ use std::time::Duration; use v1::{EthPubSub, EthPubSubClient, Metadata}; -use ethcore::client::{TestBlockChainClient, EachBlockWith, ChainNotify, ChainRoute, ChainRouteType}; +use ethcore::client::{TestBlockChainClient, EachBlockWith, ChainNotify, NewBlocks, ChainRoute, ChainRouteType}; use parity_runtime::Runtime; const DURATION_ZERO: Duration = Duration::from_millis(0); @@ -57,13 +57,13 @@ fn should_subscribe_to_new_heads() { assert_eq!(io.handle_request_sync(request, metadata.clone()), Some(response.to_owned())); // Check notifications - handler.new_blocks(vec![], vec![], ChainRoute::new(vec![(h1, ChainRouteType::Enacted)]), vec![], vec![], DURATION_ZERO); + handler.new_blocks(NewBlocks::new(vec![], vec![], ChainRoute::new(vec![(h1, ChainRouteType::Enacted)]), vec![], vec![], DURATION_ZERO, true)); let (res, receiver) = receiver.into_future().wait().unwrap(); let response = r#"{"jsonrpc":"2.0","method":"eth_subscription","params":{"result":{"author":"0x0000000000000000000000000000000000000000","difficulty":"0x1","extraData":"0x","gasLimit":"0xf4240","gasUsed":"0x0","hash":"0x3457d2fa2e3dd33c78ac681cf542e429becf718859053448748383af67e23218","logsBloom":"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000","miner":"0x0000000000000000000000000000000000000000","number":"0x1","parentHash":"0x0cd786a2425d16f152c658316c423e6ce1181e15c3295826d7c9904cba9ce303","receiptsRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421","sealFields":[],"sha3Uncles":"0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347","size":"0x1c9","stateRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421","timestamp":"0x0","transactionsRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421"},"subscription":"0x416d77337e24399d"}}"#; assert_eq!(res, Some(response.into())); // Notify about two blocks - handler.new_blocks(vec![], vec![], ChainRoute::new(vec![(h2, ChainRouteType::Enacted), (h3, ChainRouteType::Enacted)]), vec![], vec![], DURATION_ZERO); + handler.new_blocks(NewBlocks::new(vec![], vec![], ChainRoute::new(vec![(h2, ChainRouteType::Enacted), (h3, ChainRouteType::Enacted)]), vec![], vec![], DURATION_ZERO, true)); // Receive both let (res, receiver) = receiver.into_future().wait().unwrap(); @@ -129,7 +129,7 @@ fn should_subscribe_to_logs() { assert_eq!(io.handle_request_sync(request, metadata.clone()), Some(response.to_owned())); // Check notifications (enacted) - handler.new_blocks(vec![], vec![], ChainRoute::new(vec![(h1, ChainRouteType::Enacted)]), vec![], vec![], DURATION_ZERO); + handler.new_blocks(NewBlocks::new(vec![], vec![], ChainRoute::new(vec![(h1, ChainRouteType::Enacted)]), vec![], vec![], DURATION_ZERO, false)); let (res, receiver) = receiver.into_future().wait().unwrap(); let response = r#"{"jsonrpc":"2.0","method":"eth_subscription","params":{"result":{"address":"0x0000000000000000000000000000000000000005","blockHash":"0x3457d2fa2e3dd33c78ac681cf542e429becf718859053448748383af67e23218","blockNumber":"0x1","data":"0x","logIndex":"0x0","removed":false,"topics":["0x0000000000000000000000000000000000000000000000000000000000000001","0x0000000000000000000000000000000000000000000000000000000000000002","0x0000000000000000000000000000000000000000000000000000000000000000","0x0000000000000000000000000000000000000000000000000000000000000000"],"transactionHash":""#.to_owned() + &format!("0x{:x}", tx_hash) @@ -137,7 +137,7 @@ fn should_subscribe_to_logs() { assert_eq!(res, Some(response.into())); // Check notifications (retracted) - handler.new_blocks(vec![], vec![], ChainRoute::new(vec![(h1, ChainRouteType::Retracted)]), vec![], vec![], DURATION_ZERO); + handler.new_blocks(NewBlocks::new(vec![], vec![], ChainRoute::new(vec![(h1, ChainRouteType::Retracted)]), vec![], vec![], DURATION_ZERO, false)); let (res, receiver) = receiver.into_future().wait().unwrap(); let response = r#"{"jsonrpc":"2.0","method":"eth_subscription","params":{"result":{"address":"0x0000000000000000000000000000000000000005","blockHash":"0x3457d2fa2e3dd33c78ac681cf542e429becf718859053448748383af67e23218","blockNumber":"0x1","data":"0x","logIndex":"0x0","removed":true,"topics":["0x0000000000000000000000000000000000000000000000000000000000000001","0x0000000000000000000000000000000000000000000000000000000000000002","0x0000000000000000000000000000000000000000000000000000000000000000","0x0000000000000000000000000000000000000000000000000000000000000000"],"transactionHash":""#.to_owned() + &format!("0x{:x}", tx_hash) diff --git a/secret_store/src/acl_storage.rs b/secret_store/src/acl_storage.rs index 6bc9aa0e6..5704d42b3 100644 --- a/secret_store/src/acl_storage.rs +++ b/secret_store/src/acl_storage.rs @@ -16,12 +16,10 @@ use std::sync::Arc; use std::collections::{HashMap, HashSet}; -use std::time::Duration; use parking_lot::{Mutex, RwLock}; -use ethcore::client::{BlockId, ChainNotify, ChainRoute, CallContract}; -use ethereum_types::{H256, Address}; +use ethcore::client::{BlockId, ChainNotify, NewBlocks, CallContract}; +use ethereum_types::Address; use ethabi::FunctionOutputDecoder; -use bytes::Bytes; use trusted_client::TrustedClient; use types::{Error, ServerKeyId, ContractAddress}; @@ -77,8 +75,9 @@ impl AclStorage for OnChainAclStorage { } impl ChainNotify for OnChainAclStorage { - fn new_blocks(&self, _imported: Vec, _invalid: Vec, route: ChainRoute, _sealed: Vec, _proposed: Vec, _duration: Duration) { - if !route.enacted().is_empty() || !route.retracted().is_empty() { + fn new_blocks(&self, new_blocks: NewBlocks) { + if new_blocks.has_more_blocks_to_import { return } + if !new_blocks.route.enacted().is_empty() || !new_blocks.route.retracted().is_empty() { self.contract.lock().update_contract_address() } } diff --git a/secret_store/src/key_server_set.rs b/secret_store/src/key_server_set.rs index 128d865b8..2e5e6816d 100644 --- a/secret_store/src/key_server_set.rs +++ b/secret_store/src/key_server_set.rs @@ -17,10 +17,9 @@ use std::sync::Arc; use std::net::SocketAddr; use std::collections::{BTreeMap, HashSet}; -use std::time::Duration; use parking_lot::Mutex; use ethabi::FunctionOutputDecoder; -use ethcore::client::{Client, BlockChainClient, BlockId, ChainNotify, ChainRoute, CallContract}; +use ethcore::client::{Client, BlockChainClient, BlockId, ChainNotify, NewBlocks, CallContract}; use ethereum_types::{H256, Address}; use ethkey::public_to_address; use bytes::Bytes; @@ -151,8 +150,9 @@ impl KeyServerSet for OnChainKeyServerSet { } impl ChainNotify for OnChainKeyServerSet { - fn new_blocks(&self, _imported: Vec, _invalid: Vec, route: ChainRoute, _sealed: Vec, _proposed: Vec, _duration: Duration) { - let (enacted, retracted) = route.into_enacted_retracted(); + fn new_blocks(&self, new_blocks: NewBlocks) { + if new_blocks.has_more_blocks_to_import { return } + let (enacted, retracted) = new_blocks.route.into_enacted_retracted(); if !enacted.is_empty() || !retracted.is_empty() { self.contract.lock().update(enacted, retracted) diff --git a/secret_store/src/listener/service_contract_listener.rs b/secret_store/src/listener/service_contract_listener.rs index dc2c8a38e..e26d3d925 100644 --- a/secret_store/src/listener/service_contract_listener.rs +++ b/secret_store/src/listener/service_contract_listener.rs @@ -17,10 +17,9 @@ use std::collections::HashSet; use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::time::Duration; use std::thread; use parking_lot::Mutex; -use ethcore::client::{ChainNotify, ChainRoute}; +use ethcore::client::{ChainNotify, NewBlocks}; use ethkey::{Public, public_to_address}; use bytes::Bytes; use ethereum_types::{H256, U256, Address}; @@ -435,9 +434,10 @@ impl Drop for ServiceContractListener { } impl ChainNotify for ServiceContractListener { - fn new_blocks(&self, _imported: Vec, _invalid: Vec, route: ChainRoute, _sealed: Vec, _proposed: Vec, _duration: Duration) { - let enacted_len = route.enacted().len(); - if enacted_len == 0 && route.retracted().is_empty() { + fn new_blocks(&self, new_blocks: NewBlocks) { + if new_blocks.has_more_blocks_to_import { return } + let enacted_len = new_blocks.route.enacted().len(); + if enacted_len == 0 && new_blocks.route.retracted().is_empty() { return; } diff --git a/updater/src/updater.rs b/updater/src/updater.rs index 1e939b97e..4b968625a 100644 --- a/updater/src/updater.rs +++ b/updater/src/updater.rs @@ -25,9 +25,8 @@ use parking_lot::{Mutex, MutexGuard}; use rand::{self, Rng}; use target_info::Target; -use bytes::Bytes; use ethcore::BlockNumber; -use ethcore::client::{BlockId, BlockChainClient, ChainNotify, ChainRoute}; +use ethcore::client::{BlockId, BlockChainClient, ChainNotify, NewBlocks}; use ethcore::filter::Filter; use ethereum_types::H256; use hash_fetch::{self as fetch, HashFetch}; @@ -669,7 +668,8 @@ impl Updater, _invalid: Vec, _route: ChainRoute, _sealed: Vec, _proposed: Vec, _duration: Duration) { + fn new_blocks(&self, new_blocks: NewBlocks) { + if new_blocks.has_more_blocks_to_import { return } match (self.client.upgrade(), self.sync.as_ref().and_then(Weak::upgrade)) { (Some(ref c), Some(ref s)) if !s.status().is_syncing(c.queue_info()) => self.poll(), _ => {},