From baf0dbc6bf4fa39e24d6fb25943bf4bbfbdb0291 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Thu, 9 Feb 2017 18:42:18 +0100 Subject: [PATCH] LightProvider struct using light transaction queue --- ethcore/light/src/client/mod.rs | 61 +++++++++++++-------------- ethcore/light/src/provider.rs | 75 ++++++++++++++++++++++++++++++++- sync/src/api.rs | 4 +- sync/src/light_sync/mod.rs | 25 ++++++----- 4 files changed, 119 insertions(+), 46 deletions(-) diff --git a/ethcore/light/src/client/mod.rs b/ethcore/light/src/client/mod.rs index ea4660abc..29b812aa6 100644 --- a/ethcore/light/src/client/mod.rs +++ b/ethcore/light/src/client/mod.rs @@ -22,19 +22,15 @@ use ethcore::client::ClientReport; use ethcore::ids::BlockId; use ethcore::header::Header; use ethcore::verification::queue::{self, HeaderQueue}; -use ethcore::transaction::{PendingTransaction, Condition as TransactionCondition}; use ethcore::blockchain_info::BlockChainInfo; use ethcore::spec::Spec; use ethcore::service::ClientIoMessage; use ethcore::encoded; use io::IoChannel; -use util::hash::{H256, H256FastMap}; +use util::hash::H256; use util::{Bytes, Mutex, RwLock}; -use provider::Provider; -use request; - use self::header_chain::HeaderChain; pub use self::service::Service; @@ -58,6 +54,9 @@ pub trait LightChainClient: Send + Sync { /// parent queued prior. fn queue_header(&self, header: Header) -> Result; + /// Attempt to get block header by block id. + fn block_header(&self, id: BlockId) -> Option; + /// Query whether a block is known. fn is_known(&self, hash: &H256) -> bool; @@ -74,11 +73,25 @@ pub trait LightChainClient: Send + Sync { fn cht_root(&self, i: usize) -> Option; } +/// Something which can be treated as a `LightChainClient`. +pub trait AsLightClient { + /// The kind of light client this can be treated as. + type Client: LightChainClient; + + /// Access the underlying light client. + fn as_light_client(&self) -> &Self::Client; +} + +impl AsLightClient for T { + type Client = Self; + + fn as_light_client(&self) -> &Self { self } +} + /// Light client implementation. pub struct Client { queue: HeaderQueue, chain: HeaderChain, - tx_pool: Mutex>, report: RwLock, import_lock: Mutex<()>, } @@ -89,7 +102,6 @@ impl Client { Client { queue: HeaderQueue::new(config.queue, spec.engine.clone(), io_channel, true), chain: HeaderChain::new(&::rlp::encode(&spec.genesis_header())), - tx_pool: Mutex::new(Default::default()), report: RwLock::new(ClientReport::default()), import_lock: Mutex::new(()), } @@ -100,25 +112,6 @@ impl Client { self.queue.import(header).map_err(Into::into) } - /// Import a local transaction. - pub fn import_own_transaction(&self, tx: PendingTransaction) { - self.tx_pool.lock().insert(tx.transaction.hash(), tx); - } - - /// Fetch a vector of all pending transactions. - pub fn ready_transactions(&self) -> Vec { - let best = self.chain.best_header(); - self.tx_pool.lock() - .values() - .filter(|t| match t.condition { - Some(TransactionCondition::Number(x)) => x <= best.number(), - Some(TransactionCondition::Timestamp(x)) => x <= best.timestamp(), - None => true, - }) - .cloned() - .collect() - } - /// Inquire about the status of a given header. pub fn status(&self, hash: &H256) -> BlockStatus { match self.queue.status(hash) { @@ -216,6 +209,10 @@ impl LightChainClient for Client { self.import_header(header) } + fn block_header(&self, id: BlockId) -> Option { + Client::block_header(self, id) + } + fn is_known(&self, hash: &H256) -> bool { self.status(hash) == BlockStatus::InChain } @@ -237,8 +234,8 @@ impl LightChainClient for Client { } } -// dummy implementation -- may draw from canonical cache further on. -impl Provider for Client { +// dummy implementation, should be removed when a `TestClient` is added. +impl ::provider::Provider for Client { fn chain_info(&self) -> BlockChainInfo { Client::chain_info(self) } @@ -263,19 +260,19 @@ impl Provider for Client { None } - fn state_proof(&self, _req: request::StateProof) -> Vec { + fn state_proof(&self, _req: ::request::StateProof) -> Vec { Vec::new() } - fn contract_code(&self, _req: request::ContractCode) -> Bytes { + fn contract_code(&self, _req: ::request::ContractCode) -> Bytes { Vec::new() } - fn header_proof(&self, _req: request::HeaderProof) -> Option<(encoded::Header, Vec)> { + fn header_proof(&self, _req: ::request::HeaderProof) -> Option<(encoded::Header, Vec)> { None } - fn ready_transactions(&self) -> Vec { + fn ready_transactions(&self) -> Vec<::ethcore::transaction::PendingTransaction> { Vec::new() } } diff --git a/ethcore/light/src/provider.rs b/ethcore/light/src/provider.rs index 4721caa73..caade3857 100644 --- a/ethcore/light/src/provider.rs +++ b/ethcore/light/src/provider.rs @@ -17,15 +17,19 @@ //! A provider for the LES protocol. This is typically a full node, who can //! give as much data as necessary to its peers. +use std::sync::Arc; + use ethcore::blockchain_info::BlockChainInfo; use ethcore::client::{BlockChainClient, ProvingBlockChainClient}; use ethcore::transaction::PendingTransaction; use ethcore::ids::BlockId; use ethcore::encoded; +use util::{Bytes, RwLock, H256}; use cht::{self, BlockInfo}; +use client::{LightChainClient, AsLightClient}; +use transaction_queue::TransactionQueue; -use util::{Bytes, H256}; use request; @@ -284,6 +288,75 @@ impl Provider for T { } } +/// The light client "provider" implementation. This wraps a `LightClient` and +/// a light transaction queue. +pub struct LightProvider { + client: Arc, + txqueue: Arc>, +} + +impl LightProvider { + /// Create a new `LightProvider` from the given client and transaction queue. + pub fn new(client: Arc, txqueue: Arc>) -> Self { + LightProvider { + client: client, + txqueue: txqueue, + } + } +} + +// TODO: draw from cache (shared between this and the RPC layer) +impl Provider for LightProvider { + fn chain_info(&self) -> BlockChainInfo { + self.client.as_light_client().chain_info() + } + + fn reorg_depth(&self, _a: &H256, _b: &H256) -> Option { + None + } + + fn earliest_state(&self) -> Option { + None + } + + fn block_header(&self, id: BlockId) -> Option { + self.client.as_light_client().block_header(id) + } + + fn block_body(&self, _id: BlockId) -> Option { + None + } + + fn block_receipts(&self, _hash: &H256) -> Option { + None + } + + fn state_proof(&self, _req: request::StateProof) -> Vec { + Vec::new() + } + + fn contract_code(&self, _req: request::ContractCode) -> Bytes { + Vec::new() + } + + fn header_proof(&self, _req: request::HeaderProof) -> Option<(encoded::Header, Vec)> { + None + } + + fn ready_transactions(&self) -> Vec { + let chain_info = self.chain_info(); + self.txqueue.read().ready_transactions(chain_info.best_block_number, chain_info.best_block_timestamp) + } +} + +impl AsLightClient for LightProvider { + type Client = L::Client; + + fn as_light_client(&self) -> &L::Client { + self.client.as_light_client() + } +} + #[cfg(test)] mod tests { use ethcore::client::{EachBlockWith, TestBlockChainClient}; diff --git a/sync/src/api.rs b/sync/src/api.rs index 5b97bc566..9b1ace73b 100644 --- a/sync/src/api.rs +++ b/sync/src/api.rs @@ -34,7 +34,7 @@ use ipc::{BinaryConvertable, BinaryConvertError, IpcConfig}; use std::str::FromStr; use parking_lot::RwLock; use chain::{ETH_PACKET_COUNT, SNAPSHOT_SYNC_PACKET_COUNT}; -use light::client::LightChainClient; +use light::client::AsLightClient; use light::Provider; use light::net::{self as light_net, LightProtocol, Params as LightParams, Capabilities, Handler as LightHandler, EventContext}; @@ -642,7 +642,7 @@ pub struct LightSync { impl LightSync { /// Create a new light sync service. pub fn new(params: LightSyncParams) -> Result - where L: LightChainClient + Provider + 'static + where L: AsLightClient + Provider + Sync + Send + 'static { use light_sync::LightSync as SyncHandler; diff --git a/sync/src/light_sync/mod.rs b/sync/src/light_sync/mod.rs index 685cb24be..fba89dd7b 100644 --- a/sync/src/light_sync/mod.rs +++ b/sync/src/light_sync/mod.rs @@ -36,7 +36,7 @@ use std::collections::HashMap; use std::mem; use std::sync::Arc; -use light::client::LightChainClient; +use light::client::{AsLightClient, LightChainClient}; use light::net::{ Announcement, Handler, BasicContext, EventContext, Capabilities, ReqId, Status, @@ -106,8 +106,9 @@ impl AncestorSearch { } fn process_response(self, ctx: &ResponseContext, client: &L) -> AncestorSearch - where L: LightChainClient + where L: AsLightClient { + let client = client.as_light_client(); let first_num = client.chain_info().first_block_number.unwrap_or(0); match self { AncestorSearch::Awaiting(id, start, req) => { @@ -203,7 +204,7 @@ impl<'a> ResponseContext for ResponseCtx<'a> { } /// Light client synchronization manager. See module docs for more details. -pub struct LightSync { +pub struct LightSync { best_seen: Mutex>, // best seen block on the network. peers: RwLock>>, // peers which are relevant to synchronization. client: Arc, @@ -211,7 +212,7 @@ pub struct LightSync { state: Mutex, } -impl Handler for LightSync { +impl Handler for LightSync { fn on_connect(&self, ctx: &EventContext, status: &Status, capabilities: &Capabilities) { if !capabilities.serve_headers { trace!(target: "sync", "Disconnecting irrelevant peer: {}", ctx.peer()); @@ -344,7 +345,7 @@ impl Handler for LightSync { } // private helpers -impl LightSync { +impl LightSync { // Begins a search for the common ancestor and our best block. // does not lock state, instead has a mutable reference to it passed. fn begin_search(&self, state: &mut SyncState) { @@ -354,8 +355,8 @@ impl LightSync { return; } - self.client.flush_queue(); - let chain_info = self.client.chain_info(); + self.client.as_light_client().flush_queue(); + let chain_info = self.client.as_light_client().chain_info(); trace!(target: "sync", "Beginning search for common ancestor from {:?}", (chain_info.best_block_number, chain_info.best_block_hash)); @@ -366,8 +367,10 @@ impl LightSync { fn maintain_sync(&self, ctx: &BasicContext) { const DRAIN_AMOUNT: usize = 128; + let client = self.client.as_light_client(); + let chain_info = client.chain_info(); + let mut state = self.state.lock(); - let chain_info = self.client.chain_info(); debug!(target: "sync", "Maintaining sync ({:?})", &*state); // drain any pending blocks into the queue. @@ -376,7 +379,7 @@ impl LightSync { 'a: loop { - if self.client.queue_info().is_full() { break } + if client.queue_info().is_full() { break } *state = match mem::replace(&mut *state, SyncState::Idle) { SyncState::Rounds(round) @@ -388,7 +391,7 @@ impl LightSync { trace!(target: "sync", "Drained {} headers to import", sink.len()); for header in sink.drain(..) { - if let Err(e) = self.client.queue_header(header) { + if let Err(e) = client.queue_header(header) { debug!(target: "sync", "Found bad header ({:?}). Reset to search state.", e); self.begin_search(&mut state); @@ -492,7 +495,7 @@ impl LightSync { } // public API -impl LightSync { +impl LightSync { /// Create a new instance of `LightSync`. /// /// This won't do anything until registered as a handler