Filters and block RPCs for the light client (#5320)

* block_hash method for LightChainClient

* abstraction and futures-based eth_filter

* log fetching for light client

* add eth-filter delegate

* eth_block fetching RPCs

* return default accounts from on_demand

* fix early exit

* BlockNumber -> BlockId

* early exit for no known block number.
This commit is contained in:
Robert Habermeier 2017-04-12 12:07:54 +02:00 committed by Gav Wood
parent e84d03f31d
commit daf1495c4e
13 changed files with 442 additions and 108 deletions

View File

@ -361,6 +361,22 @@ impl HeaderChain {
}
}
/// Get a block's hash by ID. In the case of query by number, only canonical results
/// will be returned.
pub fn block_hash(&self, id: BlockId) -> Option<H256> {
match id {
BlockId::Earliest => Some(self.genesis_hash()),
BlockId::Hash(hash) => Some(hash),
BlockId::Number(num) => {
if self.best_block.read().number < num { return None }
self.candidates.read().get(&num).map(|entry| entry.canonical_hash)
}
BlockId::Latest | BlockId::Pending => {
Some(self.best_block.read().hash)
}
}
}
/// Get a block header. In the case of query by number, only canonical blocks
/// will be returned.
pub fn block_header(&self, id: BlockId) -> Option<encoded::Header> {
@ -414,6 +430,28 @@ impl HeaderChain {
}
}
/// Get a block's chain score.
/// Returns nothing for non-canonical blocks.
pub fn score(&self, id: BlockId) -> Option<U256> {
let genesis_hash = self.genesis_hash();
match id {
BlockId::Earliest | BlockId::Number(0) => Some(self.genesis_header.difficulty()),
BlockId::Hash(hash) if hash == genesis_hash => Some(self.genesis_header.difficulty()),
BlockId::Hash(hash) => match self.block_header(BlockId::Hash(hash)) {
Some(header) => self.candidates.read().get(&header.number())
.and_then(|era| era.candidates.iter().find(|e| e.hash == hash))
.map(|c| c.total_difficulty),
None => None,
},
BlockId::Number(num) => {
let candidates = self.candidates.read();
if self.best_block.read().number < num { return None }
candidates.get(&num).map(|era| era.candidates[0].total_difficulty)
}
BlockId::Latest | BlockId::Pending => Some(self.best_block.read().total_difficulty)
}
}
/// Get the best block's header.
pub fn best_header(&self) -> encoded::Header {
self.block_header(BlockId::Latest).expect("Header for best block always stored; qed")

View File

@ -31,7 +31,7 @@ use ethcore::service::ClientIoMessage;
use ethcore::encoded;
use io::IoChannel;
use util::{H256, Mutex, RwLock};
use util::{H256, U256, Mutex, RwLock};
use util::kvdb::{KeyValueDB, CompactionProfile};
use self::header_chain::{AncestryIter, HeaderChain};
@ -67,12 +67,18 @@ pub trait LightChainClient: Send + Sync {
/// parent queued prior.
fn queue_header(&self, header: Header) -> Result<H256, BlockImportError>;
/// Attempt to get a block hash by block id.
fn block_hash(&self, id: BlockId) -> Option<H256>;
/// Attempt to get block header by block id.
fn block_header(&self, id: BlockId) -> Option<encoded::Header>;
/// Get the best block header.
fn best_block_header(&self) -> encoded::Header;
/// Get a block's chain score by ID.
fn score(&self, id: BlockId) -> Option<U256>;
/// Get an iterator over a block and its ancestry.
fn ancestry_iter<'a>(&'a self, start: BlockId) -> Box<Iterator<Item=encoded::Header> + 'a>;
@ -183,6 +189,11 @@ impl Client {
self.queue.queue_info()
}
/// Attempt to get a block hash by block id.
pub fn block_hash(&self, id: BlockId) -> Option<H256> {
self.chain.block_hash(id)
}
/// Get a block header by Id.
pub fn block_header(&self, id: BlockId) -> Option<encoded::Header> {
self.chain.block_header(id)
@ -193,6 +204,11 @@ impl Client {
self.chain.best_header()
}
/// Get a block's chain score.
pub fn score(&self, id: BlockId) -> Option<U256> {
self.chain.score(id)
}
/// Get an iterator over a block and its ancestry.
pub fn ancestry_iter(&self, start: BlockId) -> AncestryIter {
self.chain.ancestry_iter(start)
@ -310,6 +326,10 @@ impl LightChainClient for Client {
self.import_header(header)
}
fn block_hash(&self, id: BlockId) -> Option<H256> {
Client::block_hash(self, id)
}
fn block_header(&self, id: BlockId) -> Option<encoded::Header> {
Client::block_header(self, id)
}
@ -318,6 +338,10 @@ impl LightChainClient for Client {
Client::best_block_header(self)
}
fn score(&self, id: BlockId) -> Option<U256> {
Client::score(self, id)
}
fn ancestry_iter<'a>(&'a self, start: BlockId) -> Box<Iterator<Item=encoded::Header> + 'a> {
Box::new(Client::ancestry_iter(self, start))
}

View File

@ -35,7 +35,7 @@ use futures::sync::oneshot::{self, Sender, Receiver};
use network::PeerId;
use rlp::RlpStream;
use util::{Bytes, RwLock, Mutex, U256, H256};
use util::sha3::{SHA3_NULL_RLP, SHA3_EMPTY_LIST_RLP};
use util::sha3::{SHA3_NULL_RLP, SHA3_EMPTY, SHA3_EMPTY_LIST_RLP};
use net::{self, Handler, Status, Capabilities, Announcement, EventContext, BasicContext, ReqId};
use cache::Cache;
@ -83,7 +83,7 @@ enum Pending {
HeaderByHash(request::HeaderByHash, Sender<encoded::Header>),
Block(request::Body, Sender<encoded::Block>),
BlockReceipts(request::BlockReceipts, Sender<Vec<Receipt>>),
Account(request::Account, Sender<Option<BasicAccount>>),
Account(request::Account, Sender<BasicAccount>),
Code(request::Code, Sender<Bytes>),
TxProof(request::TransactionProof, Sender<Result<Executed, ExecutionError>>),
}
@ -136,18 +136,20 @@ pub struct OnDemand {
pending_requests: RwLock<HashMap<ReqId, Pending>>,
cache: Arc<Mutex<Cache>>,
orphaned_requests: RwLock<Vec<Pending>>,
start_nonce: U256,
}
const RECEIVER_IN_SCOPE: &'static str = "Receiver is still in scope, so it's not dropped; qed";
impl OnDemand {
/// Create a new `OnDemand` service with the given cache.
pub fn new(cache: Arc<Mutex<Cache>>) -> Self {
pub fn new(cache: Arc<Mutex<Cache>>, account_start_nonce: U256) -> Self {
OnDemand {
peers: RwLock::new(HashMap::new()),
pending_requests: RwLock::new(HashMap::new()),
cache: cache,
orphaned_requests: RwLock::new(Vec::new()),
start_nonce: account_start_nonce,
}
}
@ -268,7 +270,7 @@ impl OnDemand {
/// Request an account by address and block header -- which gives a hash to query and a state root
/// to verify against.
pub fn account(&self, ctx: &BasicContext, req: request::Account) -> Receiver<Option<BasicAccount>> {
pub fn account(&self, ctx: &BasicContext, req: request::Account) -> Receiver<BasicAccount> {
let (sender, receiver) = oneshot::channel();
self.dispatch(ctx, Pending::Account(req, sender));
receiver
@ -279,7 +281,7 @@ impl OnDemand {
let (sender, receiver) = oneshot::channel();
// fast path for no code.
if req.code_hash == ::util::sha3::SHA3_EMPTY {
if req.code_hash == SHA3_EMPTY {
sender.send(Vec::new()).expect(RECEIVER_IN_SCOPE)
} else {
self.dispatch(ctx, Pending::Code(req, sender));
@ -497,10 +499,19 @@ impl Handler for OnDemand {
Pending::Account(req, sender) => {
if let NetworkResponse::Account(ref response) = *response {
match req.check_response(&response.proof) {
Ok(maybe_account) => {
Ok(account) => {
let account = account.unwrap_or_else(|| {
BasicAccount {
balance: 0.into(),
nonce: self.start_nonce,
code_hash: SHA3_EMPTY,
storage_root: SHA3_NULL_RLP
}
});
// TODO: validate against request outputs.
// needs engine + env info as part of request.
let _ = sender.send(maybe_account);
let _ = sender.send(account);
return
}
Err(e) => warn!(target: "on_demand", "Error handling response for state request: {:?}", e),
@ -572,7 +583,7 @@ mod tests {
#[test]
fn detects_hangup() {
let cache = Arc::new(Mutex::new(Cache::new(Default::default(), Duration::hours(6))));
let on_demand = OnDemand::new(cache);
let on_demand = OnDemand::new(cache, 0.into());
let result = on_demand.header_by_hash(&FakeContext, request::HeaderByHash(H256::default()));
assert!(on_demand.orphaned_requests.read().len() == 1);

View File

@ -199,6 +199,12 @@ impl Block {
/// Decode to a full block.
pub fn decode(&self) -> FullBlock { ::rlp::decode(&self.0) }
/// Decode the header.
pub fn decode_header(&self) -> FullHeader { self.rlp().val_at(0) }
/// Clone the encoded header.
pub fn header(&self) -> Header { Header(self.rlp().at(0).as_raw().to_vec()) }
/// Get the rlp of this block.
#[inline]
pub fn rlp(&self) -> Rlp {

View File

@ -67,7 +67,6 @@ impl IoHandler<ClientIoMessage> for QueueCull {
let (sync, on_demand, txq) = (self.sync.clone(), self.on_demand.clone(), self.txq.clone());
let best_header = self.client.best_block_header();
let start_nonce = self.client.engine().account_start_nonce();
info!(target: "cull", "Attempting to cull queued transactions from {} senders.", senders.len());
self.remote.spawn_with_timeout(move || {
@ -75,8 +74,7 @@ impl IoHandler<ClientIoMessage> for QueueCull {
// fetch the nonce of each sender in the queue.
let nonce_futures = senders.iter()
.map(|&address| request::Account { header: best_header.clone(), address: address })
.map(|request| on_demand.account(ctx, request))
.map(move |fut| fut.map(move |x| x.map(|acc| acc.nonce).unwrap_or(start_nonce)))
.map(|request| on_demand.account(ctx, request).map(|acc| acc.nonce))
.zip(senders.iter())
.map(|(fut, &addr)| fut.map(move |nonce| (addr, nonce)));

View File

@ -244,7 +244,7 @@ impl Dependencies for FullDependencies {
);
handler.extend_with(client.to_delegate());
let filter_client = EthFilterClient::new(&self.client, &self.miner);
let filter_client = EthFilterClient::new(self.client.clone(), self.miner.clone());
handler.extend_with(filter_client.to_delegate());
add_signing_methods!(EthSigning, handler, self);
@ -377,9 +377,8 @@ impl Dependencies for LightDependencies {
self.secret_store.clone(),
self.cache.clone(),
);
handler.extend_with(client.to_delegate());
// TODO: filters.
handler.extend_with(Eth::to_delegate(client.clone()));
handler.extend_with(EthFilter::to_delegate(client));
add_signing_methods!(EthSigning, handler, self);
},
Api::Personal => {

View File

@ -221,7 +221,8 @@ fn execute_light(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) ->
}
// start on_demand service.
let on_demand = Arc::new(::light::on_demand::OnDemand::new(cache.clone()));
let account_start_nonce = service.client().engine().account_start_nonce();
let on_demand = Arc::new(::light::on_demand::OnDemand::new(cache.clone(), account_start_nonce));
// set network path.
net_conf.net_config_path = Some(db_dirs.network_path().to_string_lossy().into_owned());

View File

@ -268,7 +268,7 @@ impl LightDispatcher {
match nonce_future {
Some(x) =>
x.map(|acc| acc.map_or_else(Default::default, |acc| acc.nonce))
x.map(|acc| acc.nonce)
.map_err(|_| errors::no_light_peers())
.boxed(),
None => future::err(errors::network_disabled()).boxed()

View File

@ -544,23 +544,23 @@ impl<C, SN: ?Sized, S: ?Sized, M, EM> Eth for EthClient<C, SN, S, M, EM> where
Err(errors::deprecated("Compilation functionality is deprecated.".to_string()))
}
fn logs(&self, filter: Filter) -> Result<Vec<Log>, Error> {
fn logs(&self, filter: Filter) -> BoxFuture<Vec<Log>, Error> {
let include_pending = filter.to_block == Some(BlockNumber::Pending);
let filter: EthcoreFilter = filter.into();
let mut logs = take_weak!(self.client).logs(filter.clone())
let mut logs = take_weakf!(self.client).logs(filter.clone())
.into_iter()
.map(From::from)
.collect::<Vec<Log>>();
if include_pending {
let best_block = take_weak!(self.client).chain_info().best_block_number;
let pending = pending_logs(&*take_weak!(self.miner), best_block, &filter);
let best_block = take_weakf!(self.client).chain_info().best_block_number;
let pending = pending_logs(&*take_weakf!(self.miner), best_block, &filter);
logs.extend(pending);
}
let logs = limit_logs(logs, filter.limit);
Ok(logs)
future::ok(logs).boxed()
}
fn work(&self, no_new_work_timeout: Trailing<u64>) -> Result<Work, Error> {

View File

@ -16,89 +16,133 @@
//! Eth Filter RPC implementation
use std::sync::{Arc, Weak};
use std::sync::Arc;
use std::collections::HashSet;
use jsonrpc_core::*;
use ethcore::miner::MinerService;
use ethcore::filter::Filter as EthcoreFilter;
use ethcore::client::{BlockChainClient, BlockId};
use util::Mutex;
use util::{H256, Mutex};
use futures::{future, Future, BoxFuture};
use v1::traits::EthFilter;
use v1::types::{BlockNumber, Index, Filter, FilterChanges, Log, H256 as RpcH256, U256 as RpcU256};
use v1::helpers::{PollFilter, PollManager, limit_logs};
use v1::impls::eth::pending_logs;
/// Eth filter rpc implementation.
/// Something which provides data that can be filtered over.
pub trait Filterable {
/// Current best block number.
fn best_block_number(&self) -> u64;
/// Get a block hash by block id.
fn block_hash(&self, id: BlockId) -> Option<RpcH256>;
/// pending transaction hashes at the given block.
fn pending_transactions_hashes(&self, block_number: u64) -> Vec<H256>;
/// Get logs that match the given filter.
fn logs(&self, filter: EthcoreFilter) -> BoxFuture<Vec<Log>, Error>;
/// Get logs from the pending block.
fn pending_logs(&self, block_number: u64, filter: &EthcoreFilter) -> Vec<Log>;
/// Get a reference to the poll manager.
fn polls(&self) -> &Mutex<PollManager<PollFilter>>;
}
/// Eth filter rpc implementation for a full node.
pub struct EthFilterClient<C, M> where
C: BlockChainClient,
M: MinerService {
client: Weak<C>,
miner: Weak<M>,
client: Arc<C>,
miner: Arc<M>,
polls: Mutex<PollManager<PollFilter>>,
}
impl<C, M> EthFilterClient<C, M> where
C: BlockChainClient,
M: MinerService {
impl<C, M> EthFilterClient<C, M> where C: BlockChainClient, M: MinerService {
/// Creates new Eth filter client.
pub fn new(client: &Arc<C>, miner: &Arc<M>) -> Self {
pub fn new(client: Arc<C>, miner: Arc<M>) -> Self {
EthFilterClient {
client: Arc::downgrade(client),
miner: Arc::downgrade(miner),
client: client,
miner: miner,
polls: Mutex::new(PollManager::new()),
}
}
}
impl<C, M> EthFilter for EthFilterClient<C, M>
where C: BlockChainClient + 'static, M: MinerService + 'static
{
impl<C, M> Filterable for EthFilterClient<C, M> where C: BlockChainClient, M: MinerService {
fn best_block_number(&self) -> u64 {
self.client.chain_info().best_block_number
}
fn block_hash(&self, id: BlockId) -> Option<RpcH256> {
self.client.block_hash(id).map(Into::into)
}
fn pending_transactions_hashes(&self, best: u64) -> Vec<H256> {
self.miner.pending_transactions_hashes(best)
}
fn logs(&self, filter: EthcoreFilter) -> BoxFuture<Vec<Log>, Error> {
future::ok(self.client.logs(filter).into_iter().map(Into::into).collect()).boxed()
}
fn pending_logs(&self, block_number: u64, filter: &EthcoreFilter) -> Vec<Log> {
pending_logs(&*self.miner, block_number, filter)
}
fn polls(&self) -> &Mutex<PollManager<PollFilter>> { &self.polls }
}
impl<T: Filterable + Send + Sync + 'static> EthFilter for T {
fn new_filter(&self, filter: Filter) -> Result<RpcU256, Error> {
let mut polls = self.polls.lock();
let block_number = take_weak!(self.client).chain_info().best_block_number;
let mut polls = self.polls().lock();
let block_number = self.best_block_number();
let id = polls.create_poll(PollFilter::Logs(block_number, Default::default(), filter));
Ok(id.into())
}
fn new_block_filter(&self) -> Result<RpcU256, Error> {
let mut polls = self.polls.lock();
let id = polls.create_poll(PollFilter::Block(take_weak!(self.client).chain_info().best_block_number));
let mut polls = self.polls().lock();
let id = polls.create_poll(PollFilter::Block(self.best_block_number()));
Ok(id.into())
}
fn new_pending_transaction_filter(&self) -> Result<RpcU256, Error> {
let mut polls = self.polls.lock();
let best_block = take_weak!(self.client).chain_info().best_block_number;
let pending_transactions = take_weak!(self.miner).pending_transactions_hashes(best_block);
let mut polls = self.polls().lock();
let best_block = self.best_block_number();
let pending_transactions = self.pending_transactions_hashes(best_block);
let id = polls.create_poll(PollFilter::PendingTransaction(pending_transactions));
Ok(id.into())
}
fn filter_changes(&self, index: Index) -> Result<FilterChanges, Error> {
let client = take_weak!(self.client);
let mut polls = self.polls.lock();
fn filter_changes(&self, index: Index) -> BoxFuture<FilterChanges, Error> {
let mut polls = self.polls().lock();
match polls.poll_mut(&index.value()) {
None => Ok(FilterChanges::Empty),
None => future::ok(FilterChanges::Empty).boxed(),
Some(filter) => match *filter {
PollFilter::Block(ref mut block_number) => {
// + 1, cause we want to return hashes including current block hash.
let current_number = client.chain_info().best_block_number + 1;
let current_number = self.best_block_number() + 1;
let hashes = (*block_number..current_number).into_iter()
.map(BlockId::Number)
.filter_map(|id| client.block_hash(id))
.map(Into::into)
.filter_map(|id| self.block_hash(id))
.collect::<Vec<RpcH256>>();
*block_number = current_number;
Ok(FilterChanges::Hashes(hashes))
future::ok(FilterChanges::Hashes(hashes)).boxed()
},
PollFilter::PendingTransaction(ref mut previous_hashes) => {
// get hashes of pending transactions
let best_block = take_weak!(self.client).chain_info().best_block_number;
let current_hashes = take_weak!(self.miner).pending_transactions_hashes(best_block);
let best_block = self.best_block_number();
let current_hashes = self.pending_transactions_hashes(best_block);
let new_hashes =
{
@ -117,11 +161,11 @@ impl<C, M> EthFilter for EthFilterClient<C, M>
*previous_hashes = current_hashes;
// return new hashes
Ok(FilterChanges::Hashes(new_hashes))
future::ok(FilterChanges::Hashes(new_hashes)).boxed()
},
PollFilter::Logs(ref mut block_number, ref mut previous_logs, ref filter) => {
// retrive the current block number
let current_number = client.chain_info().best_block_number;
let current_number = self.best_block_number();
// check if we need to check pending hashes
let include_pending = filter.to_block == Some(BlockNumber::Pending);
@ -131,16 +175,9 @@ impl<C, M> EthFilter for EthFilterClient<C, M>
filter.from_block = BlockId::Number(*block_number);
filter.to_block = BlockId::Latest;
// retrieve logs in range from_block..min(BlockId::Latest..to_block)
let mut logs = client.logs(filter.clone())
.into_iter()
.map(From::from)
.collect::<Vec<Log>>();
// additionally retrieve pending logs
if include_pending {
let best_block = take_weak!(self.client).chain_info().best_block_number;
let pending_logs = pending_logs(&*take_weak!(self.miner), best_block, &filter);
// retrieve pending logs
let pending = if include_pending {
let pending_logs = self.pending_logs(current_number, &filter);
// remove logs about which client was already notified about
let new_pending_logs: Vec<_> = pending_logs.iter()
@ -151,49 +188,56 @@ impl<C, M> EthFilter for EthFilterClient<C, M>
// save all logs retrieved by client
*previous_logs = pending_logs.into_iter().collect();
// append logs array with new pending logs
logs.extend(new_pending_logs);
}
let logs = limit_logs(logs, filter.limit);
new_pending_logs
} else {
Vec::new()
};
// save the number of the next block as a first block from which
// we want to get logs
*block_number = current_number + 1;
Ok(FilterChanges::Logs(logs))
// retrieve logs in range from_block..min(BlockId::Latest..to_block)
let limit = filter.limit;
self.logs(filter)
.map(move |mut logs| { logs.extend(pending); logs }) // append fetched pending logs
.map(move |logs| limit_logs(logs, limit)) // limit the logs
.map(FilterChanges::Logs)
.boxed()
}
}
}
}
fn filter_logs(&self, index: Index) -> Result<Vec<Log>, Error> {
let mut polls = self.polls.lock();
fn filter_logs(&self, index: Index) -> BoxFuture<Vec<Log>, Error> {
let mut polls = self.polls().lock();
match polls.poll(&index.value()) {
Some(&PollFilter::Logs(ref _block_number, ref _previous_log, ref filter)) => {
let include_pending = filter.to_block == Some(BlockNumber::Pending);
let filter: EthcoreFilter = filter.clone().into();
let mut logs = take_weak!(self.client).logs(filter.clone())
.into_iter()
.map(From::from)
.collect::<Vec<Log>>();
if include_pending {
let best_block = take_weak!(self.client).chain_info().best_block_number;
logs.extend(pending_logs(&*take_weak!(self.miner), best_block, &filter));
}
// fetch pending logs.
let pending = if include_pending {
let best_block = self.best_block_number();
self.pending_logs(best_block, &filter)
} else {
Vec::new()
};
let logs = limit_logs(logs, filter.limit);
Ok(logs)
// retrieve logs asynchronously, appending pending logs.
let limit = filter.limit;
self.logs(filter)
.map(move |mut logs| { logs.extend(pending); logs })
.map(move |logs| limit_logs(logs, limit))
.boxed()
},
// just empty array
_ => Ok(Vec::new()),
_ => future::ok(Vec::new()).boxed()
}
}
fn uninstall_filter(&self, index: Index) -> Result<bool, Error> {
self.polls.lock().remove_poll(&index.value());
self.polls().lock().remove_poll(&index.value());
Ok(true)
}
}

View File

@ -34,6 +34,7 @@ use ethcore::basic_account::BasicAccount;
use ethcore::encoded;
use ethcore::executed::{Executed, ExecutionError};
use ethcore::ids::BlockId;
use ethcore::filter::Filter as EthcoreFilter;
use ethcore::transaction::{Action, SignedTransaction, Transaction as EthTransaction};
use ethsync::LightSync;
use rlp::UntrustedRlp;
@ -43,7 +44,9 @@ use util::{RwLock, Mutex, Uint, U256};
use futures::{future, Future, BoxFuture, IntoFuture};
use futures::sync::oneshot;
use v1::impls::eth_filter::Filterable;
use v1::helpers::{CallRequest as CRequest, errors, limit_logs, dispatch};
use v1::helpers::{PollFilter, PollManager};
use v1::helpers::block_import::is_major_importing;
use v1::traits::Eth;
use v1::types::{
@ -55,7 +58,7 @@ use v1::metadata::Metadata;
use util::Address;
/// Light client `ETH` RPC.
/// Light client `ETH` (and filter) RPC.
pub struct EthClient {
sync: Arc<LightSync>,
client: Arc<LightClient>,
@ -63,6 +66,22 @@ pub struct EthClient {
transaction_queue: Arc<RwLock<TransactionQueue>>,
accounts: Arc<AccountProvider>,
cache: Arc<Mutex<LightDataCache>>,
polls: Mutex<PollManager<PollFilter>>,
}
impl Clone for EthClient {
fn clone(&self) -> Self {
// each instance should have its own poll manager.
EthClient {
sync: self.sync.clone(),
client: self.client.clone(),
on_demand: self.on_demand.clone(),
transaction_queue: self.transaction_queue.clone(),
accounts: self.accounts.clone(),
cache: self.cache.clone(),
polls: Mutex::new(PollManager::new()),
}
}
}
// helper for internal error: on demand sender cancelled.
@ -90,6 +109,7 @@ impl EthClient {
transaction_queue: transaction_queue,
accounts: accounts,
cache: cache,
polls: Mutex::new(PollManager::new()),
}
}
@ -153,12 +173,15 @@ impl EthClient {
Some(hdr) => hdr,
};
sync.with_context(|ctx| on_demand.account(ctx, request::Account {
let maybe_fut = sync.with_context(|ctx| on_demand.account(ctx, request::Account {
header: header,
address: address,
}))
.map(|x| x.map_err(err_premature_cancel).boxed())
.unwrap_or_else(|| future::err(errors::network_disabled()).boxed())
}));
match maybe_fut {
Some(fut) => fut.map(Some).map_err(err_premature_cancel).boxed(),
None => future::err(errors::network_disabled()).boxed(),
}
}).boxed()
}
@ -234,6 +257,111 @@ impl EthClient {
}
}).boxed()
}
fn block(&self, id: BlockId) -> BoxFuture<Option<encoded::Block>, Error> {
let (on_demand, sync) = (self.on_demand.clone(), self.sync.clone());
self.header(id).and_then(move |hdr| {
let req = match hdr {
Some(hdr) => request::Body::new(hdr),
None => return future::ok(None).boxed(),
};
match sync.with_context(move |ctx| on_demand.block(ctx, req)) {
Some(fut) => fut.map_err(err_premature_cancel).map(Some).boxed(),
None => future::err(errors::network_disabled()).boxed(),
}
}).boxed()
}
// get a "rich" block structure
fn rich_block(&self, id: BlockId, include_txs: bool) -> BoxFuture<Option<RichBlock>, Error> {
let (on_demand, sync) = (self.on_demand.clone(), self.sync.clone());
let (client, engine) = (self.client.clone(), self.client.engine().clone());
// helper for filling out a rich block once we've got a block and a score.
let fill_rich = move |block: encoded::Block, score: Option<U256>| {
let header = block.decode_header();
let extra_info = engine.extra_info(&header);
RichBlock {
block: Block {
hash: Some(header.hash().into()),
size: Some(block.rlp().as_raw().len().into()),
parent_hash: header.parent_hash().clone().into(),
uncles_hash: header.uncles_hash().clone().into(),
author: header.author().clone().into(),
miner: header.author().clone().into(),
state_root: header.state_root().clone().into(),
transactions_root: header.transactions_root().clone().into(),
receipts_root: header.receipts_root().clone().into(),
number: Some(header.number().into()),
gas_used: header.gas_used().clone().into(),
gas_limit: header.gas_limit().clone().into(),
logs_bloom: header.log_bloom().clone().into(),
timestamp: header.timestamp().into(),
difficulty: header.difficulty().clone().into(),
total_difficulty: score.map(Into::into),
seal_fields: header.seal().into_iter().cloned().map(Into::into).collect(),
uncles: block.uncle_hashes().into_iter().map(Into::into).collect(),
transactions: match include_txs {
true => BlockTransactions::Full(block.view().localized_transactions().into_iter().map(Into::into).collect()),
false => BlockTransactions::Hashes(block.transaction_hashes().into_iter().map(Into::into).collect()),
},
extra_data: Bytes::new(header.extra_data().to_vec()),
},
extra_info: extra_info
}
};
// get the block itself.
self.block(id).and_then(move |block| match block {
None => return future::ok(None).boxed(),
Some(block) => {
// then fetch the total difficulty (this is much easier after getting the block).
match client.score(id) {
Some(score) => future::ok(fill_rich(block, Some(score))).map(Some).boxed(),
None => {
// make a CHT request to fetch the chain score.
let req = cht::block_to_cht_number(block.number())
.and_then(|num| client.cht_root(num as usize))
.and_then(|root| request::HeaderProof::new(block.number(), root));
let req = match req {
Some(req) => req,
None => {
// somehow the genesis block slipped past other checks.
// return it now.
let score = client.block_header(BlockId::Number(0))
.expect("genesis always stored; qed")
.difficulty();
return future::ok(fill_rich(block, Some(score))).map(Some).boxed()
}
};
// three possible outcomes:
// - network is down.
// - we get a score, but our hash is non-canonical.
// - we get ascore, and our hash is canonical.
let maybe_fut = sync.with_context(move |ctx| on_demand.hash_and_score_by_number(ctx, req));
match maybe_fut {
Some(fut) => fut.map(move |(hash, score)| {
let score = if hash == block.hash() {
Some(score)
} else {
None
};
Some(fill_rich(block, score))
}).map_err(err_premature_cancel).boxed(),
None => return future::err(errors::network_disabled()).boxed(),
}
}
}
}
}).boxed()
}
}
impl Eth for EthClient {
@ -275,7 +403,10 @@ impl Eth for EthClient {
}
fn gas_price(&self) -> Result<RpcU256, Error> {
Ok(Default::default())
Ok(self.cache.lock().gas_price_corpus()
.and_then(|c| c.median().cloned())
.map(RpcU256::from)
.unwrap_or_else(Default::default))
}
fn accounts(&self, meta: Metadata) -> BoxFuture<Vec<RpcH160>, Error> {
@ -304,11 +435,11 @@ impl Eth for EthClient {
}
fn block_by_hash(&self, hash: RpcH256, include_txs: bool) -> BoxFuture<Option<RichBlock>, Error> {
future::err(errors::unimplemented(None)).boxed()
self.rich_block(BlockId::Hash(hash.into()), include_txs)
}
fn block_by_number(&self, num: BlockNumber, include_txs: bool) -> BoxFuture<Option<RichBlock>, Error> {
future::err(errors::unimplemented(None)).boxed()
self.rich_block(num.into(), include_txs)
}
fn transaction_count(&self, address: RpcH160, num: Trailing<BlockNumber>) -> BoxFuture<RpcU256, Error> {
@ -484,19 +615,101 @@ impl Eth for EthClient {
Err(errors::deprecated("Compilation of Solidity via RPC is deprecated".to_string()))
}
fn logs(&self, _filter: Filter) -> Result<Vec<Log>, Error> {
Err(errors::unimplemented(None))
fn logs(&self, filter: Filter) -> BoxFuture<Vec<Log>, Error> {
let limit = filter.limit;
Filterable::logs(self, filter.into())
.map(move|logs| limit_logs(logs, limit))
.boxed()
}
fn work(&self, _timeout: Trailing<u64>) -> Result<Work, Error> {
Err(errors::unimplemented(None))
Err(errors::light_unimplemented(None))
}
fn submit_work(&self, _nonce: RpcH64, _pow_hash: RpcH256, _mix_hash: RpcH256) -> Result<bool, Error> {
Err(errors::unimplemented(None))
Err(errors::light_unimplemented(None))
}
fn submit_hashrate(&self, _rate: RpcU256, _id: RpcH256) -> Result<bool, Error> {
Err(errors::unimplemented(None))
Err(errors::light_unimplemented(None))
}
}
// This trait implementation triggers a blanked impl of `EthFilter`.
impl Filterable for EthClient {
fn best_block_number(&self) -> u64 { self.client.chain_info().best_block_number }
fn block_hash(&self, id: BlockId) -> Option<RpcH256> {
self.client.block_hash(id).map(Into::into)
}
fn pending_transactions_hashes(&self, _block_number: u64) -> Vec<::util::H256> {
Vec::new()
}
fn logs(&self, filter: EthcoreFilter) -> BoxFuture<Vec<Log>, Error> {
use std::collections::BTreeMap;
use futures::stream::{self, Stream};
use util::H2048;
// early exit for "to" block before "from" block.
let best_number = self.client.chain_info().best_block_number;
let block_number = |id| match id {
BlockId::Earliest => Some(0),
BlockId::Latest | BlockId::Pending => Some(best_number),
BlockId::Hash(h) => self.client.block_header(BlockId::Hash(h)).map(|hdr| hdr.number()),
BlockId::Number(x) => Some(x),
};
match (block_number(filter.to_block), block_number(filter.from_block)) {
(Some(to), Some(from)) if to < from => return future::ok(Vec::new()).boxed(),
(Some(_), Some(_)) => {},
_ => return future::err(errors::unknown_block()).boxed(),
}
let maybe_future = self.sync.with_context(move |ctx| {
// find all headers which match the filter, and fetch the receipts for each one.
// match them with their numbers for easy sorting later.
let bit_combos = filter.bloom_possibilities();
let receipts_futures: Vec<_> = self.client.ancestry_iter(filter.to_block)
.take_while(|ref hdr| BlockId::Number(hdr.number()) != filter.from_block)
.take_while(|ref hdr| BlockId::Hash(hdr.hash()) != filter.from_block)
.filter(|ref hdr| {
let hdr_bloom = hdr.log_bloom();
bit_combos.iter().find(|&bloom| hdr_bloom & *bloom == *bloom).is_some()
})
.map(|hdr| (hdr.number(), request::BlockReceipts(hdr)))
.map(|(num, req)| self.on_demand.block_receipts(ctx, req).map(move |x| (num, x)))
.collect();
// as the receipts come in, find logs within them which match the filter.
// insert them into a BTreeMap to maintain order by number and block index.
stream::futures_unordered(receipts_futures)
.fold(BTreeMap::new(), move |mut matches, (num, receipts)| {
for (block_index, log) in receipts.into_iter().flat_map(|r| r.logs).enumerate() {
if filter.matches(&log) {
matches.insert((num, block_index), log.into());
}
}
future::ok(matches)
}) // and then collect them into a vector.
.map(|matches| matches.into_iter().map(|(_, v)| v).collect())
.map_err(err_premature_cancel)
});
match maybe_future {
Some(fut) => fut.boxed(),
None => future::err(errors::network_disabled()).boxed(),
}
}
fn pending_logs(&self, _block_number: u64, _filter: &EthcoreFilter) -> Vec<Log> {
Vec::new() // light clients don't mine.
}
fn polls(&self) -> &Mutex<PollManager<PollFilter>> {
&self.polls
}
}

View File

@ -90,7 +90,7 @@ impl EthTester {
let hashrates = Arc::new(Mutex::new(HashMap::new()));
let external_miner = Arc::new(ExternalMiner::new(hashrates.clone()));
let eth = EthClient::new(&client, &snapshot, &sync, &opt_ap, &miner, &external_miner, options).to_delegate();
let filter = EthFilterClient::new(&client, &miner).to_delegate();
let filter = EthFilterClient::new(client.clone(), miner.clone()).to_delegate();
let dispatcher = FullDispatcher::new(Arc::downgrade(&client), Arc::downgrade(&miner));
let sign = SigningUnsafeClient::new(&opt_ap, dispatcher).to_delegate();

View File

@ -162,8 +162,8 @@ build_rpc_trait! {
fn compile_serpent(&self, String) -> Result<Bytes, Error>;
/// Returns logs matching given filter object.
#[rpc(name = "eth_getLogs")]
fn logs(&self, Filter) -> Result<Vec<Log>, Error>;
#[rpc(async, name = "eth_getLogs")]
fn logs(&self, Filter) -> BoxFuture<Vec<Log>, Error>;
/// Returns the hash of the current block, the seedHash, and the boundary condition to be met.
#[rpc(name = "eth_getWork")]
@ -196,12 +196,12 @@ build_rpc_trait! {
fn new_pending_transaction_filter(&self) -> Result<U256, Error>;
/// Returns filter changes since last poll.
#[rpc(name = "eth_getFilterChanges")]
fn filter_changes(&self, Index) -> Result<FilterChanges, Error>;
#[rpc(async, name = "eth_getFilterChanges")]
fn filter_changes(&self, Index) -> BoxFuture<FilterChanges, Error>;
/// Returns all logs matching given filter (in a range 'from' - 'to').
#[rpc(name = "eth_getFilterLogs")]
fn filter_logs(&self, Index) -> Result<Vec<Log>, Error>;
#[rpc(async, name = "eth_getFilterLogs")]
fn filter_logs(&self, Index) -> BoxFuture<Vec<Log>, Error>;
/// Uninstalls filter.
#[rpc(name = "eth_uninstallFilter")]