From 70ef33f6fe5d22b21c435b2e77d8ebced3f30110 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Thu, 27 Jul 2017 13:50:12 +0200 Subject: [PATCH] Light client improvements (#6156) * no seal checking * import command and --no-seal-check for light client * fix eth_call * tweak registry dapps lookup * ignore failed requests to non-server peers --- ethcore/light/src/client/mod.rs | 5 +- ethcore/light/src/on_demand/mod.rs | 34 +++++-- parity/blockchain.rs | 156 ++++++++++++++++++++++++++++- parity/configuration.rs | 2 + parity/dapps.rs | 2 +- parity/run.rs | 1 + rpc/src/v1/helpers/dispatch.rs | 2 +- rpc/src/v1/helpers/light_fetch.rs | 120 ++++++++++++++++------ 8 files changed, 281 insertions(+), 41 deletions(-) diff --git a/ethcore/light/src/client/mod.rs b/ethcore/light/src/client/mod.rs index 044c40133..3f0e50584 100644 --- a/ethcore/light/src/client/mod.rs +++ b/ethcore/light/src/client/mod.rs @@ -58,6 +58,8 @@ pub struct Config { pub db_wal: bool, /// Should it do full verification of blocks? pub verify_full: bool, + /// Should it check the seal of blocks? + pub check_seal: bool, } impl Default for Config { @@ -69,6 +71,7 @@ impl Default for Config { db_compaction: CompactionProfile::default(), db_wal: true, verify_full: true, + check_seal: true, } } } @@ -168,7 +171,7 @@ impl Client { let gh = ::rlp::encode(&spec.genesis_header()); Ok(Client { - queue: HeaderQueue::new(config.queue, spec.engine.clone(), io_channel, true), + queue: HeaderQueue::new(config.queue, spec.engine.clone(), io_channel, config.check_seal), engine: spec.engine.clone(), chain: HeaderChain::new(db.clone(), chain_col, &gh, cache)?, report: RwLock::new(ClientReport::default()), diff --git a/ethcore/light/src/on_demand/mod.rs b/ethcore/light/src/on_demand/mod.rs index c1919ebd1..42469725b 100644 --- a/ethcore/light/src/on_demand/mod.rs +++ b/ethcore/light/src/on_demand/mod.rs @@ -54,13 +54,21 @@ struct Peer { } impl Peer { - // whether this peer can fulfill the - fn can_fulfill(&self, c: &Capabilities) -> bool { - let caps = &self.capabilities; + // whether this peer can fulfill the necessary capabilities for the given + // request. + fn can_fulfill(&self, request: &Capabilities) -> bool { + let local_caps = &self.capabilities; + let can_serve_since = |req, local| { + match (req, local) { + (Some(request_block), Some(serve_since)) => request_block >= serve_since, + (Some(_), None) => false, + (None, _) => true, + } + }; - caps.serve_headers == c.serve_headers && - caps.serve_chain_since >= c.serve_chain_since && - caps.serve_state_since >= c.serve_chain_since + local_caps.serve_headers >= request.serve_headers && + can_serve_since(request.serve_chain_since, local_caps.serve_chain_since) && + can_serve_since(request.serve_state_since, local_caps.serve_state_since) } } @@ -244,7 +252,7 @@ impl OnDemand { peers: RwLock::new(HashMap::new()), in_transit: RwLock::new(HashMap::new()), cache: cache, - no_immediate_dispatch: true, + no_immediate_dispatch: false, } } @@ -266,7 +274,6 @@ impl OnDemand { -> Result>, basic_request::NoSuchOutput> { let (sender, receiver) = oneshot::channel(); - if requests.is_empty() { assert!(sender.send(Vec::new()).is_ok(), "receiver still in scope; qed"); return Ok(receiver); @@ -335,6 +342,7 @@ impl OnDemand { // dispatch pending requests, and discard those for which the corresponding // receiver has been dropped. fn dispatch_pending(&self, ctx: &BasicContext) { + // wrapper future for calling `poll_cancel` on our `Senders` to preserve // the invariant that it's always within a task. struct CheckHangup<'a, T: 'a>(&'a mut Sender); @@ -360,6 +368,8 @@ impl OnDemand { if self.pending.read().is_empty() { return } let mut pending = self.pending.write(); + debug!(target: "on_demand", "Attempting to dispatch {} pending requests", pending.len()); + // iterate over all pending requests, and check them for hang-up. // then, try and find a peer who can serve it. let peers = self.peers.read(); @@ -378,16 +388,21 @@ impl OnDemand { match ctx.request_from(*peer_id, pending.net_requests.clone()) { Ok(req_id) => { + trace!(target: "on_demand", "Dispatched request {} to peer {}", req_id, peer_id); self.in_transit.write().insert(req_id, pending); return None } - Err(net::Error::NoCredits) => {} + Err(net::Error::NoCredits) | Err(net::Error::NotServer) => {} Err(e) => debug!(target: "on_demand", "Error dispatching request to peer: {}", e), } } + + // TODO: maximum number of failures _when we have peers_. Some(pending) }) .collect(); // `pending` now contains all requests we couldn't dispatch. + + debug!(target: "on_demand", "Was unable to dispatch {} requests.", pending.len()); } // submit a pending request set. attempts to answer from cache before @@ -395,6 +410,7 @@ impl OnDemand { fn submit_pending(&self, ctx: &BasicContext, mut pending: Pending) { // answer as many requests from cache as we can, and schedule for dispatch // if incomplete. + pending.answer_from_cache(&*self.cache); if let Some(mut pending) = pending.try_complete() { pending.update_net_requests(); diff --git a/parity/blockchain.rs b/parity/blockchain.rs index a70272931..426c8d447 100644 --- a/parity/blockchain.rs +++ b/parity/blockchain.rs @@ -93,6 +93,7 @@ pub struct ImportBlockchain { pub check_seal: bool, pub with_color: bool, pub verifier_settings: VerifierSettings, + pub light: bool, } #[derive(Debug, PartialEq)] @@ -138,12 +139,165 @@ pub struct ExportState { pub fn execute(cmd: BlockchainCmd) -> Result<(), String> { match cmd { BlockchainCmd::Kill(kill_cmd) => kill_db(kill_cmd), - BlockchainCmd::Import(import_cmd) => execute_import(import_cmd), + BlockchainCmd::Import(import_cmd) => { + if import_cmd.light { + execute_import_light(import_cmd) + } else { + execute_import(import_cmd) + } + } BlockchainCmd::Export(export_cmd) => execute_export(export_cmd), BlockchainCmd::ExportState(export_cmd) => execute_export_state(export_cmd), } } +fn execute_import_light(cmd: ImportBlockchain) -> Result<(), String> { + use light::client::{Service as LightClientService, Config as LightClientConfig}; + use light::cache::Cache as LightDataCache; + + let timer = Instant::now(); + + // load spec file + let spec = cmd.spec.spec(&cmd.dirs.cache)?; + + // load genesis hash + let genesis_hash = spec.genesis_header().hash(); + + // database paths + let db_dirs = cmd.dirs.database(genesis_hash, None, spec.data_dir.clone()); + + // user defaults path + let user_defaults_path = db_dirs.user_defaults_path(); + + // load user defaults + let user_defaults = UserDefaults::load(&user_defaults_path)?; + + fdlimit::raise_fd_limit(); + + // select pruning algorithm + let algorithm = cmd.pruning.to_algorithm(&user_defaults); + + // prepare client and snapshot paths. + let client_path = db_dirs.client_path(algorithm); + + // execute upgrades + let compaction = cmd.compaction.compaction_profile(db_dirs.db_root_path().as_path()); + execute_upgrades(&cmd.dirs.base, &db_dirs, algorithm, compaction)?; + + // create dirs used by parity + cmd.dirs.create_dirs(false, false, false)?; + + let cache = Arc::new(::util::Mutex::new( + LightDataCache::new(Default::default(), ::time::Duration::seconds(0)) + )); + + let mut config = LightClientConfig { + queue: Default::default(), + chain_column: ::ethcore::db::COL_LIGHT_CHAIN, + db_cache_size: Some(cmd.cache_config.blockchain() as usize * 1024 * 1024), + db_compaction: compaction, + db_wal: cmd.wal, + verify_full: true, + check_seal: cmd.check_seal, + }; + + config.queue.max_mem_use = cmd.cache_config.queue() as usize * 1024 * 1024; + config.queue.verifier_settings = cmd.verifier_settings; + + let service = LightClientService::start(config, &spec, &client_path, cache) + .map_err(|e| format!("Failed to start client: {}", e))?; + + // free up the spec in memory. + drop(spec); + + let client = service.client(); + + let mut instream: Box = match cmd.file_path { + Some(f) => Box::new(fs::File::open(&f).map_err(|_| format!("Cannot open given file: {}", f))?), + None => Box::new(io::stdin()), + }; + + const READAHEAD_BYTES: usize = 8; + + let mut first_bytes: Vec = vec![0; READAHEAD_BYTES]; + let mut first_read = 0; + + let format = match cmd.format { + Some(format) => format, + None => { + first_read = instream.read(&mut first_bytes).map_err(|_| "Error reading from the file/stream.")?; + match first_bytes[0] { + 0xf9 => DataFormat::Binary, + _ => DataFormat::Hex, + } + } + }; + + let do_import = |bytes: Vec| { + while client.queue_info().is_full() { sleep(Duration::from_secs(1)); } + + let header: ::ethcore::header::Header = ::rlp::UntrustedRlp::new(&bytes).val_at(0) + .map_err(|e| format!("Bad block: {}", e))?; + + if client.best_block_header().number() >= header.number() { return Ok(()) } + + if header.number() % 10000 == 0 { + info!("#{}", header.number()); + } + + match client.import_header(header) { + Err(BlockImportError::Import(ImportError::AlreadyInChain)) => { + trace!("Skipping block already in chain."); + } + Err(e) => { + return Err(format!("Cannot import block: {:?}", e)); + }, + Ok(_) => {}, + } + Ok(()) + }; + + match format { + DataFormat::Binary => { + loop { + let mut bytes = if first_read > 0 {first_bytes.clone()} else {vec![0; READAHEAD_BYTES]}; + let n = if first_read > 0 { + first_read + } else { + instream.read(&mut bytes).map_err(|_| "Error reading from the file/stream.")? + }; + if n == 0 { break; } + first_read = 0; + let s = PayloadInfo::from(&bytes).map_err(|e| format!("Invalid RLP in the file/stream: {:?}", e))?.total(); + bytes.resize(s, 0); + instream.read_exact(&mut bytes[n..]).map_err(|_| "Error reading from the file/stream.")?; + do_import(bytes)?; + } + } + DataFormat::Hex => { + for line in BufReader::new(instream).lines() { + let s = line.map_err(|_| "Error reading from the file/stream.")?; + let s = if first_read > 0 {from_utf8(&first_bytes).unwrap().to_owned() + &(s[..])} else {s}; + first_read = 0; + let bytes = s.from_hex().map_err(|_| "Invalid hex in file/stream.")?; + do_import(bytes)?; + } + } + } + client.flush_queue(); + + let ms = timer.elapsed().as_milliseconds(); + let report = client.report(); + + info!("Import completed in {} seconds, {} headers, {} hdr/s", + ms / 1000, + report.blocks_imported, + (report.blocks_imported * 1000) as u64 / ms, + ); + + Ok(()) +} + fn execute_import(cmd: ImportBlockchain) -> Result<(), String> { let timer = Instant::now(); diff --git a/parity/configuration.rs b/parity/configuration.rs index 6d5990aa0..a0687f2a8 100644 --- a/parity/configuration.rs +++ b/parity/configuration.rs @@ -243,6 +243,7 @@ impl Configuration { check_seal: !self.args.flag_no_seal_check, with_color: logger_config.color, verifier_settings: self.verifier_settings(), + light: self.args.flag_light, }; Cmd::Blockchain(BlockchainCmd::Import(import_cmd)) } else if self.args.cmd_export { @@ -1184,6 +1185,7 @@ mod tests { check_seal: true, with_color: !cfg!(windows), verifier_settings: Default::default(), + light: false, }))); } diff --git a/parity/dapps.rs b/parity/dapps.rs index 1ddffce1f..14a57833c 100644 --- a/parity/dapps.rs +++ b/parity/dapps.rs @@ -114,7 +114,7 @@ impl ContractClient for LightRegistrar { tx: Transaction { nonce: self.client.engine().account_start_nonce(header.number()), action: Action::Call(address), - gas: 50_000_000.into(), + gas: 50_000.into(), // should be enough for all registry lookups. TODO: exponential backoff gas_price: 0.into(), value: 0.into(), data: data, diff --git a/parity/run.rs b/parity/run.rs index 6f0b444a0..7c835aa3f 100644 --- a/parity/run.rs +++ b/parity/run.rs @@ -211,6 +211,7 @@ fn execute_light(cmd: RunCmd, can_restart: bool, logger: Arc) -> db_compaction: compaction, db_wal: cmd.wal, verify_full: true, + check_seal: cmd.check_seal, }; config.queue.max_mem_use = cmd.cache_config.queue() as usize * 1024 * 1024; diff --git a/rpc/src/v1/helpers/dispatch.rs b/rpc/src/v1/helpers/dispatch.rs index ff950d346..ddd238fd8 100644 --- a/rpc/src/v1/helpers/dispatch.rs +++ b/rpc/src/v1/helpers/dispatch.rs @@ -177,7 +177,7 @@ pub fn fetch_gas_price_corpus( ) -> BoxFuture, Error> { const GAS_PRICE_SAMPLE_SIZE: usize = 100; - if let Some(cached) = cache.lock().gas_price_corpus() { + if let Some(cached) = { cache.lock().gas_price_corpus() } { return future::ok(cached).boxed() } diff --git a/rpc/src/v1/helpers/light_fetch.rs b/rpc/src/v1/helpers/light_fetch.rs index a0316f9cb..4b5e0f815 100644 --- a/rpc/src/v1/helpers/light_fetch.rs +++ b/rpc/src/v1/helpers/light_fetch.rs @@ -26,6 +26,7 @@ use ethcore::filter::Filter as EthcoreFilter; use ethcore::transaction::{Action, Transaction as EthTransaction}; use futures::{future, Future, BoxFuture}; +use futures::future::Either; use jsonrpc_core::Error; use jsonrpc_macros::Trailing; @@ -159,7 +160,9 @@ impl LightFetch { /// helper for getting proved execution. pub fn proved_execution(&self, req: CallRequest, num: Trailing) -> BoxFuture { - const DEFAULT_GAS_PRICE: U256 = U256([0, 0, 0, 21_000_000]); + const DEFAULT_GAS_PRICE: u64 = 21_000; + // starting gas when gas not provided. + const START_GAS: u64 = 50_000; let (sync, on_demand, client) = (self.sync.clone(), self.on_demand.clone(), self.client.clone()); let req: CallRequestHelper = req.into(); @@ -167,21 +170,21 @@ impl LightFetch { let from = req.from.unwrap_or(Address::zero()); let nonce_fut = match req.nonce { - Some(nonce) => future::ok(Some(nonce)).boxed(), - None => self.account(from, id).map(|acc| acc.map(|a| a.nonce)).boxed(), + Some(nonce) => Either::A(future::ok(Some(nonce))), + None => Either::B(self.account(from, id).map(|acc| acc.map(|a| a.nonce))), }; let gas_price_fut = match req.gas_price { - Some(price) => future::ok(price).boxed(), - None => dispatch::fetch_gas_price_corpus( + Some(price) => Either::A(future::ok(price)), + None => Either::B(dispatch::fetch_gas_price_corpus( self.sync.clone(), self.client.clone(), self.on_demand.clone(), self.cache.clone(), ).map(|corp| match corp.median() { Some(median) => *median, - None => DEFAULT_GAS_PRICE, - }).boxed() + None => DEFAULT_GAS_PRICE.into(), + })) }; // if nonce resolves, this should too since it'll be in the LRU-cache. @@ -190,22 +193,29 @@ impl LightFetch { // fetch missing transaction fields from the network. nonce_fut.join(gas_price_fut).and_then(move |(nonce, gas_price)| { let action = req.to.map_or(Action::Create, Action::Call); - let gas = req.gas.unwrap_or(U256::from(10_000_000)); // better gas amount? let value = req.value.unwrap_or_else(U256::zero); let data = req.data.unwrap_or_default(); - future::done(match nonce { - Some(n) => Ok(EthTransaction { + future::done(match (nonce, req.gas) { + (Some(n), Some(gas)) => Ok((true, EthTransaction { nonce: n, action: action, gas: gas, gas_price: gas_price, value: value, data: data, - }.fake_sign(from)), - None => Err(errors::unknown_block()), + })), + (Some(n), None) => Ok((false, EthTransaction { + nonce: n, + action: action, + gas: START_GAS.into(), + gas_price: gas_price, + value: value, + data: data, + })), + (None, _) => Err(errors::unknown_block()), }) - }).join(header_fut).and_then(move |(tx, hdr)| { + }).join(header_fut).and_then(move |((gas_known, tx), hdr)| { // then request proved execution. // TODO: get last-hashes from network. let env_info = match client.env_info(id) { @@ -213,24 +223,15 @@ impl LightFetch { _ => return future::err(errors::unknown_block()).boxed(), }; - let request = request::TransactionProof { + execute_tx(gas_known, ExecuteParams { + from: from, tx: tx, - header: hdr.into(), + hdr: hdr, env_info: env_info, engine: client.engine().clone(), - }; - - let proved_future = sync.with_context(move |ctx| { - on_demand - .request(ctx, request) - .expect("no back-references; therefore all back-refs valid; qed") - .map_err(errors::on_demand_cancel).boxed() - }); - - match proved_future { - Some(fut) => fut.boxed(), - None => future::err(errors::network_disabled()).boxed(), - } + on_demand: on_demand, + sync: sync, + }) }).boxed() } @@ -320,3 +321,66 @@ impl LightFetch { } } } + +#[derive(Clone)] +struct ExecuteParams { + from: Address, + tx: EthTransaction, + hdr: encoded::Header, + env_info: ::evm::env_info::EnvInfo, + engine: Arc<::ethcore::engines::Engine>, + on_demand: Arc, + sync: Arc, +} + +// has a peer execute the transaction with given params. If `gas_known` is false, +// this will double the gas on each `OutOfGas` error. +fn execute_tx(gas_known: bool, params: ExecuteParams) -> BoxFuture { + if !gas_known { + future::loop_fn(params, |mut params| { + execute_tx(true, params.clone()).and_then(move |res| { + match res { + Ok(executed) => { + // TODO: how to distinguish between actual OOG and + // exception? + if executed.exception.is_some() { + let old_gas = params.tx.gas; + params.tx.gas = params.tx.gas * 2.into(); + if params.tx.gas > params.hdr.gas_limit() { + params.tx.gas = old_gas; + } else { + return Ok(future::Loop::Continue(params)) + } + } + + Ok(future::Loop::Break(Ok(executed))) + } + failed => Ok(future::Loop::Break(failed)), + } + }) + }).boxed() + } else { + trace!(target: "light_fetch", "Placing execution request for {} gas in on_demand", + params.tx.gas); + + let request = request::TransactionProof { + tx: params.tx.fake_sign(params.from), + header: params.hdr.into(), + env_info: params.env_info, + engine: params.engine, + }; + + let on_demand = params.on_demand; + let proved_future = params.sync.with_context(move |ctx| { + on_demand + .request(ctx, request) + .expect("no back-references; therefore all back-refs valid; qed") + .map_err(errors::on_demand_cancel) + }); + + match proved_future { + Some(fut) => fut.boxed(), + None => future::err(errors::network_disabled()).boxed(), + } + } +}