From 528dbf909a277d0e6d834c4e1b3b0fe3435fa0df Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Thu, 6 Apr 2017 17:44:31 +0200 Subject: [PATCH] fix RPC fallout --- parity/light_helpers/queue_cull.rs | 6 +- parity/run.rs | 3 +- rpc/src/v1/helpers/dispatch.rs | 3 +- rpc/src/v1/impls/light/eth.rs | 139 +++++++++++------------------ 4 files changed, 62 insertions(+), 89 deletions(-) diff --git a/parity/light_helpers/queue_cull.rs b/parity/light_helpers/queue_cull.rs index 548ee33cd..235090052 100644 --- a/parity/light_helpers/queue_cull.rs +++ b/parity/light_helpers/queue_cull.rs @@ -67,6 +67,7 @@ impl IoHandler for QueueCull { let (sync, on_demand, txq) = (self.sync.clone(), self.on_demand.clone(), self.txq.clone()); let best_header = self.client.best_block_header(); + let start_nonce = self.client.engine().account_start_nonce; info!(target: "cull", "Attempting to cull queued transactions from {} senders.", senders.len()); self.remote.spawn_with_timeout(move || { @@ -74,7 +75,10 @@ impl IoHandler for QueueCull { // fetch the nonce of each sender in the queue. let nonce_futures = senders.iter() .map(|&address| request::Account { header: best_header.clone(), address: address }) - .map(|request| on_demand.account(ctx, request).map(|acc| acc.nonce)) + .map(move |request| { + on_demand.account(ctx, request) + .map(move |maybe_acc| maybe_acc.map_or(start_nonce, |acc.nonce|)) + }) .zip(senders.iter()) .map(|(fut, &addr)| fut.map(move |nonce| (addr, nonce))); diff --git a/parity/run.rs b/parity/run.rs index a4fc6ad19..2292bf87d 100644 --- a/parity/run.rs +++ b/parity/run.rs @@ -221,8 +221,7 @@ fn execute_light(cmd: RunCmd, can_restart: bool, logger: Arc) -> let cache = Arc::new(::util::Mutex::new(cache)); // start on_demand service. - let account_start_nonce = service.client().engine().account_start_nonce(); - let on_demand = Arc::new(::light::on_demand::OnDemand::new(cache.clone(), account_start_nonce)); + let on_demand = Arc::new(::light::on_demand::OnDemand::new(cache.clone())); // set network path. net_conf.net_config_path = Some(db_dirs.network_path().to_string_lossy().into_owned()); diff --git a/rpc/src/v1/helpers/dispatch.rs b/rpc/src/v1/helpers/dispatch.rs index d58a211ed..5bce4b10c 100644 --- a/rpc/src/v1/helpers/dispatch.rs +++ b/rpc/src/v1/helpers/dispatch.rs @@ -261,6 +261,7 @@ impl LightDispatcher { } let best_header = self.client.best_block_header(); + let account_start_nonce = self.client.engine().account_start_nonce(); let nonce_future = self.sync.with_context(|ctx| self.on_demand.account(ctx, request::Account { header: best_header, address: addr, @@ -268,7 +269,7 @@ impl LightDispatcher { match nonce_future { Some(x) => - x.map(|acc| acc.nonce) + x.map(|acc| acc.map_or(account_start_nonce, |acc| acc.nonce)) .map_err(|_| errors::no_light_peers()) .boxed(), None => future::err(errors::network_disabled()).boxed() diff --git a/rpc/src/v1/impls/light/eth.rs b/rpc/src/v1/impls/light/eth.rs index e1014afe8..6a051cce0 100644 --- a/rpc/src/v1/impls/light/eth.rs +++ b/rpc/src/v1/impls/light/eth.rs @@ -114,16 +114,16 @@ impl EthClient { } /// Get a block header from the on demand service or client, or error. - fn header(&self, id: BlockId) -> BoxFuture, Error> { + fn header(&self, id: BlockId) -> BoxFuture { if let Some(h) = self.client.block_header(id) { - return future::ok(Some(h)).boxed() + return future::ok(h).boxed() } let maybe_future = match id { BlockId::Number(n) => { let cht_root = cht::block_to_cht_number(n).and_then(|cn| self.client.cht_root(cn as usize)); match cht_root { - None => return future::ok(None).boxed(), + None => return future::err(errors::unknown_block()).boxed(), Some(root) => { let req = request::HeaderProof::new(n, root) .expect("only fails for 0; client always stores genesis; client already queried; qed"); @@ -139,7 +139,7 @@ impl EthClient { Some(fut) => fut.map_err(err_premature_cancel).boxed(), None => future::err(errors::network_disabled()).boxed(), } - }).map(Some).boxed() + }).boxed() }) } } @@ -148,7 +148,7 @@ impl EthClient { self.sync.with_context(|ctx| self.on_demand.header_by_hash(ctx, request::HeaderByHash(h)) .then(|res| future::done(match res { - Ok(h) => Ok(Some(h)), + Ok(h) => Ok(h), Err(e) => Err(err_premature_cancel(e)), })) .boxed() @@ -164,22 +164,18 @@ impl EthClient { } // helper for getting account info at a given block. + // `None` indicates the account doesn't exist at the given block. fn account(&self, address: Address, id: BlockId) -> BoxFuture, Error> { let (sync, on_demand) = (self.sync.clone(), self.on_demand.clone()); self.header(id).and_then(move |header| { - let header = match header { - None => return future::ok(None).boxed(), - Some(hdr) => hdr, - }; - let maybe_fut = sync.with_context(|ctx| on_demand.account(ctx, request::Account { header: header, address: address, })); match maybe_fut { - Some(fut) => fut.map(Some).map_err(err_premature_cancel).boxed(), + Some(fut) => fut.map_err(err_premature_cancel).boxed(), None => future::err(errors::network_disabled()).boxed(), } }).boxed() @@ -236,10 +232,11 @@ impl EthClient { }).join(header_fut).and_then(move |(tx, hdr)| { // then request proved execution. // TODO: get last-hashes from network. - let (env_info, hdr) = match (client.env_info(id), hdr) { - (Some(env_info), Some(hdr)) => (env_info, hdr), + let env_info = match client.env_info(id) { + Some(env_info) => env_info, _ => return future::err(errors::unknown_block()).boxed(), }; + let request = request::TransactionProof { tx: tx, header: hdr, @@ -258,24 +255,20 @@ impl EthClient { }).boxed() } - fn block(&self, id: BlockId) -> BoxFuture, Error> { + // get a block itself. fails on unknown block ID. + fn block(&self, id: BlockId) -> BoxFuture { let (on_demand, sync) = (self.on_demand.clone(), self.sync.clone()); - self.header(id).and_then(move |hdr| { - let req = match hdr { - Some(hdr) => request::Body::new(hdr), - None => return future::ok(None).boxed(), - }; - + self.header(id).map(request::Body::new).and_then(move |req| { match sync.with_context(move |ctx| on_demand.block(ctx, req)) { - Some(fut) => fut.map_err(err_premature_cancel).map(Some).boxed(), + Some(fut) => fut.map_err(err_premature_cancel).boxed(), None => future::err(errors::network_disabled()).boxed(), } }).boxed() } - // get a "rich" block structure - fn rich_block(&self, id: BlockId, include_txs: bool) -> BoxFuture, Error> { + // get a "rich" block structure. Fails on unknown block. + fn rich_block(&self, id: BlockId, include_txs: bool) -> BoxFuture { let (on_demand, sync) = (self.on_demand.clone(), self.sync.clone()); let (client, engine) = (self.client.clone(), self.client.engine().clone()); @@ -314,49 +307,45 @@ impl EthClient { }; // get the block itself. - self.block(id).and_then(move |block| match block { - None => return future::ok(None).boxed(), - Some(block) => { - // then fetch the total difficulty (this is much easier after getting the block). - match client.score(id) { - Some(score) => future::ok(fill_rich(block, Some(score))).map(Some).boxed(), - None => { - // make a CHT request to fetch the chain score. - let req = cht::block_to_cht_number(block.number()) - .and_then(|num| client.cht_root(num as usize)) - .and_then(|root| request::HeaderProof::new(block.number(), root)); + self.block(id).and_then(move |block| { + // then fetch the total difficulty (this is much easier after getting the block). + match client.score(id) { + Some(score) => future::ok(fill_rich(block, Some(score))).boxed(), + None => { + // make a CHT request to fetch the chain score. + let req = cht::block_to_cht_number(block.number()) + .and_then(|num| client.cht_root(num as usize)) + .and_then(|root| request::HeaderProof::new(block.number(), root)); + let req = match req { + Some(req) => req, + None => { + // somehow the genesis block slipped past other checks. + // return it now. + let score = client.block_header(BlockId::Number(0)) + .expect("genesis always stored; qed") + .difficulty(); - let req = match req { - Some(req) => req, - None => { - // somehow the genesis block slipped past other checks. - // return it now. - let score = client.block_header(BlockId::Number(0)) - .expect("genesis always stored; qed") - .difficulty(); - - return future::ok(fill_rich(block, Some(score))).map(Some).boxed() - } - }; - - // three possible outcomes: - // - network is down. - // - we get a score, but our hash is non-canonical. - // - we get ascore, and our hash is canonical. - let maybe_fut = sync.with_context(move |ctx| on_demand.hash_and_score_by_number(ctx, req)); - match maybe_fut { - Some(fut) => fut.map(move |(hash, score)| { - let score = if hash == block.hash() { - Some(score) - } else { - None - }; - - Some(fill_rich(block, score)) - }).map_err(err_premature_cancel).boxed(), - None => return future::err(errors::network_disabled()).boxed(), + return future::ok(fill_rich(block, Some(score))).boxed() } + }; + + // three possible outcomes: + // - network is down. + // - we get a score, but our hash is non-canonical. + // - we get ascore, and our hash is canonical. + let maybe_fut = sync.with_context(move |ctx| on_demand.hash_and_score_by_number(ctx, req)); + match maybe_fut { + Some(fut) => fut.map(move |(hash, score)| { + let score = if hash == block.hash() { + Some(score) + } else { + None + }; + + fill_rich(block, score) + }).map_err(err_premature_cancel).boxed(), + None => return future::err(errors::network_disabled()).boxed(), } } } @@ -435,11 +424,11 @@ impl Eth for EthClient { } fn block_by_hash(&self, hash: RpcH256, include_txs: bool) -> BoxFuture, Error> { - self.rich_block(BlockId::Hash(hash.into()), include_txs) + self.rich_block(BlockId::Hash(hash.into()), include_txs).map(Some).boxed() } fn block_by_number(&self, num: BlockNumber, include_txs: bool) -> BoxFuture, Error> { - self.rich_block(num.into(), include_txs) + self.rich_block(num.into(), include_txs).map(Some).boxed() } fn transaction_count(&self, address: RpcH160, num: Trailing) -> BoxFuture { @@ -451,11 +440,6 @@ impl Eth for EthClient { let (sync, on_demand) = (self.sync.clone(), self.on_demand.clone()); self.header(BlockId::Hash(hash.into())).and_then(move |hdr| { - let hdr = match hdr { - None => return future::ok(None).boxed(), - Some(hdr) => hdr, - }; - if hdr.transactions_root() == SHA3_NULL_RLP { future::ok(Some(U256::from(0).into())).boxed() } else { @@ -471,11 +455,6 @@ impl Eth for EthClient { let (sync, on_demand) = (self.sync.clone(), self.on_demand.clone()); self.header(num.into()).and_then(move |hdr| { - let hdr = match hdr { - None => return future::ok(None).boxed(), - Some(hdr) => hdr, - }; - if hdr.transactions_root() == SHA3_NULL_RLP { future::ok(Some(U256::from(0).into())).boxed() } else { @@ -491,11 +470,6 @@ impl Eth for EthClient { let (sync, on_demand) = (self.sync.clone(), self.on_demand.clone()); self.header(BlockId::Hash(hash.into())).and_then(move |hdr| { - let hdr = match hdr { - None => return future::ok(None).boxed(), - Some(hdr) => hdr, - }; - if hdr.uncles_hash() == SHA3_EMPTY_LIST_RLP { future::ok(Some(U256::from(0).into())).boxed() } else { @@ -511,11 +485,6 @@ impl Eth for EthClient { let (sync, on_demand) = (self.sync.clone(), self.on_demand.clone()); self.header(num.into()).and_then(move |hdr| { - let hdr = match hdr { - None => return future::ok(None).boxed(), - Some(hdr) => hdr, - }; - if hdr.uncles_hash() == SHA3_EMPTY_LIST_RLP { future::ok(Some(U256::from(0).into())).boxed() } else {