From 10e1787ad1aa589912991c90d65f356a097d98a7 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Thu, 4 Apr 2019 10:59:07 +0200 Subject: [PATCH] fix(light cull): poll light cull instead of timer (#10559) * fix(light cull): poll light cull instead of timer * fix(grumbles): remove error + updated docs * fix(on-demand request): `expect()` reason * docs(remove misleading info) --- Cargo.lock | 2 +- parity/light_helpers/mod.rs | 2 - parity/light_helpers/queue_cull.rs | 105 ----------------------------- parity/run.rs | 11 --- rpc/src/v1/helpers/light_fetch.rs | 44 +++++++++++- rpc/src/v1/impls/light/eth.rs | 15 +++-- 6 files changed, 54 insertions(+), 125 deletions(-) delete mode 100644 parity/light_helpers/queue_cull.rs diff --git a/Cargo.lock b/Cargo.lock index 6dff31936..a56d1b663 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1152,8 +1152,8 @@ dependencies = [ "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "macros 0.1.0", "parity-bytes 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", - "parking_lot 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)", "parity-runtime 0.1.0", + "parking_lot 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)", "rlp 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "rustc-hex 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/parity/light_helpers/mod.rs b/parity/light_helpers/mod.rs index 9a9bbf2cd..843dd419d 100644 --- a/parity/light_helpers/mod.rs +++ b/parity/light_helpers/mod.rs @@ -17,7 +17,5 @@ //! Utilities and helpers for the light client. mod epoch_fetch; -mod queue_cull; pub use self::epoch_fetch::EpochFetch; -pub use self::queue_cull::QueueCull; diff --git a/parity/light_helpers/queue_cull.rs b/parity/light_helpers/queue_cull.rs deleted file mode 100644 index 693d8f93c..000000000 --- a/parity/light_helpers/queue_cull.rs +++ /dev/null @@ -1,105 +0,0 @@ -// Copyright 2015-2019 Parity Technologies (UK) Ltd. -// This file is part of Parity Ethereum. - -// Parity Ethereum is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Parity Ethereum is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Parity Ethereum. If not, see . - -//! Service for culling the light client's transaction queue. - -use std::sync::Arc; -use std::time::Duration; - -use ethcore::client::ClientIoMessage; -use sync::{LightSync, LightNetworkDispatcher}; -use io::{IoContext, IoHandler, TimerToken}; - -use light::client::LightChainClient; -use light::on_demand::{request, OnDemand, OnDemandRequester}; -use light::TransactionQueue; - -use futures::{future, Future}; - -use parity_runtime::Executor; - -use parking_lot::RwLock; - -// Attepmt to cull once every 10 minutes. -const TOKEN: TimerToken = 1; -const TIMEOUT: Duration = Duration::from_secs(60 * 10); - -// But make each attempt last only 9 minutes -const PURGE_TIMEOUT: Duration = Duration::from_secs(60 * 9); - -/// Periodically culls the transaction queue of mined transactions. -pub struct QueueCull { - /// A handle to the client, for getting the latest block header. - pub client: Arc, - /// A handle to the sync service. - pub sync: Arc, - /// The on-demand request service. - pub on_demand: Arc, - /// The transaction queue. - pub txq: Arc>, - /// Event loop executor. - pub executor: Executor, -} - -impl IoHandler for QueueCull { - fn initialize(&self, io: &IoContext) { - io.register_timer(TOKEN, TIMEOUT).expect("Error registering timer"); - } - - fn timeout(&self, _io: &IoContext, timer: TimerToken) { - if timer != TOKEN { return } - - let senders = self.txq.read().queued_senders(); - if senders.is_empty() { return } - - let (sync, on_demand, txq) = (self.sync.clone(), self.on_demand.clone(), self.txq.clone()); - let best_header = self.client.best_block_header(); - let start_nonce = self.client.engine().account_start_nonce(best_header.number()); - - info!(target: "cull", "Attempting to cull queued transactions from {} senders.", senders.len()); - self.executor.spawn_with_timeout(move || { - let maybe_fetching = sync.with_context(move |ctx| { - // fetch the nonce of each sender in the queue. - let nonce_reqs = senders.iter() - .map(|&address| request::Account { header: best_header.clone().into(), address: address }) - .collect::>(); - - // when they come in, update each sender to the new nonce. - on_demand.request(ctx, nonce_reqs) - .expect("No back-references; therefore all back-references are valid; qed") - .map(move |accs| { - let txq = txq.write(); - let _ = accs.into_iter() - .map(|maybe_acc| maybe_acc.map_or(start_nonce, |acc| acc.nonce)) - .zip(senders) - .fold(txq, |mut txq, (nonce, addr)| { - txq.cull(addr, nonce); - txq - }); - }) - .map_err(|_| debug!(target: "cull", "OnDemand prematurely closed channel.")) - }); - - match maybe_fetching { - Some(fut) => future::Either::A(fut), - None => { - debug!(target: "cull", "Unable to acquire network context; qed"); - future::Either::B(future::ok(())) - }, - } - }, PURGE_TIMEOUT, || {}) - } -} diff --git a/parity/run.rs b/parity/run.rs index 8160c930d..b6f8c53a0 100644 --- a/parity/run.rs +++ b/parity/run.rs @@ -295,17 +295,6 @@ fn execute_light_impl(cmd: RunCmd, logger: Arc, on_client_rq // spin up event loop let runtime = Runtime::with_default_thread_count(); - // queue cull service. - let queue_cull = Arc::new(::light_helpers::QueueCull { - client: client.clone(), - sync: light_sync.clone(), - on_demand: on_demand.clone(), - txq: txq.clone(), - executor: runtime.executor(), - }); - - service.register_handler(queue_cull).map_err(|e| format!("Error attaching service: {:?}", e))?; - // start the network. light_sync.start_network(); diff --git a/rpc/src/v1/helpers/light_fetch.rs b/rpc/src/v1/helpers/light_fetch.rs index 540772a5d..92b7f2ac0 100644 --- a/rpc/src/v1/helpers/light_fetch.rs +++ b/rpc/src/v1/helpers/light_fetch.rs @@ -16,8 +16,9 @@ //! Helpers for fetching blockchain data either from the light client or the network. -use std::cmp; use std::clone::Clone; +use std::cmp; +use std::collections::BTreeMap; use std::sync::Arc; use types::basic_account::BasicAccount; @@ -48,7 +49,6 @@ use ethereum_types::{Address, U256}; use hash::H256; use parking_lot::{Mutex, RwLock}; use fastmap::H256FastMap; -use std::collections::BTreeMap; use types::transaction::{Action, Transaction as EthTransaction, PendingTransaction, SignedTransaction, LocalizedTransaction}; use v1::helpers::{CallRequest as CallRequestHelper, errors, dispatch}; @@ -522,6 +522,46 @@ where })) } + /// Helper to cull the `light` transaction queue of mined transactions + pub fn light_cull(&self, txq: Arc>) -> impl Future + Send { + let senders = txq.read().queued_senders(); + if senders.is_empty() { + return Either::B(future::err(errors::internal("No pending local transactions", ""))); + } + + let sync = self.sync.clone(); + let on_demand = self.on_demand.clone(); + let best_header = self.client.best_block_header(); + let start_nonce = self.client.engine().account_start_nonce(best_header.number()); + + let account_request = sync.with_context(move |ctx| { + // fetch the nonce of each sender in the queue. + let nonce_reqs = senders.iter() + .map(|&address| request::Account { header: best_header.clone().into(), address }) + .collect::>(); + + // when they come in, update each sender to the new nonce. + on_demand.request(ctx, nonce_reqs) + .expect(NO_INVALID_BACK_REFS_PROOF) + .map(move |accs| { + let mut txq = txq.write(); + accs.into_iter() + .map(|maybe_acc| maybe_acc.map_or(start_nonce, |acc| acc.nonce)) + .zip(senders) + .for_each(|(nonce, addr)| { + txq.cull(addr, nonce); + }); + }) + .map_err(errors::on_demand_error) + }); + + if let Some(fut) = account_request { + Either::A(fut) + } else { + Either::B(future::err(errors::network_disabled())) + } + } + fn send_requests(&self, reqs: Vec, parse_response: F) -> impl Future + Send where F: FnOnce(Vec) -> T + Send + 'static, T: Send + 'static, diff --git a/rpc/src/v1/impls/light/eth.rs b/rpc/src/v1/impls/light/eth.rs index 73e2b99c6..6467bfbc7 100644 --- a/rpc/src/v1/impls/light/eth.rs +++ b/rpc/src/v1/impls/light/eth.rs @@ -420,15 +420,22 @@ where } fn transaction_by_hash(&self, hash: H256) -> BoxFuture> { - { - let tx_queue = self.transaction_queue.read(); - if let Some(tx) = tx_queue.get(&hash) { + let in_txqueue = self.transaction_queue.read().get(&hash).is_some(); + + // The transaction is in the `local txqueue` then fetch the latest state from the network and attempt + // to cull the transaction queue. + if in_txqueue { + // Note, this will block (relies on HTTP timeout) to make sure `cull` will finish to avoid having to call + // `eth_getTransactionByHash` more than once to ensure the `txqueue` is up to `date` when it is called + if let Err(e) = self.fetcher().light_cull(self.transaction_queue.clone()).wait() { + debug!(target: "cull", "failed because of: {:?}", e); + } + if let Some(tx) = self.transaction_queue.read().get(&hash) { return Box::new(future::ok(Some(Transaction::from_pending( tx.clone(), )))); } } - Box::new(self.fetcher().transaction_by_hash(hash).map(|x| x.map(|(tx, _)| tx))) }