From 3a5843c40f208ab9715e3507ffcf30c46b5ec694 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Thu, 27 Oct 2016 15:45:59 +0200 Subject: [PATCH] skeleton and request traits --- ethcore/src/light/client.rs | 5 ++ ethcore/src/light/sync.rs | 33 +++++++++ sync/src/chain.rs | 12 ++-- sync/src/light/mod.rs | 53 ++++++++------ sync/src/light/request.rs | 138 ++++++++++++++++++++++++++++++++++++ 5 files changed, 213 insertions(+), 28 deletions(-) create mode 100644 ethcore/src/light/sync.rs create mode 100644 sync/src/light/request.rs diff --git a/ethcore/src/light/client.rs b/ethcore/src/light/client.rs index a4ff2531f..7f061e3ab 100644 --- a/ethcore/src/light/client.rs +++ b/ethcore/src/light/client.rs @@ -68,6 +68,11 @@ impl Client { pub fn queue_info(&self) -> QueueInfo { self.header_queue.queue_info() } + + /// Get the chain info. + pub fn chain_info(&self) -> ChainInfo { + + } } impl Provider for Client { diff --git a/ethcore/src/light/sync.rs b/ethcore/src/light/sync.rs new file mode 100644 index 000000000..15ed5673c --- /dev/null +++ b/ethcore/src/light/sync.rs @@ -0,0 +1,33 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! A light sync target. + +use block_import_error::BlockImportError; +use client::BlockChainInfo; + +use util::hash::H256; + +pub trait Sync { + /// Whether syncing is enabled. + fn enabled(&self) -> bool; + + /// Current chain info. + fn chain_info(&self) -> BlockChainInfo; + + /// Import a header. + fn import_header(&self, header_bytes: Vec) -> Result; +} \ No newline at end of file diff --git a/sync/src/chain.rs b/sync/src/chain.rs index 916e7424e..344ed8c41 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -209,8 +209,8 @@ pub struct SyncStatus { impl SyncStatus { /// Indicates if snapshot download is in progress pub fn is_snapshot_syncing(&self) -> bool { - self.state == SyncState::SnapshotManifest - || self.state == SyncState::SnapshotData + self.state == SyncState::SnapshotManifest + || self.state == SyncState::SnapshotData || self.state == SyncState::SnapshotWaiting } @@ -453,7 +453,7 @@ impl ChainSync { self.init_downloaders(io.chain()); self.reset_and_continue(io); } - + /// Restart sync after bad block has been detected. May end up re-downloading up to QUEUE_SIZE blocks fn init_downloaders(&mut self, chain: &BlockChainClient) { // Do not assume that the block queue/chain still has our last_imported_block @@ -1150,9 +1150,9 @@ impl ChainSync { } }, BlockSet::OldBlocks => { - if self.old_blocks.as_mut().map_or(false, |downloader| { downloader.collect_blocks(io, false) == Err(DownloaderImportError::Invalid) }) { - self.restart(io); - } else if self.old_blocks.as_ref().map_or(false, |downloader| { downloader.is_complete() }) { + if self.old_blocks.as_mut().map_or(false, |downloader| { downloader.collect_blocks(io, false) == Err(DownloaderImportError::Invalid) }) { + self.restart(io); + } else if self.old_blocks.as_ref().map_or(false, |downloader| { downloader.is_complete() }) { trace!(target: "sync", "Background block download is complete"); self.old_blocks = None; } diff --git a/sync/src/light/mod.rs b/sync/src/light/mod.rs index 59ffceda6..fe60c30e5 100644 --- a/sync/src/light/mod.rs +++ b/sync/src/light/mod.rs @@ -24,8 +24,10 @@ use io::TimerToken; use network::{NetworkProtocolHandler, NetworkService, NetworkContext, NetworkError, PeerId}; use rlp::{DecoderError, RlpStream, Stream, UntrustedRlp, View}; use util::hash::H256; +use parking_lot::{Mutex, RwLock}; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; +use std::sync::atomic::{AtomicUsize, Ordering}; const TIMEOUT: TimerToken = 0; const TIMEOUT_INTERVAL_MS: u64 = 1000; @@ -84,29 +86,18 @@ mod packet { pub const GET_TRANSACTION_PROOFS: u8 = 0x12; pub const TRANSACTION_PROOFS: u8 = 0x13; } -// which direction a request should go in. -enum Direction { - Forwards, - Reverse, -} -// A request made to a peer. -enum Request { - // a request for headers: - // (number, hash), maximum, skip, direction. - Headers((u64, H256), u64, u64, Direction), - // a request for block bodies by hashes. - Bodies(Vec), - // a request for tx receipts by hashes. - Receipts(Vec), - // a request for contract code by (block hash, address hash). - Code(Vec<(H256, H256)>), +struct Request; + +struct Requested { + timestamp: usize, + total: Request, } // data about each peer. struct Peer { - pending_requests: HashMap, // requests pending for peer. buffer: u64, // remaining buffer value. + current_asking: HashSet, // pending request ids. } /// This handles synchronization of the header chain for a light client. @@ -115,15 +106,25 @@ pub struct Chain { genesis_hash: H256, mainnet: bool, peers: RwLock>, + pending_requests: RwLock>, + req_id: AtomicUsize, } impl Chain { + // make a request to a given peer. + fn request_from(&self, peer: &PeerId, req: Request) { + unimplemented!() + } + // called when a peer connects. fn on_connect(&self, peer: &PeerId, io: &NetworkContext) { let peer = *peer; match self.send_status(peer, io) { Ok(()) => { - self.peers.write().insert(peer, Peer); + self.peers.write().insert(peer, Peer { + buffer: 0, + current_asking: HashSet::new(), + }); } Err(e) => { trace!(target: "les", "Error while sending status: {}", e); @@ -133,8 +134,9 @@ impl Chain { } // called when a peer disconnects. - fn on_disconnect(&self, peer: PeerId) { - self.peers.write().remove(peer); + fn on_disconnect(&self, peer: PeerId, io: &NetworkContext) { + // TODO: reassign all requests assigned to this peer. + self.peers.write().remove(&peer); } fn send_status(&self, peer: PeerId, io: &NetworkContext) -> Result<(), NetworkError> { @@ -165,6 +167,11 @@ impl Chain { io.send(peer, packet::STATUS, stream.out()) } + /// Check on the status of all pending requests. + fn check_pending_requests(&self) { + unimplemented!() + } + fn status(&self, peer: &PeerId, io: &NetworkContext) { unimplemented!() } @@ -177,7 +184,9 @@ impl Chain { } // Handle a request for block headers. - fn get_block_headers(&self, peers: &PeerId, io: &NetworkContext, data: UntrustedRlp) { + fn get_block_headers(&self, peer: &PeerId, io: &NetworkContext, data: UntrustedRlp) { + const MAX_HEADERS: usize = 512; + unimplemented!() } diff --git a/sync/src/light/request.rs b/sync/src/light/request.rs new file mode 100644 index 000000000..112cbcc42 --- /dev/null +++ b/sync/src/light/request.rs @@ -0,0 +1,138 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! LES request types. + +use util::bigint::prelude::*; +use rlp::*; + +/// An LES request. This defines its data format, and the format of its response type. +pub trait Request: Sized { + /// The response type of this request. + type Response: Response; + + /// The error type when decoding a response. + type Error; + + /// Whether this request is empty. + fn is_empty(&self) -> bool; + + /// The remainder of this request unfulfilled by the response. Required to return + /// an equivalent request when provided with an empty response. + fn remainder(&self, res: &Self::Response) -> Self; + + /// Attempt to parse raw data into a response object + /// or an error. Behavior undefined if the raw data didn't originate from + /// this request. + fn parse_response(&self, raw: &[u8]) -> Result; +} + +/// Request responses. These must have a combination operation used to fill the gaps +/// in one response with data from another. +pub trait Response: Sized { + /// Combine the two responses into one. This can only be relied on to behave correctly + /// if `other` is a response to a sub-request of the request this response was + /// produced from. + fn combine(&mut self, other: Self); +} + +/// A request for block bodies. +pub struct BlockBodies { + hashes: Vec, +} + +/// A response for block bodies. +pub struct BlockBodiesResponse { + bodies: Vec<(H256, Vec)>, +} + +impl Request for BlockBodies { + type Response = BlockBodiesResponse; + type Error = ::rlp::DecoderError; + + fn is_empty(&self) -> bool { self.hashes.is_empty() } + + fn remainder(&self, res: &Self::Response) -> Self { + let mut remaining = Vec::new(); + + let bodies = res.bodies.iter().map(|&(_, ref b) b).chain(::std::iter::repeat(&Vec::new())); + for (hash, body) in self.hashes.iter().zip(bodies) { + if body.is_empty() { + remaining.push(hash); + } + } + + BlockBodies { + hashes: remaining, + } + } + + fn parse_response(&self, raw: &[u8]) -> Result { + use ethcore::transaction::SignedTransaction; + use ethcore::header::Header; + + let rlp = UntrustedRlp::new(raw); + + let mut bodies = Vec::with_capacity(self.hashes.len()); + + let items = rlp.iter(); + for hash in self.hashes.iter().cloned() { + let res_bytes = match items.next() { + Some(rlp) => { + // perform basic block verification. + // TODO: custom error type? + try!(rlp.val_at::>(0) + .and_then(|_| rlp.val_at::>(1))); + + try!(rlp.data()).to_owned() + } + None => Vec::new(), + }; + + bodies.push((hash, res_bytes)); + } + + Ok(BlockBodiesResponse { + bodies: bodies, + }) + } +} + +impl Response for BlockBodiesResponse { + fn identity() -> Self { + BlockBodiesResponse { + bodies: Vec::new(), + } + } + + fn combine(&mut self, other: Self) { + let other_iter = other.bodies.into_iter(); + + 'a: + for &mut (ref my_hash, ref mut my_body) in self.bodies.iter_mut() { + loop { + match other_iter.next() { + Some((hash, body)) if hash == my_hash && !body.is_empty() => { + *my_body = body.to_owned(); + break + } + Some(_) => continue, + None => break 'a, + } + } + } + } +} \ No newline at end of file