diff --git a/ethcore/types/src/header.rs b/ethcore/types/src/header.rs index 3dfe6ab83..cfe8f5bb6 100644 --- a/ethcore/types/src/header.rs +++ b/ethcore/types/src/header.rs @@ -16,7 +16,6 @@ //! Block header. -use std::cmp; use hash::{KECCAK_NULL_RLP, KECCAK_EMPTY_LIST_RLP, keccak}; use heapsize::HeapSizeOf; use ethereum_types::{H256, U256, Address, Bloom}; @@ -342,7 +341,7 @@ impl Decodable for Header { number: r.val_at(8)?, gas_limit: r.val_at(9)?, gas_used: r.val_at(10)?, - timestamp: cmp::min(r.val_at::(11)?, u64::max_value().into()).as_u64(), + timestamp: r.val_at(11)?, extra_data: r.val_at(12)?, seal: vec![], hash: keccak(r.as_raw()).into(), @@ -412,4 +411,15 @@ mod tests { assert_eq!(header_rlp, encoded_header); } + + #[test] + fn reject_header_with_large_timestamp() { + // that's rlp of block header created with ethash engine. + // The encoding contains a large timestamp (295147905179352825856) + let header_rlp = "f901f9a0d405da4e66f1445d455195229624e133f5baafe72b5cf7b3c36c12c8146e98b7a01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347948888f1f195afa192cfee860698584c030f4c9db1a05fb2b4bfdef7b314451cb138a534d225c922fc0e5fbe25e451142732c3e25c25a088d2ec6b9860aae1a2c3b299f72b6a5d70d7f7ba4722c78f2c49ba96273c2158a007c6fdfa8eea7e86b81f5b0fc0f78f90cc19f4aa60d323151e0cac660199e9a1b90100000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000008302008003832fefba82524d891000000000000000000080a0a0349d8c3df71f1a48a9df7d03fd5f14aeee7d91332c009ecaff0a71ead405bd88ab4e252a7e8c2a23".from_hex().unwrap(); + + // This should fail decoding timestamp + let header: Result = rlp::decode(&header_rlp); + assert_eq!(header.unwrap_err(), rlp::DecoderError::RlpIsTooBig); + } } diff --git a/parity/light_helpers/mod.rs b/parity/light_helpers/mod.rs index 9a9bbf2cd..843dd419d 100644 --- a/parity/light_helpers/mod.rs +++ b/parity/light_helpers/mod.rs @@ -17,7 +17,5 @@ //! Utilities and helpers for the light client. mod epoch_fetch; -mod queue_cull; pub use self::epoch_fetch::EpochFetch; -pub use self::queue_cull::QueueCull; diff --git a/parity/light_helpers/queue_cull.rs b/parity/light_helpers/queue_cull.rs deleted file mode 100644 index 693d8f93c..000000000 --- a/parity/light_helpers/queue_cull.rs +++ /dev/null @@ -1,105 +0,0 @@ -// Copyright 2015-2019 Parity Technologies (UK) Ltd. -// This file is part of Parity Ethereum. - -// Parity Ethereum is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Parity Ethereum is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Parity Ethereum. If not, see . - -//! Service for culling the light client's transaction queue. - -use std::sync::Arc; -use std::time::Duration; - -use ethcore::client::ClientIoMessage; -use sync::{LightSync, LightNetworkDispatcher}; -use io::{IoContext, IoHandler, TimerToken}; - -use light::client::LightChainClient; -use light::on_demand::{request, OnDemand, OnDemandRequester}; -use light::TransactionQueue; - -use futures::{future, Future}; - -use parity_runtime::Executor; - -use parking_lot::RwLock; - -// Attepmt to cull once every 10 minutes. -const TOKEN: TimerToken = 1; -const TIMEOUT: Duration = Duration::from_secs(60 * 10); - -// But make each attempt last only 9 minutes -const PURGE_TIMEOUT: Duration = Duration::from_secs(60 * 9); - -/// Periodically culls the transaction queue of mined transactions. -pub struct QueueCull { - /// A handle to the client, for getting the latest block header. - pub client: Arc, - /// A handle to the sync service. - pub sync: Arc, - /// The on-demand request service. - pub on_demand: Arc, - /// The transaction queue. - pub txq: Arc>, - /// Event loop executor. - pub executor: Executor, -} - -impl IoHandler for QueueCull { - fn initialize(&self, io: &IoContext) { - io.register_timer(TOKEN, TIMEOUT).expect("Error registering timer"); - } - - fn timeout(&self, _io: &IoContext, timer: TimerToken) { - if timer != TOKEN { return } - - let senders = self.txq.read().queued_senders(); - if senders.is_empty() { return } - - let (sync, on_demand, txq) = (self.sync.clone(), self.on_demand.clone(), self.txq.clone()); - let best_header = self.client.best_block_header(); - let start_nonce = self.client.engine().account_start_nonce(best_header.number()); - - info!(target: "cull", "Attempting to cull queued transactions from {} senders.", senders.len()); - self.executor.spawn_with_timeout(move || { - let maybe_fetching = sync.with_context(move |ctx| { - // fetch the nonce of each sender in the queue. - let nonce_reqs = senders.iter() - .map(|&address| request::Account { header: best_header.clone().into(), address: address }) - .collect::>(); - - // when they come in, update each sender to the new nonce. - on_demand.request(ctx, nonce_reqs) - .expect("No back-references; therefore all back-references are valid; qed") - .map(move |accs| { - let txq = txq.write(); - let _ = accs.into_iter() - .map(|maybe_acc| maybe_acc.map_or(start_nonce, |acc| acc.nonce)) - .zip(senders) - .fold(txq, |mut txq, (nonce, addr)| { - txq.cull(addr, nonce); - txq - }); - }) - .map_err(|_| debug!(target: "cull", "OnDemand prematurely closed channel.")) - }); - - match maybe_fetching { - Some(fut) => future::Either::A(fut), - None => { - debug!(target: "cull", "Unable to acquire network context; qed"); - future::Either::B(future::ok(())) - }, - } - }, PURGE_TIMEOUT, || {}) - } -} diff --git a/parity/run.rs b/parity/run.rs index c4a3c7512..ca4ca1af1 100644 --- a/parity/run.rs +++ b/parity/run.rs @@ -295,17 +295,6 @@ fn execute_light_impl(cmd: RunCmd, logger: Arc, on_client_rq // spin up event loop let runtime = Runtime::with_default_thread_count(); - // queue cull service. - let queue_cull = Arc::new(::light_helpers::QueueCull { - client: client.clone(), - sync: light_sync.clone(), - on_demand: on_demand.clone(), - txq: txq.clone(), - executor: runtime.executor(), - }); - - service.register_handler(queue_cull).map_err(|e| format!("Error attaching service: {:?}", e))?; - // start the network. light_sync.start_network(); diff --git a/rpc/src/v1/helpers/light_fetch.rs b/rpc/src/v1/helpers/light_fetch.rs index e18695fcb..819f56645 100644 --- a/rpc/src/v1/helpers/light_fetch.rs +++ b/rpc/src/v1/helpers/light_fetch.rs @@ -16,8 +16,9 @@ //! Helpers for fetching blockchain data either from the light client or the network. -use std::cmp; use std::clone::Clone; +use std::cmp; +use std::collections::BTreeMap; use std::sync::Arc; use types::basic_account::BasicAccount; @@ -48,7 +49,6 @@ use ethereum_types::{Address, U256}; use hash::H256; use parking_lot::{Mutex, RwLock}; use fastmap::H256FastMap; -use std::collections::BTreeMap; use types::transaction::{Action, Transaction as EthTransaction, PendingTransaction, SignedTransaction, LocalizedTransaction}; use v1::helpers::{CallRequest as CallRequestHelper, errors, dispatch}; @@ -523,6 +523,46 @@ where })) } + /// Helper to cull the `light` transaction queue of mined transactions + pub fn light_cull(&self, txq: Arc>) -> impl Future + Send { + let senders = txq.read().queued_senders(); + if senders.is_empty() { + return Either::B(future::err(errors::internal("No pending local transactions", ""))); + } + + let sync = self.sync.clone(); + let on_demand = self.on_demand.clone(); + let best_header = self.client.best_block_header(); + let start_nonce = self.client.engine().account_start_nonce(best_header.number()); + + let account_request = sync.with_context(move |ctx| { + // fetch the nonce of each sender in the queue. + let nonce_reqs = senders.iter() + .map(|&address| request::Account { header: best_header.clone().into(), address }) + .collect::>(); + + // when they come in, update each sender to the new nonce. + on_demand.request(ctx, nonce_reqs) + .expect(NO_INVALID_BACK_REFS_PROOF) + .map(move |accs| { + let mut txq = txq.write(); + accs.into_iter() + .map(|maybe_acc| maybe_acc.map_or(start_nonce, |acc| acc.nonce)) + .zip(senders) + .for_each(|(nonce, addr)| { + txq.cull(addr, nonce); + }); + }) + .map_err(errors::on_demand_error) + }); + + if let Some(fut) = account_request { + Either::A(fut) + } else { + Either::B(future::err(errors::network_disabled())) + } + } + fn send_requests(&self, reqs: Vec, parse_response: F) -> impl Future + Send where F: FnOnce(Vec) -> T + Send + 'static, T: Send + 'static, diff --git a/rpc/src/v1/impls/light/eth.rs b/rpc/src/v1/impls/light/eth.rs index 73e2b99c6..6467bfbc7 100644 --- a/rpc/src/v1/impls/light/eth.rs +++ b/rpc/src/v1/impls/light/eth.rs @@ -420,15 +420,22 @@ where } fn transaction_by_hash(&self, hash: H256) -> BoxFuture> { - { - let tx_queue = self.transaction_queue.read(); - if let Some(tx) = tx_queue.get(&hash) { + let in_txqueue = self.transaction_queue.read().get(&hash).is_some(); + + // The transaction is in the `local txqueue` then fetch the latest state from the network and attempt + // to cull the transaction queue. + if in_txqueue { + // Note, this will block (relies on HTTP timeout) to make sure `cull` will finish to avoid having to call + // `eth_getTransactionByHash` more than once to ensure the `txqueue` is up to `date` when it is called + if let Err(e) = self.fetcher().light_cull(self.transaction_queue.clone()).wait() { + debug!(target: "cull", "failed because of: {:?}", e); + } + if let Some(tx) = self.transaction_queue.read().get(&hash) { return Box::new(future::ok(Some(Transaction::from_pending( tx.clone(), )))); } } - Box::new(self.fetcher().transaction_by_hash(hash).map(|x| x.map(|(tx, _)| tx))) }