Merge branch 'master' into fo-6418-dont-export-bigint

This commit is contained in:
Fredrik Harrysson
2017-09-05 10:48:54 +02:00
committed by GitHub
45 changed files with 1615 additions and 281 deletions

View File

@@ -0,0 +1,74 @@
// Copyright 2015-2017 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! Trait for fetching chain data.
use std::sync::Arc;
use ethcore::encoded;
use ethcore::engines::{Engine, StateDependentProof};
use ethcore::header::Header;
use ethcore::receipt::Receipt;
use futures::future::IntoFuture;
use util::H256;
/// Provides full chain data.
pub trait ChainDataFetcher: Send + Sync + 'static {
/// Error type when data unavailable.
type Error: ::std::fmt::Debug;
/// Future for fetching block body.
type Body: IntoFuture<Item=encoded::Block, Error=Self::Error>;
/// Future for fetching block receipts.
type Receipts: IntoFuture<Item=Vec<Receipt>, Error=Self::Error>;
/// Future for fetching epoch transition
type Transition: IntoFuture<Item=Vec<u8>, Error=Self::Error>;
/// Fetch a block body.
fn block_body(&self, header: &Header) -> Self::Body;
/// Fetch block receipts.
fn block_receipts(&self, header: &Header) -> Self::Receipts;
/// Fetch epoch transition proof at given header.
fn epoch_transition(&self, hash: H256, engine: Arc<Engine>, checker: Arc<StateDependentProof>) -> Self::Transition;
}
/// Fetcher implementation which cannot fetch anything.
pub struct Unavailable;
/// Create a fetcher which has all data unavailable.
pub fn unavailable() -> Unavailable { Unavailable }
impl ChainDataFetcher for Unavailable {
type Error = &'static str;
type Body = Result<encoded::Block, &'static str>;
type Receipts = Result<Vec<Receipt>, &'static str>;
type Transition = Result<Vec<u8>, &'static str>;
fn block_body(&self, _header: &Header) -> Self::Body {
Err("fetching block bodies unavailable")
}
fn block_receipts(&self, _header: &Header) -> Self::Receipts {
Err("fetching block receipts unavailable")
}
fn epoch_transition(&self, _h: H256, _e: Arc<Engine>, _check: Arc<StateDependentProof>) -> Self::Transition {
Err("fetching epoch transition proofs unavailable")
}
}

View File

