Fix pubsub new_blocks notifications to include all blocks (#9987)

Fix: new blocks notifications sometimes missing in pubsub RPC

Implement new struct to pass to `new_blocks()` with extra parameter - `has_more_blocks_to_import`, which was previously used to determine whether the notification should be sent. Now it's up to each implementation to decide what to do.

Updated all implementations to behave as before, except `eth_pubsub`, which will send notification even when the queue is not empty.

Update tests.
This commit is contained in:
mattrutherford
2018-12-19 09:24:14 +00:00
committed by GitHub
parent 13b832f959
commit 215602de08
14 changed files with 145 additions and 136 deletions

View File

@@ -18,7 +18,6 @@
use std::sync::{Arc, Weak};
use std::collections::BTreeMap;
use std::time::Duration;
use jsonrpc_core::{BoxFuture, Result, Error};
use jsonrpc_core::futures::{self, Future, IntoFuture};
@@ -34,14 +33,13 @@ use v1::types::{pubsub, RichHeader, Log};
use ethcore::encoded;
use ethcore::filter::Filter as EthFilter;
use ethcore::client::{BlockChainClient, ChainNotify, ChainRoute, ChainRouteType, BlockId};
use ethcore::client::{BlockChainClient, ChainNotify, NewBlocks, ChainRouteType, BlockId};
use sync::LightSync;
use light::cache::Cache;
use light::on_demand::OnDemand;
use light::client::{LightChainClient, LightChainNotify};
use parity_runtime::Executor;
use ethereum_types::H256;
use bytes::Bytes;
use parking_lot::{RwLock, Mutex};
type Client = Sink<pubsub::Result>;
@@ -220,18 +218,10 @@ impl<C: LightClient> LightChainNotify for ChainNotificationHandler<C> {
}
impl<C: BlockChainClient> ChainNotify for ChainNotificationHandler<C> {
fn new_blocks(
&self,
_imported: Vec<H256>,
_invalid: Vec<H256>,
route: ChainRoute,
_sealed: Vec<H256>,
// Block bytes.
_proposed: Vec<Bytes>,
_duration: Duration,
) {
fn new_blocks(&self, new_blocks: NewBlocks) {
if self.heads_subscribers.read().is_empty() && self.logs_subscribers.read().is_empty() { return }
const EXTRA_INFO_PROOF: &'static str = "Object exists in in blockchain (fetched earlier), extra_info is always available if object exists; qed";
let headers = route.route()
let headers = new_blocks.route.route()
.iter()
.filter_map(|&(hash, ref typ)| {
match typ {
@@ -249,7 +239,7 @@ impl<C: BlockChainClient> ChainNotify for ChainNotificationHandler<C> {
self.notify_heads(&headers);
// We notify logs enacting and retracting as the order in route.
self.notify_logs(route.route(), |filter, ex| {
self.notify_logs(new_blocks.route.route(), |filter, ex| {
match ex {
&ChainRouteType::Enacted =>
Ok(self.client.logs(filter).unwrap_or_default().into_iter().map(Into::into).collect()),

View File

@@ -24,7 +24,7 @@ use std::time::Duration;
use v1::{EthPubSub, EthPubSubClient, Metadata};
use ethcore::client::{TestBlockChainClient, EachBlockWith, ChainNotify, ChainRoute, ChainRouteType};
use ethcore::client::{TestBlockChainClient, EachBlockWith, ChainNotify, NewBlocks, ChainRoute, ChainRouteType};
use parity_runtime::Runtime;
const DURATION_ZERO: Duration = Duration::from_millis(0);
@@ -57,13 +57,13 @@ fn should_subscribe_to_new_heads() {
assert_eq!(io.handle_request_sync(request, metadata.clone()), Some(response.to_owned()));
// Check notifications
handler.new_blocks(vec![], vec![], ChainRoute::new(vec![(h1, ChainRouteType::Enacted)]), vec![], vec![], DURATION_ZERO);
handler.new_blocks(NewBlocks::new(vec![], vec![], ChainRoute::new(vec![(h1, ChainRouteType::Enacted)]), vec![], vec![], DURATION_ZERO, true));
let (res, receiver) = receiver.into_future().wait().unwrap();
let response = r#"{"jsonrpc":"2.0","method":"eth_subscription","params":{"result":{"author":"0x0000000000000000000000000000000000000000","difficulty":"0x1","extraData":"0x","gasLimit":"0xf4240","gasUsed":"0x0","hash":"0x3457d2fa2e3dd33c78ac681cf542e429becf718859053448748383af67e23218","logsBloom":"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000","miner":"0x0000000000000000000000000000000000000000","number":"0x1","parentHash":"0x0cd786a2425d16f152c658316c423e6ce1181e15c3295826d7c9904cba9ce303","receiptsRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421","sealFields":[],"sha3Uncles":"0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347","size":"0x1c9","stateRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421","timestamp":"0x0","transactionsRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421"},"subscription":"0x416d77337e24399d"}}"#;
assert_eq!(res, Some(response.into()));
// Notify about two blocks
handler.new_blocks(vec![], vec![], ChainRoute::new(vec![(h2, ChainRouteType::Enacted), (h3, ChainRouteType::Enacted)]), vec![], vec![], DURATION_ZERO);
handler.new_blocks(NewBlocks::new(vec![], vec![], ChainRoute::new(vec![(h2, ChainRouteType::Enacted), (h3, ChainRouteType::Enacted)]), vec![], vec![], DURATION_ZERO, true));
// Receive both
let (res, receiver) = receiver.into_future().wait().unwrap();
@@ -129,7 +129,7 @@ fn should_subscribe_to_logs() {
assert_eq!(io.handle_request_sync(request, metadata.clone()), Some(response.to_owned()));
// Check notifications (enacted)
handler.new_blocks(vec![], vec![], ChainRoute::new(vec![(h1, ChainRouteType::Enacted)]), vec![], vec![], DURATION_ZERO);
handler.new_blocks(NewBlocks::new(vec![], vec![], ChainRoute::new(vec![(h1, ChainRouteType::Enacted)]), vec![], vec![], DURATION_ZERO, false));
let (res, receiver) = receiver.into_future().wait().unwrap();
let response = r#"{"jsonrpc":"2.0","method":"eth_subscription","params":{"result":{"address":"0x0000000000000000000000000000000000000005","blockHash":"0x3457d2fa2e3dd33c78ac681cf542e429becf718859053448748383af67e23218","blockNumber":"0x1","data":"0x","logIndex":"0x0","removed":false,"topics":["0x0000000000000000000000000000000000000000000000000000000000000001","0x0000000000000000000000000000000000000000000000000000000000000002","0x0000000000000000000000000000000000000000000000000000000000000000","0x0000000000000000000000000000000000000000000000000000000000000000"],"transactionHash":""#.to_owned()
+ &format!("0x{:x}", tx_hash)
@@ -137,7 +137,7 @@ fn should_subscribe_to_logs() {
assert_eq!(res, Some(response.into()));
// Check notifications (retracted)
handler.new_blocks(vec![], vec![], ChainRoute::new(vec![(h1, ChainRouteType::Retracted)]), vec![], vec![], DURATION_ZERO);
handler.new_blocks(NewBlocks::new(vec![], vec![], ChainRoute::new(vec![(h1, ChainRouteType::Retracted)]), vec![], vec![], DURATION_ZERO, false));
let (res, receiver) = receiver.into_future().wait().unwrap();
let response = r#"{"jsonrpc":"2.0","method":"eth_subscription","params":{"result":{"address":"0x0000000000000000000000000000000000000005","blockHash":"0x3457d2fa2e3dd33c78ac681cf542e429becf718859053448748383af67e23218","blockNumber":"0x1","data":"0x","logIndex":"0x0","removed":true,"topics":["0x0000000000000000000000000000000000000000000000000000000000000001","0x0000000000000000000000000000000000000000000000000000000000000002","0x0000000000000000000000000000000000000000000000000000000000000000","0x0000000000000000000000000000000000000000000000000000000000000000"],"transactionHash":""#.to_owned()
+ &format!("0x{:x}", tx_hash)