glue for fetching epoch proofs from network

This commit is contained in:
Robert Habermeier 2017-08-24 15:17:48 +02:00
parent 871a9c063e
commit b953f9b66a
15 changed files with 168 additions and 38 deletions

View File

@ -18,6 +18,7 @@
use std::sync::Arc;
use ethcore::encoded;
use ethcore::engines::{Engine, StateDependentProof};
use ethcore::header::Header;
use ethcore::receipt::Receipt;
@ -30,7 +31,7 @@ pub trait ChainDataFetcher: Send + Sync + 'static {
type Error: ::std::fmt::Debug;
/// Future for fetching block body.
type Body: IntoFuture<Item=Vec<u8>,Error=Self::Error>;
type Body: IntoFuture<Item=encoded::Block,Error=Self::Error>;
/// Future for fetching block receipts.
type Receipts: IntoFuture<Item=Vec<Receipt>,Error=Self::Error>;
/// Future for fetching epoch transition
@ -55,7 +56,7 @@ pub fn unavailable() -> Unavailable { Unavailable }
impl ChainDataFetcher for Unavailable {
type Error = &'static str;
type Body = Result<Vec<u8>, &'static str>;
type Body = Result<encoded::Block, &'static str>;
type Receipts = Result<Vec<Receipt>, &'static str>;
type Transition = Result<Vec<u8>, &'static str>;

View File

@ -83,6 +83,9 @@ impl Default for Config {
/// Trait for interacting with the header chain abstractly.
pub trait LightChainClient: Send + Sync {
/// Adds a new `LightChainNotify` listener.
fn add_listener(&self, listener: Weak<LightChainNotify>);
/// Get chain info.
fn chain_info(&self) -> BlockChainInfo;
@ -481,7 +484,7 @@ impl<T: ChainDataFetcher> Client<T> {
};
if let Some(b) = b {
block = Some(b.into_future().wait()?);
block = Some(b.into_future().wait()?.into_inner());
}
if let Some(r) = r {
@ -526,6 +529,10 @@ impl<T: ChainDataFetcher> Client<T> {
}
impl<T: ChainDataFetcher> LightChainClient for Client<T> {
fn add_listener(&self, listener: Weak<LightChainNotify>) {
Client::add_listener(self, listener)
}
fn chain_info(&self) -> BlockChainInfo { Client::chain_info(self) }
fn queue_header(&self, header: Header) -> Result<H256, BlockImportError> {

View File

@ -259,6 +259,11 @@ impl Header {
/// Get the SHA3 (Keccak) of this header, optionally `with_seal`.
pub fn rlp_sha3(&self, with_seal: Seal) -> H256 { self.rlp(with_seal).sha3() }
/// Encode the header, getting a type-safe wrapper around the RLP.
pub fn encoded(&self) -> ::encoded::Header {
::encoded::Header::new(self.rlp(Seal::With))
}
}
impl Decodable for Header {

View File

@ -204,7 +204,9 @@ fn execute_import_light(cmd: ImportBlockchain) -> Result<(), String> {
config.queue.max_mem_use = cmd.cache_config.queue() as usize * 1024 * 1024;
config.queue.verifier_settings = cmd.verifier_settings;
let service = LightClientService::start(config, &spec, &client_path, cache)
// TODO: could epoch signals be avilable at the end of the file?
let fetch = ::light::client::fetch::unavailable();
let service = LightClientService::start(config, &spec, fetch, &client_path, cache)
.map_err(|e| format!("Failed to start client: {}", e))?;
// free up the spec in memory.

View File

@ -26,7 +26,7 @@ use futures_cpupool::CpuPool;
use hash_fetch::fetch::Client as FetchClient;
use hash_fetch::urlhint::ContractClient;
use helpers::replace_home;
use light::client::Client as LightClient;
use light::client::LightChainClient;
use light::on_demand::{self, OnDemand};
use rpc;
use rpc_apis::SignerService;
@ -94,16 +94,16 @@ impl ContractClient for FullRegistrar {
}
/// Registrar implementation for the light client.
pub struct LightRegistrar {
pub struct LightRegistrar<T> {
/// The light client.
pub client: Arc<LightClient>,
pub client: Arc<T>,
/// Handle to the on-demand service.
pub on_demand: Arc<OnDemand>,
/// Handle to the light network service.
pub sync: Arc<LightSync>,
}
impl ContractClient for LightRegistrar {
impl<T: LightChainClient + 'static> ContractClient for LightRegistrar<T> {
fn registrar(&self) -> Result<Address, String> {
self.client.engine().additional_params().get("registrar")
.ok_or_else(|| "Registrar not defined.".into())
@ -113,7 +113,14 @@ impl ContractClient for LightRegistrar {
}
fn call(&self, address: Address, data: Bytes) -> BoxFuture<Bytes, String> {
let (header, env_info) = (self.client.best_block_header(), self.client.latest_env_info());
let header = self.client.best_block_header();
let env_info = self.client.env_info(BlockId::Hash(header.hash()))
.ok_or_else(|| format!("Cannot fetch env info for header {}", header.hash()));
let env_info = match env_info {
Ok(x) => x,
Err(e) => return future::err(e).boxed(),
};
let maybe_future = self.sync.with_context(move |ctx| {
self.on_demand

View File

@ -22,7 +22,7 @@ use std::sync::{Arc};
use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering as AtomicOrdering};
use std::time::{Instant, Duration};
use ethcore::client::*;
use ethcore::client::{BlockId, BlockChainClient, BlockChainInfo, BlockQueueInfo, ChainNotify, ClientReport, Client};
use ethcore::header::BlockNumber;
use ethcore::service::ClientIoMessage;
use ethcore::snapshot::{RestorationStatus, SnapshotService as SS};

View File

@ -0,0 +1,89 @@
// Copyright 2015-2017 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::sync::{Arc, Weak};
use ethcore::encoded;
use ethcore::engines::{Engine, StateDependentProof};
use ethcore::header::Header;
use ethcore::receipt::Receipt;
use ethsync::LightSync;
use futures::{future, Future, BoxFuture};
use light::client::fetch::ChainDataFetcher;
use light::on_demand::{request, OnDemand};
use util::{RwLock, H256};
const ALL_VALID_BACKREFS: &str = "no back-references, therefore all back-references valid; qed";
/// Allows on-demand fetch of data useful for the light client.
pub struct EpochFetch {
/// A handle to the sync service.
pub sync: Arc<RwLock<Weak<LightSync>>>,
/// The on-demand request service.
pub on_demand: Arc<OnDemand>,
}
impl EpochFetch {
fn request<T>(&self, req: T) -> BoxFuture<T::Out, &'static str>
where T: Send + request::RequestAdapter + 'static, T::Out: Send + 'static
{
match self.sync.read().upgrade() {
Some(sync) => {
let on_demand = &self.on_demand;
let maybe_future = sync.with_context(move |ctx| {
on_demand.request(ctx, req).expect(ALL_VALID_BACKREFS)
});
match maybe_future {
Some(x) => x.map_err(|_| "Request canceled").boxed(),
None => future::err("Unable to access network.").boxed(),
}
}
None => future::err("Unable to access network").boxed(),
}
}
}
impl ChainDataFetcher for EpochFetch {
type Error = &'static str;
type Body = BoxFuture<encoded::Block, &'static str>;
type Receipts = BoxFuture<Vec<Receipt>, &'static str>;
type Transition = BoxFuture<Vec<u8>, &'static str>;
fn block_body(&self, header: &Header) -> Self::Body {
self.request(request::Body(header.encoded().into()))
}
/// Fetch block receipts.
fn block_receipts(&self, header: &Header) -> Self::Receipts {
self.request(request::BlockReceipts(header.encoded().into()))
}
/// Fetch epoch transition proof at given header.
fn epoch_transition(&self, hash: H256, engine: Arc<Engine>, checker: Arc<StateDependentProof>)
-> Self::Transition
{
self.request(request::Signal {
hash: hash,
engine: engine,
proof_check: checker,
})
}
}

View File

@ -16,6 +16,8 @@
//! Utilities and helpers for the light client.
mod epoch_fetch;
mod queue_cull;
pub use self::epoch_fetch::EpochFetch;
pub use self::queue_cull::QueueCull;

View File

@ -23,7 +23,7 @@ use ethcore::service::ClientIoMessage;
use ethsync::LightSync;
use io::{IoContext, IoHandler, TimerToken};
use light::client::Client;
use light::client::LightChainClient;
use light::on_demand::{request, OnDemand};
use light::TransactionQueue;
@ -41,9 +41,9 @@ const TIMEOUT_MS: u64 = 1000 * 60 * 10;
const PURGE_TIMEOUT_MS: u64 = 1000 * 60 * 9;
/// Periodically culls the transaction queue of mined transactions.
pub struct QueueCull {
pub struct QueueCull<T> {
/// A handle to the client, for getting the latest block header.
pub client: Arc<Client>,
pub client: Arc<T>,
/// A handle to the sync service.
pub sync: Arc<LightSync>,
/// The on-demand request service.
@ -54,7 +54,7 @@ pub struct QueueCull {
pub remote: Remote,
}
impl IoHandler<ClientIoMessage> for QueueCull {
impl<T: LightChainClient + 'static> IoHandler<ClientIoMessage> for QueueCull<T> {
fn initialize(&self, io: &IoContext<ClientIoMessage>) {
io.register_timer(TOKEN, TIMEOUT_MS).expect("Error registering timer");
}

View File

@ -34,6 +34,7 @@ use ethsync::{ManageNetwork, SyncProvider, LightSync};
use hash_fetch::fetch::Client as FetchClient;
use jsonrpc_core::{self as core, MetaIoHandler};
use light::{TransactionQueue as LightTransactionQueue, Cache as LightDataCache};
use light::client::LightChainClient;
use updater::Updater;
use util::{Mutex, RwLock};
use ethcore_logger::RotatingLogger;
@ -395,9 +396,9 @@ impl ActivityNotifier for LightClientNotifier {
}
/// RPC dependencies for a light client.
pub struct LightDependencies {
pub struct LightDependencies<T> {
pub signer_service: Arc<SignerService>,
pub client: Arc<::light::client::Client>,
pub client: Arc<T>,
pub sync: Arc<LightSync>,
pub net: Arc<ManageNetwork>,
pub secret_store: Arc<AccountProvider>,
@ -415,7 +416,7 @@ pub struct LightDependencies {
pub whisper_rpc: Option<::whisper::RpcFactory>,
}
impl LightDependencies {
impl<C: LightChainClient + 'static> LightDependencies<C> {
fn extend_api<T: core::Middleware<Metadata>>(
&self,
handler: &mut MetaIoHandler<Metadata, T>,
@ -563,7 +564,7 @@ impl LightDependencies {
}
}
impl Dependencies for LightDependencies {
impl<T: LightChainClient + 'static> Dependencies for LightDependencies<T> {
type Notifier = LightClientNotifier;
fn activity_notifier(&self) -> Self::Notifier { LightClientNotifier }

View File

@ -14,7 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::sync::Arc;
use std::sync::{Arc, Weak};
use std::net::{TcpListener};
use ctrlc::CtrlC;
@ -217,7 +217,16 @@ fn execute_light(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) ->
config.queue.max_mem_use = cmd.cache_config.queue() as usize * 1024 * 1024;
config.queue.verifier_settings = cmd.verifier_settings;
let service = light_client::Service::start(config, &spec, &db_dirs.client_path(algorithm), cache.clone())
// start on_demand service.
let on_demand = Arc::new(::light::on_demand::OnDemand::new(cache.clone()));
let sync_handle = Arc::new(RwLock::new(Weak::new()));
let fetch = ::light_helpers::EpochFetch {
on_demand: on_demand.clone(),
sync: sync_handle.clone(),
};
let service = light_client::Service::start(config, &spec, fetch, &db_dirs.client_path(algorithm), cache.clone())
.map_err(|e| format!("Error starting light client: {}", e))?;
let txq = Arc::new(RwLock::new(::light::transaction_queue::TransactionQueue::default()));
let provider = ::light::provider::LightProvider::new(service.client().clone(), txq.clone());
@ -229,9 +238,6 @@ fn execute_light(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) ->
net_conf.boot_nodes = spec.nodes.clone();
}
// start on_demand service.
let on_demand = Arc::new(::light::on_demand::OnDemand::new(cache.clone()));
let mut attached_protos = Vec::new();
let whisper_factory = if cmd.whisper.enabled {
let (whisper_net, whisper_factory) = ::whisper::setup(cmd.whisper.target_message_pool_size)
@ -255,6 +261,7 @@ fn execute_light(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) ->
};
let light_sync = LightSync::new(sync_params).map_err(|e| format!("Error starting network: {}", e))?;
let light_sync = Arc::new(light_sync);
*sync_handle.write() = Arc::downgrade(&light_sync);
// spin up event loop
let event_loop = EventLoop::spawn();

View File

@ -25,7 +25,7 @@ use jsonrpc_core::Error;
use jsonrpc_macros::Trailing;
use light::cache::Cache as LightDataCache;
use light::client::{Client as LightClient, LightChainClient};
use light::client::LightChainClient;
use light::{cht, TransactionQueue};
use light::on_demand::{request, OnDemand};
@ -62,9 +62,9 @@ use util::Address;
const NO_INVALID_BACK_REFS: &'static str = "Fails only on invalid back-references; back-references here known to be valid; qed";
/// Light client `ETH` (and filter) RPC.
pub struct EthClient {
pub struct EthClient<T> {
sync: Arc<LightSync>,
client: Arc<LightClient>,
client: Arc<T>,
on_demand: Arc<OnDemand>,
transaction_queue: Arc<RwLock<TransactionQueue>>,
accounts: Arc<AccountProvider>,
@ -72,7 +72,7 @@ pub struct EthClient {
polls: Mutex<PollManager<PollFilter>>,
}
impl Clone for EthClient {
impl<T> Clone for EthClient<T> {
fn clone(&self) -> Self {
// each instance should have its own poll manager.
EthClient {
@ -88,12 +88,12 @@ impl Clone for EthClient {
}
impl EthClient {
impl<T: LightChainClient + 'static> EthClient<T> {
/// Create a new `EthClient` with a handle to the light sync instance, client,
/// and on-demand request service, which is assumed to be attached as a handler.
pub fn new(
sync: Arc<LightSync>,
client: Arc<LightClient>,
client: Arc<T>,
on_demand: Arc<OnDemand>,
transaction_queue: Arc<RwLock<TransactionQueue>>,
accounts: Arc<AccountProvider>,
@ -208,7 +208,7 @@ impl EthClient {
}
}
impl Eth for EthClient {
impl<T: LightChainClient + 'static> Eth for EthClient<T> {
type Metadata = Metadata;
fn protocol_version(&self) -> Result<String, Error> {
@ -465,7 +465,7 @@ impl Eth for EthClient {
}
// This trait implementation triggers a blanked impl of `EthFilter`.
impl Filterable for EthClient {
impl<T: LightChainClient + 'static> Filterable for EthClient<T> {
fn best_block_number(&self) -> u64 { self.client.chain_info().best_block_number }
fn block_hash(&self, id: BlockId) -> Option<RpcH256> {

View File

@ -2238,7 +2238,7 @@ mod tests {
use super::{PeerInfo, PeerAsking};
use ethkey;
use ethcore::header::*;
use ethcore::client::*;
use ethcore::client::{BlockChainClient, EachBlockWith, TestBlockChainClient};
use ethcore::transaction::UnverifiedTransaction;
use ethcore::miner::MinerService;

View File

@ -25,7 +25,7 @@ use tests::helpers::{TestNet, Peer as PeerLike, TestPacket};
use ethcore::client::TestBlockChainClient;
use ethcore::spec::Spec;
use io::IoChannel;
use light::client::Client as LightClient;
use light::client::fetch::{self, Unavailable};
use light::net::{LightProtocol, IoContext, Capabilities, Params as LightParams};
use light::provider::LightProvider;
use network::{NodeId, PeerId};
@ -36,6 +36,8 @@ use light::cache::Cache;
const NETWORK_ID: u64 = 0xcafebabe;
pub type LightClient = ::light::client::Client<Unavailable>;
struct TestIoContext<'a> {
queue: &'a RwLock<VecDeque<TestPacket>>,
sender: Option<PeerId>,
@ -216,7 +218,14 @@ impl TestNet<Peer> {
// skip full verification because the blocks are bad.
config.verify_full = false;
let cache = Arc::new(Mutex::new(Cache::new(Default::default(), Duration::hours(6))));
let client = LightClient::in_memory(config, &Spec::new_test(), IoChannel::disconnected(), cache);
let client = LightClient::in_memory(
config,
&Spec::new_test(),
fetch::unavailable(), // TODO: allow fetch from full nodes.
IoChannel::disconnected(),
cache
);
peers.push(Arc::new(Peer::new_light(Arc::new(client))))
}

View File

@ -69,8 +69,8 @@ fn authority_round() {
// Push transaction to both clients. Only one of them gets lucky to produce a block.
net.peer(0).chain.miner().set_engine_signer(s0.address(), "".to_owned()).unwrap();
net.peer(1).chain.miner().set_engine_signer(s1.address(), "".to_owned()).unwrap();
net.peer(0).chain.engine().register_client(Arc::downgrade(&net.peer(0).chain));
net.peer(1).chain.engine().register_client(Arc::downgrade(&net.peer(1).chain));
net.peer(0).chain.engine().register_client(Arc::downgrade(&net.peer(0).chain) as _);
net.peer(1).chain.engine().register_client(Arc::downgrade(&net.peer(1).chain) as _);
net.peer(0).chain.set_io_channel(IoChannel::to_handler(Arc::downgrade(&io_handler1)));
net.peer(1).chain.set_io_channel(IoChannel::to_handler(Arc::downgrade(&io_handler0)));
// exchange statuses
@ -158,8 +158,8 @@ fn tendermint() {
trace!(target: "poa", "Peer 0 is {}.", s0.address());
net.peer(1).chain.miner().set_engine_signer(s1.address(), "".to_owned()).unwrap();
trace!(target: "poa", "Peer 1 is {}.", s1.address());
net.peer(0).chain.engine().register_client(Arc::downgrade(&net.peer(0).chain));
net.peer(1).chain.engine().register_client(Arc::downgrade(&net.peer(1).chain));
net.peer(0).chain.engine().register_client(Arc::downgrade(&net.peer(0).chain) as _);
net.peer(1).chain.engine().register_client(Arc::downgrade(&net.peer(1).chain) as _);
net.peer(0).chain.set_io_channel(IoChannel::to_handler(Arc::downgrade(&io_handler0)));
net.peer(1).chain.set_io_channel(IoChannel::to_handler(Arc::downgrade(&io_handler1)));
// Exhange statuses