queue culling and informant
This commit is contained in:
		
							parent
							
								
									3708b3be63
								
							
						
					
					
						commit
						a78068cbe9
					
				
							
								
								
									
										1
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							
							
						
						
									
										1
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							@ -27,6 +27,7 @@ dependencies = [
 | 
				
			|||||||
 "ethsync 1.7.0",
 | 
					 "ethsync 1.7.0",
 | 
				
			||||||
 "evmbin 0.1.0",
 | 
					 "evmbin 0.1.0",
 | 
				
			||||||
 "fdlimit 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
 | 
					 "fdlimit 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
 | 
				
			||||||
 | 
					 "futures 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)",
 | 
				
			||||||
 "hyper 0.10.0-a.0 (git+https://github.com/paritytech/hyper)",
 | 
					 "hyper 0.10.0-a.0 (git+https://github.com/paritytech/hyper)",
 | 
				
			||||||
 "isatty 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
 | 
					 "isatty 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
 | 
				
			||||||
 "jsonrpc-core 7.0.0 (git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7)",
 | 
					 "jsonrpc-core 7.0.0 (git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7)",
 | 
				
			||||||
 | 
				
			|||||||
@ -23,6 +23,7 @@ toml = "0.2"
 | 
				
			|||||||
serde = "0.9"
 | 
					serde = "0.9"
 | 
				
			||||||
serde_json = "0.9"
 | 
					serde_json = "0.9"
 | 
				
			||||||
app_dirs = "1.1.1"
 | 
					app_dirs = "1.1.1"
 | 
				
			||||||
 | 
					futures = "0.1"
 | 
				
			||||||
fdlimit = "0.1"
 | 
					fdlimit = "0.1"
 | 
				
			||||||
ws2_32-sys = "0.2"
 | 
					ws2_32-sys = "0.2"
 | 
				
			||||||
hyper = { default-features = false, git = "https://github.com/paritytech/hyper" }
 | 
					hyper = { default-features = false, git = "https://github.com/paritytech/hyper" }
 | 
				
			||||||
 | 
				
			|||||||
@ -50,7 +50,7 @@ impl fmt::Display for Error {
 | 
				
			|||||||
/// Light client service.
 | 
					/// Light client service.
 | 
				
			||||||
pub struct Service {
 | 
					pub struct Service {
 | 
				
			||||||
	client: Arc<Client>,
 | 
						client: Arc<Client>,
 | 
				
			||||||
	_io_service: IoService<ClientIoMessage>,
 | 
						io_service: IoService<ClientIoMessage>,
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
impl Service {
 | 
					impl Service {
 | 
				
			||||||
@ -82,10 +82,15 @@ impl Service {
 | 
				
			|||||||
		io_service.register_handler(Arc::new(ImportBlocks(client.clone()))).map_err(Error::Io)?;
 | 
							io_service.register_handler(Arc::new(ImportBlocks(client.clone()))).map_err(Error::Io)?;
 | 
				
			||||||
		Ok(Service {
 | 
							Ok(Service {
 | 
				
			||||||
			client: client,
 | 
								client: client,
 | 
				
			||||||
			_io_service: io_service,
 | 
								io_service: io_service,
 | 
				
			||||||
		})
 | 
							})
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						/// Register an I/O handler on the service.
 | 
				
			||||||
 | 
						pub fn register_handler(&self, handler: Arc<IoHandler<ClientIoMessage> + Send>) -> Result<(), IoError> {
 | 
				
			||||||
 | 
							self.io_service.register_handler(handler)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	/// Get a handle to the client.
 | 
						/// Get a handle to the client.
 | 
				
			||||||
	pub fn client(&self) -> &Arc<Client> {
 | 
						pub fn client(&self) -> &Arc<Client> {
 | 
				
			||||||
		&self.client
 | 
							&self.client
 | 
				
			||||||
 | 
				
			|||||||
							
								
								
									
										21
									
								
								parity/light_helpers/mod.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										21
									
								
								parity/light_helpers/mod.rs
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,21 @@
 | 
				
			|||||||
 | 
					// Copyright 2015-2017 Parity Technologies (UK) Ltd.
 | 
				
			||||||
 | 
					// This file is part of Parity.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Parity is free software: you can redistribute it and/or modify
 | 
				
			||||||
 | 
					// it under the terms of the GNU General Public License as published by
 | 
				
			||||||
 | 
					// the Free Software Foundation, either version 3 of the License, or
 | 
				
			||||||
 | 
					// (at your option) any later version.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Parity is distributed in the hope that it will be useful,
 | 
				
			||||||
 | 
					// but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
				
			||||||
 | 
					// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | 
				
			||||||
 | 
					// GNU General Public License for more details.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// You should have received a copy of the GNU General Public License
 | 
				
			||||||
 | 
					// along with Parity.  If not, see <http://www.gnu.org/licenses/>.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					//! Utilities and helpers for the light client.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					mod queue_cull;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					pub use self::queue_cull::QueueCull;
 | 
				
			||||||
							
								
								
									
										99
									
								
								parity/light_helpers/queue_cull.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										99
									
								
								parity/light_helpers/queue_cull.rs
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,99 @@
 | 
				
			|||||||
 | 
					// Copyright 2015-2017 Parity Technologies (UK) Ltd.
 | 
				
			||||||
 | 
					// This file is part of Parity.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Parity is free software: you can redistribute it and/or modify
 | 
				
			||||||
 | 
					// it under the terms of the GNU General Public License as published by
 | 
				
			||||||
 | 
					// the Free Software Foundation, either version 3 of the License, or
 | 
				
			||||||
 | 
					// (at your option) any later version.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Parity is distributed in the hope that it will be useful,
 | 
				
			||||||
 | 
					// but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
				
			||||||
 | 
					// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | 
				
			||||||
 | 
					// GNU General Public License for more details.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// You should have received a copy of the GNU General Public License
 | 
				
			||||||
 | 
					// along with Parity.  If not, see <http://www.gnu.org/licenses/>.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					//! Service for culling the light client's transaction queue.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					use std::sync::Arc;
 | 
				
			||||||
 | 
					use std::time::Duration;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					use ethcore::service::ClientIoMessage;
 | 
				
			||||||
 | 
					use ethsync::LightSync;
 | 
				
			||||||
 | 
					use io::{IoContext, IoHandler, TimerToken};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					use light::client::Client;
 | 
				
			||||||
 | 
					use light::on_demand::{request, OnDemand};
 | 
				
			||||||
 | 
					use light::TransactionQueue;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					use futures::{future, stream, Future, Stream};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					use parity_reactor::Remote;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					use util::RwLock;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Attepmt to cull once every 10 minutes.
 | 
				
			||||||
 | 
					const TOKEN: TimerToken = 1;
 | 
				
			||||||
 | 
					const TIMEOUT_MS: u64 = 1000 * 60 * 10;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// But make each attempt last only 9 minutes
 | 
				
			||||||
 | 
					const PURGE_TIMEOUT_MS: u64 = 1000 * 60 * 9;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					/// Periodically culls the transaction queue of mined transactions.
 | 
				
			||||||
 | 
					pub struct QueueCull {
 | 
				
			||||||
 | 
						/// A handle to the client, for getting the latest block header.
 | 
				
			||||||
 | 
						pub client: Arc<Client>,
 | 
				
			||||||
 | 
						/// A handle to the sync service.
 | 
				
			||||||
 | 
						pub sync: Arc<LightSync>,
 | 
				
			||||||
 | 
						/// The on-demand request service.
 | 
				
			||||||
 | 
						pub on_demand: Arc<OnDemand>,
 | 
				
			||||||
 | 
						/// The transaction queue.
 | 
				
			||||||
 | 
						pub txq: Arc<RwLock<TransactionQueue>>,
 | 
				
			||||||
 | 
						/// Event loop remote.
 | 
				
			||||||
 | 
						pub remote: Remote,
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					impl IoHandler<ClientIoMessage> for QueueCull {
 | 
				
			||||||
 | 
						fn initialize(&self, io: &IoContext<ClientIoMessage>) {
 | 
				
			||||||
 | 
							io.register_timer(TOKEN, TIMEOUT_MS).expect("Error registering timer");
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						fn timeout(&self, _io: &IoContext<ClientIoMessage>, timer: TimerToken) {
 | 
				
			||||||
 | 
							if timer != TOKEN { return }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							let senders = self.txq.read().queued_senders();
 | 
				
			||||||
 | 
							if senders.is_empty() { return }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							let (sync, on_demand, txq) = (self.sync.clone(), self.on_demand.clone(), self.txq.clone());
 | 
				
			||||||
 | 
							let best_header = self.client.best_block_header();
 | 
				
			||||||
 | 
							let start_nonce = self.client.engine().account_start_nonce();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							info!(target: "cull", "Attempting to cull queued transactions from {} senders.", senders.len());
 | 
				
			||||||
 | 
							self.remote.spawn_with_timeout(move || {
 | 
				
			||||||
 | 
								let maybe_fetching = sync.with_context(move |ctx| {
 | 
				
			||||||
 | 
									// fetch the nonce of each sender in the queue.
 | 
				
			||||||
 | 
									let nonce_futures = senders.iter()
 | 
				
			||||||
 | 
										.map(|&address| request::Account { header: best_header.clone(), address: address })
 | 
				
			||||||
 | 
										.map(|request| on_demand.account(ctx, request))
 | 
				
			||||||
 | 
										.map(move |fut| fut.map(move |x| x.map(|acc| acc.nonce).unwrap_or(start_nonce)))
 | 
				
			||||||
 | 
										.zip(senders.iter())
 | 
				
			||||||
 | 
										.map(|(fut, &addr)| fut.map(move |nonce| (addr, nonce)));
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
									// as they come in, update each sender to the new nonce.
 | 
				
			||||||
 | 
									stream::futures_unordered(nonce_futures)
 | 
				
			||||||
 | 
										.fold(txq, |txq, (address, nonce)| {
 | 
				
			||||||
 | 
											txq.write().cull(address, nonce);
 | 
				
			||||||
 | 
											future::ok(txq)
 | 
				
			||||||
 | 
										})
 | 
				
			||||||
 | 
										.map(|_| ()) // finally, discard the txq handle and log errors.
 | 
				
			||||||
 | 
										.map_err(|_| debug!(target: "cull", "OnDemand prematurely closed channel."))
 | 
				
			||||||
 | 
								});
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								match maybe_fetching {
 | 
				
			||||||
 | 
									Some(fut) => fut.boxed(),
 | 
				
			||||||
 | 
									None => future::ok(()).boxed(),
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							}, Duration::from_millis(PURGE_TIMEOUT_MS), || {})
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
@ -28,6 +28,7 @@ extern crate ctrlc;
 | 
				
			|||||||
extern crate docopt;
 | 
					extern crate docopt;
 | 
				
			||||||
extern crate env_logger;
 | 
					extern crate env_logger;
 | 
				
			||||||
extern crate fdlimit;
 | 
					extern crate fdlimit;
 | 
				
			||||||
 | 
					extern crate futures;
 | 
				
			||||||
extern crate hyper;
 | 
					extern crate hyper;
 | 
				
			||||||
extern crate isatty;
 | 
					extern crate isatty;
 | 
				
			||||||
extern crate jsonrpc_core;
 | 
					extern crate jsonrpc_core;
 | 
				
			||||||
@ -101,6 +102,7 @@ mod deprecated;
 | 
				
			|||||||
mod dir;
 | 
					mod dir;
 | 
				
			||||||
mod helpers;
 | 
					mod helpers;
 | 
				
			||||||
mod informant;
 | 
					mod informant;
 | 
				
			||||||
 | 
					mod light_helpers;
 | 
				
			||||||
mod migration;
 | 
					mod migration;
 | 
				
			||||||
mod modules;
 | 
					mod modules;
 | 
				
			||||||
mod params;
 | 
					mod params;
 | 
				
			||||||
 | 
				
			|||||||
@ -238,12 +238,24 @@ fn execute_light(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) ->
 | 
				
			|||||||
	};
 | 
						};
 | 
				
			||||||
	let light_sync = LightSync::new(sync_params).map_err(|e| format!("Error starting network: {}", e))?;
 | 
						let light_sync = LightSync::new(sync_params).map_err(|e| format!("Error starting network: {}", e))?;
 | 
				
			||||||
	let light_sync = Arc::new(light_sync);
 | 
						let light_sync = Arc::new(light_sync);
 | 
				
			||||||
	light_sync.start_network();
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// start RPCs.
 | 
					 | 
				
			||||||
	// spin up event loop
 | 
						// spin up event loop
 | 
				
			||||||
	let event_loop = EventLoop::spawn();
 | 
						let event_loop = EventLoop::spawn();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// queue cull service.
 | 
				
			||||||
 | 
						let queue_cull = Arc::new(::light_helpers::QueueCull {
 | 
				
			||||||
 | 
							client: service.client().clone(),
 | 
				
			||||||
 | 
							sync: light_sync.clone(),
 | 
				
			||||||
 | 
							on_demand: on_demand.clone(),
 | 
				
			||||||
 | 
							txq: txq.clone(),
 | 
				
			||||||
 | 
							remote: event_loop.remote(),
 | 
				
			||||||
 | 
						});
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						service.register_handler(queue_cull).map_err(|e| format!("Error attaching service: {:?}", e))?;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// start the network.
 | 
				
			||||||
 | 
						light_sync.start_network();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// fetch service
 | 
						// fetch service
 | 
				
			||||||
	let fetch = FetchClient::new().map_err(|e| format!("Error starting fetch client: {:?}", e))?;
 | 
						let fetch = FetchClient::new().map_err(|e| format!("Error starting fetch client: {:?}", e))?;
 | 
				
			||||||
	let passwords = passwords_from_files(&cmd.acc_conf.password_files)?;
 | 
						let passwords = passwords_from_files(&cmd.acc_conf.password_files)?;
 | 
				
			||||||
@ -253,6 +265,7 @@ fn execute_light(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) ->
 | 
				
			|||||||
	let rpc_stats = Arc::new(informant::RpcStats::default());
 | 
						let rpc_stats = Arc::new(informant::RpcStats::default());
 | 
				
			||||||
	let signer_path = cmd.signer_conf.signer_path.clone();
 | 
						let signer_path = cmd.signer_conf.signer_path.clone();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// start RPCs
 | 
				
			||||||
	let deps_for_rpc_apis = Arc::new(rpc_apis::LightDependencies {
 | 
						let deps_for_rpc_apis = Arc::new(rpc_apis::LightDependencies {
 | 
				
			||||||
		signer_service: Arc::new(rpc_apis::SignerService::new(move || {
 | 
							signer_service: Arc::new(rpc_apis::SignerService::new(move || {
 | 
				
			||||||
			signer::generate_new_token(signer_path.clone()).map_err(|e| format!("{:?}", e))
 | 
								signer::generate_new_token(signer_path.clone()).map_err(|e| format!("{:?}", e))
 | 
				
			||||||
@ -299,6 +312,14 @@ fn execute_light(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) ->
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	// TODO: Dapps
 | 
						// TODO: Dapps
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// minimal informant thread. Just prints block number every 5 seconds.
 | 
				
			||||||
 | 
						// TODO: integrate with informant.rs
 | 
				
			||||||
 | 
						let informant_client = service.client().clone();
 | 
				
			||||||
 | 
						::std::thread::spawn(move || loop {
 | 
				
			||||||
 | 
							info!("#{}", informant_client.best_block_header().number());
 | 
				
			||||||
 | 
							::std::thread::sleep(::std::time::Duration::from_secs(5));
 | 
				
			||||||
 | 
						});
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// wait for ctrl-c.
 | 
						// wait for ctrl-c.
 | 
				
			||||||
	Ok(wait_for_exit(panic_handler, None, None, can_restart))
 | 
						Ok(wait_for_exit(panic_handler, None, None, can_restart))
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
				
			|||||||
		Loading…
	
		Reference in New Issue
	
	Block a user