Keep all enacted blocks notify in order (#8524)
* Keep all enacted blocks notify in order * Collect is unnecessary * Update ChainNotify to use ChainRouteType * Fix all ethcore fn defs * Wrap the type within ChainRoute * Fix private-tx and sync api * Fix secret_store API * Fix updater API * Fix rpc api * Fix informant api * Eagerly cache enacted/retracted and remove contain_enacted/retracted * Fix indent * tests: should use full expr form for struct constructor * Use into_enacted_retracted to further avoid copy * typo: not a function * rpc/tests: ChainRoute -> ChainRoute::new
This commit is contained in:
@@ -20,7 +20,7 @@ use ethereum_types::H256;
|
||||
use blockchain::block_info::{BlockInfo, BlockLocation};
|
||||
|
||||
/// Import route for newly inserted block.
|
||||
#[derive(Debug, PartialEq)]
|
||||
#[derive(Debug, PartialEq, Clone)]
|
||||
pub struct ImportRoute {
|
||||
/// Blocks that were invalidated by new block.
|
||||
pub retracted: Vec<H256>,
|
||||
|
||||
@@ -17,7 +17,9 @@
|
||||
use bytes::Bytes;
|
||||
use ethereum_types::H256;
|
||||
use transaction::UnverifiedTransaction;
|
||||
use blockchain::ImportRoute;
|
||||
use std::time::Duration;
|
||||
use std::collections::HashMap;
|
||||
|
||||
/// Messages to broadcast via chain
|
||||
pub enum ChainMessageType {
|
||||
@@ -29,6 +31,89 @@ pub enum ChainMessageType {
|
||||
SignedPrivateTransaction(Vec<u8>),
|
||||
}
|
||||
|
||||
/// Route type to indicate whether it is enacted or retracted.
|
||||
#[derive(Clone)]
|
||||
pub enum ChainRouteType {
|
||||
/// Enacted block
|
||||
Enacted,
|
||||
/// Retracted block
|
||||
Retracted
|
||||
}
|
||||
|
||||
/// A complete chain enacted retracted route.
|
||||
#[derive(Default, Clone)]
|
||||
pub struct ChainRoute {
|
||||
route: Vec<(H256, ChainRouteType)>,
|
||||
enacted: Vec<H256>,
|
||||
retracted: Vec<H256>,
|
||||
}
|
||||
|
||||
impl<'a> From<&'a [ImportRoute]> for ChainRoute {
|
||||
fn from(import_results: &'a [ImportRoute]) -> ChainRoute {
|
||||
ChainRoute::new(import_results.iter().flat_map(|route| {
|
||||
route.retracted.iter().map(|h| (*h, ChainRouteType::Retracted))
|
||||
.chain(route.enacted.iter().map(|h| (*h, ChainRouteType::Enacted)))
|
||||
}).collect())
|
||||
}
|
||||
}
|
||||
|
||||
impl ChainRoute {
|
||||
/// Create a new ChainRoute based on block hash and route type pairs.
|
||||
pub fn new(route: Vec<(H256, ChainRouteType)>) -> Self {
|
||||
let (enacted, retracted) = Self::to_enacted_retracted(&route);
|
||||
|
||||
Self { route, enacted, retracted }
|
||||
}
|
||||
|
||||
/// Gather all non-duplicate enacted and retracted blocks.
|
||||
fn to_enacted_retracted(route: &[(H256, ChainRouteType)]) -> (Vec<H256>, Vec<H256>) {
|
||||
fn map_to_vec(map: Vec<(H256, bool)>) -> Vec<H256> {
|
||||
map.into_iter().map(|(k, _v)| k).collect()
|
||||
}
|
||||
|
||||
// Because we are doing multiple inserts some of the blocks that were enacted in import `k`
|
||||
// could be retracted in import `k+1`. This is why to understand if after all inserts
|
||||
// the block is enacted or retracted we iterate over all routes and at the end final state
|
||||
// will be in the hashmap
|
||||
let map = route.iter().fold(HashMap::new(), |mut map, route| {
|
||||
match &route.1 {
|
||||
&ChainRouteType::Enacted => {
|
||||
map.insert(route.0, true);
|
||||
},
|
||||
&ChainRouteType::Retracted => {
|
||||
map.insert(route.0, false);
|
||||
},
|
||||
}
|
||||
map
|
||||
});
|
||||
|
||||
// Split to enacted retracted (using hashmap value)
|
||||
let (enacted, retracted) = map.into_iter().partition(|&(_k, v)| v);
|
||||
// And convert tuples to keys
|
||||
(map_to_vec(enacted), map_to_vec(retracted))
|
||||
}
|
||||
|
||||
/// Consume route and return the enacted retracted form.
|
||||
pub fn into_enacted_retracted(self) -> (Vec<H256>, Vec<H256>) {
|
||||
(self.enacted, self.retracted)
|
||||
}
|
||||
|
||||
/// All non-duplicate enacted blocks.
|
||||
pub fn enacted(&self) -> &[H256] {
|
||||
&self.enacted
|
||||
}
|
||||
|
||||
/// All non-duplicate retracted blocks.
|
||||
pub fn retracted(&self) -> &[H256] {
|
||||
&self.retracted
|
||||
}
|
||||
|
||||
/// All blocks in the route.
|
||||
pub fn route(&self) -> &[(H256, ChainRouteType)] {
|
||||
&self.route
|
||||
}
|
||||
}
|
||||
|
||||
/// Represents what has to be handled by actor listening to chain events
|
||||
pub trait ChainNotify : Send + Sync {
|
||||
/// fires when chain has new blocks.
|
||||
@@ -36,8 +121,7 @@ pub trait ChainNotify : Send + Sync {
|
||||
&self,
|
||||
_imported: Vec<H256>,
|
||||
_invalid: Vec<H256>,
|
||||
_enacted: Vec<H256>,
|
||||
_retracted: Vec<H256>,
|
||||
_route: ChainRoute,
|
||||
_sealed: Vec<H256>,
|
||||
// Block bytes.
|
||||
_proposed: Vec<Bytes>,
|
||||
|
||||
@@ -14,7 +14,7 @@
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use std::collections::{HashSet, HashMap, BTreeMap, BTreeSet, VecDeque};
|
||||
use std::collections::{HashSet, BTreeMap, BTreeSet, VecDeque};
|
||||
use std::fmt;
|
||||
use std::str::FromStr;
|
||||
use std::sync::{Arc, Weak};
|
||||
@@ -46,8 +46,8 @@ use client::{
|
||||
use client::{
|
||||
BlockId, TransactionId, UncleId, TraceId, ClientConfig, BlockChainClient,
|
||||
TraceFilter, CallAnalytics, BlockImportError, Mode,
|
||||
ChainNotify, PruningInfo, ProvingBlockChainClient, EngineInfo, ChainMessageType,
|
||||
IoClient,
|
||||
ChainMessageType, ChainNotify, ChainRoute, PruningInfo, ProvingBlockChainClient,
|
||||
EngineInfo, IoClient,
|
||||
};
|
||||
use encoded;
|
||||
use engines::{EthEngine, EpochTransition};
|
||||
@@ -257,32 +257,6 @@ impl Importer {
|
||||
})
|
||||
}
|
||||
|
||||
fn calculate_enacted_retracted(&self, import_results: &[ImportRoute]) -> (Vec<H256>, Vec<H256>) {
|
||||
fn map_to_vec(map: Vec<(H256, bool)>) -> Vec<H256> {
|
||||
map.into_iter().map(|(k, _v)| k).collect()
|
||||
}
|
||||
|
||||
// In ImportRoute we get all the blocks that have been enacted and retracted by single insert.
|
||||
// Because we are doing multiple inserts some of the blocks that were enacted in import `k`
|
||||
// could be retracted in import `k+1`. This is why to understand if after all inserts
|
||||
// the block is enacted or retracted we iterate over all routes and at the end final state
|
||||
// will be in the hashmap
|
||||
let map = import_results.iter().fold(HashMap::new(), |mut map, route| {
|
||||
for hash in &route.enacted {
|
||||
map.insert(hash.clone(), true);
|
||||
}
|
||||
for hash in &route.retracted {
|
||||
map.insert(hash.clone(), false);
|
||||
}
|
||||
map
|
||||
});
|
||||
|
||||
// Split to enacted retracted (using hashmap value)
|
||||
let (enacted, retracted) = map.into_iter().partition(|&(_k, v)| v);
|
||||
// And convert tuples to keys
|
||||
(map_to_vec(enacted), map_to_vec(retracted))
|
||||
}
|
||||
|
||||
/// This is triggered by a message coming from a block queue when the block is ready for insertion
|
||||
pub fn import_verified_blocks(&self, client: &Client) -> usize {
|
||||
|
||||
@@ -348,18 +322,17 @@ impl Importer {
|
||||
|
||||
{
|
||||
if !imported_blocks.is_empty() && is_empty {
|
||||
let (enacted, retracted) = self.calculate_enacted_retracted(&import_results);
|
||||
let route = ChainRoute::from(import_results.as_ref());
|
||||
|
||||
if is_empty {
|
||||
self.miner.chain_new_blocks(client, &imported_blocks, &invalid_blocks, &enacted, &retracted, false);
|
||||
self.miner.chain_new_blocks(client, &imported_blocks, &invalid_blocks, route.enacted(), route.retracted(), false);
|
||||
}
|
||||
|
||||
client.notify(|notify| {
|
||||
notify.new_blocks(
|
||||
imported_blocks.clone(),
|
||||
invalid_blocks.clone(),
|
||||
enacted.clone(),
|
||||
retracted.clone(),
|
||||
route.clone(),
|
||||
Vec::new(),
|
||||
proposed_blocks.clone(),
|
||||
duration,
|
||||
@@ -2167,14 +2140,13 @@ impl ImportSealedBlock for Client {
|
||||
self.state_db.write().sync_cache(&route.enacted, &route.retracted, false);
|
||||
route
|
||||
};
|
||||
let (enacted, retracted) = self.importer.calculate_enacted_retracted(&[route]);
|
||||
self.importer.miner.chain_new_blocks(self, &[h.clone()], &[], &enacted, &retracted, self.engine.seals_internally().is_some());
|
||||
let route = ChainRoute::from([route].as_ref());
|
||||
self.importer.miner.chain_new_blocks(self, &[h.clone()], &[], route.enacted(), route.retracted(), true);
|
||||
self.notify(|notify| {
|
||||
notify.new_blocks(
|
||||
vec![h.clone()],
|
||||
vec![],
|
||||
enacted.clone(),
|
||||
retracted.clone(),
|
||||
route.clone(),
|
||||
vec![h.clone()],
|
||||
vec![],
|
||||
start.elapsed(),
|
||||
@@ -2192,8 +2164,7 @@ impl BroadcastProposalBlock for Client {
|
||||
notify.new_blocks(
|
||||
vec![],
|
||||
vec![],
|
||||
vec![],
|
||||
vec![],
|
||||
ChainRoute::default(),
|
||||
vec![],
|
||||
vec![block.rlp_bytes()],
|
||||
DURATION_ZERO,
|
||||
|
||||
@@ -31,7 +31,7 @@ pub use self::error::Error;
|
||||
pub use self::evm_test_client::{EvmTestClient, EvmTestError, TransactResult};
|
||||
pub use self::io_message::ClientIoMessage;
|
||||
pub use self::test_client::{TestBlockChainClient, EachBlockWith};
|
||||
pub use self::chain_notify::{ChainNotify, ChainMessageType};
|
||||
pub use self::chain_notify::{ChainNotify, ChainRoute, ChainRouteType, ChainMessageType};
|
||||
pub use self::traits::{
|
||||
Nonce, Balance, ChainInfo, BlockInfo, ReopenBlock, PrepareOpenBlock, CallContract, TransactionInfo, RegistryInfo, ScheduleInfo, ImportSealedBlock, BroadcastProposalBlock, ImportBlock,
|
||||
StateOrBlock, StateClient, Call, EngineInfo, AccountData, BlockChain, BlockProducer, SealedBlockImporter
|
||||
|
||||
@@ -17,7 +17,7 @@
|
||||
//! Watcher for snapshot-related chain events.
|
||||
|
||||
use parking_lot::Mutex;
|
||||
use client::{BlockInfo, Client, ChainNotify, ClientIoMessage};
|
||||
use client::{BlockInfo, Client, ChainNotify, ChainRoute, ClientIoMessage};
|
||||
use ids::BlockId;
|
||||
|
||||
use io::IoChannel;
|
||||
@@ -103,8 +103,7 @@ impl ChainNotify for Watcher {
|
||||
&self,
|
||||
imported: Vec<H256>,
|
||||
_: Vec<H256>,
|
||||
_: Vec<H256>,
|
||||
_: Vec<H256>,
|
||||
_: ChainRoute,
|
||||
_: Vec<H256>,
|
||||
_: Vec<Bytes>,
|
||||
_duration: Duration)
|
||||
@@ -131,7 +130,7 @@ impl ChainNotify for Watcher {
|
||||
mod tests {
|
||||
use super::{Broadcast, Oracle, Watcher};
|
||||
|
||||
use client::ChainNotify;
|
||||
use client::{ChainNotify, ChainRoute};
|
||||
|
||||
use ethereum_types::{H256, U256};
|
||||
|
||||
@@ -174,8 +173,7 @@ mod tests {
|
||||
watcher.new_blocks(
|
||||
hashes,
|
||||
vec![],
|
||||
vec![],
|
||||
vec![],
|
||||
ChainRoute::default(),
|
||||
vec![],
|
||||
vec![],
|
||||
DURATION_ZERO,
|
||||
|
||||
Reference in New Issue
Block a user