diff --git a/Cargo.lock b/Cargo.lock index d50bf5c3e..03823d6e9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -27,6 +27,7 @@ dependencies = [ "ethsync 1.7.0", "evmbin 0.1.0", "fdlimit 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", "isatty 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "jsonrpc-core 7.0.0 (git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", @@ -533,6 +534,7 @@ name = "ethcore-light" version = "1.7.0" dependencies = [ "ethcore 1.7.0", + "ethcore-devtools 1.7.0", "ethcore-io 1.7.0", "ethcore-ipc 1.7.0", "ethcore-ipc-codegen 1.7.0", diff --git a/Cargo.toml b/Cargo.toml index 1d0e27a71..09af66e09 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,7 @@ toml = "0.2" serde = "0.9" serde_json = "0.9" app_dirs = "1.1.1" +futures = "0.1" fdlimit = "0.1" ws2_32-sys = "0.2" ctrlc = { git = "https://github.com/paritytech/rust-ctrlc.git" } diff --git a/ethcore/light/Cargo.toml b/ethcore/light/Cargo.toml index 6f95d8a0e..78210904e 100644 --- a/ethcore/light/Cargo.toml +++ b/ethcore/light/Cargo.toml @@ -17,6 +17,7 @@ ethcore-util = { path = "../../util" } ethcore-network = { path = "../../util/network" } ethcore-io = { path = "../../util/io" } ethcore-ipc = { path = "../../ipc/rpc", optional = true } +ethcore-devtools = { path = "../../devtools" } rlp = { path = "../../util/rlp" } time = "0.1" smallvec = "0.3.1" diff --git a/ethcore/light/src/client/header_chain.rs b/ethcore/light/src/client/header_chain.rs index 9dcd25888..d4ea8d107 100644 --- a/ethcore/light/src/client/header_chain.rs +++ b/ethcore/light/src/client/header_chain.rs @@ -23,9 +23,9 @@ //! This is separate from the `BlockChain` for two reasons: //! - It stores only headers (and a pruned subset of them) //! - To allow for flexibility in the database layout once that's incorporated. -// TODO: use DB instead of memory. DB Layout: just the contents of `candidates`/`headers` -use std::collections::{BTreeMap, HashMap}; +use std::collections::BTreeMap; +use std::sync::Arc; use cht; @@ -34,7 +34,10 @@ use ethcore::error::BlockError; use ethcore::encoded; use ethcore::header::Header; use ethcore::ids::BlockId; -use util::{H256, U256, HeapSizeOf, Mutex, RwLock}; + +use rlp::{Encodable, Decodable, DecoderError, RlpStream, Rlp, UntrustedRlp}; +use util::{H256, U256, HeapSizeOf, RwLock}; +use util::kvdb::{DBTransaction, KeyValueDB}; use smallvec::SmallVec; @@ -43,6 +46,9 @@ use smallvec::SmallVec; /// relevant to any blocks we've got in memory. const HISTORY: u64 = 2048; +/// The best block key. Maps to an RLP list: [best_era, last_era] +const CURRENT_KEY: &'static [u8] = &*b"best_and_latest"; + /// Information about a block. #[derive(Debug, Clone)] pub struct BlockDescriptor { @@ -75,42 +81,142 @@ impl HeapSizeOf for Entry { } } +impl Encodable for Entry { + fn rlp_append(&self, s: &mut RlpStream) { + s.begin_list(self.candidates.len()); + + for candidate in &self.candidates { + s.begin_list(3) + .append(&candidate.hash) + .append(&candidate.parent_hash) + .append(&candidate.total_difficulty); + } + } +} + +impl Decodable for Entry { + fn decode(rlp: &UntrustedRlp) -> Result { + + let mut candidates = SmallVec::<[Candidate; 3]>::new(); + + for item in rlp.iter() { + candidates.push(Candidate { + hash: item.val_at(0)?, + parent_hash: item.val_at(1)?, + total_difficulty: item.val_at(2)?, + }) + } + + if candidates.is_empty() { return Err(DecoderError::Custom("Empty candidates vector submitted.")) } + + // rely on the invariant that the canonical entry is always first. + let canon_hash = candidates[0].hash; + Ok(Entry { + candidates: candidates, + canonical_hash: canon_hash, + }) + } +} + +fn cht_key(number: u64) -> String { + format!("{:08x}_canonical", number) +} + +fn era_key(number: u64) -> String { + format!("candidates_{}", number) +} + +/// Pending changes from `insert` to be applied after the database write has finished. +pub struct PendingChanges { + best_block: Option, // new best block. +} + /// Header chain. See module docs for more details. pub struct HeaderChain { genesis_header: encoded::Header, // special-case the genesis. candidates: RwLock>, - headers: RwLock>, best_block: RwLock, - cht_roots: Mutex>, + db: Arc, + col: Option, } impl HeaderChain { - /// Create a new header chain given this genesis block. - pub fn new(genesis: &[u8]) -> Self { + /// Create a new header chain given this genesis block and database to read from. + pub fn new(db: Arc, col: Option, genesis: &[u8]) -> Result { use ethcore::views::HeaderView; - let g_view = HeaderView::new(genesis); + let chain = if let Some(current) = db.get(col, CURRENT_KEY)? { + let (best_number, highest_number) = { + let rlp = Rlp::new(¤t); + (rlp.val_at(0), rlp.val_at(1)) + }; - HeaderChain { - genesis_header: encoded::Header::new(genesis.to_owned()), - best_block: RwLock::new(BlockDescriptor { - hash: g_view.hash(), - number: 0, - total_difficulty: g_view.difficulty(), - }), - candidates: RwLock::new(BTreeMap::new()), - headers: RwLock::new(HashMap::new()), - cht_roots: Mutex::new(Vec::new()), - } + let mut cur_number = highest_number; + let mut candidates = BTreeMap::new(); + + // load all era entries and referenced headers within them. + while let Some(entry) = db.get(col, era_key(cur_number).as_bytes())? { + let entry: Entry = ::rlp::decode(&entry); + trace!(target: "chain", "loaded header chain entry for era {} with {} candidates", + cur_number, entry.candidates.len()); + + candidates.insert(cur_number, entry); + + cur_number -= 1; + } + + // fill best block block descriptor. + let best_block = { + let era = match candidates.get(&best_number) { + Some(era) => era, + None => return Err(format!("Database corrupt: highest block referenced but no data.")), + }; + + let best = &era.candidates[0]; + BlockDescriptor { + hash: best.hash, + number: best_number, + total_difficulty: best.total_difficulty, + } + }; + + HeaderChain { + genesis_header: encoded::Header::new(genesis.to_owned()), + best_block: RwLock::new(best_block), + candidates: RwLock::new(candidates), + db: db, + col: col, + } + } else { + let g_view = HeaderView::new(genesis); + HeaderChain { + genesis_header: encoded::Header::new(genesis.to_owned()), + best_block: RwLock::new(BlockDescriptor { + hash: g_view.hash(), + number: 0, + total_difficulty: g_view.difficulty(), + }), + candidates: RwLock::new(BTreeMap::new()), + db: db, + col: col, + } + }; + + Ok(chain) } /// Insert a pre-verified header. /// /// This blindly trusts that the data given to it is sensible. - pub fn insert(&self, header: Header) -> Result<(), BlockError> { + /// Returns a set of pending changes to be applied with `apply_pending` + /// before the next call to insert and after the transaction has been written. + pub fn insert(&self, transaction: &mut DBTransaction, header: Header) -> Result { let hash = header.hash(); let number = header.number(); let parent_hash = *header.parent_hash(); + let mut pending = PendingChanges { + best_block: None, + }; // hold candidates the whole time to guard import order. let mut candidates = self.candidates.write(); @@ -128,20 +234,41 @@ impl HeaderChain { let total_difficulty = parent_td + *header.difficulty(); - // insert headers and candidates entries. - candidates.entry(number).or_insert_with(|| Entry { candidates: SmallVec::new(), canonical_hash: hash }) - .candidates.push(Candidate { + // insert headers and candidates entries and write era to disk. + { + let cur_era = candidates.entry(number) + .or_insert_with(|| Entry { candidates: SmallVec::new(), canonical_hash: hash }); + cur_era.candidates.push(Candidate { hash: hash, parent_hash: parent_hash, total_difficulty: total_difficulty, - }); + }); - let raw = ::rlp::encode(&header).to_vec(); - self.headers.write().insert(hash, encoded::Header::new(raw)); + // fix ordering of era before writing. + if total_difficulty > cur_era.candidates[0].total_difficulty { + let cur_pos = cur_era.candidates.len() - 1; + cur_era.candidates.swap(cur_pos, 0); + cur_era.canonical_hash = hash; + } + + transaction.put(self.col, era_key(number).as_bytes(), &::rlp::encode(&*cur_era)) + } + + let raw = ::rlp::encode(&header); + transaction.put(self.col, &hash[..], &*raw); + + let (best_num, is_new_best) = { + let cur_best = self.best_block.read(); + if cur_best.total_difficulty < total_difficulty { + (number, true) + } else { + (cur_best.number, false) + } + }; // reorganize ancestors so canonical entries are first in their // respective candidates vectors. - if self.best_block.read().total_difficulty < total_difficulty { + if is_new_best { let mut canon_hash = hash; for (&height, entry) in candidates.iter_mut().rev().skip_while(|&(height, _)| *height > number) { if height != number && entry.canonical_hash == canon_hash { break; } @@ -160,23 +287,26 @@ impl HeaderChain { // what about reorgs > cht::SIZE + HISTORY? // resetting to the last block of a given CHT should be possible. canon_hash = entry.candidates[0].parent_hash; + + // write altered era to disk + if height != number { + let rlp_era = ::rlp::encode(&*entry); + transaction.put(self.col, era_key(height).as_bytes(), &rlp_era); + } } trace!(target: "chain", "New best block: ({}, {}), TD {}", number, hash, total_difficulty); - *self.best_block.write() = BlockDescriptor { + pending.best_block = Some(BlockDescriptor { hash: hash, number: number, total_difficulty: total_difficulty, - }; + }); // produce next CHT root if it's time. let earliest_era = *candidates.keys().next().expect("at least one era just created; qed"); if earliest_era + HISTORY + cht::SIZE <= number { let cht_num = cht::block_to_cht_number(earliest_era) .expect("fails only for number == 0; genesis never imported; qed"); - debug_assert_eq!(cht_num as usize, self.cht_roots.lock().len()); - - let mut headers = self.headers.write(); let cht_root = { let mut i = earliest_era; @@ -186,10 +316,12 @@ impl HeaderChain { let iter = || { let era_entry = candidates.remove(&i) .expect("all eras are sequential with no gaps; qed"); + transaction.delete(self.col, era_key(i).as_bytes()); + i += 1; for ancient in &era_entry.candidates { - headers.remove(&ancient.hash); + transaction.delete(self.col, &ancient.hash); } let canon = &era_entry.candidates[0]; @@ -199,28 +331,56 @@ impl HeaderChain { .expect("fails only when too few items; this is checked; qed") }; + // write the CHT root to the database. debug!(target: "chain", "Produced CHT {} root: {:?}", cht_num, cht_root); - - self.cht_roots.lock().push(cht_root); + transaction.put(self.col, cht_key(cht_num).as_bytes(), &::rlp::encode(&cht_root)); } } - Ok(()) + // write the best and latest eras to the database. + { + let latest_num = *candidates.iter().rev().next().expect("at least one era just inserted; qed").0; + let mut stream = RlpStream::new_list(2); + stream.append(&best_num).append(&latest_num); + transaction.put(self.col, CURRENT_KEY, &stream.out()) + } + Ok(pending) + } + + /// Apply pending changes from a previous `insert` operation. + /// Must be done before the next `insert` call. + pub fn apply_pending(&self, pending: PendingChanges) { + if let Some(best_block) = pending.best_block { + *self.best_block.write() = best_block; + } } /// Get a block header. In the case of query by number, only canonical blocks /// will be returned. pub fn block_header(&self, id: BlockId) -> Option { + let load_from_db = |hash: H256| { + match self.db.get(self.col, &hash) { + Ok(val) => val.map(|x| x.to_vec()).map(encoded::Header::new), + Err(e) => { + warn!(target: "chain", "Failed to read from database: {}", e); + None + } + } + }; + match id { BlockId::Earliest | BlockId::Number(0) => Some(self.genesis_header.clone()), - BlockId::Hash(hash) => self.headers.read().get(&hash).cloned(), + BlockId::Hash(hash) => load_from_db(hash), BlockId::Number(num) => { if self.best_block.read().number < num { return None } self.candidates.read().get(&num).map(|entry| entry.canonical_hash) - .and_then(|hash| self.headers.read().get(&hash).cloned()) + .and_then(load_from_db) } BlockId::Latest | BlockId::Pending => { + // hold candidates hear to prevent deletion of the header + // as we read it. + let _candidates = self.candidates.read(); let hash = { let best = self.best_block.read(); if best.number == 0 { @@ -230,7 +390,7 @@ impl HeaderChain { best.hash }; - self.headers.read().get(&hash).cloned() + load_from_db(hash) } } } @@ -257,7 +417,13 @@ impl HeaderChain { /// This is because it's assumed that the genesis hash is known, /// so including it within a CHT would be redundant. pub fn cht_root(&self, n: usize) -> Option { - self.cht_roots.lock().get(n).map(|h| h.clone()) + match self.db.get(self.col, cht_key(n as u64).as_bytes()) { + Ok(val) => val.map(|x| ::rlp::decode(&x)), + Err(e) => { + warn!(target: "chain", "Error reading from database: {}", e); + None + } + } } /// Get the genesis hash. @@ -287,7 +453,7 @@ impl HeaderChain { /// Get block status. pub fn status(&self, hash: &H256) -> BlockStatus { - match self.headers.read().contains_key(hash) { + match self.db.get(self.col, &*hash).ok().map_or(false, |x| x.is_some()) { true => BlockStatus::InChain, false => BlockStatus::Unknown, } @@ -296,9 +462,7 @@ impl HeaderChain { impl HeapSizeOf for HeaderChain { fn heap_size_of_children(&self) -> usize { - self.candidates.read().heap_size_of_children() + - self.headers.read().heap_size_of_children() + - self.cht_roots.lock().heap_size_of_children() + self.candidates.read().heap_size_of_children() } } @@ -324,16 +488,23 @@ impl<'a> Iterator for AncestryIter<'a> { #[cfg(test)] mod tests { use super::HeaderChain; + use std::sync::Arc; + use ethcore::ids::BlockId; use ethcore::header::Header; use ethcore::spec::Spec; + fn make_db() -> Arc<::util::KeyValueDB> { + Arc::new(::util::kvdb::in_memory(0)) + } + #[test] fn basic_chain() { let spec = Spec::new_test(); let genesis_header = spec.genesis_header(); + let db = make_db(); - let chain = HeaderChain::new(&::rlp::encode(&genesis_header)); + let chain = HeaderChain::new(db.clone(), None, &::rlp::encode(&genesis_header)).unwrap(); let mut parent_hash = genesis_header.hash(); let mut rolling_timestamp = genesis_header.timestamp(); @@ -345,7 +516,10 @@ mod tests { header.set_difficulty(*genesis_header.difficulty() * i.into()); parent_hash = header.hash(); - chain.insert(header).unwrap(); + let mut tx = db.transaction(); + let pending = chain.insert(&mut tx, header).unwrap(); + db.write(tx).unwrap(); + chain.apply_pending(pending); rolling_timestamp += 10; } @@ -361,7 +535,8 @@ mod tests { let spec = Spec::new_test(); let genesis_header = spec.genesis_header(); - let chain = HeaderChain::new(&::rlp::encode(&genesis_header)); + let db = make_db(); + let chain = HeaderChain::new(db.clone(), None, &::rlp::encode(&genesis_header)).unwrap(); let mut parent_hash = genesis_header.hash(); let mut rolling_timestamp = genesis_header.timestamp(); @@ -373,7 +548,10 @@ mod tests { header.set_difficulty(*genesis_header.difficulty() * i.into()); parent_hash = header.hash(); - chain.insert(header).unwrap(); + let mut tx = db.transaction(); + let pending = chain.insert(&mut tx, header).unwrap(); + db.write(tx).unwrap(); + chain.apply_pending(pending); rolling_timestamp += 10; } @@ -389,7 +567,10 @@ mod tests { header.set_difficulty(*genesis_header.difficulty() * i.into()); parent_hash = header.hash(); - chain.insert(header).unwrap(); + let mut tx = db.transaction(); + let pending = chain.insert(&mut tx, header).unwrap(); + db.write(tx).unwrap(); + chain.apply_pending(pending); rolling_timestamp += 10; } @@ -410,7 +591,10 @@ mod tests { header.set_difficulty(*genesis_header.difficulty() * (i * i).into()); parent_hash = header.hash(); - chain.insert(header).unwrap(); + let mut tx = db.transaction(); + let pending = chain.insert(&mut tx, header).unwrap(); + db.write(tx).unwrap(); + chain.apply_pending(pending); rolling_timestamp += 11; } @@ -432,11 +616,101 @@ mod tests { fn earliest_is_latest() { let spec = Spec::new_test(); let genesis_header = spec.genesis_header(); + let db = make_db(); - let chain = HeaderChain::new(&::rlp::encode(&genesis_header)); + let chain = HeaderChain::new(db.clone(), None, &::rlp::encode(&genesis_header)).unwrap(); assert!(chain.block_header(BlockId::Earliest).is_some()); assert!(chain.block_header(BlockId::Latest).is_some()); assert!(chain.block_header(BlockId::Pending).is_some()); } + + #[test] + fn restore_from_db() { + let spec = Spec::new_test(); + let genesis_header = spec.genesis_header(); + let db = make_db(); + + { + let chain = HeaderChain::new(db.clone(), None, &::rlp::encode(&genesis_header)).unwrap(); + let mut parent_hash = genesis_header.hash(); + let mut rolling_timestamp = genesis_header.timestamp(); + for i in 1..10000 { + let mut header = Header::new(); + header.set_parent_hash(parent_hash); + header.set_number(i); + header.set_timestamp(rolling_timestamp); + header.set_difficulty(*genesis_header.difficulty() * i.into()); + parent_hash = header.hash(); + + let mut tx = db.transaction(); + let pending = chain.insert(&mut tx, header).unwrap(); + db.write(tx).unwrap(); + chain.apply_pending(pending); + + rolling_timestamp += 10; + } + } + + let chain = HeaderChain::new(db.clone(), None, &::rlp::encode(&genesis_header)).unwrap(); + assert!(chain.block_header(BlockId::Number(10)).is_none()); + assert!(chain.block_header(BlockId::Number(9000)).is_some()); + assert!(chain.cht_root(2).is_some()); + assert!(chain.cht_root(3).is_none()); + assert_eq!(chain.block_header(BlockId::Latest).unwrap().number(), 9999); + } + + #[test] + fn restore_higher_non_canonical() { + let spec = Spec::new_test(); + let genesis_header = spec.genesis_header(); + let db = make_db(); + + { + let chain = HeaderChain::new(db.clone(), None, &::rlp::encode(&genesis_header)).unwrap(); + let mut parent_hash = genesis_header.hash(); + let mut rolling_timestamp = genesis_header.timestamp(); + + // push 100 low-difficulty blocks. + for i in 1..101 { + let mut header = Header::new(); + header.set_parent_hash(parent_hash); + header.set_number(i); + header.set_timestamp(rolling_timestamp); + header.set_difficulty(*genesis_header.difficulty() * i.into()); + parent_hash = header.hash(); + + let mut tx = db.transaction(); + let pending = chain.insert(&mut tx, header).unwrap(); + db.write(tx).unwrap(); + chain.apply_pending(pending); + + rolling_timestamp += 10; + } + + // push fewer high-difficulty blocks. + for i in 1..11 { + let mut header = Header::new(); + header.set_parent_hash(parent_hash); + header.set_number(i); + header.set_timestamp(rolling_timestamp); + header.set_difficulty(*genesis_header.difficulty() * i.into() * 1000.into()); + parent_hash = header.hash(); + + let mut tx = db.transaction(); + let pending = chain.insert(&mut tx, header).unwrap(); + db.write(tx).unwrap(); + chain.apply_pending(pending); + + rolling_timestamp += 10; + } + + assert_eq!(chain.block_header(BlockId::Latest).unwrap().number(), 10); + } + + // after restoration, non-canonical eras should still be loaded. + let chain = HeaderChain::new(db.clone(), None, &::rlp::encode(&genesis_header)).unwrap(); + assert_eq!(chain.block_header(BlockId::Latest).unwrap().number(), 10); + assert!(chain.candidates.read().get(&100).is_some()) + } } diff --git a/ethcore/light/src/client/mod.rs b/ethcore/light/src/client/mod.rs index c791caed1..d294053e1 100644 --- a/ethcore/light/src/client/mod.rs +++ b/ethcore/light/src/client/mod.rs @@ -32,6 +32,7 @@ use ethcore::encoded; use io::IoChannel; use util::{H256, Mutex, RwLock}; +use util::kvdb::{KeyValueDB, CompactionProfile}; use self::header_chain::{AncestryIter, HeaderChain}; @@ -45,6 +46,14 @@ mod service; pub struct Config { /// Verification queue config. pub queue: queue::Config, + /// Chain column in database. + pub chain_column: Option, + /// Database cache size. `None` => rocksdb default. + pub db_cache_size: Option, + /// State db compaction profile + pub db_compaction: CompactionProfile, + /// Should db have WAL enabled? + pub db_wal: bool, } /// Trait for interacting with the header chain abstractly. @@ -106,18 +115,30 @@ pub struct Client { chain: HeaderChain, report: RwLock, import_lock: Mutex<()>, + db: Arc, } impl Client { /// Create a new `Client`. - pub fn new(config: Config, spec: &Spec, io_channel: IoChannel) -> Self { - Client { + pub fn new(config: Config, db: Arc, chain_col: Option, spec: &Spec, io_channel: IoChannel) -> Result { + let gh = ::rlp::encode(&spec.genesis_header()); + + Ok(Client { queue: HeaderQueue::new(config.queue, spec.engine.clone(), io_channel, true), engine: spec.engine.clone(), - chain: HeaderChain::new(&::rlp::encode(&spec.genesis_header())), + chain: HeaderChain::new(db.clone(), chain_col, &gh)?, report: RwLock::new(ClientReport::default()), import_lock: Mutex::new(()), - } + db: db, + }) + } + + /// Create a new `Client` backed purely in-memory. + /// This will ignore all database options in the configuration. + pub fn in_memory(config: Config, spec: &Spec, io_channel: IoChannel) -> Self { + let db = ::util::kvdb::in_memory(0); + + Client::new(config, Arc::new(db), None, spec, io_channel).expect("New DB creation infallible; qed") } /// Import a header to the queue for additional verification. @@ -201,15 +222,23 @@ impl Client { for verified_header in self.queue.drain(MAX) { let (num, hash) = (verified_header.number(), verified_header.hash()); - match self.chain.insert(verified_header) { - Ok(()) => { + let mut tx = self.db.transaction(); + let pending = match self.chain.insert(&mut tx, verified_header) { + Ok(pending) => { good.push(hash); self.report.write().blocks_imported += 1; + pending } Err(e) => { debug!(target: "client", "Error importing header {:?}: {}", (num, hash), e); bad.push(hash); + break; } + }; + self.db.write_buffered(tx); + self.chain.apply_pending(pending); + if let Err(e) = self.db.flush() { + panic!("Database flush failed: {}. Check disk health and space.", e); } } diff --git a/ethcore/light/src/client/service.rs b/ethcore/light/src/client/service.rs index fe7caee94..55795d870 100644 --- a/ethcore/light/src/client/service.rs +++ b/ethcore/light/src/client/service.rs @@ -17,33 +17,80 @@ //! Minimal IO service for light client. //! Just handles block import messages and passes them to the client. +use std::fmt; +use std::path::Path; use std::sync::Arc; +use ethcore::db; use ethcore::service::ClientIoMessage; use ethcore::spec::Spec; use io::{IoContext, IoError, IoHandler, IoService}; +use util::kvdb::{Database, DatabaseConfig}; use super::{Client, Config as ClientConfig}; +/// Errors on service initialization. +#[derive(Debug)] +pub enum Error { + /// Database error. + Database(String), + /// I/O service error. + Io(IoError), +} + +impl fmt::Display for Error { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match *self { + Error::Database(ref msg) => write!(f, "Database error: {}", msg), + Error::Io(ref err) => write!(f, "I/O service error: {}", err), + } + } +} + /// Light client service. pub struct Service { client: Arc, - _io_service: IoService, + io_service: IoService, } impl Service { /// Start the service: initialize I/O workers and client itself. - pub fn start(config: ClientConfig, spec: &Spec) -> Result { - let io_service = try!(IoService::::start()); - let client = Arc::new(Client::new(config, spec, io_service.channel())); - try!(io_service.register_handler(Arc::new(ImportBlocks(client.clone())))); + pub fn start(config: ClientConfig, spec: &Spec, path: &Path) -> Result { + // initialize database. + let mut db_config = DatabaseConfig::with_columns(db::NUM_COLUMNS); + // give all rocksdb cache to the header chain column. + if let Some(size) = config.db_cache_size { + db_config.set_cache(db::COL_LIGHT_CHAIN, size); + } + + db_config.compaction = config.db_compaction; + db_config.wal = config.db_wal; + + let db = Arc::new(Database::open( + &db_config, + &path.to_str().expect("DB path could not be converted to string.") + ).map_err(Error::Database)?); + + let io_service = IoService::::start().map_err(Error::Io)?; + let client = Arc::new(Client::new(config, + db, + db::COL_LIGHT_CHAIN, + spec, + io_service.channel(), + ).map_err(Error::Database)?); + io_service.register_handler(Arc::new(ImportBlocks(client.clone()))).map_err(Error::Io)?; Ok(Service { client: client, - _io_service: io_service, + io_service: io_service, }) } + /// Register an I/O handler on the service. + pub fn register_handler(&self, handler: Arc + Send>) -> Result<(), IoError> { + self.io_service.register_handler(handler) + } + /// Get a handle to the client. pub fn client(&self) -> &Arc { &self.client @@ -63,11 +110,13 @@ impl IoHandler for ImportBlocks { #[cfg(test)] mod tests { use super::Service; + use devtools::RandomTempPath; use ethcore::spec::Spec; #[test] fn it_works() { let spec = Spec::new_test(); - Service::start(Default::default(), &spec).unwrap(); + let temp_path = RandomTempPath::new(); + Service::start(Default::default(), &spec, temp_path.as_path()).unwrap(); } } diff --git a/ethcore/light/src/lib.rs b/ethcore/light/src/lib.rs index ada58d8de..82b6ea126 100644 --- a/ethcore/light/src/lib.rs +++ b/ethcore/light/src/lib.rs @@ -55,6 +55,7 @@ pub mod remote { mod types; +pub use self::cache::Cache; pub use self::provider::Provider; pub use self::transaction_queue::TransactionQueue; pub use types::request as request; @@ -76,3 +77,6 @@ extern crate stats; #[cfg(feature = "ipc")] extern crate ethcore_ipc as ipc; + +#[cfg(test)] +extern crate ethcore_devtools as devtools; diff --git a/ethcore/light/src/net/context.rs b/ethcore/light/src/net/context.rs index 9eafead57..64ddd19a3 100644 --- a/ethcore/light/src/net/context.rs +++ b/ethcore/light/src/net/context.rs @@ -61,10 +61,12 @@ impl<'a> IoContext for NetworkContext<'a> { } fn disconnect_peer(&self, peer: PeerId) { + trace!(target: "pip", "Initiating disconnect of peer {}", peer); NetworkContext::disconnect_peer(self, peer); } fn disable_peer(&self, peer: PeerId) { + trace!(target: "pip", "Initiating disable of peer {}", peer); NetworkContext::disable_peer(self, peer); } diff --git a/ethcore/light/src/net/mod.rs b/ethcore/light/src/net/mod.rs index 667e07cb4..e32e92145 100644 --- a/ethcore/light/src/net/mod.rs +++ b/ethcore/light/src/net/mod.rs @@ -27,7 +27,7 @@ use util::hash::H256; use util::{DBValue, Mutex, RwLock, U256}; use time::{Duration, SteadyTime}; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::fmt; use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; @@ -61,6 +61,9 @@ const TIMEOUT_INTERVAL_MS: u64 = 1000; const TICK_TIMEOUT: TimerToken = 1; const TICK_TIMEOUT_INTERVAL_MS: u64 = 5000; +const PROPAGATE_TIMEOUT: TimerToken = 2; +const PROPAGATE_TIMEOUT_INTERVAL_MS: u64 = 5000; + // minimum interval between updates. const UPDATE_INTERVAL_MS: i64 = 5000; @@ -132,6 +135,7 @@ pub struct Peer { last_update: SteadyTime, pending_requests: RequestSet, failed_requests: Vec, + propagated_transactions: HashSet, } /// A light protocol event handler. @@ -303,12 +307,18 @@ impl LightProtocol { match peer.remote_flow { None => Err(Error::NotServer), Some((ref mut creds, ref params)) => { - // check that enough credits are available. - let mut temp_creds: Credits = creds.clone(); - for request in requests.requests() { - temp_creds.deduct_cost(params.compute_cost(request))?; + // apply recharge to credits if there's no pending requests. + if peer.pending_requests.is_empty() { + params.recharge(creds); } - *creds = temp_creds; + + // compute and deduct cost. + let pre_creds = creds.current(); + let cost = params.compute_cost_multi(requests.requests()); + creds.deduct_cost(cost)?; + + trace!(target: "pip", "requesting from peer {}. Cost: {}; Available: {}", + peer_id, cost, pre_creds); let req_id = ReqId(self.req_id.fetch_add(1, Ordering::SeqCst)); io.send(*peer_id, packet::REQUEST, { @@ -318,7 +328,7 @@ impl LightProtocol { }); // begin timeout. - peer.pending_requests.insert(req_id, requests, SteadyTime::now()); + peer.pending_requests.insert(req_id, requests, cost, SteadyTime::now()); Ok(req_id) } } @@ -401,20 +411,25 @@ impl LightProtocol { let req_id = ReqId(raw.val_at(0)?); let cur_credits: U256 = raw.val_at(1)?; - trace!(target: "pip", "pre-verifying response from peer {}", peer); + trace!(target: "pip", "pre-verifying response for {} from peer {}", req_id, peer); let peers = self.peers.read(); let res = match peers.get(peer) { Some(peer_info) => { let mut peer_info = peer_info.lock(); let req_info = peer_info.pending_requests.remove(&req_id, SteadyTime::now()); + let cumulative_cost = peer_info.pending_requests.cumulative_cost(); let flow_info = peer_info.remote_flow.as_mut(); match (req_info, flow_info) { (Some(_), Some(flow_info)) => { let &mut (ref mut c, ref mut flow) = flow_info; - let actual_credits = ::std::cmp::min(cur_credits, *flow.limit()); - c.update_to(actual_credits); + + // only update if the cumulative cost of the request set is zero. + if cumulative_cost == 0.into() { + let actual_credits = ::std::cmp::min(cur_credits, *flow.limit()); + c.update_to(actual_credits); + } Ok(()) } @@ -488,6 +503,47 @@ impl LightProtocol { } } + // propagate transactions to relay peers. + // if we aren't on the mainnet, we just propagate to all relay peers + fn propagate_transactions(&self, io: &IoContext) { + if self.capabilities.read().tx_relay { return } + + let ready_transactions = self.provider.ready_transactions(); + if ready_transactions.is_empty() { return } + + trace!(target: "pip", "propagate transactions: {} ready", ready_transactions.len()); + + let all_transaction_hashes: HashSet<_> = ready_transactions.iter().map(|tx| tx.hash()).collect(); + let mut buf = Vec::new(); + + let peers = self.peers.read(); + for (peer_id, peer_info) in peers.iter() { + let mut peer_info = peer_info.lock(); + if !peer_info.capabilities.tx_relay { continue } + + let prop_filter = &mut peer_info.propagated_transactions; + *prop_filter = &*prop_filter & &all_transaction_hashes; + + // fill the buffer with all non-propagated transactions. + let to_propagate = ready_transactions.iter() + .filter(|tx| prop_filter.insert(tx.hash())) + .map(|tx| &tx.transaction); + + buf.extend(to_propagate); + + // propagate to the given peer. + if buf.is_empty() { continue } + io.send(*peer_id, packet::SEND_TRANSACTIONS, { + let mut stream = RlpStream::new_list(buf.len()); + for pending_tx in buf.drain(..) { + stream.append(pending_tx); + } + + stream.out() + }) + } + } + /// called when a peer connects. pub fn on_connect(&self, peer: &PeerId, io: &IoContext) { let proto_version = match io.protocol_version(*peer).ok_or(Error::WrongNetwork) { @@ -520,6 +576,7 @@ impl LightProtocol { last_update: SteadyTime::now(), }); + trace!(target: "pip", "Sending status to peer {}", peer); io.send(*peer, packet::STATUS, status_packet); } @@ -601,6 +658,7 @@ impl LightProtocol { last_update: pending.last_update, pending_requests: RequestSet::default(), failed_requests: Vec::new(), + propagated_transactions: HashSet::new(), })); for handler in &self.handlers { @@ -683,6 +741,8 @@ impl LightProtocol { trace!(target: "pip", "Received requests (id: {}) from peer {}", req_id, peer_id); // deserialize requests, check costs and request validity. + self.flow_params.recharge(&mut peer.local_credits); + peer.local_credits.deduct_cost(self.flow_params.base_cost())?; for request_rlp in raw.at(1)?.iter().take(MAX_REQUESTS) { let request: Request = request_rlp.as_val()?; @@ -709,6 +769,7 @@ impl LightProtocol { }); trace!(target: "pip", "Responded to {}/{} requests in packet {}", responses.len(), num_requests, req_id); + trace!(target: "pip", "Peer {} has {} credits remaining.", peer_id, peer.local_credits.current()); io.respond(packet::RESPONSE, { let mut stream = RlpStream::new_list(3); @@ -782,6 +843,8 @@ impl NetworkProtocolHandler for LightProtocol { .expect("Error registering sync timer."); io.register_timer(TICK_TIMEOUT, TICK_TIMEOUT_INTERVAL_MS) .expect("Error registering sync timer."); + io.register_timer(PROPAGATE_TIMEOUT, PROPAGATE_TIMEOUT_INTERVAL_MS) + .expect("Error registering sync timer."); } fn read(&self, io: &NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]) { @@ -800,6 +863,7 @@ impl NetworkProtocolHandler for LightProtocol { match timer { TIMEOUT => self.timeout_check(io), TICK_TIMEOUT => self.tick_handlers(io), + PROPAGATE_TIMEOUT => self.propagate_transactions(io), _ => warn!(target: "pip", "received timeout on unknown token {}", timer), } } diff --git a/ethcore/light/src/net/request_set.rs b/ethcore/light/src/net/request_set.rs index a2391ef6f..094fa1894 100644 --- a/ethcore/light/src/net/request_set.rs +++ b/ethcore/light/src/net/request_set.rs @@ -27,22 +27,29 @@ use std::iter::FromIterator; use request::Request; use request::Requests; use net::{timeout, ReqId}; +use util::U256; use time::{Duration, SteadyTime}; +// Request set entry: requests + cost. +#[derive(Debug)] +struct Entry(Requests, U256); + /// Request set. #[derive(Debug)] pub struct RequestSet { counter: u64, + cumulative_cost: U256, base: Option, ids: HashMap, - reqs: BTreeMap, + reqs: BTreeMap, } impl Default for RequestSet { fn default() -> Self { RequestSet { counter: 0, + cumulative_cost: 0.into(), base: None, ids: HashMap::new(), reqs: BTreeMap::new(), @@ -52,10 +59,12 @@ impl Default for RequestSet { impl RequestSet { /// Push requests onto the stack. - pub fn insert(&mut self, req_id: ReqId, req: Requests, now: SteadyTime) { + pub fn insert(&mut self, req_id: ReqId, req: Requests, cost: U256, now: SteadyTime) { let counter = self.counter; + self.cumulative_cost = self.cumulative_cost + cost; + self.ids.insert(req_id, counter); - self.reqs.insert(counter, req); + self.reqs.insert(counter, Entry(req, cost)); if self.reqs.keys().next().map_or(true, |x| *x == counter) { self.base = Some(now); @@ -71,7 +80,7 @@ impl RequestSet { None => return None, }; - let req = self.reqs.remove(&id).expect("entry in `ids` implies entry in `reqs`; qed"); + let Entry(req, cost) = self.reqs.remove(&id).expect("entry in `ids` implies entry in `reqs`; qed"); match self.reqs.keys().next() { Some(k) if *k > id => self.base = Some(now), @@ -79,6 +88,7 @@ impl RequestSet { _ => {} } + self.cumulative_cost = self.cumulative_cost - cost; Some(req) } @@ -93,7 +103,7 @@ impl RequestSet { let first_req = self.reqs.values().next() .expect("base existing implies `reqs` non-empty; qed"); - base + compute_timeout(&first_req) <= now + base + compute_timeout(&first_req.0) <= now } /// Collect all pending request ids. @@ -108,6 +118,9 @@ impl RequestSet { /// Whether the set is empty. pub fn is_empty(&self) -> bool { self.len() == 0 } + + /// The cumulative cost of all requests in the set. + pub fn cumulative_cost(&self) -> U256 { self.cumulative_cost } } // helper to calculate timeout for a specific set of requests. @@ -141,8 +154,8 @@ mod tests { let the_req = RequestBuilder::default().build(); let req_time = compute_timeout(&the_req); - req_set.insert(ReqId(0), the_req.clone(), test_begin); - req_set.insert(ReqId(1), the_req, test_begin + Duration::seconds(1)); + req_set.insert(ReqId(0), the_req.clone(), 0.into(), test_begin); + req_set.insert(ReqId(1), the_req, 0.into(), test_begin + Duration::seconds(1)); assert_eq!(req_set.base, Some(test_begin)); @@ -153,4 +166,22 @@ mod tests { assert!(!req_set.check_timeout(test_end)); assert!(req_set.check_timeout(test_end + Duration::seconds(1))); } + + #[test] + fn cumulative_cost() { + let the_req = RequestBuilder::default().build(); + let test_begin = SteadyTime::now(); + let test_end = test_begin + Duration::seconds(1); + let mut req_set = RequestSet::default(); + + for i in 0..5 { + req_set.insert(ReqId(i), the_req.clone(), 1.into(), test_begin); + assert_eq!(req_set.cumulative_cost, (i + 1).into()); + } + + for i in (0..5).rev() { + assert!(req_set.remove(&ReqId(i), test_end).is_some()); + assert_eq!(req_set.cumulative_cost, i.into()); + } + } } diff --git a/ethcore/light/src/net/tests/mod.rs b/ethcore/light/src/net/tests/mod.rs index e2081534c..6dc5fbe7e 100644 --- a/ethcore/light/src/net/tests/mod.rs +++ b/ethcore/light/src/net/tests/mod.rs @@ -600,8 +600,8 @@ fn id_guard() { let mut pending_requests = RequestSet::default(); - pending_requests.insert(req_id_1, req.clone(), ::time::SteadyTime::now()); - pending_requests.insert(req_id_2, req, ::time::SteadyTime::now()); + pending_requests.insert(req_id_1, req.clone(), 0.into(), ::time::SteadyTime::now()); + pending_requests.insert(req_id_2, req, 1.into(), ::time::SteadyTime::now()); proto.peers.write().insert(peer_id, ::util::Mutex::new(Peer { local_credits: flow_params.create_credits(), @@ -612,6 +612,7 @@ fn id_guard() { last_update: ::time::SteadyTime::now(), pending_requests: pending_requests, failed_requests: Vec::new(), + propagated_transactions: Default::default(), })); // first, malformed responses. diff --git a/ethcore/light/src/on_demand/mod.rs b/ethcore/light/src/on_demand/mod.rs index 8d451c88e..a7c1ba2c4 100644 --- a/ethcore/light/src/on_demand/mod.rs +++ b/ethcore/light/src/on_demand/mod.rs @@ -37,7 +37,7 @@ use rlp::RlpStream; use util::{Bytes, RwLock, Mutex, U256, H256}; use util::sha3::{SHA3_NULL_RLP, SHA3_EMPTY_LIST_RLP}; -use net::{Handler, Status, Capabilities, Announcement, EventContext, BasicContext, ReqId}; +use net::{self, Handler, Status, Capabilities, Announcement, EventContext, BasicContext, ReqId}; use cache::Cache; use request::{self as basic_request, Request as NetworkRequest, Response as NetworkResponse}; @@ -57,15 +57,15 @@ impl Peer { self.capabilities.serve_headers && self.status.head_num > req.num(), Pending::HeaderByHash(_, _) => self.capabilities.serve_headers, Pending::Block(ref req, _) => - self.capabilities.serve_chain_since.as_ref().map_or(false, |x| *x >= req.header.number()), + self.capabilities.serve_chain_since.as_ref().map_or(false, |x| *x <= req.header.number()), Pending::BlockReceipts(ref req, _) => - self.capabilities.serve_chain_since.as_ref().map_or(false, |x| *x >= req.0.number()), + self.capabilities.serve_chain_since.as_ref().map_or(false, |x| *x <= req.0.number()), Pending::Account(ref req, _) => - self.capabilities.serve_state_since.as_ref().map_or(false, |x| *x >= req.header.number()), + self.capabilities.serve_state_since.as_ref().map_or(false, |x| *x <= req.header.number()), Pending::Code(ref req, _) => - self.capabilities.serve_state_since.as_ref().map_or(false, |x| *x >= req.block_id.1), + self.capabilities.serve_state_since.as_ref().map_or(false, |x| *x <= req.block_id.1), Pending::TxProof(ref req, _) => - self.capabilities.serve_state_since.as_ref().map_or(false, |x| *x >= req.header.number()), + self.capabilities.serve_state_since.as_ref().map_or(false, |x| *x <= req.header.number()), } } } @@ -210,7 +210,7 @@ impl OnDemand { /// it as easily. pub fn header_by_hash(&self, ctx: &BasicContext, req: request::HeaderByHash) -> Receiver { let (sender, receiver) = oneshot::channel(); - match self.cache.lock().block_header(&req.0) { + match { self.cache.lock().block_header(&req.0) } { Some(hdr) => sender.send(hdr).expect(RECEIVER_IN_SCOPE), None => self.dispatch(ctx, Pending::HeaderByHash(req, sender)), } @@ -232,11 +232,13 @@ impl OnDemand { sender.send(encoded::Block::new(stream.out())).expect(RECEIVER_IN_SCOPE); } else { - match self.cache.lock().block_body(&req.hash) { + match { self.cache.lock().block_body(&req.hash) } { Some(body) => { let mut stream = RlpStream::new_list(3); + let body = body.rlp(); stream.append_raw(&req.header.into_inner(), 1); - stream.append_raw(&body.into_inner(), 2); + stream.append_raw(&body.at(0).as_raw(), 1); + stream.append_raw(&body.at(1).as_raw(), 1); sender.send(encoded::Block::new(stream.out())).expect(RECEIVER_IN_SCOPE); } @@ -255,7 +257,7 @@ impl OnDemand { if req.0.receipts_root() == SHA3_NULL_RLP { sender.send(Vec::new()).expect(RECEIVER_IN_SCOPE); } else { - match self.cache.lock().block_receipts(&req.0.hash()) { + match { self.cache.lock().block_receipts(&req.0.hash()) } { Some(receipts) => sender.send(receipts).expect(RECEIVER_IN_SCOPE), None => self.dispatch(ctx, Pending::BlockReceipts(req, sender)), } @@ -303,23 +305,26 @@ impl OnDemand { let complete = builder.build(); + let kind = complete.requests()[0].kind(); for (id, peer) in self.peers.read().iter() { if !peer.can_handle(&pending) { continue } match ctx.request_from(*id, complete.clone()) { Ok(req_id) => { - trace!(target: "on_demand", "Assigning request to peer {}", id); + trace!(target: "on_demand", "{}: Assigned {:?} to peer {}", + req_id, kind, id); + self.pending_requests.write().insert( req_id, pending, ); return } + Err(net::Error::NoCredits) => {} Err(e) => trace!(target: "on_demand", "Failed to make request of peer {}: {:?}", id, e), } } - trace!(target: "on_demand", "No suitable peer for request"); self.orphaned_requests.write().push(pending); } @@ -353,6 +358,7 @@ impl OnDemand { let to_dispatch = ::std::mem::replace(&mut *self.orphaned_requests.write(), Vec::new()); + trace!(target: "on_demand", "Attempting to dispatch {} orphaned requests.", to_dispatch.len()); for mut orphaned in to_dispatch { let hung_up = match orphaned { Pending::HeaderProof(_, ref mut sender) => match *sender { @@ -397,10 +403,12 @@ impl Handler for OnDemand { } fn on_announcement(&self, ctx: &EventContext, announcement: &Announcement) { - let mut peers = self.peers.write(); - if let Some(ref mut peer) = peers.get_mut(&ctx.peer()) { - peer.status.update_from(&announcement); - peer.capabilities.update_from(&announcement); + { + let mut peers = self.peers.write(); + if let Some(ref mut peer) = peers.get_mut(&ctx.peer()) { + peer.status.update_from(&announcement); + peer.capabilities.update_from(&announcement); + } } self.dispatch_orphaned(ctx.as_basic()); @@ -422,6 +430,8 @@ impl Handler for OnDemand { } }; + trace!(target: "on_demand", "Handling response for request {}, kind={:?}", req_id, response.kind()); + // handle the response appropriately for the request. // all branches which do not return early lead to disabling of the peer // due to misbehavior. @@ -441,7 +451,7 @@ impl Handler for OnDemand { } return } - Err(e) => warn!("Error handling response for header request: {:?}", e), + Err(e) => warn!(target: "on_demand", "Error handling response for header request: {:?}", e), } } } @@ -454,7 +464,7 @@ impl Handler for OnDemand { let _ = sender.send(header); return } - Err(e) => warn!("Error handling response for header request: {:?}", e), + Err(e) => warn!(target: "on_demand", "Error handling response for header request: {:?}", e), } } } @@ -467,7 +477,7 @@ impl Handler for OnDemand { let _ = sender.send(block); return } - Err(e) => warn!("Error handling response for block request: {:?}", e), + Err(e) => warn!(target: "on_demand", "Error handling response for block request: {:?}", e), } } } @@ -480,7 +490,7 @@ impl Handler for OnDemand { let _ = sender.send(receipts); return } - Err(e) => warn!("Error handling response for receipts request: {:?}", e), + Err(e) => warn!(target: "on_demand", "Error handling response for receipts request: {:?}", e), } } } @@ -493,7 +503,7 @@ impl Handler for OnDemand { let _ = sender.send(maybe_account); return } - Err(e) => warn!("Error handling response for state request: {:?}", e), + Err(e) => warn!(target: "on_demand", "Error handling response for state request: {:?}", e), } } } @@ -504,7 +514,7 @@ impl Handler for OnDemand { let _ = sender.send(response.code.clone()); return } - Err(e) => warn!("Error handling response for code request: {:?}", e), + Err(e) => warn!(target: "on_demand", "Error handling response for code request: {:?}", e), } } } @@ -519,7 +529,7 @@ impl Handler for OnDemand { let _ = sender.send(Err(err)); return } - ProvedExecution::BadProof => warn!("Error handling response for transaction proof request"), + ProvedExecution::BadProof => warn!(target: "on_demand", "Error handling response for transaction proof request"), } } } diff --git a/ethcore/light/src/on_demand/request.rs b/ethcore/light/src/on_demand/request.rs index d3bb06888..8a37ddf7b 100644 --- a/ethcore/light/src/on_demand/request.rs +++ b/ethcore/light/src/on_demand/request.rs @@ -151,7 +151,8 @@ impl Body { // concatenate the header and the body. let mut stream = RlpStream::new_list(3); stream.append_raw(self.header.rlp().as_raw(), 1); - stream.append_raw(&body.rlp().as_raw(), 2); + stream.append_raw(body.rlp().at(0).as_raw(), 1); + stream.append_raw(body.rlp().at(1).as_raw(), 1); Ok(encoded::Block::new(stream.out())) } diff --git a/ethcore/light/src/types/request/mod.rs b/ethcore/light/src/types/request/mod.rs index f7d0b7df6..3099f8fed 100644 --- a/ethcore/light/src/types/request/mod.rs +++ b/ethcore/light/src/types/request/mod.rs @@ -244,7 +244,8 @@ pub enum CompleteRequest { } impl Request { - fn kind(&self) -> Kind { + /// Get the request kind. + pub fn kind(&self) -> Kind { match *self { Request::Headers(_) => Kind::Headers, Request::HeaderProof(_) => Kind::HeaderProof, @@ -435,7 +436,8 @@ impl Response { } } - fn kind(&self) -> Kind { + /// Inspect the kind of this response. + pub fn kind(&self) -> Kind { match *self { Response::Headers(_) => Kind::Headers, Response::HeaderProof(_) => Kind::HeaderProof, @@ -726,7 +728,6 @@ pub mod header_proof { impl Decodable for Response { fn decode(rlp: &UntrustedRlp) -> Result { - Ok(Response { proof: rlp.list_at(0)?, hash: rlp.val_at(1)?, @@ -737,12 +738,10 @@ pub mod header_proof { impl Encodable for Response { fn rlp_append(&self, s: &mut RlpStream) { - s.begin_list(3).begin_list(self.proof.len()); - for item in &self.proof { - s.append_list(&item); - } - - s.append(&self.hash).append(&self.td); + s.begin_list(3) + .append_list::,_>(&self.proof[..]) + .append(&self.hash) + .append(&self.td); } } } @@ -826,7 +825,6 @@ pub mod block_receipts { impl Decodable for Response { fn decode(rlp: &UntrustedRlp) -> Result { - Ok(Response { receipts: rlp.as_list()?, }) @@ -923,8 +921,8 @@ pub mod block_body { use ethcore::transaction::UnverifiedTransaction; // check body validity. - let _: Vec = rlp.list_at(0)?; - let _: Vec = rlp.list_at(1)?; + let _: Vec = rlp.list_at(0)?; + let _: Vec = rlp.list_at(1)?; Ok(Response { body: encoded::Body::new(rlp.as_raw().to_owned()), @@ -1063,12 +1061,9 @@ pub mod account { impl Encodable for Response { fn rlp_append(&self, s: &mut RlpStream) { - s.begin_list(5).begin_list(self.proof.len()); - for item in &self.proof { - s.append_list(&item); - } - - s.append(&self.nonce) + s.begin_list(5) + .append_list::,_>(&self.proof[..]) + .append(&self.nonce) .append(&self.balance) .append(&self.code_hash) .append(&self.storage_root); @@ -1207,11 +1202,9 @@ pub mod storage { impl Encodable for Response { fn rlp_append(&self, s: &mut RlpStream) { - s.begin_list(2).begin_list(self.proof.len()); - for item in &self.proof { - s.append_list(&item); - } - s.append(&self.value); + s.begin_list(2) + .append_list::,_>(&self.proof[..]) + .append(&self.value); } } } @@ -1486,9 +1479,16 @@ mod tests { fn check_roundtrip(val: T) where T: ::rlp::Encodable + ::rlp::Decodable + PartialEq + ::std::fmt::Debug { + // check as single value. let bytes = ::rlp::encode(&val); let new_val: T = ::rlp::decode(&bytes); assert_eq!(val, new_val); + + // check as list containing single value. + let list = [val]; + let bytes = ::rlp::encode_list(&list); + let new_list: Vec = ::rlp::decode_list(&bytes); + assert_eq!(&list, &new_list[..]); } #[test] @@ -1540,7 +1540,7 @@ mod tests { let full_req = Request::HeaderProof(req.clone()); let res = HeaderProofResponse { - proof: Vec::new(), + proof: vec![vec![1, 2, 3], vec![4, 5, 6]], hash: Default::default(), td: 100.into(), }; @@ -1572,6 +1572,7 @@ mod tests { #[test] fn body_roundtrip() { + use ethcore::transaction::{Transaction, UnverifiedTransaction}; let req = IncompleteBodyRequest { hash: Field::Scalar(Default::default()), }; @@ -1579,8 +1580,12 @@ mod tests { let full_req = Request::Body(req.clone()); let res = BodyResponse { body: { + let header = ::ethcore::header::Header::default(); + let tx = UnverifiedTransaction::from(Transaction::default().fake_sign(Default::default())); let mut stream = RlpStream::new_list(2); - stream.begin_list(0).begin_list(0); + stream.begin_list(2).append(&tx).append(&tx) + .begin_list(1).append(&header); + ::ethcore::encoded::Body::new(stream.out()) }, }; @@ -1601,7 +1606,7 @@ mod tests { let full_req = Request::Account(req.clone()); let res = AccountResponse { - proof: Vec::new(), + proof: vec![vec![1, 2, 3], vec![4, 5, 6]], nonce: 100.into(), balance: 123456.into(), code_hash: Default::default(), @@ -1625,7 +1630,7 @@ mod tests { let full_req = Request::Storage(req.clone()); let res = StorageResponse { - proof: Vec::new(), + proof: vec![vec![1, 2, 3], vec![4, 5, 6]], value: H256::default(), }; let full_res = Response::Storage(res.clone()); @@ -1707,4 +1712,31 @@ mod tests { assert_eq!(rlp.val_at::(0).unwrap(), 100usize); assert_eq!(rlp.list_at::(1).unwrap(), reqs); } + + #[test] + fn responses_vec() { + let mut stream = RlpStream::new_list(2); + stream.begin_list(0).begin_list(0); + + let body = ::ethcore::encoded::Body::new(stream.out()); + let reqs = vec![ + Response::Headers(HeadersResponse { headers: vec![] }), + Response::HeaderProof(HeaderProofResponse { proof: vec![], hash: Default::default(), td: 100.into()}), + Response::Receipts(ReceiptsResponse { receipts: vec![Default::default()] }), + Response::Body(BodyResponse { body: body }), + Response::Account(AccountResponse { + proof: vec![], + nonce: 100.into(), + balance: 123.into(), + code_hash: Default::default(), + storage_root: Default::default() + }), + Response::Storage(StorageResponse { proof: vec![], value: H256::default() }), + Response::Code(CodeResponse { code: vec![1, 2, 3, 4, 5] }), + Response::Execution(ExecutionResponse { items: vec![] }), + ]; + + let raw = ::rlp::encode_list(&reqs); + assert_eq!(::rlp::decode_list::(&raw), reqs); + } } diff --git a/ethcore/src/client/config.rs b/ethcore/src/client/config.rs index 5c7cf9471..b58ae83cb 100644 --- a/ethcore/src/client/config.rs +++ b/ethcore/src/client/config.rs @@ -26,7 +26,7 @@ use verification::{VerifierType, QueueConfig}; use util::{journaldb, CompactionProfile}; /// Client state db compaction profile -#[derive(Debug, PartialEq)] +#[derive(Debug, PartialEq, Clone)] pub enum DatabaseCompactionProfile { /// Try to determine compaction profile automatically Auto, diff --git a/ethcore/src/db.rs b/ethcore/src/db.rs index 4e8da714d..bccb8e943 100644 --- a/ethcore/src/db.rs +++ b/ethcore/src/db.rs @@ -38,8 +38,10 @@ pub const COL_TRACE: Option = Some(4); pub const COL_ACCOUNT_BLOOM: Option = Some(5); /// Column for general information from the local node which can persist. pub const COL_NODE_INFO: Option = Some(6); +/// Column for the light client chain. +pub const COL_LIGHT_CHAIN: Option = Some(7); /// Number of columns in DB -pub const NUM_COLUMNS: Option = Some(7); +pub const NUM_COLUMNS: Option = Some(8); /// Modes for updating caches. #[derive(Clone, Copy)] diff --git a/ethcore/src/migrations/mod.rs b/ethcore/src/migrations/mod.rs index 6cc4a13a8..76b10fd19 100644 --- a/ethcore/src/migrations/mod.rs +++ b/ethcore/src/migrations/mod.rs @@ -16,6 +16,8 @@ //! Database migrations. +use util::migration::ChangeColumns; + pub mod state; pub mod blocks; pub mod extras; @@ -27,5 +29,18 @@ pub use self::v9::Extract; mod v10; pub use self::v10::ToV10; -mod v11; -pub use self::v11::TO_V11; +/// The migration from v10 to v11. +/// Adds a column for node info. +pub const TO_V11: ChangeColumns = ChangeColumns { + pre_columns: Some(6), + post_columns: Some(7), + version: 11, +}; + +/// The migration from v11 to v12. +/// Adds a column for light chain storage. +pub const TO_V12: ChangeColumns = ChangeColumns { + pre_columns: Some(7), + post_columns: Some(8), + version: 12, +}; diff --git a/parity/cli/config.full.toml b/parity/cli/config.full.toml index 94c79cb15..56cb60fc5 100644 --- a/parity/cli/config.full.toml +++ b/parity/cli/config.full.toml @@ -39,7 +39,7 @@ warp = true allow_ips = "all" snapshot_peers = 0 max_pending_peers = 64 -serve_light = true +no_serve_light = false reserved_only = false reserved_peers = "./path_to_file" diff --git a/parity/cli/mod.rs b/parity/cli/mod.rs index 7576c063f..02f0838cc 100644 --- a/parity/cli/mod.rs +++ b/parity/cli/mod.rs @@ -94,6 +94,7 @@ usage! { flag_chain: String = "foundation", or |c: &Config| otry!(c.parity).chain.clone(), flag_keys_path: String = "$BASE/keys", or |c: &Config| otry!(c.parity).keys_path.clone(), flag_identity: String = "", or |c: &Config| otry!(c.parity).identity.clone(), + flag_light: bool = false, or |c: &Config| otry!(c.parity).light, // -- Account Options flag_unlock: Option = None, @@ -149,6 +150,8 @@ usage! { flag_reserved_only: bool = false, or |c: &Config| otry!(c.network).reserved_only.clone(), flag_no_ancient_blocks: bool = false, or |_| None, + flag_no_serve_light: bool = false, + or |c: &Config| otry!(c.network).no_serve_light.clone(), // -- API and Console Options // RPC @@ -379,6 +382,7 @@ struct Operating { db_path: Option, keys_path: Option, identity: Option, + light: Option, } #[derive(Default, Debug, PartialEq, RustcDecodable)] @@ -414,6 +418,7 @@ struct Network { node_key: Option, reserved_peers: Option, reserved_only: Option, + no_serve_light: Option, } #[derive(Default, Debug, PartialEq, RustcDecodable)] @@ -639,6 +644,7 @@ mod tests { flag_db_path: Some("$HOME/.parity/chains".into()), flag_keys_path: "$HOME/.parity/keys".into(), flag_identity: "".into(), + flag_light: false, // -- Account Options flag_unlock: Some("0xdeadbeefcafe0000000000000000000000000000".into()), @@ -669,6 +675,7 @@ mod tests { flag_reserved_peers: Some("./path_to_file".into()), flag_reserved_only: false, flag_no_ancient_blocks: false, + flag_no_serve_light: false, // -- API and Console Options // RPC @@ -844,6 +851,7 @@ mod tests { db_path: None, keys_path: None, identity: None, + light: None, }), account: Some(Account { unlock: Some(vec!["0x1".into(), "0x2".into(), "0x3".into()]), @@ -873,6 +881,7 @@ mod tests { node_key: None, reserved_peers: Some("./path/to/reserved_peers".into()), reserved_only: Some(true), + no_serve_light: None, }), rpc: Some(Rpc { disable: Some(true), diff --git a/parity/cli/usage.txt b/parity/cli/usage.txt index 1ebeffef9..1e5f3c0fb 100644 --- a/parity/cli/usage.txt +++ b/parity/cli/usage.txt @@ -70,6 +70,11 @@ Operating Options: --keys-path PATH Specify the path for JSON key files to be found (default: {flag_keys_path}). --identity NAME Specify your node's name. (default: {flag_identity}) + --light Experimental: run in light client mode. Light clients + synchronize a bare minimum of data and fetch necessary + data on-demand from the network. Much lower in storage, + potentially higher in bandwidth. Has no effect with + subcommands (default: {flag_light}). Account Options: --unlock ACCOUNTS Unlock ACCOUNTS for the duration of the execution. @@ -129,6 +134,7 @@ Networking Options: --max-pending-peers NUM Allow up to NUM pending connections. (default: {flag_max_pending_peers}) --no-ancient-blocks Disable downloading old blocks after snapshot restoration or warp sync. (default: {flag_no_ancient_blocks}) + --no-serve-light Disable serving of light peers. (default: {flag_no_serve_light}) API and Console Options: --no-jsonrpc Disable the JSON-RPC API server. (default: {flag_no_jsonrpc}) diff --git a/parity/configuration.rs b/parity/configuration.rs index 1eb4c1848..f585dc22e 100644 --- a/parity/configuration.rs +++ b/parity/configuration.rs @@ -382,6 +382,8 @@ impl Configuration { check_seal: !self.args.flag_no_seal_check, download_old_blocks: !self.args.flag_no_ancient_blocks, verifier_settings: verifier_settings, + serve_light: !self.args.flag_no_serve_light, + light: self.args.flag_light, }; Cmd::Run(run_cmd) }; @@ -1201,6 +1203,8 @@ mod tests { check_seal: true, download_old_blocks: true, verifier_settings: Default::default(), + serve_light: true, + light: false, }; expected.secretstore_conf.enabled = cfg!(feature = "secretstore"); assert_eq!(conf.into_command().unwrap().cmd, Cmd::Run(expected)); diff --git a/parity/dapps.rs b/parity/dapps.rs index e0e97c08f..4cdd1e550 100644 --- a/parity/dapps.rs +++ b/parity/dapps.rs @@ -18,12 +18,14 @@ use std::path::PathBuf; use std::sync::Arc; use dir::default_data_path; -use ethcore::client::Client; -use ethsync::SyncProvider; +use ethcore::client::{Client, BlockChainClient, BlockId}; +use ethcore::transaction::{Transaction, Action}; use hash_fetch::fetch::Client as FetchClient; +use hash_fetch::urlhint::ContractClient; use helpers::replace_home; use rpc_apis::SignerService; use parity_reactor; +use util::{Bytes, Address, U256}; #[derive(Debug, PartialEq, Clone)] pub struct Configuration { @@ -43,15 +45,53 @@ impl Default for Configuration { } } -pub struct Dependencies { +/// Registrar implementation of the full client. +pub struct FullRegistrar { + /// Handle to the full client. pub client: Arc, - pub sync: Arc, +} + +impl ContractClient for FullRegistrar { + fn registrar(&self) -> Result { + self.client.additional_params().get("registrar") + .ok_or_else(|| "Registrar not defined.".into()) + .and_then(|registrar| { + registrar.parse().map_err(|e| format!("Invalid registrar address: {:?}", e)) + }) + } + + fn call(&self, address: Address, data: Bytes) -> Result { + let from = Address::default(); + let transaction = Transaction { + nonce: self.client.latest_nonce(&from), + action: Action::Call(address), + gas: U256::from(50_000_000), + gas_price: U256::default(), + value: U256::default(), + data: data, + }.fake_sign(from); + + self.client.call(&transaction, BlockId::Latest, Default::default()) + .map_err(|e| format!("{:?}", e)) + .map(|executed| { + executed.output + }) + } +} + +// TODO: light client implementation forwarding to OnDemand and waiting for future +// to resolve. +pub struct Dependencies { + pub sync_status: Arc<::parity_dapps::SyncStatus>, + pub contract_client: Arc, pub remote: parity_reactor::TokioRemote, pub fetch: FetchClient, pub signer: Arc, } -pub fn new(configuration: Configuration, deps: Dependencies) -> Result, String> { +pub fn new(configuration: Configuration, deps: Dependencies) + -> Result, String> +{ if !configuration.enabled { return Ok(None); } @@ -96,13 +136,8 @@ mod server { use super::Dependencies; use std::path::PathBuf; use std::sync::Arc; - use util::{Bytes, Address, U256}; - use ethcore::transaction::{Transaction, Action}; - use ethcore::client::{Client, BlockChainClient, BlockId}; - use ethcore_rpc::is_major_importing; use hash_fetch::fetch::Client as FetchClient; - use hash_fetch::urlhint::ContractClient; use parity_dapps; use parity_reactor; @@ -113,12 +148,8 @@ mod server { dapps_path: PathBuf, extra_dapps: Vec, ) -> Result { - let sync = deps.sync; let signer = deps.signer.clone(); - let client = deps.client; let parity_remote = parity_reactor::Remote::new(deps.remote.clone()); - let registrar = Arc::new(Registrar { client: client.clone() }); - let sync_status = Arc::new(move || is_major_importing(Some(sync.status().state), client.queue_info())); let web_proxy_tokens = Arc::new(move |token| signer.is_valid_web_proxy_access_token(&token)); Ok(parity_dapps::Middleware::new( @@ -126,42 +157,10 @@ mod server { deps.signer.address(), dapps_path, extra_dapps, - registrar, - sync_status, + deps.contract_client, + deps.sync_status, web_proxy_tokens, deps.fetch.clone(), )) } - - struct Registrar { - client: Arc, - } - - impl ContractClient for Registrar { - fn registrar(&self) -> Result { - self.client.additional_params().get("registrar") - .ok_or_else(|| "Registrar not defined.".into()) - .and_then(|registrar| { - registrar.parse().map_err(|e| format!("Invalid registrar address: {:?}", e)) - }) - } - - fn call(&self, address: Address, data: Bytes) -> Result { - let from = Address::default(); - let transaction = Transaction { - nonce: self.client.latest_nonce(&from), - action: Action::Call(address), - gas: U256::from(50_000_000), - gas_price: U256::default(), - value: U256::default(), - data: data, - }.fake_sign(from); - - self.client.call(&transaction, BlockId::Latest, Default::default()) - .map_err(|e| format!("{:?}", e)) - .map(|executed| { - executed.output - }) - } - } } diff --git a/ethcore/src/migrations/v11.rs b/parity/light_helpers/mod.rs similarity index 75% rename from ethcore/src/migrations/v11.rs rename to parity/light_helpers/mod.rs index e33de6170..488f970c2 100644 --- a/ethcore/src/migrations/v11.rs +++ b/parity/light_helpers/mod.rs @@ -14,13 +14,8 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . -//! Adds a seventh column for node information. +//! Utilities and helpers for the light client. -use util::migration::ChangeColumns; +mod queue_cull; -/// The migration from v10 to v11. -pub const TO_V11: ChangeColumns = ChangeColumns { - pre_columns: Some(6), - post_columns: Some(7), - version: 11, -}; +pub use self::queue_cull::QueueCull; diff --git a/parity/light_helpers/queue_cull.rs b/parity/light_helpers/queue_cull.rs new file mode 100644 index 000000000..10865d485 --- /dev/null +++ b/parity/light_helpers/queue_cull.rs @@ -0,0 +1,99 @@ +// Copyright 2015-2017 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Service for culling the light client's transaction queue. + +use std::sync::Arc; +use std::time::Duration; + +use ethcore::service::ClientIoMessage; +use ethsync::LightSync; +use io::{IoContext, IoHandler, TimerToken}; + +use light::client::Client; +use light::on_demand::{request, OnDemand}; +use light::TransactionQueue; + +use futures::{future, stream, Future, Stream}; + +use parity_reactor::Remote; + +use util::RwLock; + +// Attepmt to cull once every 10 minutes. +const TOKEN: TimerToken = 1; +const TIMEOUT_MS: u64 = 1000 * 60 * 10; + +// But make each attempt last only 9 minutes +const PURGE_TIMEOUT_MS: u64 = 1000 * 60 * 9; + +/// Periodically culls the transaction queue of mined transactions. +pub struct QueueCull { + /// A handle to the client, for getting the latest block header. + pub client: Arc, + /// A handle to the sync service. + pub sync: Arc, + /// The on-demand request service. + pub on_demand: Arc, + /// The transaction queue. + pub txq: Arc>, + /// Event loop remote. + pub remote: Remote, +} + +impl IoHandler for QueueCull { + fn initialize(&self, io: &IoContext) { + io.register_timer(TOKEN, TIMEOUT_MS).expect("Error registering timer"); + } + + fn timeout(&self, _io: &IoContext, timer: TimerToken) { + if timer != TOKEN { return } + + let senders = self.txq.read().queued_senders(); + if senders.is_empty() { return } + + let (sync, on_demand, txq) = (self.sync.clone(), self.on_demand.clone(), self.txq.clone()); + let best_header = self.client.best_block_header(); + let start_nonce = self.client.engine().account_start_nonce(); + + info!(target: "cull", "Attempting to cull queued transactions from {} senders.", senders.len()); + self.remote.spawn_with_timeout(move || { + let maybe_fetching = sync.with_context(move |ctx| { + // fetch the nonce of each sender in the queue. + let nonce_futures = senders.iter() + .map(|&address| request::Account { header: best_header.clone(), address: address }) + .map(|request| on_demand.account(ctx, request)) + .map(move |fut| fut.map(move |x| x.map(|acc| acc.nonce).unwrap_or(start_nonce))) + .zip(senders.iter()) + .map(|(fut, &addr)| fut.map(move |nonce| (addr, nonce))); + + // as they come in, update each sender to the new nonce. + stream::futures_unordered(nonce_futures) + .fold(txq, |txq, (address, nonce)| { + txq.write().cull(address, nonce); + future::ok(txq) + }) + .map(|_| ()) // finally, discard the txq handle and log errors. + .map_err(|_| debug!(target: "cull", "OnDemand prematurely closed channel.")) + }); + + match maybe_fetching { + Some(fut) => fut.boxed(), + None => future::ok(()).boxed(), + } + }, Duration::from_millis(PURGE_TIMEOUT_MS), || {}) + } +} diff --git a/parity/main.rs b/parity/main.rs index 1063571a9..0d55055da 100644 --- a/parity/main.rs +++ b/parity/main.rs @@ -28,6 +28,7 @@ extern crate ctrlc; extern crate docopt; extern crate env_logger; extern crate fdlimit; +extern crate futures; extern crate isatty; extern crate jsonrpc_core; extern crate num_cpus; @@ -105,6 +106,7 @@ mod deprecated; mod dir; mod helpers; mod informant; +mod light_helpers; mod migration; mod modules; mod params; diff --git a/parity/migration.rs b/parity/migration.rs index 445724325..c4e5f5ac6 100644 --- a/parity/migration.rs +++ b/parity/migration.rs @@ -30,7 +30,7 @@ use ethcore::migrations::Extract; /// Database is assumed to be at default version, when no version file is found. const DEFAULT_VERSION: u32 = 5; /// Current version of database models. -const CURRENT_VERSION: u32 = 11; +const CURRENT_VERSION: u32 = 12; /// First version of the consolidated database. const CONSOLIDATION_VERSION: u32 = 9; /// Defines how many items are migrated to the new version of database at once. @@ -147,6 +147,7 @@ fn consolidated_database_migrations(compaction_profile: &CompactionProfile) -> R let mut manager = MigrationManager::new(default_migration_settings(compaction_profile)); manager.add_migration(migrations::ToV10::new()).map_err(|_| Error::MigrationImpossible)?; manager.add_migration(migrations::TO_V11).map_err(|_| Error::MigrationImpossible)?; + manager.add_migration(migrations::TO_V12).map_err(|_| Error::MigrationImpossible)?; Ok(manager) } diff --git a/parity/rpc.rs b/parity/rpc.rs index 254bb782e..70a91c851 100644 --- a/parity/rpc.rs +++ b/parity/rpc.rs @@ -81,8 +81,8 @@ impl fmt::Display for IpcConfiguration { } } -pub struct Dependencies { - pub apis: Arc, +pub struct Dependencies { + pub apis: Arc, pub remote: TokioRemote, pub stats: Arc, } @@ -112,11 +112,17 @@ impl rpc::IpcMetaExtractor for RpcExtractor { } } -fn setup_apis(apis: ApiSet, deps: &Dependencies) -> MetaIoHandler { - rpc_apis::setup_rpc(deps.stats.clone(), deps.apis.clone(), apis) +fn setup_apis(apis: ApiSet, deps: &Dependencies) -> MetaIoHandler> + where D: rpc_apis::Dependencies +{ + rpc_apis::setup_rpc(deps.stats.clone(), &*deps.apis, apis) } -pub fn new_http(conf: HttpConfiguration, deps: &Dependencies, middleware: Option) -> Result, String> { +pub fn new_http( + conf: HttpConfiguration, + deps: &Dependencies, + middleware: Option +) -> Result, String> { if !conf.enabled { return Ok(None); } @@ -157,7 +163,10 @@ pub fn new_http(conf: HttpConfiguration, deps: &Dependencies, middleware: Option } } -pub fn new_ipc(conf: IpcConfiguration, dependencies: &Dependencies) -> Result, String> { +pub fn new_ipc( + conf: IpcConfiguration, + dependencies: &Dependencies +) -> Result, String> { if !conf.enabled { return Ok(None); } diff --git a/parity/rpc_apis.rs b/parity/rpc_apis.rs index dbeeea962..ea1eabc61 100644 --- a/parity/rpc_apis.rs +++ b/parity/rpc_apis.rs @@ -27,12 +27,14 @@ use ethcore::client::Client; use ethcore::miner::{Miner, ExternalMiner}; use ethcore::snapshot::SnapshotService; use ethcore_rpc::{Metadata, NetworkSettings}; -use ethcore_rpc::informant::{Middleware, RpcStats, ClientNotifier}; -use ethcore_rpc::dispatch::FullDispatcher; -use ethsync::{ManageNetwork, SyncProvider}; +use ethcore_rpc::informant::{ActivityNotifier, Middleware, RpcStats, ClientNotifier}; +use ethcore_rpc::dispatch::{FullDispatcher, LightDispatcher}; +use ethsync::{ManageNetwork, SyncProvider, LightSync}; use hash_fetch::fetch::Client as FetchClient; use jsonrpc_core::{MetaIoHandler}; +use light::{TransactionQueue as LightTransactionQueue, Cache as LightDataCache}; use updater::Updater; +use util::{Mutex, RwLock}; use ethcore_logger::RotatingLogger; #[derive(Debug, PartialEq, Clone, Eq, Hash)] @@ -112,25 +114,6 @@ impl FromStr for ApiSet { } } -pub struct Dependencies { - pub signer_service: Arc, - pub client: Arc, - pub snapshot: Arc, - pub sync: Arc, - pub net: Arc, - pub secret_store: Option>, - pub miner: Arc, - pub external_miner: Arc, - pub logger: Arc, - pub settings: Arc, - pub net_service: Arc, - pub updater: Arc, - pub geth_compatibility: bool, - pub dapps_interface: Option, - pub dapps_port: Option, - pub fetch: FetchClient, -} - fn to_modules(apis: &[Api]) -> BTreeMap { let mut modules = BTreeMap::new(); for api in apis { @@ -151,6 +134,274 @@ fn to_modules(apis: &[Api]) -> BTreeMap { modules } +/// RPC dependencies can be used to initialize RPC endpoints from APIs. +pub trait Dependencies { + type Notifier: ActivityNotifier; + + /// Create the activity notifier. + fn activity_notifier(&self) -> Self::Notifier; + + /// Extend the given I/O handler with endpoints for each API. + fn extend_with_set(&self, handler: &mut MetaIoHandler>, apis: &[Api]); +} + +/// RPC dependencies for a full node. +pub struct FullDependencies { + pub signer_service: Arc, + pub client: Arc, + pub snapshot: Arc, + pub sync: Arc, + pub net: Arc, + pub secret_store: Option>, + pub miner: Arc, + pub external_miner: Arc, + pub logger: Arc, + pub settings: Arc, + pub net_service: Arc, + pub updater: Arc, + pub geth_compatibility: bool, + pub dapps_interface: Option, + pub dapps_port: Option, + pub fetch: FetchClient, +} + +impl Dependencies for FullDependencies { + type Notifier = ClientNotifier; + + fn activity_notifier(&self) -> ClientNotifier { + ClientNotifier { + client: self.client.clone(), + } + } + + fn extend_with_set(&self, handler: &mut MetaIoHandler, apis: &[Api]) { + use ethcore_rpc::v1::*; + + macro_rules! add_signing_methods { + ($namespace:ident, $handler:expr, $deps:expr) => { + { + let deps = &$deps; + let dispatcher = FullDispatcher::new(Arc::downgrade(&deps.client), Arc::downgrade(&deps.miner)); + if deps.signer_service.is_enabled() { + $handler.extend_with($namespace::to_delegate(SigningQueueClient::new(&deps.signer_service, dispatcher, &deps.secret_store))) + } else { + $handler.extend_with($namespace::to_delegate(SigningUnsafeClient::new(&deps.secret_store, dispatcher))) + } + } + } + } + + let dispatcher = FullDispatcher::new(Arc::downgrade(&self.client), Arc::downgrade(&self.miner)); + for api in apis { + match *api { + Api::Web3 => { + handler.extend_with(Web3Client::new().to_delegate()); + }, + Api::Net => { + handler.extend_with(NetClient::new(&self.sync).to_delegate()); + }, + Api::Eth => { + let client = EthClient::new( + &self.client, + &self.snapshot, + &self.sync, + &self.secret_store, + &self.miner, + &self.external_miner, + EthClientOptions { + pending_nonce_from_queue: self.geth_compatibility, + allow_pending_receipt_query: !self.geth_compatibility, + send_block_number_in_get_work: !self.geth_compatibility, + } + ); + handler.extend_with(client.to_delegate()); + + let filter_client = EthFilterClient::new(&self.client, &self.miner); + handler.extend_with(filter_client.to_delegate()); + + add_signing_methods!(EthSigning, handler, self); + }, + Api::Personal => { + handler.extend_with(PersonalClient::new(&self.secret_store, dispatcher.clone(), self.geth_compatibility).to_delegate()); + }, + Api::Signer => { + handler.extend_with(SignerClient::new(&self.secret_store, dispatcher.clone(), &self.signer_service).to_delegate()); + }, + Api::Parity => { + let signer = match self.signer_service.is_enabled() { + true => Some(self.signer_service.clone()), + false => None, + }; + handler.extend_with(ParityClient::new( + &self.client, + &self.miner, + &self.sync, + &self.updater, + &self.net_service, + &self.secret_store, + self.logger.clone(), + self.settings.clone(), + signer, + self.dapps_interface.clone(), + self.dapps_port, + ).to_delegate()); + + add_signing_methods!(EthSigning, handler, self); + add_signing_methods!(ParitySigning, handler, self); + }, + Api::ParityAccounts => { + handler.extend_with(ParityAccountsClient::new(&self.secret_store).to_delegate()); + }, + Api::ParitySet => { + handler.extend_with(ParitySetClient::new( + &self.client, + &self.miner, + &self.updater, + &self.net_service, + self.fetch.clone(), + ).to_delegate()) + }, + Api::Traces => { + handler.extend_with(TracesClient::new(&self.client, &self.miner).to_delegate()) + }, + Api::Rpc => { + let modules = to_modules(&apis); + handler.extend_with(RpcClient::new(modules).to_delegate()); + } + } + } + } +} + +/// Light client notifier. Doesn't do anything yet, but might in the future. +pub struct LightClientNotifier; + +impl ActivityNotifier for LightClientNotifier { + fn active(&self) {} +} + +/// RPC dependencies for a light client. +pub struct LightDependencies { + pub signer_service: Arc, + pub client: Arc<::light::client::Client>, + pub sync: Arc, + pub net: Arc, + pub secret_store: Arc, + pub logger: Arc, + pub settings: Arc, + pub on_demand: Arc<::light::on_demand::OnDemand>, + pub cache: Arc>, + pub transaction_queue: Arc>, + pub dapps_interface: Option, + pub dapps_port: Option, + pub fetch: FetchClient, + pub geth_compatibility: bool, +} + +impl Dependencies for LightDependencies { + type Notifier = LightClientNotifier; + + fn activity_notifier(&self) -> Self::Notifier { LightClientNotifier } + fn extend_with_set(&self, handler: &mut MetaIoHandler>, apis: &[Api]) { + use ethcore_rpc::v1::*; + + let dispatcher = LightDispatcher::new( + self.sync.clone(), + self.client.clone(), + self.on_demand.clone(), + self.cache.clone(), + self.transaction_queue.clone(), + ); + + macro_rules! add_signing_methods { + ($namespace:ident, $handler:expr, $deps:expr) => { + { + let deps = &$deps; + let dispatcher = dispatcher.clone(); + let secret_store = Some(deps.secret_store.clone()); + if deps.signer_service.is_enabled() { + $handler.extend_with($namespace::to_delegate( + SigningQueueClient::new(&deps.signer_service, dispatcher, &secret_store) + )) + } else { + $handler.extend_with( + $namespace::to_delegate(SigningUnsafeClient::new(&secret_store, dispatcher)) + ) + } + } + } + } + + for api in apis { + match *api { + Api::Web3 => { + handler.extend_with(Web3Client::new().to_delegate()); + }, + Api::Net => { + handler.extend_with(light::NetClient::new(self.sync.clone()).to_delegate()); + }, + Api::Eth => { + let client = light::EthClient::new( + self.sync.clone(), + self.client.clone(), + self.on_demand.clone(), + self.transaction_queue.clone(), + self.secret_store.clone(), + self.cache.clone(), + ); + handler.extend_with(client.to_delegate()); + + // TODO: filters. + add_signing_methods!(EthSigning, handler, self); + }, + Api::Personal => { + let secret_store = Some(self.secret_store.clone()); + handler.extend_with(PersonalClient::new(&secret_store, dispatcher.clone(), self.geth_compatibility).to_delegate()); + }, + Api::Signer => { + let secret_store = Some(self.secret_store.clone()); + handler.extend_with(SignerClient::new(&secret_store, dispatcher.clone(), &self.signer_service).to_delegate()); + }, + Api::Parity => { + let signer = match self.signer_service.is_enabled() { + true => Some(self.signer_service.clone()), + false => None, + }; + handler.extend_with(light::ParityClient::new( + Arc::new(dispatcher.clone()), + self.secret_store.clone(), + self.logger.clone(), + self.settings.clone(), + signer, + self.dapps_interface.clone(), + self.dapps_port, + ).to_delegate()); + + add_signing_methods!(EthSigning, handler, self); + add_signing_methods!(ParitySigning, handler, self); + }, + Api::ParityAccounts => { + let secret_store = Some(self.secret_store.clone()); + handler.extend_with(ParityAccountsClient::new(&secret_store).to_delegate()); + }, + Api::ParitySet => { + handler.extend_with(light::ParitySetClient::new( + self.sync.clone(), + self.fetch.clone(), + ).to_delegate()) + }, + Api::Traces => { + handler.extend_with(light::TracesClient.to_delegate()) + }, + Api::Rpc => { + let modules = to_modules(&apis); + handler.extend_with(RpcClient::new(modules).to_delegate()); + } + } + } + } +} + impl ApiSet { pub fn list_apis(&self) -> HashSet { let mut safe_list = vec![Api::Web3, Api::Net, Api::Eth, Api::Parity, Api::Traces, Api::Rpc] @@ -172,110 +423,12 @@ impl ApiSet { } } -macro_rules! add_signing_methods { - ($namespace:ident, $handler:expr, $deps:expr) => { - { - let handler = &mut $handler; - let deps = &$deps; - let dispatcher = FullDispatcher::new(Arc::downgrade(&deps.client), Arc::downgrade(&deps.miner)); - if deps.signer_service.is_enabled() { - handler.extend_with($namespace::to_delegate(SigningQueueClient::new(&deps.signer_service, dispatcher, &deps.secret_store))) - } else { - handler.extend_with($namespace::to_delegate(SigningUnsafeClient::new(&deps.secret_store, dispatcher))) - } - } - } -} - -pub fn setup_rpc(stats: Arc, deps: Arc, apis: ApiSet) -> MetaIoHandler { - use ethcore_rpc::v1::*; - - let mut handler = MetaIoHandler::with_middleware(Middleware::new(stats, ClientNotifier { - client: deps.client.clone(), - })); - +pub fn setup_rpc(stats: Arc, deps: &D, apis: ApiSet) -> MetaIoHandler> { + let mut handler = MetaIoHandler::with_middleware(Middleware::new(stats, deps.activity_notifier())); // it's turned into vector, cause ont of the cases requires &[] let apis = apis.list_apis().into_iter().collect::>(); - let dispatcher = FullDispatcher::new(Arc::downgrade(&deps.client), Arc::downgrade(&deps.miner)); + deps.extend_with_set(&mut handler, &apis[..]); - for api in &apis { - match *api { - Api::Web3 => { - handler.extend_with(Web3Client::new().to_delegate()); - }, - Api::Net => { - handler.extend_with(NetClient::new(&deps.sync).to_delegate()); - }, - Api::Eth => { - let client = EthClient::new( - &deps.client, - &deps.snapshot, - &deps.sync, - &deps.secret_store, - &deps.miner, - &deps.external_miner, - EthClientOptions { - pending_nonce_from_queue: deps.geth_compatibility, - allow_pending_receipt_query: !deps.geth_compatibility, - send_block_number_in_get_work: !deps.geth_compatibility, - } - ); - handler.extend_with(client.to_delegate()); - - let filter_client = EthFilterClient::new(&deps.client, &deps.miner); - handler.extend_with(filter_client.to_delegate()); - - add_signing_methods!(EthSigning, handler, deps); - }, - Api::Personal => { - handler.extend_with(PersonalClient::new(&deps.secret_store, dispatcher.clone(), deps.geth_compatibility).to_delegate()); - }, - Api::Signer => { - handler.extend_with(SignerClient::new(&deps.secret_store, dispatcher.clone(), &deps.signer_service).to_delegate()); - }, - Api::Parity => { - let signer = match deps.signer_service.is_enabled() { - true => Some(deps.signer_service.clone()), - false => None, - }; - handler.extend_with(ParityClient::new( - &deps.client, - &deps.miner, - &deps.sync, - &deps.updater, - &deps.net_service, - &deps.secret_store, - deps.logger.clone(), - deps.settings.clone(), - signer, - deps.dapps_interface.clone(), - deps.dapps_port, - ).to_delegate()); - - add_signing_methods!(EthSigning, handler, deps); - add_signing_methods!(ParitySigning, handler, deps); - }, - Api::ParityAccounts => { - handler.extend_with(ParityAccountsClient::new(&deps.secret_store).to_delegate()); - }, - Api::ParitySet => { - handler.extend_with(ParitySetClient::new( - &deps.client, - &deps.miner, - &deps.updater, - &deps.net_service, - deps.fetch.clone(), - ).to_delegate()) - }, - Api::Traces => { - handler.extend_with(TracesClient::new(&deps.client, &deps.miner).to_delegate()) - }, - Api::Rpc => { - let modules = to_modules(&apis); - handler.extend_with(RpcClient::new(modules).to_delegate()); - } - } - } handler } diff --git a/parity/run.rs b/parity/run.rs index 8da5130d4..1ad124dbe 100644 --- a/parity/run.rs +++ b/parity/run.rs @@ -30,6 +30,7 @@ use ethcore::account_provider::{AccountProvider, AccountProviderSettings}; use ethcore::miner::{Miner, MinerService, ExternalMiner, MinerOptions}; use ethcore::snapshot; use ethcore::verification::queue::VerifierSettings; +use light::Cache as LightDataCache; use ethsync::SyncConfig; use informant::Informant; use updater::{UpdatePolicy, Updater}; @@ -60,6 +61,10 @@ const SNAPSHOT_PERIOD: u64 = 10000; // how many blocks to wait before starting a periodic snapshot. const SNAPSHOT_HISTORY: u64 = 100; +// Number of minutes before a given gas price corpus should expire. +// Light client only. +const GAS_CORPUS_EXPIRATION_MINUTES: i64 = 60 * 6; + // Pops along with error messages when a password is missing or invalid. const VERIFY_PASSWORD_HINT: &'static str = "Make sure valid password is present in files passed using `--password` or in the configuration file."; @@ -107,6 +112,8 @@ pub struct RunCmd { pub check_seal: bool, pub download_old_blocks: bool, pub verifier_settings: VerifierSettings, + pub serve_light: bool, + pub light: bool, } pub fn open_ui(signer_conf: &signer::Configuration) -> Result<(), String> { @@ -148,6 +155,171 @@ impl ::local_store::NodeInfo for FullNodeInfo { } } +// helper for light execution. +fn execute_light(cmd: RunCmd, can_restart: bool, logger: Arc) -> Result<(bool, Option), String> { + use light::client as light_client; + use ethsync::{LightSyncParams, LightSync, ManageNetwork}; + use util::RwLock; + + let panic_handler = PanicHandler::new_in_arc(); + + // load spec + let spec = cmd.spec.spec()?; + + // load genesis hash + let genesis_hash = spec.genesis_header().hash(); + + // database paths + let db_dirs = cmd.dirs.database(genesis_hash, cmd.spec.legacy_fork_name(), spec.data_dir.clone()); + + // user defaults path + let user_defaults_path = db_dirs.user_defaults_path(); + + // load user defaults + let user_defaults = UserDefaults::load(&user_defaults_path)?; + + // select pruning algorithm + let algorithm = cmd.pruning.to_algorithm(&user_defaults); + + let compaction = cmd.compaction.compaction_profile(db_dirs.db_root_path().as_path()); + + // execute upgrades + execute_upgrades(&cmd.dirs.base, &db_dirs, algorithm, compaction.clone())?; + + // create dirs used by parity + cmd.dirs.create_dirs(cmd.dapps_conf.enabled, cmd.signer_conf.enabled, cmd.secretstore_conf.enabled)?; + + info!("Starting {}", Colour::White.bold().paint(version())); + info!("Running in experimental {} mode.", Colour::Blue.bold().paint("Light Client")); + + // start client and create transaction queue. + let mut config = light_client::Config { + queue: Default::default(), + chain_column: ::ethcore::db::COL_LIGHT_CHAIN, + db_cache_size: Some(cmd.cache_config.blockchain() as usize * 1024 * 1024), + db_compaction: compaction, + db_wal: cmd.wal, + }; + + config.queue.max_mem_use = cmd.cache_config.queue() as usize * 1024 * 1024; + config.queue.verifier_settings = cmd.verifier_settings; + + let service = light_client::Service::start(config, &spec, &db_dirs.client_path(algorithm)) + .map_err(|e| format!("Error starting light client: {}", e))?; + let txq = Arc::new(RwLock::new(::light::transaction_queue::TransactionQueue::default())); + let provider = ::light::provider::LightProvider::new(service.client().clone(), txq.clone()); + + // start network. + // set up bootnodes + let mut net_conf = cmd.net_conf; + if !cmd.custom_bootnodes { + net_conf.boot_nodes = spec.nodes.clone(); + } + + // TODO: configurable cache size. + let cache = LightDataCache::new(Default::default(), ::time::Duration::minutes(GAS_CORPUS_EXPIRATION_MINUTES)); + let cache = Arc::new(::util::Mutex::new(cache)); + + // start on_demand service. + let on_demand = Arc::new(::light::on_demand::OnDemand::new(cache.clone())); + + // set network path. + net_conf.net_config_path = Some(db_dirs.network_path().to_string_lossy().into_owned()); + let sync_params = LightSyncParams { + network_config: net_conf.into_basic().map_err(|e| format!("Failed to produce network config: {}", e))?, + client: Arc::new(provider), + network_id: cmd.network_id.unwrap_or(spec.network_id()), + subprotocol_name: ::ethsync::LIGHT_PROTOCOL, + handlers: vec![on_demand.clone()], + }; + let light_sync = LightSync::new(sync_params).map_err(|e| format!("Error starting network: {}", e))?; + let light_sync = Arc::new(light_sync); + + // spin up event loop + let event_loop = EventLoop::spawn(); + + // queue cull service. + let queue_cull = Arc::new(::light_helpers::QueueCull { + client: service.client().clone(), + sync: light_sync.clone(), + on_demand: on_demand.clone(), + txq: txq.clone(), + remote: event_loop.remote(), + }); + + service.register_handler(queue_cull).map_err(|e| format!("Error attaching service: {:?}", e))?; + + // start the network. + light_sync.start_network(); + + // fetch service + let fetch = FetchClient::new().map_err(|e| format!("Error starting fetch client: {:?}", e))?; + let passwords = passwords_from_files(&cmd.acc_conf.password_files)?; + + // prepare account provider + let account_provider = Arc::new(prepare_account_provider(&cmd.spec, &cmd.dirs, &spec.data_dir, cmd.acc_conf, &passwords)?); + let rpc_stats = Arc::new(informant::RpcStats::default()); + let signer_path = cmd.signer_conf.signer_path.clone(); + + // start RPCs + let deps_for_rpc_apis = Arc::new(rpc_apis::LightDependencies { + signer_service: Arc::new(rpc_apis::SignerService::new(move || { + signer::generate_new_token(signer_path.clone()).map_err(|e| format!("{:?}", e)) + }, cmd.ui_address)), + client: service.client().clone(), + sync: light_sync.clone(), + net: light_sync.clone(), + secret_store: account_provider, + logger: logger, + settings: Arc::new(cmd.net_settings), + on_demand: on_demand, + cache: cache, + transaction_queue: txq, + dapps_interface: match cmd.dapps_conf.enabled { + true => Some(cmd.http_conf.interface.clone()), + false => None, + }, + dapps_port: match cmd.dapps_conf.enabled { + true => Some(cmd.http_conf.port), + false => None, + }, + fetch: fetch, + geth_compatibility: cmd.geth_compatibility, + }); + + let dependencies = rpc::Dependencies { + apis: deps_for_rpc_apis.clone(), + remote: event_loop.raw_remote(), + stats: rpc_stats.clone(), + }; + + // start rpc servers + let _http_server = rpc::new_http(cmd.http_conf, &dependencies, None)?; + let _ipc_server = rpc::new_ipc(cmd.ipc_conf, &dependencies)?; + + // the signer server + let signer_deps = signer::Dependencies { + apis: deps_for_rpc_apis.clone(), + remote: event_loop.raw_remote(), + rpc_stats: rpc_stats.clone(), + }; + let signing_queue = deps_for_rpc_apis.signer_service.queue(); + let _signer_server = signer::start(cmd.signer_conf.clone(), signing_queue, signer_deps)?; + + // TODO: Dapps + + // minimal informant thread. Just prints block number every 5 seconds. + // TODO: integrate with informant.rs + let informant_client = service.client().clone(); + ::std::thread::spawn(move || loop { + info!("#{}", informant_client.best_block_header().number()); + ::std::thread::sleep(::std::time::Duration::from_secs(5)); + }); + + // wait for ctrl-c. + Ok(wait_for_exit(panic_handler, None, None, can_restart)) +} + pub fn execute(cmd: RunCmd, can_restart: bool, logger: Arc) -> Result<(bool, Option), String> { if cmd.ui && cmd.dapps_conf.enabled { // Check if Parity is already running @@ -157,12 +329,17 @@ pub fn execute(cmd: RunCmd, can_restart: bool, logger: Arc) -> R } } - // set up panic handler - let panic_handler = PanicHandler::new_in_arc(); - // increase max number of open files raise_fd_limit(); + // run as light client. + if cmd.light { + return execute_light(cmd, can_restart, logger); + } + + // set up panic handler + let panic_handler = PanicHandler::new_in_arc(); + // load spec let spec = cmd.spec.spec()?; @@ -244,6 +421,7 @@ pub fn execute(cmd: RunCmd, can_restart: bool, logger: Arc) -> R sync_config.fork_block = spec.fork_block(); sync_config.warp_sync = cmd.warp_sync; sync_config.download_old_blocks = cmd.download_old_blocks; + sync_config.serve_light = cmd.serve_light; let passwords = passwords_from_files(&cmd.acc_conf.password_files)?; @@ -407,7 +585,8 @@ pub fn execute(cmd: RunCmd, can_restart: bool, logger: Arc) -> R true => None, false => Some(account_provider.clone()) }; - let deps_for_rpc_apis = Arc::new(rpc_apis::Dependencies { + + let deps_for_rpc_apis = Arc::new(rpc_apis::FullDependencies { signer_service: Arc::new(rpc_apis::SignerService::new(move || { signer::generate_new_token(signer_path.clone()).map_err(|e| format!("{:?}", e)) }, cmd.ui_address)), @@ -440,13 +619,18 @@ pub fn execute(cmd: RunCmd, can_restart: bool, logger: Arc) -> R stats: rpc_stats.clone(), }; - // the dapps middleware - let dapps_deps = dapps::Dependencies { - client: client.clone(), - sync: sync_provider.clone(), - remote: event_loop.raw_remote(), - fetch: fetch.clone(), - signer: deps_for_rpc_apis.signer_service.clone(), + // the dapps server + let dapps_deps = { + let (sync, client) = (sync_provider.clone(), client.clone()); + let contract_client = Arc::new(::dapps::FullRegistrar { client: client.clone() }); + + dapps::Dependencies { + sync_status: Arc::new(move || is_major_importing(Some(sync.status().state), client.queue_info())), + contract_client: contract_client, + remote: event_loop.raw_remote(), + fetch: fetch.clone(), + signer: deps_for_rpc_apis.signer_service.clone(), + } }; let dapps_middleware = dapps::new(cmd.dapps_conf.clone(), dapps_deps)?; @@ -460,7 +644,8 @@ pub fn execute(cmd: RunCmd, can_restart: bool, logger: Arc) -> R remote: event_loop.raw_remote(), rpc_stats: rpc_stats.clone(), }; - let signer_server = signer::start(cmd.signer_conf.clone(), signer_deps)?; + let signing_queue = deps_for_rpc_apis.signer_service.queue(); + let signer_server = signer::start(cmd.signer_conf.clone(), signing_queue, signer_deps)?; // secret store key server let secretstore_deps = secretstore::Dependencies { diff --git a/parity/signer.rs b/parity/signer.rs index 664a0e6e4..29429311e 100644 --- a/parity/signer.rs +++ b/parity/signer.rs @@ -23,7 +23,7 @@ pub use ethcore_signer::Server as SignerServer; use ansi_term::Colour; use dir::default_data_path; use ethcore_rpc::informant::RpcStats; -use ethcore_rpc; +use ethcore_rpc::{self, ConfirmationsQueue}; use ethcore_signer as signer; use helpers::replace_home; use parity_reactor::TokioRemote; @@ -55,8 +55,8 @@ impl Default for Configuration { } } -pub struct Dependencies { - pub apis: Arc, +pub struct Dependencies { + pub apis: Arc, pub remote: TokioRemote, pub rpc_stats: Arc, } @@ -77,11 +77,15 @@ impl signer::MetaExtractor for StandardExtractor { } } -pub fn start(conf: Configuration, deps: Dependencies) -> Result, String> { +pub fn start( + conf: Configuration, + queue: Arc, + deps: Dependencies, +) -> Result, String> { if !conf.enabled { Ok(None) } else { - Ok(Some(do_start(conf, deps)?)) + Ok(Some(do_start(conf, queue, deps)?)) } } @@ -125,14 +129,18 @@ pub fn generate_new_token(path: String) -> io::Result { Ok(code) } -fn do_start(conf: Configuration, deps: Dependencies) -> Result { +fn do_start( + conf: Configuration, + queue: Arc, + deps: Dependencies +) -> Result { let addr = format!("{}:{}", conf.interface, conf.port) .parse() .map_err(|_| format!("Invalid port specified: {}", conf.port))?; let start_result = { let server = signer::ServerBuilder::new( - deps.apis.signer_service.queue(), + queue, codes_path(conf.signer_path), ); if conf.skip_origin_validation { @@ -141,7 +149,7 @@ fn do_start(conf: Configuration, deps: Dependencies) -> Result) -> BoxFuture { const DEFAULT_GAS_PRICE: U256 = U256([0, 0, 0, 21_000_000]); - let (sync, on_demand, client) = (self.sync.clone(), self.on_demand.clone(), self.client.clone()); let req: CRequest = req.into(); let id = num.0.into(); @@ -245,7 +244,22 @@ impl Eth for EthClient { } fn syncing(&self) -> Result { - rpc_unimplemented!() + if self.sync.is_major_importing() { + let chain_info = self.client.chain_info(); + let current_block = U256::from(chain_info.best_block_number); + let highest_block = self.sync.highest_block().map(U256::from) + .unwrap_or_else(|| current_block.clone()); + + Ok(SyncStatus::Info(SyncInfo { + starting_block: U256::from(self.sync.start_block()).into(), + current_block: current_block.into(), + highest_block: highest_block.into(), + warp_chunks_amount: None, + warp_chunks_processed: None, + })) + } else { + Ok(SyncStatus::None) + } } fn author(&self, _meta: Self::Metadata) -> BoxFuture { diff --git a/rpc/src/v1/impls/light/mod.rs b/rpc/src/v1/impls/light/mod.rs index 8c2e6d240..38ba2438e 100644 --- a/rpc/src/v1/impls/light/mod.rs +++ b/rpc/src/v1/impls/light/mod.rs @@ -23,7 +23,10 @@ pub mod eth; pub mod parity; pub mod parity_set; pub mod trace; +pub mod net; pub use self::eth::EthClient; pub use self::parity::ParityClient; pub use self::parity_set::ParitySetClient; +pub use self::net::NetClient; +pub use self::trace::TracesClient; diff --git a/rpc/src/v1/impls/light/net.rs b/rpc/src/v1/impls/light/net.rs new file mode 100644 index 000000000..4f0ede48f --- /dev/null +++ b/rpc/src/v1/impls/light/net.rs @@ -0,0 +1,49 @@ +// Copyright 2015-2017 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Net rpc implementation. +use std::sync::Arc; +use jsonrpc_core::Error; +use ethsync::LightSyncProvider; +use v1::traits::Net; + +/// Net rpc implementation. +pub struct NetClient { + sync: Arc +} + +impl NetClient where S: LightSyncProvider { + /// Creates new NetClient. + pub fn new(sync: Arc) -> Self { + NetClient { + sync: sync, + } + } +} + +impl Net for NetClient where S: LightSyncProvider { + fn version(&self) -> Result { + Ok(format!("{}", self.sync.network_id()).to_owned()) + } + + fn peer_count(&self) -> Result { + Ok(format!("0x{:x}", self.sync.peer_numbers().connected as u64).to_owned()) + } + + fn is_listening(&self) -> Result { + Ok(true) + } +} diff --git a/rpc/src/v1/impls/net.rs b/rpc/src/v1/impls/net.rs index 5588805ab..399b2201a 100644 --- a/rpc/src/v1/impls/net.rs +++ b/rpc/src/v1/impls/net.rs @@ -21,7 +21,7 @@ use ethsync::SyncProvider; use v1::traits::Net; /// Net rpc implementation. -pub struct NetClient where S: SyncProvider { +pub struct NetClient { sync: Weak } diff --git a/rpc/src/v1/tests/helpers/sync_provider.rs b/rpc/src/v1/tests/helpers/sync_provider.rs index fe2ae3f59..83c7db015 100644 --- a/rpc/src/v1/tests/helpers/sync_provider.rs +++ b/rpc/src/v1/tests/helpers/sync_provider.rs @@ -83,7 +83,7 @@ impl SyncProvider for TestSyncProvider { difficulty: Some(40.into()), head: 50.into(), }), - les_info: None, + pip_info: None, }, PeerInfo { id: None, @@ -96,7 +96,7 @@ impl SyncProvider for TestSyncProvider { difficulty: None, head: 60.into() }), - les_info: None, + pip_info: None, } ] } diff --git a/rpc/src/v1/tests/mocked/parity.rs b/rpc/src/v1/tests/mocked/parity.rs index d47c810f1..8de64c25c 100644 --- a/rpc/src/v1/tests/mocked/parity.rs +++ b/rpc/src/v1/tests/mocked/parity.rs @@ -304,7 +304,7 @@ fn rpc_parity_net_peers() { let io = deps.default_client(); let request = r#"{"jsonrpc": "2.0", "method": "parity_netPeers", "params":[], "id": 1}"#; - let response = r#"{"jsonrpc":"2.0","result":{"active":0,"connected":120,"max":50,"peers":[{"caps":["eth/62","eth/63"],"id":"node1","name":"Parity/1","network":{"localAddress":"127.0.0.1:8888","remoteAddress":"127.0.0.1:7777"},"protocols":{"eth":{"difficulty":"0x28","head":"0000000000000000000000000000000000000000000000000000000000000032","version":62},"les":null}},{"caps":["eth/63","eth/64"],"id":null,"name":"Parity/2","network":{"localAddress":"127.0.0.1:3333","remoteAddress":"Handshake"},"protocols":{"eth":{"difficulty":null,"head":"000000000000000000000000000000000000000000000000000000000000003c","version":64},"les":null}}]},"id":1}"#; + let response = r#"{"jsonrpc":"2.0","result":{"active":0,"connected":120,"max":50,"peers":[{"caps":["eth/62","eth/63"],"id":"node1","name":"Parity/1","network":{"localAddress":"127.0.0.1:8888","remoteAddress":"127.0.0.1:7777"},"protocols":{"eth":{"difficulty":"0x28","head":"0000000000000000000000000000000000000000000000000000000000000032","version":62},"pip":null}},{"caps":["eth/63","eth/64"],"id":null,"name":"Parity/2","network":{"localAddress":"127.0.0.1:3333","remoteAddress":"Handshake"},"protocols":{"eth":{"difficulty":null,"head":"000000000000000000000000000000000000000000000000000000000000003c","version":64},"pip":null}}]},"id":1}"#; assert_eq!(io.handle_request_sync(request), Some(response.to_owned())); } diff --git a/rpc/src/v1/types/mod.rs b/rpc/src/v1/types/mod.rs index 0ec60a74f..c9923fdce 100644 --- a/rpc/src/v1/types/mod.rs +++ b/rpc/src/v1/types/mod.rs @@ -65,7 +65,7 @@ pub use self::receipt::Receipt; pub use self::rpc_settings::RpcSettings; pub use self::sync::{ SyncStatus, SyncInfo, Peers, PeerInfo, PeerNetworkInfo, PeerProtocolsInfo, - TransactionStats, ChainStatus, EthProtocolInfo, LesProtocolInfo, + TransactionStats, ChainStatus, EthProtocolInfo, PipProtocolInfo, }; pub use self::trace::{LocalizedTrace, TraceResults}; pub use self::trace_filter::TraceFilter; diff --git a/rpc/src/v1/types/sync.rs b/rpc/src/v1/types/sync.rs index d83a3a64c..813fe8cb3 100644 --- a/rpc/src/v1/types/sync.rs +++ b/rpc/src/v1/types/sync.rs @@ -83,8 +83,8 @@ pub struct PeerNetworkInfo { pub struct PeerProtocolsInfo { /// Ethereum protocol information pub eth: Option, - /// LES protocol information. - pub les: Option, + /// PIP protocol information. + pub pip: Option, } /// Peer Ethereum protocol information @@ -108,10 +108,10 @@ impl From for EthProtocolInfo { } } -/// Peer LES protocol information +/// Peer PIP protocol information #[derive(Default, Debug, Serialize)] -pub struct LesProtocolInfo { - /// Negotiated LES protocol version +pub struct PipProtocolInfo { + /// Negotiated PIP protocol version pub version: u32, /// Peer total difficulty pub difficulty: U256, @@ -119,9 +119,9 @@ pub struct LesProtocolInfo { pub head: String, } -impl From for LesProtocolInfo { - fn from(info: ethsync::LesProtocolInfo) -> Self { - LesProtocolInfo { +impl From for PipProtocolInfo { + fn from(info: ethsync::PipProtocolInfo) -> Self { + PipProtocolInfo { version: info.version, difficulty: info.difficulty.into(), head: info.head.hex(), @@ -171,7 +171,7 @@ impl From for PeerInfo { }, protocols: PeerProtocolsInfo { eth: p.eth_info.map(Into::into), - les: p.les_info.map(Into::into), + pip: p.pip_info.map(Into::into), }, } } diff --git a/sync/src/api.rs b/sync/src/api.rs index 4cdc9d37a..3e3234d84 100644 --- a/sync/src/api.rs +++ b/sync/src/api.rs @@ -43,7 +43,7 @@ pub const WARP_SYNC_PROTOCOL_ID: ProtocolId = *b"par"; /// Ethereum sync protocol pub const ETH_PROTOCOL: ProtocolId = *b"eth"; /// Ethereum light protocol -pub const LIGHT_PROTOCOL: ProtocolId = *b"plp"; +pub const LIGHT_PROTOCOL: ProtocolId = *b"pip"; /// Sync configuration #[derive(Debug, Clone, Copy)] @@ -126,7 +126,7 @@ pub struct PeerInfo { /// Eth protocol info. pub eth_info: Option, /// Light protocol info. - pub les_info: Option, + pub pip_info: Option, } /// Ethereum protocol info. @@ -141,10 +141,10 @@ pub struct EthProtocolInfo { pub difficulty: Option, } -/// LES protocol info. +/// PIP protocol info. #[derive(Debug)] #[cfg_attr(feature = "ipc", derive(Binary))] -pub struct LesProtocolInfo { +pub struct PipProtocolInfo { /// Protocol version pub version: u32, /// SHA3 of peer best block hash @@ -153,9 +153,9 @@ pub struct LesProtocolInfo { pub difficulty: U256, } -impl From for LesProtocolInfo { +impl From for PipProtocolInfo { fn from(status: light_net::Status) -> Self { - LesProtocolInfo { + PipProtocolInfo { version: status.protocol_version, head: status.head_hash, difficulty: status.head_td, @@ -184,7 +184,7 @@ pub struct EthSync { network: NetworkService, /// Main (eth/par) protocol handler eth_handler: Arc, - /// Light (les) protocol handler + /// Light (pip) protocol handler light_proto: Option>, /// The main subprotocol name subprotocol_name: [u8; 3], @@ -264,7 +264,7 @@ impl SyncProvider for EthSync { remote_address: session_info.remote_address, local_address: session_info.local_address, eth_info: eth_sync.peer_info(&peer_id), - les_info: light_proto.as_ref().and_then(|lp| lp.peer_status(&peer_id)).map(Into::into), + pip_info: light_proto.as_ref().and_then(|lp| lp.peer_status(&peer_id)).map(Into::into), }) }).collect() }).unwrap_or_else(Vec::new) @@ -408,13 +408,13 @@ impl ChainNotify for EthSync { } } -/// LES event handler. +/// PIP event handler. /// Simply queues transactions from light client peers. struct TxRelay(Arc); impl LightHandler for TxRelay { fn on_transactions(&self, ctx: &EventContext, relay: &[::ethcore::transaction::UnverifiedTransaction]) { - trace!(target: "les", "Relaying {} transactions from peer {}", relay.len(), ctx.peer()); + trace!(target: "pip", "Relaying {} transactions from peer {}", relay.len(), ctx.peer()); self.0.queue_transactions(relay.iter().map(|tx| ::rlp::encode(tx).to_vec()).collect(), ctx.peer()) } } @@ -642,6 +642,9 @@ pub trait LightSyncProvider { /// Get peers information fn peers(&self) -> Vec; + /// Get network id. + fn network_id(&self) -> u64; + /// Get the enode if available. fn enode(&self) -> Option; @@ -659,13 +662,17 @@ pub struct LightSyncParams { pub network_id: u64, /// Subprotocol name. pub subprotocol_name: [u8; 3], + /// Other handlers to attach. + pub handlers: Vec>, } /// Service for light synchronization. pub struct LightSync { proto: Arc, + sync: Arc<::light_sync::SyncInfo + Sync + Send>, network: NetworkService, subprotocol_name: [u8; 3], + network_id: u64, } impl LightSync { @@ -676,7 +683,7 @@ impl LightSync { use light_sync::LightSync as SyncHandler; // initialize light protocol handler and attach sync module. - let light_proto = { + let (sync, light_proto) = { let light_params = LightParams { network_id: params.network_id, flow_params: Default::default(), // or `None`? @@ -689,18 +696,24 @@ impl LightSync { }; let mut light_proto = LightProtocol::new(params.client.clone(), light_params); - let sync_handler = try!(SyncHandler::new(params.client.clone())); - light_proto.add_handler(Arc::new(sync_handler)); + let sync_handler = Arc::new(try!(SyncHandler::new(params.client.clone()))); + light_proto.add_handler(sync_handler.clone()); - Arc::new(light_proto) + for handler in params.handlers { + light_proto.add_handler(handler); + } + + (sync_handler, Arc::new(light_proto)) }; let service = try!(NetworkService::new(params.network_config)); Ok(LightSync { proto: light_proto, + sync: sync, network: service, subprotocol_name: params.subprotocol_name, + network_id: params.network_id, }) } @@ -715,6 +728,12 @@ impl LightSync { } } +impl ::std::ops::Deref for LightSync { + type Target = ::light_sync::SyncInfo; + + fn deref(&self) -> &Self::Target { &*self.sync } +} + impl ManageNetwork for LightSync { fn accept_unreserved_peers(&self) { self.network.set_non_reserved_mode(NonReservedPeerMode::Accept); @@ -786,7 +805,7 @@ impl LightSyncProvider for LightSync { remote_address: session_info.remote_address, local_address: session_info.local_address, eth_info: None, - les_info: self.proto.peer_status(&peer_id).map(Into::into), + pip_info: self.proto.peer_status(&peer_id).map(Into::into), }) }).collect() }).unwrap_or_else(Vec::new) @@ -796,6 +815,10 @@ impl LightSyncProvider for LightSync { self.network.external_url() } + fn network_id(&self) -> u64 { + self.network_id + } + fn transactions_stats(&self) -> BTreeMap { Default::default() // TODO } diff --git a/sync/src/light_sync/mod.rs b/sync/src/light_sync/mod.rs index 4590103e7..2bc179a21 100644 --- a/sync/src/light_sync/mod.rs +++ b/sync/src/light_sync/mod.rs @@ -32,7 +32,7 @@ //! announced blocks. //! - On bad block/response, punish peer and reset. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::mem; use std::sync::Arc; @@ -150,6 +150,19 @@ impl AncestorSearch { } } + fn requests_abandoned(self, req_ids: &[ReqId]) -> AncestorSearch { + match self { + AncestorSearch::Awaiting(id, start, req) => { + if req_ids.iter().find(|&x| x == &id).is_some() { + AncestorSearch::Queued(start) + } else { + AncestorSearch::Awaiting(id, start, req) + } + } + other => other, + } + } + fn dispatch_request(self, mut dispatcher: F) -> AncestorSearch where F: FnMut(HeadersRequest) -> Option { @@ -206,8 +219,10 @@ impl<'a> ResponseContext for ResponseCtx<'a> { /// Light client synchronization manager. See module docs for more details. pub struct LightSync { + start_block_number: u64, best_seen: Mutex>, // best seen block on the network. peers: RwLock>>, // peers which are relevant to synchronization. + pending_reqs: Mutex>, // requests from this handler. client: Arc, rng: Mutex, state: Mutex, @@ -270,7 +285,8 @@ impl Handler for LightSync { *state = match mem::replace(&mut *state, SyncState::Idle) { SyncState::Idle => SyncState::Idle, - SyncState::AncestorSearch(search) => SyncState::AncestorSearch(search), + SyncState::AncestorSearch(search) => + SyncState::AncestorSearch(search.requests_abandoned(unfulfilled)), SyncState::Rounds(round) => SyncState::Rounds(round.requests_abandoned(unfulfilled)), }; } @@ -320,6 +336,10 @@ impl Handler for LightSync { return } + if !self.pending_reqs.lock().remove(&req_id) { + return + } + let headers = match responses.get(0) { Some(&request::Response::Headers(ref response)) => &response.headers[..], Some(_) => { @@ -418,8 +438,10 @@ impl LightSync { let best_td = chain_info.pending_total_difficulty; let sync_target = match *self.best_seen.lock() { Some(ref target) if target.head_td > best_td => (target.head_num, target.head_hash), - _ => { - trace!(target: "sync", "No target to sync to."); + ref other => { + let network_score = other.as_ref().map(|target| target.head_td); + trace!(target: "sync", "No target to sync to. Network score: {:?}, Local score: {:?}", + network_score, best_td); *state = SyncState::Idle; return; } @@ -493,6 +515,7 @@ impl LightSync { for peer in &peer_ids { match ctx.request_from(*peer, request.clone()) { Ok(id) => { + self.pending_reqs.lock().insert(id.clone()); return Some(id) } Err(NetError::NoCredits) => {} @@ -523,11 +546,48 @@ impl LightSync { /// so it can act on events. pub fn new(client: Arc) -> Result { Ok(LightSync { + start_block_number: client.as_light_client().chain_info().best_block_number, best_seen: Mutex::new(None), peers: RwLock::new(HashMap::new()), + pending_reqs: Mutex::new(HashSet::new()), client: client, rng: Mutex::new(try!(OsRng::new())), state: Mutex::new(SyncState::Idle), }) } } + +/// Trait for erasing the type of a light sync object and exposing read-only methods. +pub trait SyncInfo { + /// Get the highest block advertised on the network. + fn highest_block(&self) -> Option; + + /// Get the block number at the time of sync start. + fn start_block(&self) -> u64; + + /// Whether major sync is underway. + fn is_major_importing(&self) -> bool; +} + +impl SyncInfo for LightSync { + fn highest_block(&self) -> Option { + self.best_seen.lock().as_ref().map(|x| x.head_num) + } + + fn start_block(&self) -> u64 { + self.start_block_number + } + + fn is_major_importing(&self) -> bool { + const EMPTY_QUEUE: usize = 3; + + if self.client.as_light_client().queue_info().unverified_queue_size > EMPTY_QUEUE { + return true; + } + + match *self.state.lock() { + SyncState::Idle => false, + _ => true, + } + } +} diff --git a/sync/src/light_sync/tests/test_net.rs b/sync/src/light_sync/tests/test_net.rs index 898f8766d..2319e8d35 100644 --- a/sync/src/light_sync/tests/test_net.rs +++ b/sync/src/light_sync/tests/test_net.rs @@ -207,7 +207,7 @@ impl TestNet { pub fn light(n_light: usize, n_full: usize) -> Self { let mut peers = Vec::with_capacity(n_light + n_full); for _ in 0..n_light { - let client = LightClient::new(Default::default(), &Spec::new_test(), IoChannel::disconnected()); + let client = LightClient::in_memory(Default::default(), &Spec::new_test(), IoChannel::disconnected()); peers.push(Arc::new(Peer::new_light(Arc::new(client)))) }