From a78068cbe966f729b85c1ad0cdedac6ec516a912 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Thu, 23 Mar 2017 22:20:00 +0100 Subject: [PATCH] queue culling and informant --- Cargo.lock | 1 + Cargo.toml | 1 + ethcore/light/src/client/service.rs | 9 ++- parity/light_helpers/mod.rs | 21 ++++++ parity/light_helpers/queue_cull.rs | 99 +++++++++++++++++++++++++++++ parity/main.rs | 2 + parity/run.rs | 25 +++++++- 7 files changed, 154 insertions(+), 4 deletions(-) create mode 100644 parity/light_helpers/mod.rs create mode 100644 parity/light_helpers/queue_cull.rs diff --git a/Cargo.lock b/Cargo.lock index d572dcf79..7e0302e14 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -27,6 +27,7 @@ dependencies = [ "ethsync 1.7.0", "evmbin 0.1.0", "fdlimit 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", "hyper 0.10.0-a.0 (git+https://github.com/paritytech/hyper)", "isatty 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "jsonrpc-core 7.0.0 (git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7)", diff --git a/Cargo.toml b/Cargo.toml index 8420c5459..66c8674df 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,7 @@ toml = "0.2" serde = "0.9" serde_json = "0.9" app_dirs = "1.1.1" +futures = "0.1" fdlimit = "0.1" ws2_32-sys = "0.2" hyper = { default-features = false, git = "https://github.com/paritytech/hyper" } diff --git a/ethcore/light/src/client/service.rs b/ethcore/light/src/client/service.rs index 89538fec2..55795d870 100644 --- a/ethcore/light/src/client/service.rs +++ b/ethcore/light/src/client/service.rs @@ -50,7 +50,7 @@ impl fmt::Display for Error { /// Light client service. pub struct Service { client: Arc, - _io_service: IoService, + io_service: IoService, } impl Service { @@ -82,10 +82,15 @@ impl Service { io_service.register_handler(Arc::new(ImportBlocks(client.clone()))).map_err(Error::Io)?; Ok(Service { client: client, - _io_service: io_service, + io_service: io_service, }) } + /// Register an I/O handler on the service. + pub fn register_handler(&self, handler: Arc + Send>) -> Result<(), IoError> { + self.io_service.register_handler(handler) + } + /// Get a handle to the client. pub fn client(&self) -> &Arc { &self.client diff --git a/parity/light_helpers/mod.rs b/parity/light_helpers/mod.rs new file mode 100644 index 000000000..488f970c2 --- /dev/null +++ b/parity/light_helpers/mod.rs @@ -0,0 +1,21 @@ +// Copyright 2015-2017 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Utilities and helpers for the light client. + +mod queue_cull; + +pub use self::queue_cull::QueueCull; diff --git a/parity/light_helpers/queue_cull.rs b/parity/light_helpers/queue_cull.rs new file mode 100644 index 000000000..10865d485 --- /dev/null +++ b/parity/light_helpers/queue_cull.rs @@ -0,0 +1,99 @@ +// Copyright 2015-2017 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Service for culling the light client's transaction queue. + +use std::sync::Arc; +use std::time::Duration; + +use ethcore::service::ClientIoMessage; +use ethsync::LightSync; +use io::{IoContext, IoHandler, TimerToken}; + +use light::client::Client; +use light::on_demand::{request, OnDemand}; +use light::TransactionQueue; + +use futures::{future, stream, Future, Stream}; + +use parity_reactor::Remote; + +use util::RwLock; + +// Attepmt to cull once every 10 minutes. +const TOKEN: TimerToken = 1; +const TIMEOUT_MS: u64 = 1000 * 60 * 10; + +// But make each attempt last only 9 minutes +const PURGE_TIMEOUT_MS: u64 = 1000 * 60 * 9; + +/// Periodically culls the transaction queue of mined transactions. +pub struct QueueCull { + /// A handle to the client, for getting the latest block header. + pub client: Arc, + /// A handle to the sync service. + pub sync: Arc, + /// The on-demand request service. + pub on_demand: Arc, + /// The transaction queue. + pub txq: Arc>, + /// Event loop remote. + pub remote: Remote, +} + +impl IoHandler for QueueCull { + fn initialize(&self, io: &IoContext) { + io.register_timer(TOKEN, TIMEOUT_MS).expect("Error registering timer"); + } + + fn timeout(&self, _io: &IoContext, timer: TimerToken) { + if timer != TOKEN { return } + + let senders = self.txq.read().queued_senders(); + if senders.is_empty() { return } + + let (sync, on_demand, txq) = (self.sync.clone(), self.on_demand.clone(), self.txq.clone()); + let best_header = self.client.best_block_header(); + let start_nonce = self.client.engine().account_start_nonce(); + + info!(target: "cull", "Attempting to cull queued transactions from {} senders.", senders.len()); + self.remote.spawn_with_timeout(move || { + let maybe_fetching = sync.with_context(move |ctx| { + // fetch the nonce of each sender in the queue. + let nonce_futures = senders.iter() + .map(|&address| request::Account { header: best_header.clone(), address: address }) + .map(|request| on_demand.account(ctx, request)) + .map(move |fut| fut.map(move |x| x.map(|acc| acc.nonce).unwrap_or(start_nonce))) + .zip(senders.iter()) + .map(|(fut, &addr)| fut.map(move |nonce| (addr, nonce))); + + // as they come in, update each sender to the new nonce. + stream::futures_unordered(nonce_futures) + .fold(txq, |txq, (address, nonce)| { + txq.write().cull(address, nonce); + future::ok(txq) + }) + .map(|_| ()) // finally, discard the txq handle and log errors. + .map_err(|_| debug!(target: "cull", "OnDemand prematurely closed channel.")) + }); + + match maybe_fetching { + Some(fut) => fut.boxed(), + None => future::ok(()).boxed(), + } + }, Duration::from_millis(PURGE_TIMEOUT_MS), || {}) + } +} diff --git a/parity/main.rs b/parity/main.rs index 2044b3ee0..cde0e6c1f 100644 --- a/parity/main.rs +++ b/parity/main.rs @@ -28,6 +28,7 @@ extern crate ctrlc; extern crate docopt; extern crate env_logger; extern crate fdlimit; +extern crate futures; extern crate hyper; extern crate isatty; extern crate jsonrpc_core; @@ -101,6 +102,7 @@ mod deprecated; mod dir; mod helpers; mod informant; +mod light_helpers; mod migration; mod modules; mod params; diff --git a/parity/run.rs b/parity/run.rs index b3d9bf90b..cf7d5e82c 100644 --- a/parity/run.rs +++ b/parity/run.rs @@ -238,12 +238,24 @@ fn execute_light(cmd: RunCmd, can_restart: bool, logger: Arc) -> }; let light_sync = LightSync::new(sync_params).map_err(|e| format!("Error starting network: {}", e))?; let light_sync = Arc::new(light_sync); - light_sync.start_network(); - // start RPCs. // spin up event loop let event_loop = EventLoop::spawn(); + // queue cull service. + let queue_cull = Arc::new(::light_helpers::QueueCull { + client: service.client().clone(), + sync: light_sync.clone(), + on_demand: on_demand.clone(), + txq: txq.clone(), + remote: event_loop.remote(), + }); + + service.register_handler(queue_cull).map_err(|e| format!("Error attaching service: {:?}", e))?; + + // start the network. + light_sync.start_network(); + // fetch service let fetch = FetchClient::new().map_err(|e| format!("Error starting fetch client: {:?}", e))?; let passwords = passwords_from_files(&cmd.acc_conf.password_files)?; @@ -253,6 +265,7 @@ fn execute_light(cmd: RunCmd, can_restart: bool, logger: Arc) -> let rpc_stats = Arc::new(informant::RpcStats::default()); let signer_path = cmd.signer_conf.signer_path.clone(); + // start RPCs let deps_for_rpc_apis = Arc::new(rpc_apis::LightDependencies { signer_service: Arc::new(rpc_apis::SignerService::new(move || { signer::generate_new_token(signer_path.clone()).map_err(|e| format!("{:?}", e)) @@ -299,6 +312,14 @@ fn execute_light(cmd: RunCmd, can_restart: bool, logger: Arc) -> // TODO: Dapps + // minimal informant thread. Just prints block number every 5 seconds. + // TODO: integrate with informant.rs + let informant_client = service.client().clone(); + ::std::thread::spawn(move || loop { + info!("#{}", informant_client.best_block_header().number()); + ::std::thread::sleep(::std::time::Duration::from_secs(5)); + }); + // wait for ctrl-c. Ok(wait_for_exit(panic_handler, None, None, can_restart)) }