// Copyright 2015-2017 Parity Technologies (UK) Ltd. // This file is part of Parity. // Parity is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // Parity is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with Parity. If not, see . //! Service for culling the light client's transaction queue. use std::sync::Arc; use std::time::Duration; use ethcore::service::ClientIoMessage; use ethsync::LightSync; use io::{IoContext, IoHandler, TimerToken}; use light::client::Client; use light::on_demand::{request, OnDemand}; use light::TransactionQueue; use futures::{future, stream, Future, Stream}; use parity_reactor::Remote; use util::RwLock; // Attepmt to cull once every 10 minutes. const TOKEN: TimerToken = 1; const TIMEOUT_MS: u64 = 1000 * 60 * 10; // But make each attempt last only 9 minutes const PURGE_TIMEOUT_MS: u64 = 1000 * 60 * 9; /// Periodically culls the transaction queue of mined transactions. pub struct QueueCull { /// A handle to the client, for getting the latest block header. pub client: Arc, /// A handle to the sync service. pub sync: Arc, /// The on-demand request service. pub on_demand: Arc, /// The transaction queue. pub txq: Arc>, /// Event loop remote. pub remote: Remote, } impl IoHandler for QueueCull { fn initialize(&self, io: &IoContext) { io.register_timer(TOKEN, TIMEOUT_MS).expect("Error registering timer"); } fn timeout(&self, _io: &IoContext, timer: TimerToken) { if timer != TOKEN { return } let senders = self.txq.read().queued_senders(); if senders.is_empty() { return } let (sync, on_demand, txq) = (self.sync.clone(), self.on_demand.clone(), self.txq.clone()); let best_header = self.client.best_block_header(); let start_nonce = self.client.engine().account_start_nonce(); info!(target: "cull", "Attempting to cull queued transactions from {} senders.", senders.len()); self.remote.spawn_with_timeout(move || { let maybe_fetching = sync.with_context(move |ctx| { // fetch the nonce of each sender in the queue. let nonce_futures = senders.iter() .map(|&address| request::Account { header: best_header.clone(), address: address }) .map(move |request| { on_demand.account(ctx, request) .map(move |maybe_acc| maybe_acc.map_or(start_nonce, |acc| acc.nonce)) }) .zip(senders.iter()) .map(|(fut, &addr)| fut.map(move |nonce| (addr, nonce))); // as they come in, update each sender to the new nonce. stream::futures_unordered(nonce_futures) .fold(txq, |txq, (address, nonce)| { txq.write().cull(address, nonce); future::ok(txq) }) .map(|_| ()) // finally, discard the txq handle and log errors. .map_err(|_| debug!(target: "cull", "OnDemand prematurely closed channel.")) }); match maybe_fetching { Some(fut) => fut.boxed(), None => future::ok(()).boxed(), } }, Duration::from_millis(PURGE_TIMEOUT_MS), || {}) } }