fix RPC fallout

This commit is contained in:
Robert Habermeier 2017-04-06 17:44:31 +02:00
parent d19232a848
commit 528dbf909a
4 changed files with 62 additions and 89 deletions

View File

@ -67,6 +67,7 @@ impl IoHandler<ClientIoMessage> for QueueCull {
let (sync, on_demand, txq) = (self.sync.clone(), self.on_demand.clone(), self.txq.clone());
let best_header = self.client.best_block_header();
let start_nonce = self.client.engine().account_start_nonce;
info!(target: "cull", "Attempting to cull queued transactions from {} senders.", senders.len());
self.remote.spawn_with_timeout(move || {
@ -74,7 +75,10 @@ impl IoHandler<ClientIoMessage> for QueueCull {
// fetch the nonce of each sender in the queue.
let nonce_futures = senders.iter()
.map(|&address| request::Account { header: best_header.clone(), address: address })
.map(|request| on_demand.account(ctx, request).map(|acc| acc.nonce))
.map(move |request| {
on_demand.account(ctx, request)
.map(move |maybe_acc| maybe_acc.map_or(start_nonce, |acc.nonce|))
})
.zip(senders.iter())
.map(|(fut, &addr)| fut.map(move |nonce| (addr, nonce)));

View File

@ -221,8 +221,7 @@ fn execute_light(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) ->
let cache = Arc::new(::util::Mutex::new(cache));
// start on_demand service.
let account_start_nonce = service.client().engine().account_start_nonce();
let on_demand = Arc::new(::light::on_demand::OnDemand::new(cache.clone(), account_start_nonce));
let on_demand = Arc::new(::light::on_demand::OnDemand::new(cache.clone()));
// set network path.
net_conf.net_config_path = Some(db_dirs.network_path().to_string_lossy().into_owned());

View File

@ -261,6 +261,7 @@ impl LightDispatcher {
}
let best_header = self.client.best_block_header();
let account_start_nonce = self.client.engine().account_start_nonce();
let nonce_future = self.sync.with_context(|ctx| self.on_demand.account(ctx, request::Account {
header: best_header,
address: addr,
@ -268,7 +269,7 @@ impl LightDispatcher {
match nonce_future {
Some(x) =>
x.map(|acc| acc.nonce)
x.map(|acc| acc.map_or(account_start_nonce, |acc| acc.nonce))
.map_err(|_| errors::no_light_peers())
.boxed(),
None => future::err(errors::network_disabled()).boxed()

View File

@ -114,16 +114,16 @@ impl EthClient {
}
/// Get a block header from the on demand service or client, or error.
fn header(&self, id: BlockId) -> BoxFuture<Option<encoded::Header>, Error> {
fn header(&self, id: BlockId) -> BoxFuture<encoded::Header, Error> {
if let Some(h) = self.client.block_header(id) {
return future::ok(Some(h)).boxed()
return future::ok(h).boxed()
}
let maybe_future = match id {
BlockId::Number(n) => {
let cht_root = cht::block_to_cht_number(n).and_then(|cn| self.client.cht_root(cn as usize));
match cht_root {
None => return future::ok(None).boxed(),
None => return future::err(errors::unknown_block()).boxed(),
Some(root) => {
let req = request::HeaderProof::new(n, root)
.expect("only fails for 0; client always stores genesis; client already queried; qed");
@ -139,7 +139,7 @@ impl EthClient {
Some(fut) => fut.map_err(err_premature_cancel).boxed(),
None => future::err(errors::network_disabled()).boxed(),
}
}).map(Some).boxed()
}).boxed()
})
}
}
@ -148,7 +148,7 @@ impl EthClient {
self.sync.with_context(|ctx|
self.on_demand.header_by_hash(ctx, request::HeaderByHash(h))
.then(|res| future::done(match res {
Ok(h) => Ok(Some(h)),
Ok(h) => Ok(h),
Err(e) => Err(err_premature_cancel(e)),
}))
.boxed()
@ -164,22 +164,18 @@ impl EthClient {
}
// helper for getting account info at a given block.
// `None` indicates the account doesn't exist at the given block.
fn account(&self, address: Address, id: BlockId) -> BoxFuture<Option<BasicAccount>, Error> {
let (sync, on_demand) = (self.sync.clone(), self.on_demand.clone());
self.header(id).and_then(move |header| {
let header = match header {
None => return future::ok(None).boxed(),
Some(hdr) => hdr,
};
let maybe_fut = sync.with_context(|ctx| on_demand.account(ctx, request::Account {
header: header,
address: address,
}));
match maybe_fut {
Some(fut) => fut.map(Some).map_err(err_premature_cancel).boxed(),
Some(fut) => fut.map_err(err_premature_cancel).boxed(),
None => future::err(errors::network_disabled()).boxed(),
}
}).boxed()
@ -236,10 +232,11 @@ impl EthClient {
}).join(header_fut).and_then(move |(tx, hdr)| {
// then request proved execution.
// TODO: get last-hashes from network.
let (env_info, hdr) = match (client.env_info(id), hdr) {
(Some(env_info), Some(hdr)) => (env_info, hdr),
let env_info = match client.env_info(id) {
Some(env_info) => env_info,
_ => return future::err(errors::unknown_block()).boxed(),
};
let request = request::TransactionProof {
tx: tx,
header: hdr,
@ -258,24 +255,20 @@ impl EthClient {
}).boxed()
}
fn block(&self, id: BlockId) -> BoxFuture<Option<encoded::Block>, Error> {
// get a block itself. fails on unknown block ID.
fn block(&self, id: BlockId) -> BoxFuture<encoded::Block, Error> {
let (on_demand, sync) = (self.on_demand.clone(), self.sync.clone());
self.header(id).and_then(move |hdr| {
let req = match hdr {
Some(hdr) => request::Body::new(hdr),
None => return future::ok(None).boxed(),
};
self.header(id).map(request::Body::new).and_then(move |req| {
match sync.with_context(move |ctx| on_demand.block(ctx, req)) {
Some(fut) => fut.map_err(err_premature_cancel).map(Some).boxed(),
Some(fut) => fut.map_err(err_premature_cancel).boxed(),
None => future::err(errors::network_disabled()).boxed(),
}
}).boxed()
}
// get a "rich" block structure
fn rich_block(&self, id: BlockId, include_txs: bool) -> BoxFuture<Option<RichBlock>, Error> {
// get a "rich" block structure. Fails on unknown block.
fn rich_block(&self, id: BlockId, include_txs: bool) -> BoxFuture<RichBlock, Error> {
let (on_demand, sync) = (self.on_demand.clone(), self.sync.clone());
let (client, engine) = (self.client.clone(), self.client.engine().clone());
@ -314,49 +307,45 @@ impl EthClient {
};
// get the block itself.
self.block(id).and_then(move |block| match block {
None => return future::ok(None).boxed(),
Some(block) => {
// then fetch the total difficulty (this is much easier after getting the block).
match client.score(id) {
Some(score) => future::ok(fill_rich(block, Some(score))).map(Some).boxed(),
None => {
// make a CHT request to fetch the chain score.
let req = cht::block_to_cht_number(block.number())
.and_then(|num| client.cht_root(num as usize))
.and_then(|root| request::HeaderProof::new(block.number(), root));
self.block(id).and_then(move |block| {
// then fetch the total difficulty (this is much easier after getting the block).
match client.score(id) {
Some(score) => future::ok(fill_rich(block, Some(score))).boxed(),
None => {
// make a CHT request to fetch the chain score.
let req = cht::block_to_cht_number(block.number())
.and_then(|num| client.cht_root(num as usize))
.and_then(|root| request::HeaderProof::new(block.number(), root));
let req = match req {
Some(req) => req,
None => {
// somehow the genesis block slipped past other checks.
// return it now.
let score = client.block_header(BlockId::Number(0))
.expect("genesis always stored; qed")
.difficulty();
let req = match req {
Some(req) => req,
None => {
// somehow the genesis block slipped past other checks.
// return it now.
let score = client.block_header(BlockId::Number(0))
.expect("genesis always stored; qed")
.difficulty();
return future::ok(fill_rich(block, Some(score))).map(Some).boxed()
}
};
// three possible outcomes:
// - network is down.
// - we get a score, but our hash is non-canonical.
// - we get ascore, and our hash is canonical.
let maybe_fut = sync.with_context(move |ctx| on_demand.hash_and_score_by_number(ctx, req));
match maybe_fut {
Some(fut) => fut.map(move |(hash, score)| {
let score = if hash == block.hash() {
Some(score)
} else {
None
};
Some(fill_rich(block, score))
}).map_err(err_premature_cancel).boxed(),
None => return future::err(errors::network_disabled()).boxed(),
return future::ok(fill_rich(block, Some(score))).boxed()
}
};
// three possible outcomes:
// - network is down.
// - we get a score, but our hash is non-canonical.
// - we get ascore, and our hash is canonical.
let maybe_fut = sync.with_context(move |ctx| on_demand.hash_and_score_by_number(ctx, req));
match maybe_fut {
Some(fut) => fut.map(move |(hash, score)| {
let score = if hash == block.hash() {
Some(score)
} else {
None
};
fill_rich(block, score)
}).map_err(err_premature_cancel).boxed(),
None => return future::err(errors::network_disabled()).boxed(),
}
}
}
@ -435,11 +424,11 @@ impl Eth for EthClient {
}
fn block_by_hash(&self, hash: RpcH256, include_txs: bool) -> BoxFuture<Option<RichBlock>, Error> {
self.rich_block(BlockId::Hash(hash.into()), include_txs)
self.rich_block(BlockId::Hash(hash.into()), include_txs).map(Some).boxed()
}
fn block_by_number(&self, num: BlockNumber, include_txs: bool) -> BoxFuture<Option<RichBlock>, Error> {
self.rich_block(num.into(), include_txs)
self.rich_block(num.into(), include_txs).map(Some).boxed()
}
fn transaction_count(&self, address: RpcH160, num: Trailing<BlockNumber>) -> BoxFuture<RpcU256, Error> {
@ -451,11 +440,6 @@ impl Eth for EthClient {
let (sync, on_demand) = (self.sync.clone(), self.on_demand.clone());
self.header(BlockId::Hash(hash.into())).and_then(move |hdr| {
let hdr = match hdr {
None => return future::ok(None).boxed(),
Some(hdr) => hdr,
};
if hdr.transactions_root() == SHA3_NULL_RLP {
future::ok(Some(U256::from(0).into())).boxed()
} else {
@ -471,11 +455,6 @@ impl Eth for EthClient {
let (sync, on_demand) = (self.sync.clone(), self.on_demand.clone());
self.header(num.into()).and_then(move |hdr| {
let hdr = match hdr {
None => return future::ok(None).boxed(),
Some(hdr) => hdr,
};
if hdr.transactions_root() == SHA3_NULL_RLP {
future::ok(Some(U256::from(0).into())).boxed()
} else {
@ -491,11 +470,6 @@ impl Eth for EthClient {
let (sync, on_demand) = (self.sync.clone(), self.on_demand.clone());
self.header(BlockId::Hash(hash.into())).and_then(move |hdr| {
let hdr = match hdr {
None => return future::ok(None).boxed(),
Some(hdr) => hdr,
};
if hdr.uncles_hash() == SHA3_EMPTY_LIST_RLP {
future::ok(Some(U256::from(0).into())).boxed()
} else {
@ -511,11 +485,6 @@ impl Eth for EthClient {
let (sync, on_demand) = (self.sync.clone(), self.on_demand.clone());
self.header(num.into()).and_then(move |hdr| {
let hdr = match hdr {
None => return future::ok(None).boxed(),
Some(hdr) => hdr,
};
if hdr.uncles_hash() == SHA3_EMPTY_LIST_RLP {
future::ok(Some(U256::from(0).into())).boxed()
} else {