Keep all enacted blocks notify in order (#8524)

* Keep all enacted blocks notify in order

* Collect is unnecessary

* Update ChainNotify to use ChainRouteType

* Fix all ethcore fn defs

* Wrap the type within ChainRoute

* Fix private-tx and sync api

* Fix secret_store API

* Fix updater API

* Fix rpc api

* Fix informant api

* Eagerly cache enacted/retracted and remove contain_enacted/retracted

* Fix indent

* tests: should use full expr form for struct constructor

* Use into_enacted_retracted to further avoid copy

* typo: not a function

* rpc/tests: ChainRoute -> ChainRoute::new
This commit is contained in:
Wei Tang 2018-05-07 18:58:25 +08:00 committed by Afri Schoedon
parent 32c32ecfda
commit 528497b86a
15 changed files with 158 additions and 98 deletions

View File

@ -79,7 +79,7 @@ use ethcore::executed::{Executed};
use transaction::{SignedTransaction, Transaction, Action, UnverifiedTransaction}; use transaction::{SignedTransaction, Transaction, Action, UnverifiedTransaction};
use ethcore::{contract_address as ethcore_contract_address}; use ethcore::{contract_address as ethcore_contract_address};
use ethcore::client::{ use ethcore::client::{
Client, ChainNotify, ChainMessageType, ClientIoMessage, BlockId, CallContract Client, ChainNotify, ChainRoute, ChainMessageType, ClientIoMessage, BlockId, CallContract
}; };
use ethcore::account_provider::AccountProvider; use ethcore::account_provider::AccountProvider;
use ethcore::miner::{self, Miner, MinerService}; use ethcore::miner::{self, Miner, MinerService};
@ -668,7 +668,7 @@ fn find_account_password(passwords: &Vec<String>, account_provider: &AccountProv
} }
impl ChainNotify for Provider { impl ChainNotify for Provider {
fn new_blocks(&self, imported: Vec<H256>, _invalid: Vec<H256>, _enacted: Vec<H256>, _retracted: Vec<H256>, _sealed: Vec<H256>, _proposed: Vec<Bytes>, _duration: Duration) { fn new_blocks(&self, imported: Vec<H256>, _invalid: Vec<H256>, _route: ChainRoute, _sealed: Vec<H256>, _proposed: Vec<Bytes>, _duration: Duration) {
if !imported.is_empty() { if !imported.is_empty() {
trace!("New blocks imported, try to prune the queue"); trace!("New blocks imported, try to prune the queue");
if let Err(err) = self.process_queue() { if let Err(err) = self.process_queue() {

View File

@ -20,7 +20,7 @@ use ethereum_types::H256;
use blockchain::block_info::{BlockInfo, BlockLocation}; use blockchain::block_info::{BlockInfo, BlockLocation};
/// Import route for newly inserted block. /// Import route for newly inserted block.
#[derive(Debug, PartialEq)] #[derive(Debug, PartialEq, Clone)]
pub struct ImportRoute { pub struct ImportRoute {
/// Blocks that were invalidated by new block. /// Blocks that were invalidated by new block.
pub retracted: Vec<H256>, pub retracted: Vec<H256>,

View File

@ -17,7 +17,9 @@
use bytes::Bytes; use bytes::Bytes;
use ethereum_types::H256; use ethereum_types::H256;
use transaction::UnverifiedTransaction; use transaction::UnverifiedTransaction;
use blockchain::ImportRoute;
use std::time::Duration; use std::time::Duration;
use std::collections::HashMap;
/// Messages to broadcast via chain /// Messages to broadcast via chain
pub enum ChainMessageType { pub enum ChainMessageType {
@ -29,6 +31,89 @@ pub enum ChainMessageType {
SignedPrivateTransaction(Vec<u8>), SignedPrivateTransaction(Vec<u8>),
} }
/// Route type to indicate whether it is enacted or retracted.
#[derive(Clone)]
pub enum ChainRouteType {
/// Enacted block
Enacted,
/// Retracted block
Retracted
}
/// A complete chain enacted retracted route.
#[derive(Default, Clone)]
pub struct ChainRoute {
route: Vec<(H256, ChainRouteType)>,
enacted: Vec<H256>,
retracted: Vec<H256>,
}
impl<'a> From<&'a [ImportRoute]> for ChainRoute {
fn from(import_results: &'a [ImportRoute]) -> ChainRoute {
ChainRoute::new(import_results.iter().flat_map(|route| {
route.retracted.iter().map(|h| (*h, ChainRouteType::Retracted))
.chain(route.enacted.iter().map(|h| (*h, ChainRouteType::Enacted)))
}).collect())
}
}
impl ChainRoute {
/// Create a new ChainRoute based on block hash and route type pairs.
pub fn new(route: Vec<(H256, ChainRouteType)>) -> Self {
let (enacted, retracted) = Self::to_enacted_retracted(&route);
Self { route, enacted, retracted }
}
/// Gather all non-duplicate enacted and retracted blocks.
fn to_enacted_retracted(route: &[(H256, ChainRouteType)]) -> (Vec<H256>, Vec<H256>) {
fn map_to_vec(map: Vec<(H256, bool)>) -> Vec<H256> {
map.into_iter().map(|(k, _v)| k).collect()
}
// Because we are doing multiple inserts some of the blocks that were enacted in import `k`
// could be retracted in import `k+1`. This is why to understand if after all inserts
// the block is enacted or retracted we iterate over all routes and at the end final state
// will be in the hashmap
let map = route.iter().fold(HashMap::new(), |mut map, route| {
match &route.1 {
&ChainRouteType::Enacted => {
map.insert(route.0, true);
},
&ChainRouteType::Retracted => {
map.insert(route.0, false);
},
}
map
});
// Split to enacted retracted (using hashmap value)
let (enacted, retracted) = map.into_iter().partition(|&(_k, v)| v);
// And convert tuples to keys
(map_to_vec(enacted), map_to_vec(retracted))
}
/// Consume route and return the enacted retracted form.
pub fn into_enacted_retracted(self) -> (Vec<H256>, Vec<H256>) {
(self.enacted, self.retracted)
}
/// All non-duplicate enacted blocks.
pub fn enacted(&self) -> &[H256] {
&self.enacted
}
/// All non-duplicate retracted blocks.
pub fn retracted(&self) -> &[H256] {
&self.retracted
}
/// All blocks in the route.
pub fn route(&self) -> &[(H256, ChainRouteType)] {
&self.route
}
}
/// Represents what has to be handled by actor listening to chain events /// Represents what has to be handled by actor listening to chain events
pub trait ChainNotify : Send + Sync { pub trait ChainNotify : Send + Sync {
/// fires when chain has new blocks. /// fires when chain has new blocks.
@ -36,8 +121,7 @@ pub trait ChainNotify : Send + Sync {
&self, &self,
_imported: Vec<H256>, _imported: Vec<H256>,
_invalid: Vec<H256>, _invalid: Vec<H256>,
_enacted: Vec<H256>, _route: ChainRoute,
_retracted: Vec<H256>,
_sealed: Vec<H256>, _sealed: Vec<H256>,
// Block bytes. // Block bytes.
_proposed: Vec<Bytes>, _proposed: Vec<Bytes>,

View File

@ -14,7 +14,7 @@
// You should have received a copy of the GNU General Public License // You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>. // along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::collections::{HashSet, HashMap, BTreeMap, BTreeSet, VecDeque}; use std::collections::{HashSet, BTreeMap, BTreeSet, VecDeque};
use std::str::FromStr; use std::str::FromStr;
use std::sync::{Arc, Weak}; use std::sync::{Arc, Weak};
use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering as AtomicOrdering}; use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering as AtomicOrdering};
@ -45,7 +45,7 @@ use client::{
use client::{ use client::{
BlockId, TransactionId, UncleId, TraceId, ClientConfig, BlockChainClient, BlockId, TransactionId, UncleId, TraceId, ClientConfig, BlockChainClient,
TraceFilter, CallAnalytics, BlockImportError, Mode, TraceFilter, CallAnalytics, BlockImportError, Mode,
ChainNotify, PruningInfo, ProvingBlockChainClient, EngineInfo, ChainMessageType ChainNotify, ChainRoute, PruningInfo, ProvingBlockChainClient, EngineInfo, ChainMessageType
}; };
use encoded; use encoded;
use engines::{EthEngine, EpochTransition}; use engines::{EthEngine, EpochTransition};
@ -245,32 +245,6 @@ impl Importer {
}) })
} }
fn calculate_enacted_retracted(&self, import_results: &[ImportRoute]) -> (Vec<H256>, Vec<H256>) {
fn map_to_vec(map: Vec<(H256, bool)>) -> Vec<H256> {
map.into_iter().map(|(k, _v)| k).collect()
}
// In ImportRoute we get all the blocks that have been enacted and retracted by single insert.
// Because we are doing multiple inserts some of the blocks that were enacted in import `k`
// could be retracted in import `k+1`. This is why to understand if after all inserts
// the block is enacted or retracted we iterate over all routes and at the end final state
// will be in the hashmap
let map = import_results.iter().fold(HashMap::new(), |mut map, route| {
for hash in &route.enacted {
map.insert(hash.clone(), true);
}
for hash in &route.retracted {
map.insert(hash.clone(), false);
}
map
});
// Split to enacted retracted (using hashmap value)
let (enacted, retracted) = map.into_iter().partition(|&(_k, v)| v);
// And convert tuples to keys
(map_to_vec(enacted), map_to_vec(retracted))
}
/// This is triggered by a message coming from a block queue when the block is ready for insertion /// This is triggered by a message coming from a block queue when the block is ready for insertion
pub fn import_verified_blocks(&self, client: &Client) -> usize { pub fn import_verified_blocks(&self, client: &Client) -> usize {
@ -336,18 +310,17 @@ impl Importer {
{ {
if !imported_blocks.is_empty() && is_empty { if !imported_blocks.is_empty() && is_empty {
let (enacted, retracted) = self.calculate_enacted_retracted(&import_results); let route = ChainRoute::from(import_results.as_ref());
if is_empty { if is_empty {
self.miner.chain_new_blocks(client, &imported_blocks, &invalid_blocks, &enacted, &retracted, false); self.miner.chain_new_blocks(client, &imported_blocks, &invalid_blocks, route.enacted(), route.retracted(), false);
} }
client.notify(|notify| { client.notify(|notify| {
notify.new_blocks( notify.new_blocks(
imported_blocks.clone(), imported_blocks.clone(),
invalid_blocks.clone(), invalid_blocks.clone(),
enacted.clone(), route.clone(),
retracted.clone(),
Vec::new(), Vec::new(),
proposed_blocks.clone(), proposed_blocks.clone(),
duration, duration,
@ -2155,14 +2128,13 @@ impl ImportSealedBlock for Client {
self.state_db.write().sync_cache(&route.enacted, &route.retracted, false); self.state_db.write().sync_cache(&route.enacted, &route.retracted, false);
route route
}; };
let (enacted, retracted) = self.importer.calculate_enacted_retracted(&[route]); let route = ChainRoute::from([route].as_ref());
self.importer.miner.chain_new_blocks(self, &[h.clone()], &[], &enacted, &retracted, true); self.importer.miner.chain_new_blocks(self, &[h.clone()], &[], route.enacted(), route.retracted(), true);
self.notify(|notify| { self.notify(|notify| {
notify.new_blocks( notify.new_blocks(
vec![h.clone()], vec![h.clone()],
vec![], vec![],
enacted.clone(), route.clone(),
retracted.clone(),
vec![h.clone()], vec![h.clone()],
vec![], vec![],
start.elapsed(), start.elapsed(),
@ -2180,8 +2152,7 @@ impl BroadcastProposalBlock for Client {
notify.new_blocks( notify.new_blocks(
vec![], vec![],
vec![], vec![],
vec![], ChainRoute::default(),
vec![],
vec![], vec![],
vec![block.rlp_bytes()], vec![block.rlp_bytes()],
DURATION_ZERO, DURATION_ZERO,

View File

@ -31,7 +31,7 @@ pub use self::error::Error;
pub use self::evm_test_client::{EvmTestClient, EvmTestError, TransactResult}; pub use self::evm_test_client::{EvmTestClient, EvmTestError, TransactResult};
pub use self::io_message::ClientIoMessage; pub use self::io_message::ClientIoMessage;
pub use self::test_client::{TestBlockChainClient, EachBlockWith}; pub use self::test_client::{TestBlockChainClient, EachBlockWith};
pub use self::chain_notify::{ChainNotify, ChainMessageType}; pub use self::chain_notify::{ChainNotify, ChainRoute, ChainRouteType, ChainMessageType};
pub use self::traits::{ pub use self::traits::{
Nonce, Balance, ChainInfo, BlockInfo, ReopenBlock, PrepareOpenBlock, CallContract, TransactionInfo, RegistryInfo, ScheduleInfo, ImportSealedBlock, BroadcastProposalBlock, ImportBlock, Nonce, Balance, ChainInfo, BlockInfo, ReopenBlock, PrepareOpenBlock, CallContract, TransactionInfo, RegistryInfo, ScheduleInfo, ImportSealedBlock, BroadcastProposalBlock, ImportBlock,
StateOrBlock, StateClient, Call, EngineInfo, AccountData, BlockChain, BlockProducer, SealedBlockImporter StateOrBlock, StateClient, Call, EngineInfo, AccountData, BlockChain, BlockProducer, SealedBlockImporter

View File

@ -17,7 +17,7 @@
//! Watcher for snapshot-related chain events. //! Watcher for snapshot-related chain events.
use parking_lot::Mutex; use parking_lot::Mutex;
use client::{BlockInfo, Client, ChainNotify, ClientIoMessage}; use client::{BlockInfo, Client, ChainNotify, ChainRoute, ClientIoMessage};
use ids::BlockId; use ids::BlockId;
use io::IoChannel; use io::IoChannel;
@ -103,8 +103,7 @@ impl ChainNotify for Watcher {
&self, &self,
imported: Vec<H256>, imported: Vec<H256>,
_: Vec<H256>, _: Vec<H256>,
_: Vec<H256>, _: ChainRoute,
_: Vec<H256>,
_: Vec<H256>, _: Vec<H256>,
_: Vec<Bytes>, _: Vec<Bytes>,
_duration: Duration) _duration: Duration)
@ -131,7 +130,7 @@ impl ChainNotify for Watcher {
mod tests { mod tests {
use super::{Broadcast, Oracle, Watcher}; use super::{Broadcast, Oracle, Watcher};
use client::ChainNotify; use client::{ChainNotify, ChainRoute};
use ethereum_types::{H256, U256}; use ethereum_types::{H256, U256};
@ -174,8 +173,7 @@ mod tests {
watcher.new_blocks( watcher.new_blocks(
hashes, hashes,
vec![], vec![],
vec![], ChainRoute::default(),
vec![],
vec![], vec![],
vec![], vec![],
DURATION_ZERO, DURATION_ZERO,

View File

@ -25,7 +25,7 @@ use network::{NetworkProtocolHandler, NetworkContext, HostInfo, PeerId, Protocol
use ethereum_types::{H256, H512, U256}; use ethereum_types::{H256, H512, U256};
use io::{TimerToken}; use io::{TimerToken};
use ethcore::ethstore::ethkey::Secret; use ethcore::ethstore::ethkey::Secret;
use ethcore::client::{BlockChainClient, ChainNotify, ChainMessageType}; use ethcore::client::{BlockChainClient, ChainNotify, ChainRoute, ChainMessageType};
use ethcore::snapshot::SnapshotService; use ethcore::snapshot::SnapshotService;
use ethcore::header::BlockNumber; use ethcore::header::BlockNumber;
use sync_io::NetSyncIo; use sync_io::NetSyncIo;
@ -409,8 +409,7 @@ impl ChainNotify for EthSync {
fn new_blocks(&self, fn new_blocks(&self,
imported: Vec<H256>, imported: Vec<H256>,
invalid: Vec<H256>, invalid: Vec<H256>,
enacted: Vec<H256>, route: ChainRoute,
retracted: Vec<H256>,
sealed: Vec<H256>, sealed: Vec<H256>,
proposed: Vec<Bytes>, proposed: Vec<Bytes>,
_duration: Duration) _duration: Duration)
@ -424,8 +423,8 @@ impl ChainNotify for EthSync {
&mut sync_io, &mut sync_io,
&imported, &imported,
&invalid, &invalid,
&enacted, route.enacted(),
&retracted, route.retracted(),
&sealed, &sealed,
&proposed); &proposed);
}); });

View File

@ -23,7 +23,7 @@ use bytes::Bytes;
use network::{self, PeerId, ProtocolId, PacketId, SessionInfo}; use network::{self, PeerId, ProtocolId, PacketId, SessionInfo};
use tests::snapshot::*; use tests::snapshot::*;
use ethcore::client::{TestBlockChainClient, BlockChainClient, Client as EthcoreClient, use ethcore::client::{TestBlockChainClient, BlockChainClient, Client as EthcoreClient,
ClientConfig, ChainNotify, ChainMessageType, ClientIoMessage}; ClientConfig, ChainNotify, ChainRoute, ChainMessageType, ClientIoMessage};
use ethcore::header::BlockNumber; use ethcore::header::BlockNumber;
use ethcore::snapshot::SnapshotService; use ethcore::snapshot::SnapshotService;
use ethcore::spec::Spec; use ethcore::spec::Spec;
@ -535,12 +535,13 @@ impl ChainNotify for EthPeer<EthcoreClient> {
fn new_blocks(&self, fn new_blocks(&self,
imported: Vec<H256>, imported: Vec<H256>,
invalid: Vec<H256>, invalid: Vec<H256>,
enacted: Vec<H256>, route: ChainRoute,
retracted: Vec<H256>,
sealed: Vec<H256>, sealed: Vec<H256>,
proposed: Vec<Bytes>, proposed: Vec<Bytes>,
_duration: Duration) _duration: Duration)
{ {
let (enacted, retracted) = route.into_enacted_retracted();
self.new_blocks_queue.write().push_back(NewBlockMessage { self.new_blocks_queue.write().push_back(NewBlockMessage {
imported, imported,
invalid, invalid,

View File

@ -25,7 +25,7 @@ use std::time::{Instant, Duration};
use atty; use atty;
use ethcore::client::{ use ethcore::client::{
BlockId, BlockChainClient, ChainInfo, BlockInfo, BlockChainInfo, BlockId, BlockChainClient, ChainInfo, BlockInfo, BlockChainInfo,
BlockQueueInfo, ChainNotify, ClientReport, Client, ClientIoMessage BlockQueueInfo, ChainNotify, ChainRoute, ClientReport, Client, ClientIoMessage
}; };
use ethcore::header::BlockNumber; use ethcore::header::BlockNumber;
use ethcore::snapshot::{RestorationStatus, SnapshotService as SS}; use ethcore::snapshot::{RestorationStatus, SnapshotService as SS};
@ -351,7 +351,7 @@ impl<T: InformantData> Informant<T> {
} }
impl ChainNotify for Informant<FullNodeInformantData> { impl ChainNotify for Informant<FullNodeInformantData> {
fn new_blocks(&self, imported: Vec<H256>, _invalid: Vec<H256>, _enacted: Vec<H256>, _retracted: Vec<H256>, _sealed: Vec<H256>, _proposed: Vec<Bytes>, duration: Duration) { fn new_blocks(&self, imported: Vec<H256>, _invalid: Vec<H256>, _route: ChainRoute, _sealed: Vec<H256>, _proposed: Vec<Bytes>, duration: Duration) {
let mut last_import = self.last_import.lock(); let mut last_import = self.last_import.lock();
let client = &self.target.client; let client = &self.target.client;

View File

@ -34,7 +34,7 @@ use v1::types::{pubsub, RichHeader, Log};
use ethcore::encoded; use ethcore::encoded;
use ethcore::filter::Filter as EthFilter; use ethcore::filter::Filter as EthFilter;
use ethcore::client::{BlockChainClient, ChainNotify, BlockId}; use ethcore::client::{BlockChainClient, ChainNotify, ChainRoute, ChainRouteType, BlockId};
use sync::LightSync; use sync::LightSync;
use light::cache::Cache; use light::cache::Cache;
use light::on_demand::OnDemand; use light::on_demand::OnDemand;
@ -141,19 +141,20 @@ impl<C> ChainNotificationHandler<C> {
} }
} }
fn notify_logs<F, T>(&self, enacted: &[H256], logs: F) where fn notify_logs<F, T, Ex>(&self, enacted: &[(H256, Ex)], logs: F) where
F: Fn(EthFilter) -> T, F: Fn(EthFilter, &Ex) -> T,
Ex: Send,
T: IntoFuture<Item = Vec<Log>, Error = Error>, T: IntoFuture<Item = Vec<Log>, Error = Error>,
T::Future: Send + 'static, T::Future: Send + 'static,
{ {
for &(ref subscriber, ref filter) in self.logs_subscribers.read().values() { for &(ref subscriber, ref filter) in self.logs_subscribers.read().values() {
let logs = futures::future::join_all(enacted let logs = futures::future::join_all(enacted
.iter() .iter()
.map(|hash| { .map(|&(hash, ref ex)| {
let mut filter = filter.clone(); let mut filter = filter.clone();
filter.from_block = BlockId::Hash(*hash); filter.from_block = BlockId::Hash(hash);
filter.to_block = filter.from_block.clone(); filter.to_block = filter.from_block.clone();
logs(filter).into_future() logs(filter, ex).into_future()
}) })
.collect::<Vec<_>>() .collect::<Vec<_>>()
); );
@ -214,7 +215,7 @@ impl<C: LightClient> LightChainNotify for ChainNotificationHandler<C> {
.collect::<Vec<_>>(); .collect::<Vec<_>>();
self.notify_heads(&headers); self.notify_heads(&headers);
self.notify_logs(&enacted, |filter| self.client.logs(filter)) self.notify_logs(&enacted.iter().map(|h| (*h, ())).collect::<Vec<_>>(), |filter, _| self.client.logs(filter))
} }
} }
@ -223,17 +224,21 @@ impl<C: BlockChainClient> ChainNotify for ChainNotificationHandler<C> {
&self, &self,
_imported: Vec<H256>, _imported: Vec<H256>,
_invalid: Vec<H256>, _invalid: Vec<H256>,
enacted: Vec<H256>, route: ChainRoute,
retracted: Vec<H256>,
_sealed: Vec<H256>, _sealed: Vec<H256>,
// Block bytes. // Block bytes.
_proposed: Vec<Bytes>, _proposed: Vec<Bytes>,
_duration: Duration, _duration: Duration,
) { ) {
const EXTRA_INFO_PROOF: &'static str = "Object exists in in blockchain (fetched earlier), extra_info is always available if object exists; qed"; const EXTRA_INFO_PROOF: &'static str = "Object exists in in blockchain (fetched earlier), extra_info is always available if object exists; qed";
let headers = enacted let headers = route.route()
.iter() .iter()
.filter_map(|hash| self.client.block_header(BlockId::Hash(*hash))) .filter_map(|&(hash, ref typ)| {
match typ {
&ChainRouteType::Retracted => None,
&ChainRouteType::Enacted => self.client.block_header(BlockId::Hash(hash))
}
})
.map(|header| { .map(|header| {
let hash = header.hash(); let hash = header.hash();
(header, self.client.block_extra_info(BlockId::Hash(hash)).expect(EXTRA_INFO_PROOF)) (header, self.client.block_extra_info(BlockId::Hash(hash)).expect(EXTRA_INFO_PROOF))
@ -243,17 +248,17 @@ impl<C: BlockChainClient> ChainNotify for ChainNotificationHandler<C> {
// Headers // Headers
self.notify_heads(&headers); self.notify_heads(&headers);
// Enacted logs // We notify logs enacting and retracting as the order in route.
self.notify_logs(&enacted, |filter| { self.notify_logs(route.route(), |filter, ex| {
Ok(self.client.logs(filter).into_iter().map(Into::into).collect()) match ex {
}); &ChainRouteType::Enacted =>
Ok(self.client.logs(filter).into_iter().map(Into::into).collect()),
// Retracted logs &ChainRouteType::Retracted =>
self.notify_logs(&retracted, |filter| { Ok(self.client.logs(filter).into_iter().map(Into::into).map(|mut log: Log| {
Ok(self.client.logs(filter).into_iter().map(Into::into).map(|mut log: Log| { log.log_type = "removed".into();
log.log_type = "removed".into(); log
log }).collect()),
}).collect()) }
}); });
} }
} }

View File

@ -24,7 +24,7 @@ use std::time::Duration;
use v1::{EthPubSub, EthPubSubClient, Metadata}; use v1::{EthPubSub, EthPubSubClient, Metadata};
use ethcore::client::{TestBlockChainClient, EachBlockWith, ChainNotify}; use ethcore::client::{TestBlockChainClient, EachBlockWith, ChainNotify, ChainRoute, ChainRouteType};
use parity_reactor::EventLoop; use parity_reactor::EventLoop;
const DURATION_ZERO: Duration = Duration::from_millis(0); const DURATION_ZERO: Duration = Duration::from_millis(0);
@ -57,13 +57,13 @@ fn should_subscribe_to_new_heads() {
assert_eq!(io.handle_request_sync(request, metadata.clone()), Some(response.to_owned())); assert_eq!(io.handle_request_sync(request, metadata.clone()), Some(response.to_owned()));
// Check notifications // Check notifications
handler.new_blocks(vec![], vec![], vec![h1], vec![], vec![], vec![], DURATION_ZERO); handler.new_blocks(vec![], vec![], ChainRoute::new(vec![(h1, ChainRouteType::Enacted)]), vec![], vec![], DURATION_ZERO);
let (res, receiver) = receiver.into_future().wait().unwrap(); let (res, receiver) = receiver.into_future().wait().unwrap();
let response = r#"{"jsonrpc":"2.0","method":"eth_subscription","params":{"result":{"author":"0x0000000000000000000000000000000000000000","difficulty":"0x1","extraData":"0x","gasLimit":"0xf4240","gasUsed":"0x0","hash":"0x3457d2fa2e3dd33c78ac681cf542e429becf718859053448748383af67e23218","logsBloom":"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000","miner":"0x0000000000000000000000000000000000000000","number":"0x1","parentHash":"0x0cd786a2425d16f152c658316c423e6ce1181e15c3295826d7c9904cba9ce303","receiptsRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421","sealFields":[],"sha3Uncles":"0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347","size":"0x1c9","stateRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421","timestamp":"0x0","transactionsRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421"},"subscription":"0x416d77337e24399d"}}"#; let response = r#"{"jsonrpc":"2.0","method":"eth_subscription","params":{"result":{"author":"0x0000000000000000000000000000000000000000","difficulty":"0x1","extraData":"0x","gasLimit":"0xf4240","gasUsed":"0x0","hash":"0x3457d2fa2e3dd33c78ac681cf542e429becf718859053448748383af67e23218","logsBloom":"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000","miner":"0x0000000000000000000000000000000000000000","number":"0x1","parentHash":"0x0cd786a2425d16f152c658316c423e6ce1181e15c3295826d7c9904cba9ce303","receiptsRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421","sealFields":[],"sha3Uncles":"0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347","size":"0x1c9","stateRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421","timestamp":"0x0","transactionsRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421"},"subscription":"0x416d77337e24399d"}}"#;
assert_eq!(res, Some(response.into())); assert_eq!(res, Some(response.into()));
// Notify about two blocks // Notify about two blocks
handler.new_blocks(vec![], vec![], vec![h2, h3], vec![], vec![], vec![], DURATION_ZERO); handler.new_blocks(vec![], vec![], ChainRoute::new(vec![(h2, ChainRouteType::Enacted), (h3, ChainRouteType::Enacted)]), vec![], vec![], DURATION_ZERO);
// Receive both // Receive both
let (res, receiver) = receiver.into_future().wait().unwrap(); let (res, receiver) = receiver.into_future().wait().unwrap();
@ -129,7 +129,7 @@ fn should_subscribe_to_logs() {
assert_eq!(io.handle_request_sync(request, metadata.clone()), Some(response.to_owned())); assert_eq!(io.handle_request_sync(request, metadata.clone()), Some(response.to_owned()));
// Check notifications (enacted) // Check notifications (enacted)
handler.new_blocks(vec![], vec![], vec![h1], vec![], vec![], vec![], DURATION_ZERO); handler.new_blocks(vec![], vec![], ChainRoute::new(vec![(h1, ChainRouteType::Enacted)]), vec![], vec![], DURATION_ZERO);
let (res, receiver) = receiver.into_future().wait().unwrap(); let (res, receiver) = receiver.into_future().wait().unwrap();
let response = r#"{"jsonrpc":"2.0","method":"eth_subscription","params":{"result":{"address":"0x0000000000000000000000000000000000000005","blockHash":"0x3457d2fa2e3dd33c78ac681cf542e429becf718859053448748383af67e23218","blockNumber":"0x1","data":"0x","logIndex":"0x0","topics":["0x0000000000000000000000000000000000000000000000000000000000000001","0x0000000000000000000000000000000000000000000000000000000000000002","0x0000000000000000000000000000000000000000000000000000000000000000","0x0000000000000000000000000000000000000000000000000000000000000000"],"transactionHash":""#.to_owned() let response = r#"{"jsonrpc":"2.0","method":"eth_subscription","params":{"result":{"address":"0x0000000000000000000000000000000000000005","blockHash":"0x3457d2fa2e3dd33c78ac681cf542e429becf718859053448748383af67e23218","blockNumber":"0x1","data":"0x","logIndex":"0x0","topics":["0x0000000000000000000000000000000000000000000000000000000000000001","0x0000000000000000000000000000000000000000000000000000000000000002","0x0000000000000000000000000000000000000000000000000000000000000000","0x0000000000000000000000000000000000000000000000000000000000000000"],"transactionHash":""#.to_owned()
+ &format!("0x{:x}", tx_hash) + &format!("0x{:x}", tx_hash)
@ -137,7 +137,7 @@ fn should_subscribe_to_logs() {
assert_eq!(res, Some(response.into())); assert_eq!(res, Some(response.into()));
// Check notifications (retracted) // Check notifications (retracted)
handler.new_blocks(vec![], vec![], vec![], vec![h1], vec![], vec![], DURATION_ZERO); handler.new_blocks(vec![], vec![], ChainRoute::new(vec![(h1, ChainRouteType::Retracted)]), vec![], vec![], DURATION_ZERO);
let (res, receiver) = receiver.into_future().wait().unwrap(); let (res, receiver) = receiver.into_future().wait().unwrap();
let response = r#"{"jsonrpc":"2.0","method":"eth_subscription","params":{"result":{"address":"0x0000000000000000000000000000000000000005","blockHash":"0x3457d2fa2e3dd33c78ac681cf542e429becf718859053448748383af67e23218","blockNumber":"0x1","data":"0x","logIndex":"0x0","topics":["0x0000000000000000000000000000000000000000000000000000000000000001","0x0000000000000000000000000000000000000000000000000000000000000002","0x0000000000000000000000000000000000000000000000000000000000000000","0x0000000000000000000000000000000000000000000000000000000000000000"],"transactionHash":""#.to_owned() let response = r#"{"jsonrpc":"2.0","method":"eth_subscription","params":{"result":{"address":"0x0000000000000000000000000000000000000005","blockHash":"0x3457d2fa2e3dd33c78ac681cf542e429becf718859053448748383af67e23218","blockNumber":"0x1","data":"0x","logIndex":"0x0","topics":["0x0000000000000000000000000000000000000000000000000000000000000001","0x0000000000000000000000000000000000000000000000000000000000000002","0x0000000000000000000000000000000000000000000000000000000000000000","0x0000000000000000000000000000000000000000000000000000000000000000"],"transactionHash":""#.to_owned()
+ &format!("0x{:x}", tx_hash) + &format!("0x{:x}", tx_hash)

View File

@ -18,7 +18,7 @@ use std::sync::Arc;
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::time::Duration; use std::time::Duration;
use parking_lot::{Mutex, RwLock}; use parking_lot::{Mutex, RwLock};
use ethcore::client::{BlockId, ChainNotify, CallContract, RegistryInfo}; use ethcore::client::{BlockId, ChainNotify, ChainRoute, CallContract, RegistryInfo};
use ethereum_types::{H256, Address}; use ethereum_types::{H256, Address};
use bytes::Bytes; use bytes::Bytes;
use trusted_client::TrustedClient; use trusted_client::TrustedClient;
@ -76,8 +76,8 @@ impl AclStorage for OnChainAclStorage {
} }
impl ChainNotify for OnChainAclStorage { impl ChainNotify for OnChainAclStorage {
fn new_blocks(&self, _imported: Vec<H256>, _invalid: Vec<H256>, enacted: Vec<H256>, retracted: Vec<H256>, _sealed: Vec<H256>, _proposed: Vec<Bytes>, _duration: Duration) { fn new_blocks(&self, _imported: Vec<H256>, _invalid: Vec<H256>, route: ChainRoute, _sealed: Vec<H256>, _proposed: Vec<Bytes>, _duration: Duration) {
if !enacted.is_empty() || !retracted.is_empty() { if !route.enacted().is_empty() || !route.retracted().is_empty() {
self.contract.lock().update() self.contract.lock().update()
} }
} }

View File

@ -19,7 +19,7 @@ use std::net::SocketAddr;
use std::collections::{BTreeMap, HashSet}; use std::collections::{BTreeMap, HashSet};
use std::time::Duration; use std::time::Duration;
use parking_lot::Mutex; use parking_lot::Mutex;
use ethcore::client::{Client, BlockChainClient, BlockId, ChainNotify, CallContract, RegistryInfo}; use ethcore::client::{Client, BlockChainClient, BlockId, ChainNotify, ChainRoute, CallContract, RegistryInfo};
use ethcore::filter::Filter; use ethcore::filter::Filter;
use ethkey::public_to_address; use ethkey::public_to_address;
use hash::keccak; use hash::keccak;
@ -163,7 +163,9 @@ impl KeyServerSet for OnChainKeyServerSet {
} }
impl ChainNotify for OnChainKeyServerSet { impl ChainNotify for OnChainKeyServerSet {
fn new_blocks(&self, _imported: Vec<H256>, _invalid: Vec<H256>, enacted: Vec<H256>, retracted: Vec<H256>, _sealed: Vec<H256>, _proposed: Vec<Bytes>, _duration: Duration) { fn new_blocks(&self, _imported: Vec<H256>, _invalid: Vec<H256>, route: ChainRoute, _sealed: Vec<H256>, _proposed: Vec<Bytes>, _duration: Duration) {
let (enacted, retracted) = route.into_enacted_retracted();
if !enacted.is_empty() || !retracted.is_empty() { if !enacted.is_empty() || !retracted.is_empty() {
self.contract.lock().update(enacted, retracted) self.contract.lock().update(enacted, retracted)
} }

View File

@ -20,7 +20,7 @@ use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration; use std::time::Duration;
use std::thread; use std::thread;
use parking_lot::Mutex; use parking_lot::Mutex;
use ethcore::client::ChainNotify; use ethcore::client::{ChainNotify, ChainRoute};
use ethkey::{Public, public_to_address}; use ethkey::{Public, public_to_address};
use bytes::Bytes; use bytes::Bytes;
use ethereum_types::{H256, U256, Address}; use ethereum_types::{H256, U256, Address};
@ -428,8 +428,8 @@ impl Drop for ServiceContractListener {
} }
impl ChainNotify for ServiceContractListener { impl ChainNotify for ServiceContractListener {
fn new_blocks(&self, _imported: Vec<H256>, _invalid: Vec<H256>, enacted: Vec<H256>, _retracted: Vec<H256>, _sealed: Vec<H256>, _proposed: Vec<Bytes>, _duration: Duration) { fn new_blocks(&self, _imported: Vec<H256>, _invalid: Vec<H256>, route: ChainRoute, _sealed: Vec<H256>, _proposed: Vec<Bytes>, _duration: Duration) {
let enacted_len = enacted.len(); let enacted_len = route.enacted().len();
if enacted_len == 0 { if enacted_len == 0 {
return; return;
} }

View File

@ -28,7 +28,7 @@ use target_info::Target;
use bytes::Bytes; use bytes::Bytes;
use ethcore::BlockNumber; use ethcore::BlockNumber;
use ethcore::filter::Filter; use ethcore::filter::Filter;
use ethcore::client::{BlockId, BlockChainClient, ChainNotify}; use ethcore::client::{BlockId, BlockChainClient, ChainNotify, ChainRoute};
use ethereum_types::H256; use ethereum_types::H256;
use sync::{SyncProvider}; use sync::{SyncProvider};
use hash_fetch::{self as fetch, HashFetch}; use hash_fetch::{self as fetch, HashFetch};
@ -660,7 +660,7 @@ impl<O: OperationsClient, F: HashFetch, T: TimeProvider, R: GenRange> Updater<O,
} }
impl ChainNotify for Updater { impl ChainNotify for Updater {
fn new_blocks(&self, _imported: Vec<H256>, _invalid: Vec<H256>, _enacted: Vec<H256>, _retracted: Vec<H256>, _sealed: Vec<H256>, _proposed: Vec<Bytes>, _duration: Duration) { fn new_blocks(&self, _imported: Vec<H256>, _invalid: Vec<H256>, _route: ChainRoute, _sealed: Vec<H256>, _proposed: Vec<Bytes>, _duration: Duration) {
match (self.client.upgrade(), self.sync.as_ref().and_then(Weak::upgrade)) { match (self.client.upgrade(), self.sync.as_ref().and_then(Weak::upgrade)) {
(Some(ref c), Some(ref s)) if !s.status().is_syncing(c.queue_info()) => self.poll(), (Some(ref c), Some(ref s)) if !s.status().is_syncing(c.queue_info()) => self.poll(),
_ => {}, _ => {},