@@ -18,11 +18,12 @@
//!
//! Unlike a full node's `BlockChain` this doesn't store much in the database.
//! It stores candidates for the last 2048-4096 blocks as well as CHT roots for
//! historical blocks all the way to the genesis.
//! historical blocks all the way to the genesis. If the engine makes use
//! of epoch transitions, those are stored as well.
//!
//! This is separate from the `BlockChain` for two reasons:
//! - It stores only headers (and a pruned subset of them)
//! - To allow for flexibility in the database layout once that's incorporated.
//! - To allow for flexibility in the database layout..
use std::collections::BTreeMap;
use std::sync::Arc;
@@ -30,15 +31,20 @@ use std::sync::Arc;
use cht;
use ethcore::block_status::BlockStatus;
use ethcore::error::BlockError;
use ethcore::error::{BlockImportError, BlockError};
use ethcore::encoded;
use ethcore::header::Header;
use ethcore::ids::BlockId;
use ethcore::spec::Spec;
use ethcore::engines::epoch::{
Transition as EpochTransition,
PendingTransition as PendingEpochTransition
};
use rlp::{Encodable, Decodable, DecoderError, RlpStream, Rlp, UntrustedRlp};
use heapsize::HeapSizeOf;
use bigint::prelude::U256;
use bigint::hash::H256;
use bigint::hash::{H256, H256FastMap, H264};
use util::kvdb::{DBTransaction, KeyValueDB};
use cache::Cache;
@@ -54,6 +60,9 @@ const HISTORY: u64 = 2048;
/// The best block key. Maps to an RLP list: [best_era, last_era]
const CURRENT_KEY: &'static [u8] = &*b"best_and_latest";
/// Key storing the last canonical epoch transition.
const LAST_CANONICAL_TRANSITION: &'static [u8] = &*b"canonical_transition";
/// Information about a block.
#[derive(Debug, Clone)]
pub struct BlockDescriptor {
@@ -101,7 +110,6 @@ impl Encodable for Entry {
impl Decodable for Entry {
fn decode(rlp: &UntrustedRlp) -> Result<Self, DecoderError> {
let mut candidates = SmallVec::<[Candidate; 3]>::new();
for item in rlp.iter() {
@@ -131,6 +139,42 @@ fn era_key(number: u64) -> String {
format!("candidates_{}", number)
}
fn pending_transition_key(block_hash: H256) -> H264 {
const LEADING: u8 = 1;
let mut key = H264::default();
key[0] = LEADING;
key.0[1..].copy_from_slice(&block_hash.0[..]);
key
}
fn transition_key(block_hash: H256) -> H264 {
const LEADING: u8 = 2;
let mut key = H264::default();
key[0] = LEADING;
key.0[1..].copy_from_slice(&block_hash.0[..]);
key
}
// encode last canonical transition entry: header and proof.
fn encode_canonical_transition(header: &Header, proof: &[u8]) -> Vec<u8> {
let mut stream = RlpStream::new_list(2);
stream.append(header).append(&proof);
stream.out()
}
// decode last canonical transition entry.
fn decode_canonical_transition(t: &[u8]) -> Result<(Header, &[u8]), DecoderError> {
let rlp = UntrustedRlp::new(t);
Ok((rlp.val_at(0)?, rlp.at(1)?.data()?))
}
/// Pending changes from `insert` to be applied after the database write has finished.
pub struct PendingChanges {
best_block: Option<BlockDescriptor>, // new best block.
@@ -141,6 +185,7 @@ pub struct HeaderChain {
genesis_header: encoded::Header, // special-case the genesis.
candidates: RwLock<BTreeMap<u64, Entry>>,
best_block: RwLock<BlockDescriptor>,
live_epoch_proofs: RwLock<H256FastMap<EpochTransition>>,
db: Arc<KeyValueDB>,
col: Option<u32>,
cache: Arc<Mutex<Cache>>,
@@ -148,8 +193,16 @@ pub struct HeaderChain {
impl HeaderChain {
/// Create a new header chain given this genesis block and database to read from.
pub fn new(db: Arc<KeyValueDB>, col: Option<u32>, genesis: &[u8], cache: Arc<Mutex<Cache>>) -> Result<Self, String> {
use ethcore::views::HeaderView;
pub fn new(
db: Arc<KeyValueDB>,
col: Option<u32>,
spec: &Spec,
cache: Arc<Mutex<Cache>>,
) -> Result<Self, String> {
let mut live_epoch_proofs = ::std::collections::HashMap::default();
let genesis = ::rlp::encode(&spec.genesis_header()).into_vec();
let decoded_header = spec.genesis_header();
let chain = if let Some(current) = db.get(col, CURRENT_KEY)? {
let (best_number, highest_number) = {
@@ -160,12 +213,24 @@ impl HeaderChain {
let mut cur_number = highest_number;
let mut candidates = BTreeMap::new();
// load all era entries and referenced headers within them.
// load all era entries, referenced headers within them,
// and live epoch proofs.
while let Some(entry) = db.get(col, era_key(cur_number).as_bytes())? {
let entry: Entry = ::rlp::decode(&entry);
trace!(target: "chain", "loaded header chain entry for era {} with {} candidates",
cur_number, entry.candidates.len());
for c in &entry.candidates {
let key = transition_key(c.hash);
if let Some(proof) = db.get(col, &*key)? {
live_epoch_proofs.insert(c.hash, EpochTransition {
block_hash: c.hash,
block_number: cur_number,
proof: proof.into_vec(),
});
}
}
candidates.insert(cur_number, entry);
cur_number -= 1;
@@ -187,29 +252,42 @@ impl HeaderChain {
};
HeaderChain {
genesis_header: encoded::Header::new(genesis.to_owned()),
genesis_header: encoded::Header::new(genesis),
best_block: RwLock::new(best_block),
candidates: RwLock::new(candidates),
live_epoch_proofs: RwLock::new(live_epoch_proofs),
db: db,
col: col,
cache: cache,
}
} else {
let g_view = HeaderView::new(genesis);
HeaderChain {
genesis_header: encoded::Header::new(genesis.to_owned()),
genesis_header: encoded::Header::new(genesis),
best_block: RwLock::new(BlockDescriptor {
hash: g_view.hash(),
hash: decoded_header.hash(),
number: 0,
total_difficulty: g_view.difficulty(),
total_difficulty: *decoded_header.difficulty(),
}),
candidates: RwLock::new(BTreeMap::new()),
live_epoch_proofs: RwLock::new(live_epoch_proofs),
db: db,
col: col,
cache: cache,
}
};
// instantiate genesis epoch data if it doesn't exist.
if let None = chain.db.get(col, LAST_CANONICAL_TRANSITION)? {
let genesis_data = spec.genesis_epoch_data()?;
{
let mut batch = chain.db.transaction();
let data = encode_canonical_transition(&decoded_header, &genesis_data);
batch.put_vec(col, LAST_CANONICAL_TRANSITION, data);
chain.db.write(batch)?;
}
}
Ok(chain)
}
@@ -218,10 +296,24 @@ impl HeaderChain {
/// This blindly trusts that the data given to it is sensible.
/// Returns a set of pending changes to be applied with `apply_pending`
/// before the next call to insert and after the transaction has been written.
pub fn insert(&self, transaction: &mut DBTransaction, header: Header) -> Result<PendingChanges, BlockError> {
///
/// If the block is an epoch transition, provide the transition along with
/// the header.
pub fn insert(
&self,
transaction: &mut DBTransaction,
header: Header,
transition_proof: Option<Vec<u8>>,
) -> Result<PendingChanges, BlockImportError> {
let hash = header.hash();
let number = header.number();
let parent_hash = *header.parent_hash();
let transition = transition_proof.map(|proof| EpochTransition {
block_hash: hash,
block_number: number,
proof: proof,
});
let mut pending = PendingChanges {
best_block: None,
};
@@ -237,7 +329,8 @@ impl HeaderChain {
candidates.get(&(number - 1))
.and_then(|entry| entry.candidates.iter().find(|c| c.hash == parent_hash))
.map(|c| c.total_difficulty)
.ok_or_else(|| BlockError::UnknownParent(parent_hash))?
.ok_or_else(|| BlockError::UnknownParent(parent_hash))
.map_err(BlockImportError::Block)?
};
let total_difficulty = parent_td + *header.difficulty();
@@ -262,8 +355,13 @@ impl HeaderChain {
transaction.put(self.col, era_key(number).as_bytes(), &::rlp::encode(&*cur_era))
}
let raw = ::rlp::encode(&header);
transaction.put(self.col, &hash[..], &*raw);
if let Some(transition) = transition {
transaction.put(self.col, &*transition_key(hash), &transition.proof);
self.live_epoch_proofs.write().insert(hash, transition);
}
let raw = header.encoded().into_inner();
transaction.put_vec(self.col, &hash[..], raw);
let (best_num, is_new_best) = {
let cur_best = self.best_block.read();
@@ -316,8 +414,10 @@ impl HeaderChain {
let cht_num = cht::block_to_cht_number(earliest_era)
.expect("fails only for number == 0; genesis never imported; qed");
let mut last_canonical_transition = None;
let cht_root = {
let mut i = earliest_era;
let mut live_epoch_proofs = self.live_epoch_proofs.write();
// iterable function which removes the candidates as it goes
// along. this will only be called until the CHT is complete.
@@ -328,7 +428,25 @@ impl HeaderChain {
i += 1;
// prune old blocks and epoch proofs.
for ancient in &era_entry.candidates {
let maybe_transition = live_epoch_proofs.remove(&ancient.hash);
if let Some(epoch_transition) = maybe_transition {
transaction.delete(self.col, &*transition_key(ancient.hash));
if ancient.hash == era_entry.canonical_hash {
last_canonical_transition = match self.db.get(self.col, &ancient.hash) {
Err(e) => {
warn!(target: "chain", "Error reading from DB: {}\n
", e);
None
}
Ok(None) => panic!("stored candidates always have corresponding headers; qed"),
Ok(Some(header)) => Some((epoch_transition, ::rlp::decode(&header))),
};
}
}
transaction.delete(self.col, &ancient.hash);
}
@@ -342,6 +460,12 @@ impl HeaderChain {
// write the CHT root to the database.
debug!(target: "chain", "Produced CHT {} root: {:?}", cht_num, cht_root);
transaction.put(self.col, cht_key(cht_num).as_bytes(), &::rlp::encode(&cht_root));
// update the last canonical transition proof
if let Some((epoch_transition, header)) = last_canonical_transition {
let x = encode_canonical_transition(&header, &epoch_transition.proof);
transaction.put_vec(self.col, LAST_CANONICAL_TRANSITION, x);
}
}
}
@@ -367,7 +491,7 @@ impl HeaderChain {
/// will be returned.
pub fn block_hash(&self, id: BlockId) -> Option<H256> {
match id {
BlockId::Earliest => Some(self.genesis_hash()),
BlockId::Earliest | BlockId::Number(0) => Some(self.genesis_hash()),
BlockId::Hash(hash) => Some(hash),
BlockId::Number(num) => {
if self.best_block.read().number < num { return None }
@@ -518,6 +642,56 @@ impl HeaderChain {
false => BlockStatus::Unknown,
}
}
/// Insert a pending transition.
pub fn insert_pending_transition(&self, batch: &mut DBTransaction, hash: H256, t: PendingEpochTransition) {
let key = pending_transition_key(hash);
batch.put(self.col, &*key, &*::rlp::encode(&t));
}
/// Get pending transition for a specific block hash.
pub fn pending_transition(&self, hash: H256) -> Option<PendingEpochTransition> {
let key = pending_transition_key(hash);
match self.db.get(self.col, &*key) {
Ok(val) => val.map(|x| ::rlp::decode(&x)),
Err(e) => {
warn!(target: "chain", "Error reading from database: {}", e);
None
}
}
}
/// Get the transition to the epoch the given parent hash is part of
/// or transitions to.
/// This will give the epoch that any children of this parent belong to.
///
/// The header corresponding the the parent hash must be stored already.
pub fn epoch_transition_for(&self, parent_hash: H256) -> Option<(Header, Vec<u8>)> {
// slow path: loop back block by block
let live_proofs = self.live_epoch_proofs.read();
for hdr in self.ancestry_iter(BlockId::Hash(parent_hash)) {
if let Some(transition) = live_proofs.get(&hdr.hash()).cloned() {
return Some((hdr.decode(), transition.proof))
}
}
// any blocks left must be descendants of the last canonical transition block.
match self.db.get(self.col, LAST_CANONICAL_TRANSITION) {
Ok(x) => {
let x = x.expect("last canonical transition always instantiated; qed");
let (hdr, proof) = decode_canonical_transition(&x)
.expect("last canonical transition always encoded correctly; qed");
Some((hdr, proof.to_vec()))
}
Err(e) => {
warn!("Error reading from DB: {}", e);
None
}
}
}
}
impl HeapSizeOf for HeaderChain {
@@ -570,7 +744,7 @@ mod tests {
let cache = Arc::new(Mutex::new(Cache::new(Default::default(), Duration::hours(6))));
let chain = HeaderChain::new(db.clone(), None, &::rlp::encode(&genesis_header), cache).unwrap();
let chain = HeaderChain::new(db.clone(), None, &spec, cache).unwrap();
let mut parent_hash = genesis_header.hash();
let mut rolling_timestamp = genesis_header.timestamp();
@@ -583,7 +757,7 @@ mod tests {
parent_hash = header.hash();
let mut tx = db.transaction();
let pending = chain.insert(&mut tx, header).unwrap();
let pending = chain.insert(&mut tx, header, None).unwrap();
db.write(tx).unwrap();
chain.apply_pending(pending);
@@ -603,7 +777,7 @@ mod tests {
let db = make_db();
let cache = Arc::new(Mutex::new(Cache::new(Default::default(), Duration::hours(6))));
let chain = HeaderChain::new(db.clone(), None, &::rlp::encode(&genesis_header), cache).unwrap();
let chain = HeaderChain::new(db.clone(), None, &spec, cache).unwrap();
let mut parent_hash = genesis_header.hash();
let mut rolling_timestamp = genesis_header.timestamp();
@@ -616,7 +790,7 @@ mod tests {
parent_hash = header.hash();
let mut tx = db.transaction();
let pending = chain.insert(&mut tx, header).unwrap();
let pending = chain.insert(&mut tx, header, None).unwrap();
db.write(tx).unwrap();
chain.apply_pending(pending);
@@ -635,7 +809,7 @@ mod tests {
parent_hash = header.hash();
let mut tx = db.transaction();
let pending = chain.insert(&mut tx, header).unwrap();
let pending = chain.insert(&mut tx, header, None).unwrap();
db.write(tx).unwrap();
chain.apply_pending(pending);
@@ -659,7 +833,7 @@ mod tests {
parent_hash = header.hash();
let mut tx = db.transaction();
let pending = chain.insert(&mut tx, header).unwrap();
let pending = chain.insert(&mut tx, header, None).unwrap();
db.write(tx).unwrap();
chain.apply_pending(pending);
@@ -682,12 +856,10 @@ mod tests {
#[test]
fn earliest_is_latest() {
let spec = Spec::new_test();
let genesis_header = spec.genesis_header();
let db = make_db();
let cache = Arc::new(Mutex::new(Cache::new(Default::default(), Duration::hours(6))));
let chain = HeaderChain::new(db.clone(), None, &::rlp::encode(&genesis_header), cache).unwrap();
let chain = HeaderChain::new(db.clone(), None, &spec, cache).unwrap();
assert!(chain.block_header(BlockId::Earliest).is_some());
assert!(chain.block_header(BlockId::Latest).is_some());
@@ -702,7 +874,7 @@ mod tests {
let cache = Arc::new(Mutex::new(Cache::new(Default::default(), Duration::hours(6))));
{
let chain = HeaderChain::new(db.clone(), None, &::rlp::encode(&genesis_header), cache.clone()).unwrap();
let chain = HeaderChain::new(db.clone(), None, &spec, cache.clone()).unwrap();
let mut parent_hash = genesis_header.hash();
let mut rolling_timestamp = genesis_header.timestamp();
for i in 1..10000 {
@@ -714,7 +886,7 @@ mod tests {
parent_hash = header.hash();
let mut tx = db.transaction();
let pending = chain.insert(&mut tx, header).unwrap();
let pending = chain.insert(&mut tx, header, None).unwrap();
db.write(tx).unwrap();
chain.apply_pending(pending);
@@ -722,7 +894,7 @@ mod tests {
}
}
let chain = HeaderChain::new(db.clone(), None, &::rlp::encode(&genesis_header), cache.clone()).unwrap();
let chain = HeaderChain::new(db.clone(), None, &spec, cache.clone()).unwrap();
assert!(chain.block_header(BlockId::Number(10)).is_none());
assert!(chain.block_header(BlockId::Number(9000)).is_some());
assert!(chain.cht_root(2).is_some());
@@ -738,7 +910,7 @@ mod tests {
let cache = Arc::new(Mutex::new(Cache::new(Default::default(), Duration::hours(6))));
{
let chain = HeaderChain::new(db.clone(), None, &::rlp::encode(&genesis_header), cache.clone()).unwrap();
let chain = HeaderChain::new(db.clone(), None, &spec, cache.clone()).unwrap();
let mut parent_hash = genesis_header.hash();
let mut rolling_timestamp = genesis_header.timestamp();
@@ -752,7 +924,7 @@ mod tests {
parent_hash = header.hash();
let mut tx = db.transaction();
let pending = chain.insert(&mut tx, header).unwrap();
let pending = chain.insert(&mut tx, header, None).unwrap();
db.write(tx).unwrap();
chain.apply_pending(pending);
@@ -769,7 +941,7 @@ mod tests {
parent_hash = header.hash();
let mut tx = db.transaction();
let pending = chain.insert(&mut tx, header).unwrap();
let pending = chain.insert(&mut tx, header, None).unwrap();
db.write(tx).unwrap();
chain.apply_pending(pending);
@@ -780,7 +952,7 @@ mod tests {
}
// after restoration, non-canonical eras should still be loaded.
let chain = HeaderChain::new(db.clone(), None, &::rlp::encode(&genesis_header), cache.clone()).unwrap();
let chain = HeaderChain::new(db.clone(), None, &spec, cache.clone()).unwrap();
assert_eq!(chain.block_header(BlockId::Latest).unwrap().number(), 10);
assert!(chain.candidates.read().get(&100).is_some())
}
@@ -792,10 +964,76 @@ mod tests {
let db = make_db();
let cache = Arc::new(Mutex::new(Cache::new(Default::default(), Duration::hours(6))));
let chain = HeaderChain::new(db.clone(), None, &::rlp::encode(&genesis_header), cache.clone()).unwrap();
let chain = HeaderChain::new(db.clone(), None, &spec, cache.clone()).unwrap();
assert!(chain.block_header(BlockId::Earliest).is_some());
assert!(chain.block_header(BlockId::Number(0)).is_some());
assert!(chain.block_header(BlockId::Hash(genesis_header.hash())).is_some());
}
#[test]
fn epoch_transitions_available_after_cht() {
let spec = Spec::new_test();
let genesis_header = spec.genesis_header();
let db = make_db();
let cache = Arc::new(Mutex::new(Cache::new(Default::default(), Duration::hours(6))));
let chain = HeaderChain::new(db.clone(), None, &spec, cache).unwrap();
let mut parent_hash = genesis_header.hash();
let mut rolling_timestamp = genesis_header.timestamp();
for i in 1..6 {
let mut header = Header::new();
header.set_parent_hash(parent_hash);
header.set_number(i);
header.set_timestamp(rolling_timestamp);
header.set_difficulty(*genesis_header.difficulty() * i.into());
parent_hash = header.hash();
let mut tx = db.transaction();
let epoch_proof = if i == 3 {
Some(vec![1, 2, 3, 4])
} else {
None
};
let pending = chain.insert(&mut tx, header, epoch_proof).unwrap();
db.write(tx).unwrap();
chain.apply_pending(pending);
rolling_timestamp += 10;
}
// these 3 should end up falling back to the genesis epoch proof in DB
for i in 0..3 {
let hash = chain.block_hash(BlockId::Number(i)).unwrap();
assert_eq!(chain.epoch_transition_for(hash).unwrap().1, Vec::<u8>::new());
}
// these are live.
for i in 3..6 {
let hash = chain.block_hash(BlockId::Number(i)).unwrap();
assert_eq!(chain.epoch_transition_for(hash).unwrap().1, vec![1, 2, 3, 4]);
}
for i in 6..10000 {
let mut header = Header::new();
header.set_parent_hash(parent_hash);
header.set_number(i);
header.set_timestamp(rolling_timestamp);
header.set_difficulty(*genesis_header.difficulty() * i.into());
parent_hash = header.hash();
let mut tx = db.transaction();
let pending = chain.insert(&mut tx, header, None).unwrap();
db.write(tx).unwrap();
chain.apply_pending(pending);
rolling_timestamp += 10;
}
// no live blocks have associated epoch proofs -- make sure we aren't leaking memory.
assert!(chain.live_epoch_proofs.read().is_empty());
assert_eq!(chain.epoch_transition_for(parent_hash).unwrap().1, vec![1, 2, 3, 4]);
}
}

View File

@@ -19,11 +19,11 @@
use std::sync::{Weak, Arc};
use ethcore::block_status::BlockStatus;
use ethcore::client::{ClientReport, EnvInfo};
use ethcore::engines::Engine;
use ethcore::error::BlockImportError;
use ethcore::client::{TransactionImportResult, ClientReport, EnvInfo};
use ethcore::engines::{epoch, Engine, EpochChange, EpochTransition, Proof, Unsure};
use ethcore::error::{TransactionError, BlockImportError, Error as EthcoreError};
use ethcore::ids::BlockId;
use ethcore::header::Header;
use ethcore::header::{BlockNumber, Header};
use ethcore::verification::queue::{self, HeaderQueue};
use ethcore::blockchain_info::BlockChainInfo;
use ethcore::spec::Spec;
@@ -31,11 +31,14 @@ use ethcore::service::ClientIoMessage;
use ethcore::encoded;
use io::IoChannel;
use parking_lot::{Mutex, RwLock};
use bigint::prelude::U256;
use bigint::hash::H256;
use futures::{IntoFuture, Future};
use util::Address;
use util::kvdb::{KeyValueDB, CompactionProfile};
use self::fetch::ChainDataFetcher;
use self::header_chain::{AncestryIter, HeaderChain};
use cache::Cache;
@@ -45,6 +48,8 @@ pub use self::service::Service;
mod header_chain;
mod service;
pub mod fetch;
/// Configuration for the light client.
#[derive(Debug, Clone)]
pub struct Config {
@@ -80,6 +85,9 @@ impl Default for Config {
/// Trait for interacting with the header chain abstractly.
pub trait LightChainClient: Send + Sync {
/// Adds a new `LightChainNotify` listener.
fn add_listener(&self, listener: Weak<LightChainNotify>);
/// Get chain info.
fn chain_info(&self) -> BlockChainInfo;
@@ -128,7 +136,7 @@ pub trait LightChainClient: Send + Sync {
fn cht_root(&self, i: usize) -> Option<H256>;
/// Get the EIP-86 transition block number.
fn eip86_transition(&self) -> u64;
fn eip86_transition(&self) -> BlockNumber;
/// Get a report of import activity since the last call.
fn report(&self) -> ClientReport;
@@ -156,7 +164,7 @@ impl<T: LightChainClient> AsLightClient for T {
}
/// Light client implementation.
pub struct Client {
pub struct Client<T> {
queue: HeaderQueue,
engine: Arc<Engine>,
chain: HeaderChain,
@@ -164,22 +172,30 @@ pub struct Client {
import_lock: Mutex<()>,
db: Arc<KeyValueDB>,
listeners: RwLock<Vec<Weak<LightChainNotify>>>,
fetcher: T,
verify_full: bool,
}
impl Client {
impl<T: ChainDataFetcher> Client<T> {
/// Create a new `Client`.
pub fn new(config: Config, db: Arc<KeyValueDB>, chain_col: Option<u32>, spec: &Spec, io_channel: IoChannel<ClientIoMessage>, cache: Arc<Mutex<Cache>>) -> Result<Self, String> {
let gh = ::rlp::encode(&spec.genesis_header());
pub fn new(
config: Config,
db: Arc<KeyValueDB>,
chain_col: Option<u32>,
spec: &Spec,
fetcher: T,
io_channel: IoChannel<ClientIoMessage>,
cache: Arc<Mutex<Cache>>
) -> Result<Self, String> {
Ok(Client {
queue: HeaderQueue::new(config.queue, spec.engine.clone(), io_channel, config.check_seal),
engine: spec.engine.clone(),
chain: HeaderChain::new(db.clone(), chain_col, &gh, cache)?,
chain: HeaderChain::new(db.clone(), chain_col, &spec, cache)?,
report: RwLock::new(ClientReport::default()),
import_lock: Mutex::new(()),
db: db,
listeners: RwLock::new(vec![]),
fetcher: fetcher,
verify_full: config.verify_full,
})
}
@@ -191,10 +207,24 @@ impl Client {
/// Create a new `Client` backed purely in-memory.
/// This will ignore all database options in the configuration.
pub fn in_memory(config: Config, spec: &Spec, io_channel: IoChannel<ClientIoMessage>, cache: Arc<Mutex<Cache>>) -> Self {
pub fn in_memory(
config: Config,
spec: &Spec,
fetcher: T,
io_channel: IoChannel<ClientIoMessage>,
cache: Arc<Mutex<Cache>>
) -> Self {
let db = ::util::kvdb::in_memory(0);
Client::new(config, Arc::new(db), None, spec, io_channel, cache).expect("New DB creation infallible; qed")
Client::new(
config,
Arc::new(db),
None,
spec,
fetcher,
io_channel,
cache
).expect("New DB creation infallible; qed")
}
/// Import a header to the queue for additional verification.
@@ -293,19 +323,33 @@ impl Client {
continue
}
// TODO: `epoch_end_signal`, `is_epoch_end`.
// proofs we get from the network would be _complete_, whereas we need
// _incomplete_ signals
let write_proof_result = match self.check_epoch_signal(&verified_header) {
Ok(Some(proof)) => self.write_pending_proof(&verified_header, proof),
Ok(None) => Ok(()),
Err(e) =>
panic!("Unable to fetch epoch transition proof: {:?}", e),
};
if let Err(e) = write_proof_result {
warn!(target: "client", "Error writing pending transition proof to DB: {:?} \
The node may not be able to synchronize further.", e);
}
let epoch_proof = self.engine.is_epoch_end(
&verified_header,
&|h| self.chain.block_header(BlockId::Hash(h)).map(|hdr| hdr.decode()),
&|h| self.chain.pending_transition(h),
);
let mut tx = self.db.transaction();
let pending = match self.chain.insert(&mut tx, verified_header) {
let pending = match self.chain.insert(&mut tx, verified_header, epoch_proof) {
Ok(pending) => {
good.push(hash);
self.report.write().blocks_imported += 1;
pending
}
Err(e) => {
debug!(target: "client", "Error importing header {:?}: {}", (num, hash), e);
debug!(target: "client", "Error importing header {:?}: {:?}", (num, hash), e);
bad.push(hash);
continue;
}
@@ -421,9 +465,76 @@ impl Client {
true
}
fn check_epoch_signal(&self, verified_header: &Header) -> Result<Option<Proof>, T::Error> {
let (mut block, mut receipts) = (None, None);
// First, check without providing auxiliary data.
match self.engine.signals_epoch_end(verified_header, None, None) {
EpochChange::No => return Ok(None),
EpochChange::Yes(proof) => return Ok(Some(proof)),
EpochChange::Unsure(unsure) => {
let (b, r) = match unsure {
Unsure::NeedsBody =>
(Some(self.fetcher.block_body(verified_header)), None),
Unsure::NeedsReceipts =>
(None, Some(self.fetcher.block_receipts(verified_header))),
Unsure::NeedsBoth => (
Some(self.fetcher.block_body(verified_header)),
Some(self.fetcher.block_receipts(verified_header)),
),
};
if let Some(b) = b {
block = Some(b.into_future().wait()?.into_inner());
}
if let Some(r) = r {
receipts = Some(r.into_future().wait()?);
}
}
}
let block = block.as_ref().map(|x| &x[..]);
let receipts = receipts.as_ref().map(|x| &x[..]);
// Check again now that required data has been fetched.
match self.engine.signals_epoch_end(verified_header, block, receipts) {
EpochChange::No => return Ok(None),
EpochChange::Yes(proof) => return Ok(Some(proof)),
EpochChange::Unsure(_) =>
panic!("Detected faulty engine implementation: requests additional \
data to check epoch end signal when everything necessary provided"),
}
}
// attempts to fetch the epoch proof from the network until successful.
fn write_pending_proof(&self, header: &Header, proof: Proof) -> Result<(), T::Error> {
let proof = match proof {
Proof::Known(known) => known,
Proof::WithState(state_dependent) => {
self.fetcher.epoch_transition(
header.hash(),
self.engine.clone(),
state_dependent
).into_future().wait()?
}
};
let mut batch = self.db.transaction();
self.chain.insert_pending_transition(&mut batch, header.hash(), epoch::PendingTransition {
proof: proof,
});
self.db.write_buffered(batch);
Ok(())
}
}
impl LightChainClient for Client {
impl<T: ChainDataFetcher> LightChainClient for Client<T> {
fn add_listener(&self, listener: Weak<LightChainNotify>) {
Client::add_listener(self, listener)
}
fn chain_info(&self) -> BlockChainInfo { Client::chain_info(self) }
fn queue_header(&self, header: Header) -> Result<H256, BlockImportError> {
@@ -482,7 +593,7 @@ impl LightChainClient for Client {
Client::cht_root(self, i)
}
fn eip86_transition(&self) -> u64 {
fn eip86_transition(&self) -> BlockNumber {
self.engine().params().eip86_transition
}
@@ -490,3 +601,38 @@ impl LightChainClient for Client {
Client::report(self)
}
}
impl<T: ChainDataFetcher> ::ethcore::client::EngineClient for Client<T> {
fn update_sealing(&self) { }
fn submit_seal(&self, _block_hash: H256, _seal: Vec<Vec<u8>>) { }
fn broadcast_consensus_message(&self, _message: Vec<u8>) { }
fn epoch_transition_for(&self, parent_hash: H256) -> Option<EpochTransition> {
self.chain.epoch_transition_for(parent_hash).map(|(hdr, proof)| EpochTransition {
block_hash: hdr.hash(),
block_number: hdr.number(),
proof: proof,
})
}
fn chain_info(&self) -> BlockChainInfo {
Client::chain_info(self)
}
fn call_contract(&self, _id: BlockId, _address: Address, _data: Vec<u8>) -> Result<Vec<u8>, String> {
Err("Contract calling not supported by light client".into())
}
fn transact_contract(&self, _address: Address, _data: Vec<u8>)
-> Result<TransactionImportResult, EthcoreError>
{
// TODO: these are only really used for misbehavior reporting.
// no relevant clients will be running light clients, but maybe
// they could be at some point?
Err(TransactionError::LimitReached.into())
}
fn block_number(&self, id: BlockId) -> Option<BlockNumber> {
self.block_header(id).map(|hdr| hdr.number())
}
}

View File

@@ -30,7 +30,7 @@ use util::kvdb::{Database, DatabaseConfig};
use cache::Cache;
use parking_lot::Mutex;
use super::{Client, Config as ClientConfig};
use super::{ChainDataFetcher, Client, Config as ClientConfig};
/// Errors on service initialization.
#[derive(Debug)]
@@ -51,14 +51,14 @@ impl fmt::Display for Error {
}
/// Light client service.
pub struct Service {
client: Arc<Client>,
pub struct Service<T> {
client: Arc<Client<T>>,
io_service: IoService<ClientIoMessage>,
}
impl Service {
impl<T: ChainDataFetcher> Service<T> {
/// Start the service: initialize I/O workers and client itself.
pub fn start(config: ClientConfig, spec: &Spec, path: &Path, cache: Arc<Mutex<Cache>>) -> Result<Self, Error> {
pub fn start(config: ClientConfig, spec: &Spec, fetcher: T, path: &Path, cache: Arc<Mutex<Cache>>) -> Result<Self, Error> {
// initialize database.
let mut db_config = DatabaseConfig::with_columns(db::NUM_COLUMNS);
@@ -81,10 +81,14 @@ impl Service {
db,
db::COL_LIGHT_CHAIN,
spec,
fetcher,
io_service.channel(),
cache,
).map_err(Error::Database)?);
io_service.register_handler(Arc::new(ImportBlocks(client.clone()))).map_err(Error::Io)?;
spec.engine.register_client(Arc::downgrade(&client) as _);
Ok(Service {
client: client,
io_service: io_service,
@@ -97,14 +101,14 @@ impl Service {
}
/// Get a handle to the client.
pub fn client(&self) -> &Arc<Client> {
pub fn client(&self) -> &Arc<Client<T>> {
&self.client
}
}
struct ImportBlocks(Arc<Client>);
struct ImportBlocks<T>(Arc<Client<T>>);
impl IoHandler<ClientIoMessage> for ImportBlocks {
impl<T: ChainDataFetcher> IoHandler<ClientIoMessage> for ImportBlocks<T> {
fn message(&self, _io: &IoContext<ClientIoMessage>, message: &ClientIoMessage) {
if let ClientIoMessage::BlockVerified = *message {
self.0.import_verified();
@@ -120,6 +124,7 @@ mod tests {
use std::sync::Arc;
use cache::Cache;
use client::fetch;
use time::Duration;
use parking_lot::Mutex;
@@ -129,6 +134,6 @@ mod tests {
let temp_path = RandomTempPath::new();
let cache = Arc::new(Mutex::new(Cache::new(Default::default(), Duration::hours(6))));
Service::start(Default::default(), &spec, temp_path.as_path(), cache).unwrap();
Service::start(Default::default(), &spec, fetch::unavailable(), temp_path.as_path(), cache).unwrap();
}
}

View File

@@ -62,6 +62,7 @@ fn hardcoded_serve_time(kind: Kind) -> u64 {
Kind::Storage => 2_000_000,
Kind::Code => 1_500_000,
Kind::Execution => 250, // per gas.
Kind::Signal => 500_000,
}
}

View File

@@ -104,9 +104,8 @@ mod packet {
// relay transactions to peers.
pub const SEND_TRANSACTIONS: u8 = 0x06;
// request and respond with epoch transition proof
pub const REQUEST_EPOCH_PROOF: u8 = 0x07;
pub const EPOCH_PROOF: u8 = 0x08;
// two packets were previously meant to be reserved for epoch proofs.
// these have since been moved to requests.
}
// timeouts for different kinds of requests. all values are in milliseconds.
@@ -124,6 +123,7 @@ mod timeout {
pub const CONTRACT_CODE: i64 = 100;
pub const HEADER_PROOF: i64 = 100;
pub const TRANSACTION_PROOF: i64 = 1000; // per gas?
pub const EPOCH_SIGNAL: i64 = 200;
}
/// A request id.
@@ -584,12 +584,6 @@ impl LightProtocol {
packet::SEND_TRANSACTIONS => self.relay_transactions(peer, io, rlp),
packet::REQUEST_EPOCH_PROOF | packet::EPOCH_PROOF => {
// ignore these for now, but leave them specified.
debug!(target: "pip", "Ignoring request/response for epoch proof");
Ok(())
}
other => {
Err(Error::UnrecognizedPacket(other))
}
@@ -952,6 +946,7 @@ impl LightProtocol {
CompleteRequest::Storage(req) => self.provider.storage_proof(req).map(Response::Storage),
CompleteRequest::Code(req) => self.provider.contract_code(req).map(Response::Code),
CompleteRequest::Execution(req) => self.provider.transaction_proof(req).map(Response::Execution),
CompleteRequest::Signal(req) => self.provider.epoch_signal(req).map(Response::Signal),
}
});

View File

@@ -91,6 +91,7 @@ pub struct CostTable {
code: U256,
header_proof: U256,
transaction_proof: U256, // cost per gas.
epoch_signal: U256,
}
impl Default for CostTable {
@@ -107,6 +108,7 @@ impl Default for CostTable {
code: 20000.into(),
header_proof: 15000.into(),
transaction_proof: 2.into(),
epoch_signal: 10000.into(),
}
}
}
@@ -121,7 +123,7 @@ impl Encodable for CostTable {
s.append(cost);
}
s.begin_list(10).append(&self.base);
s.begin_list(11).append(&self.base);
append_cost(s, &self.headers, request::Kind::Headers);
append_cost(s, &self.transaction_index, request::Kind::TransactionIndex);
append_cost(s, &self.body, request::Kind::Body);
@@ -131,6 +133,7 @@ impl Encodable for CostTable {
append_cost(s, &self.code, request::Kind::Code);
append_cost(s, &self.header_proof, request::Kind::HeaderProof);
append_cost(s, &self.transaction_proof, request::Kind::Execution);
append_cost(s, &self.epoch_signal, request::Kind::Signal);
}
}
@@ -147,6 +150,7 @@ impl Decodable for CostTable {
let mut code = None;
let mut header_proof = None;
let mut transaction_proof = None;
let mut epoch_signal = None;
for cost_list in rlp.iter().skip(1) {
let cost = cost_list.val_at(1)?;
@@ -160,6 +164,7 @@ impl Decodable for CostTable {
request::Kind::Code => code = Some(cost),
request::Kind::HeaderProof => header_proof = Some(cost),
request::Kind::Execution => transaction_proof = Some(cost),
request::Kind::Signal => epoch_signal = Some(cost),
}
}
@@ -176,6 +181,7 @@ impl Decodable for CostTable {
code: unwrap_cost(code)?,
header_proof: unwrap_cost(header_proof)?,
transaction_proof: unwrap_cost(transaction_proof)?,
epoch_signal: unwrap_cost(epoch_signal)?,
})
}
}
@@ -238,6 +244,7 @@ impl FlowParams {
code: cost_for_kind(Kind::Code),
header_proof: cost_for_kind(Kind::HeaderProof),
transaction_proof: cost_for_kind(Kind::Execution),
epoch_signal: cost_for_kind(Kind::Signal),
};
FlowParams {
@@ -263,7 +270,8 @@ impl FlowParams {
storage: free_cost.clone(),
code: free_cost.clone(),
header_proof: free_cost.clone(),
transaction_proof: free_cost,
transaction_proof: free_cost.clone(),
epoch_signal: free_cost,
}
}
}
@@ -293,6 +301,7 @@ impl FlowParams {
Request::Storage(_) => self.costs.storage,
Request::Code(_) => self.costs.code,
Request::Execution(ref req) => self.costs.transaction_proof * req.gas,
Request::Signal(_) => self.costs.epoch_signal,
}
}

View File

@@ -139,6 +139,7 @@ fn compute_timeout(reqs: &Requests) -> Duration {
Request::Storage(_) => timeout::PROOF,
Request::Code(_) => timeout::CONTRACT_CODE,
Request::Execution(_) => timeout::TRANSACTION_PROOF,
Request::Signal(_) => timeout::EPOCH_SIGNAL,
}
}))
}

View File

@@ -158,6 +158,12 @@ impl Provider for TestProvider {
None
}
fn epoch_signal(&self, _req: request::CompleteSignalRequest) -> Option<request::SignalResponse> {
Some(request::SignalResponse {
signal: vec![1, 2, 3, 4],
})
}
fn ready_transactions(&self) -> Vec<PendingTransaction> {
self.0.client.ready_transactions()
}
@@ -523,6 +529,50 @@ fn get_contract_code() {
proto.handle_packet(&expected, &1, packet::REQUEST, &request_body);
}
#[test]
fn epoch_signal() {
let capabilities = capabilities();
let (provider, proto) = setup(capabilities.clone());
let flow_params = proto.flow_params.read().clone();
let cur_status = status(provider.client.chain_info());
{
let packet_body = write_handshake(&cur_status, &capabilities, &proto);
proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body.clone()));
proto.handle_packet(&Expect::Nothing, &1, packet::STATUS, &packet_body);
}
let req_id = 112;
let request = Request::Signal(request::IncompleteSignalRequest {
block_hash: H256([1; 32]).into(),
});
let requests = encode_single(request.clone());
let request_body = make_packet(req_id, &requests);
let response = {
let response = vec![Response::Signal(SignalResponse {
signal: vec![1, 2, 3, 4],
})];
let limit = *flow_params.limit();
let cost = flow_params.compute_cost_multi(requests.requests());
println!("limit = {}, cost = {}", limit, cost);
let new_creds = limit - cost;
let mut response_stream = RlpStream::new_list(3);
response_stream.append(&req_id).append(&new_creds).append_list(&response);
response_stream.out()
};
let expected = Expect::Respond(packet::RESPONSE, response);
proto.handle_packet(&expected, &1, packet::REQUEST, &request_body);
}
#[test]
fn proof_of_execution() {
let capabilities = capabilities();

View File

@@ -195,6 +195,8 @@ fn guess_capabilities(requests: &[CheckedRequest]) -> Capabilities {
caps.serve_headers = true,
CheckedRequest::HeaderByHash(_, _) =>
caps.serve_headers = true,
CheckedRequest::Signal(_, _) =>
caps.serve_headers = true,
CheckedRequest::Body(ref req, _) => if let Ok(ref hdr) = req.0.as_ref() {
update_since(&mut caps.serve_chain_since, hdr.number());
},

View File

@@ -20,7 +20,7 @@ use std::sync::Arc;
use ethcore::basic_account::BasicAccount;
use ethcore::encoded;
use ethcore::engines::Engine;
use ethcore::engines::{Engine, StateDependentProof};
use ethcore::receipt::Receipt;
use ethcore::state::{self, ProvedExecution};
use ethcore::transaction::SignedTransaction;
@@ -56,6 +56,8 @@ pub enum Request {
Code(Code),
/// A request for proof of execution.
Execution(TransactionProof),
/// A request for epoch change signal.
Signal(Signal),
}
/// A request argument.
@@ -136,6 +138,7 @@ impl_single!(Body, Body, encoded::Block);
impl_single!(Account, Account, Option<BasicAccount>);
impl_single!(Code, Code, Bytes);
impl_single!(Execution, TransactionProof, super::ExecutionResult);
impl_single!(Signal, Signal, Vec<u8>);
macro_rules! impl_args {
() => {
@@ -244,6 +247,7 @@ pub enum CheckedRequest {
Account(Account, net_request::IncompleteAccountRequest),
Code(Code, net_request::IncompleteCodeRequest),
Execution(TransactionProof, net_request::IncompleteExecutionRequest),
Signal(Signal, net_request::IncompleteSignalRequest)
}
impl From<Request> for CheckedRequest {
@@ -302,6 +306,12 @@ impl From<Request> for CheckedRequest {
};
CheckedRequest::Execution(req, net_req)
}
Request::Signal(req) => {
let net_req = net_request::IncompleteSignalRequest {
block_hash: req.hash.into(),
};
CheckedRequest::Signal(req, net_req)
}
}
}
}
@@ -319,6 +329,7 @@ impl CheckedRequest {
CheckedRequest::Account(_, req) => NetRequest::Account(req),
CheckedRequest::Code(_, req) => NetRequest::Code(req),
CheckedRequest::Execution(_, req) => NetRequest::Execution(req),
CheckedRequest::Signal(_, req) => NetRequest::Signal(req),
}
}
@@ -446,6 +457,7 @@ macro_rules! match_me {
CheckedRequest::Account($check, $req) => $e,
CheckedRequest::Code($check, $req) => $e,
CheckedRequest::Execution($check, $req) => $e,
CheckedRequest::Signal($check, $req) => $e,
}
}
}
@@ -473,6 +485,7 @@ impl IncompleteRequest for CheckedRequest {
CheckedRequest::Account(_, ref req) => req.check_outputs(f),
CheckedRequest::Code(_, ref req) => req.check_outputs(f),
CheckedRequest::Execution(_, ref req) => req.check_outputs(f),
CheckedRequest::Signal(_, ref req) => req.check_outputs(f),
}
}
@@ -493,6 +506,7 @@ impl IncompleteRequest for CheckedRequest {
CheckedRequest::Account(_, req) => req.complete().map(CompleteRequest::Account),
CheckedRequest::Code(_, req) => req.complete().map(CompleteRequest::Code),
CheckedRequest::Execution(_, req) => req.complete().map(CompleteRequest::Execution),
CheckedRequest::Signal(_, req) => req.complete().map(CompleteRequest::Signal),
}
}
@@ -544,6 +558,9 @@ impl net_request::CheckedRequest for CheckedRequest {
CheckedRequest::Execution(ref prover, _) =>
expect!((&NetResponse::Execution(ref res), _) =>
prover.check_response(cache, &res.items).map(Response::Execution)),
CheckedRequest::Signal(ref prover, _) =>
expect!((&NetResponse::Signal(ref res), _) =>
prover.check_response(cache, &res.signal).map(Response::Signal)),
}
}
}
@@ -567,6 +584,8 @@ pub enum Response {
Code(Vec<u8>),
/// Response to a request for proved execution.
Execution(super::ExecutionResult),
/// Response to a request for epoch change signal.
Signal(Vec<u8>),
}
impl net_request::ResponseLike for Response {
@@ -850,6 +869,27 @@ impl TransactionProof {
}
}
/// Request for epoch signal.
/// Provide engine and state-dependent proof checker.
#[derive(Clone)]
pub struct Signal {
/// Block hash and number to fetch proof for.
pub hash: H256,
/// Consensus engine, used to check the proof.
pub engine: Arc<Engine>,
/// Special checker for the proof.
pub proof_check: Arc<StateDependentProof>,
}
impl Signal {
/// Check the signal, returning the signal or indicate that it's bad.
pub fn check_response(&self, _: &Mutex<::cache::Cache>, signal: &[u8]) -> Result<Vec<u8>, Error> {
self.proof_check.check_proof(&*self.engine, signal)
.map(|_| signal.to_owned())
.map_err(|_| Error::BadProof)
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -127,6 +127,9 @@ pub trait Provider: Send + Sync {
/// Provide a proof-of-execution for the given transaction proof request.
/// Returns a vector of all state items necessary to execute the transaction.
fn transaction_proof(&self, req: request::CompleteExecutionRequest) -> Option<request::ExecutionResponse>;
/// Provide epoch signal data at given block hash. This should be just the
fn epoch_signal(&self, req: request::CompleteSignalRequest) -> Option<request::SignalResponse>;
}
// Implementation of a light client data provider for a client.
@@ -265,6 +268,12 @@ impl<T: ProvingBlockChainClient + ?Sized> Provider for T {
fn ready_transactions(&self) -> Vec<PendingTransaction> {
BlockChainClient::ready_transactions(self)
}
fn epoch_signal(&self, req: request::CompleteSignalRequest) -> Option<request::SignalResponse> {
self.epoch_signal(req.block_hash).map(|signal| request::SignalResponse {
signal: signal,
})
}
}
/// The light client "provider" implementation. This wraps a `LightClient` and
@@ -330,6 +339,10 @@ impl<L: AsLightClient + Send + Sync> Provider for LightProvider<L> {
None
}
fn epoch_signal(&self, _req: request::CompleteSignalRequest) -> Option<request::SignalResponse> {
None
}
fn ready_transactions(&self) -> Vec<PendingTransaction> {
let chain_info = self.chain_info();
self.txqueue.read().ready_transactions(chain_info.best_block_number, chain_info.best_block_timestamp)

View File

@@ -67,6 +67,11 @@ pub use self::execution::{
Incomplete as IncompleteExecutionRequest,
Response as ExecutionResponse,
};
pub use self::epoch_signal::{
Complete as CompleteSignalRequest,
Incomplete as IncompleteSignalRequest,
Response as SignalResponse,
};
pub use self::builder::{RequestBuilder, Requests};
@@ -261,6 +266,8 @@ pub enum Request {
Code(IncompleteCodeRequest),
/// A request for proof of execution,
Execution(IncompleteExecutionRequest),
/// A request for an epoch signal.
Signal(IncompleteSignalRequest),
}
/// All request types, in an answerable state.
@@ -284,6 +291,8 @@ pub enum CompleteRequest {
Code(CompleteCodeRequest),
/// A request for proof of execution,
Execution(CompleteExecutionRequest),
/// A request for an epoch signal.
Signal(CompleteSignalRequest),
}
impl CompleteRequest {
@@ -299,6 +308,7 @@ impl CompleteRequest {
CompleteRequest::Storage(_) => Kind::Storage,
CompleteRequest::Code(_) => Kind::Code,
CompleteRequest::Execution(_) => Kind::Execution,
CompleteRequest::Signal(_) => Kind::Signal,
}
}
}
@@ -316,6 +326,7 @@ impl Request {
Request::Storage(_) => Kind::Storage,
Request::Code(_) => Kind::Code,
Request::Execution(_) => Kind::Execution,
Request::Signal(_) => Kind::Signal,
}
}
}
@@ -332,6 +343,7 @@ impl Decodable for Request {
Kind::Storage => Ok(Request::Storage(rlp.val_at(1)?)),
Kind::Code => Ok(Request::Code(rlp.val_at(1)?)),
Kind::Execution => Ok(Request::Execution(rlp.val_at(1)?)),
Kind::Signal => Ok(Request::Signal(rlp.val_at(1)?)),
}
}
}
@@ -353,6 +365,7 @@ impl Encodable for Request {
Request::Storage(ref req) => s.append(req),
Request::Code(ref req) => s.append(req),
Request::Execution(ref req) => s.append(req),
Request::Signal(ref req) => s.append(req),
};
}
}
@@ -374,6 +387,7 @@ impl IncompleteRequest for Request {
Request::Storage(ref req) => req.check_outputs(f),
Request::Code(ref req) => req.check_outputs(f),
Request::Execution(ref req) => req.check_outputs(f),
Request::Signal(ref req) => req.check_outputs(f),
}
}
@@ -388,6 +402,7 @@ impl IncompleteRequest for Request {
Request::Storage(ref req) => req.note_outputs(f),
Request::Code(ref req) => req.note_outputs(f),
Request::Execution(ref req) => req.note_outputs(f),
Request::Signal(ref req) => req.note_outputs(f),
}
}
@@ -402,6 +417,7 @@ impl IncompleteRequest for Request {
Request::Storage(ref mut req) => req.fill(oracle),
Request::Code(ref mut req) => req.fill(oracle),
Request::Execution(ref mut req) => req.fill(oracle),
Request::Signal(ref mut req) => req.fill(oracle),
}
}
@@ -416,6 +432,7 @@ impl IncompleteRequest for Request {
Request::Storage(req) => req.complete().map(CompleteRequest::Storage),
Request::Code(req) => req.complete().map(CompleteRequest::Code),
Request::Execution(req) => req.complete().map(CompleteRequest::Execution),
Request::Signal(req) => req.complete().map(CompleteRequest::Signal),
}
}
@@ -430,6 +447,7 @@ impl IncompleteRequest for Request {
Request::Storage(ref mut req) => req.adjust_refs(mapping),
Request::Code(ref mut req) => req.adjust_refs(mapping),
Request::Execution(ref mut req) => req.adjust_refs(mapping),
Request::Signal(ref mut req) => req.adjust_refs(mapping),
}
}
}
@@ -471,6 +489,8 @@ pub enum Kind {
Code = 7,
/// A request for transaction execution + state proof.
Execution = 8,
/// A request for epoch transition signal.
Signal = 9,
}
impl Decodable for Kind {
@@ -485,6 +505,7 @@ impl Decodable for Kind {
6 => Ok(Kind::Storage),
7 => Ok(Kind::Code),
8 => Ok(Kind::Execution),
9 => Ok(Kind::Signal),
_ => Err(DecoderError::Custom("Unknown PIP request ID.")),
}
}
@@ -517,6 +538,8 @@ pub enum Response {
Code(CodeResponse),
/// A response for proof of execution,
Execution(ExecutionResponse),
/// A response for epoch change signal.
Signal(SignalResponse),
}
impl ResponseLike for Response {
@@ -532,6 +555,7 @@ impl ResponseLike for Response {
Response::Storage(ref res) => res.fill_outputs(f),
Response::Code(ref res) => res.fill_outputs(f),
Response::Execution(ref res) => res.fill_outputs(f),
Response::Signal(ref res) => res.fill_outputs(f),
}
}
}
@@ -549,6 +573,7 @@ impl Response {
Response::Storage(_) => Kind::Storage,
Response::Code(_) => Kind::Code,
Response::Execution(_) => Kind::Execution,
Response::Signal(_) => Kind::Signal,
}
}
}
@@ -565,6 +590,7 @@ impl Decodable for Response {
Kind::Storage => Ok(Response::Storage(rlp.val_at(1)?)),
Kind::Code => Ok(Response::Code(rlp.val_at(1)?)),
Kind::Execution => Ok(Response::Execution(rlp.val_at(1)?)),
Kind::Signal => Ok(Response::Signal(rlp.val_at(1)?)),
}
}
}
@@ -586,6 +612,7 @@ impl Encodable for Response {
Response::Storage(ref res) => s.append(res),
Response::Code(ref res) => s.append(res),
Response::Execution(ref res) => s.append(res),
Response::Signal(ref res) => s.append(res),
};
}
}
@@ -1509,6 +1536,104 @@ pub mod execution {
}
}
/// A request for epoch signal data.
pub mod epoch_signal {
use super::{Field, NoSuchOutput, OutputKind, Output};
use rlp::{Encodable, Decodable, DecoderError, RlpStream, UntrustedRlp};
use util::{Bytes, H256};
/// Potentially incomplete epoch signal request.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Incomplete {
/// The block hash to request the signal for.
pub block_hash: Field<H256>,
}
impl Decodable for Incomplete {
fn decode(rlp: &UntrustedRlp) -> Result<Self, DecoderError> {
Ok(Incomplete {
block_hash: rlp.val_at(0)?,
})
}
}
impl Encodable for Incomplete {
fn rlp_append(&self, s: &mut RlpStream) {
s.begin_list(1).append(&self.block_hash);
}
}
impl super::IncompleteRequest for Incomplete {
type Complete = Complete;
type Response = Response;
fn check_outputs<F>(&self, mut f: F) -> Result<(), NoSuchOutput>
where F: FnMut(usize, usize, OutputKind) -> Result<(), NoSuchOutput>
{
if let Field::BackReference(req, idx) = self.block_hash {
f(req, idx, OutputKind::Hash)?;
}
Ok(())
}
fn note_outputs<F>(&self, _: F) where F: FnMut(usize, OutputKind) {}
fn fill<F>(&mut self, oracle: F) where F: Fn(usize, usize) -> Result<Output, NoSuchOutput> {
if let Field::BackReference(req, idx) = self.block_hash {
self.block_hash = match oracle(req, idx) {
Ok(Output::Hash(block_hash)) => Field::Scalar(block_hash.into()),
_ => Field::BackReference(req, idx),
}
}
}
fn complete(self) -> Result<Self::Complete, NoSuchOutput> {
Ok(Complete {
block_hash: self.block_hash.into_scalar()?,
})
}
fn adjust_refs<F>(&mut self, mut mapping: F) where F: FnMut(usize) -> usize {
self.block_hash.adjust_req(&mut mapping);
}
}
/// A complete request.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Complete {
/// The block hash to request the epoch signal for.
pub block_hash: H256,
}
/// The output of a request for an epoch signal.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Response {
/// The requested epoch signal.
pub signal: Bytes,
}
impl super::ResponseLike for Response {
/// Fill reusable outputs by providing them to the function.
fn fill_outputs<F>(&self, _: F) where F: FnMut(usize, Output) {}
}
impl Decodable for Response {
fn decode(rlp: &UntrustedRlp) -> Result<Self, DecoderError> {
Ok(Response {
signal: rlp.as_val()?,
})
}
}
impl Encodable for Response {
fn rlp_append(&self, s: &mut RlpStream) {
s.append(&self.signal);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
@@ -1797,4 +1922,22 @@ mod tests {
let raw = ::rlp::encode_list(&reqs);
assert_eq!(::rlp::decode_list::<Response>(&raw), reqs);
}
#[test]
fn epoch_signal_roundtrip() {
let req = IncompleteSignalRequest {
block_hash: Field::Scalar(Default::default()),
};
let full_req = Request::Signal(req.clone());
let res = SignalResponse {
signal: vec![1, 2, 3, 4, 5, 6, 7, 6, 5, 4],
};
let full_res = Response::Signal(res.clone());
check_roundtrip(req);
check_roundtrip(full_req);
check_roundtrip(res);
check_roundtrip(full_res);
}
}