From 1743e480e314122ceaf462b6a20b9ab420929888 Mon Sep 17 00:00:00 2001 From: debris Date: Sat, 5 Mar 2016 11:35:44 +0100 Subject: [PATCH 01/22] rust_stable --- .travis.yml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 8d2349dae..7213b8f09 100644 --- a/.travis.yml +++ b/.travis.yml @@ -13,6 +13,8 @@ matrix: allow_failures: - rust: nightly include: + - rust: stable + env: FEATURES="--features travis-beta" KCOV_FEATURES="" TARGETS="-p ethash -p ethcore-util -p ethcore -p ethsync -p ethcore-rpc -p parity" ARCHIVE_SUFFIX="-${TRAVIS_OS_NAME}-${TRAVIS_TAG}" - rust: beta env: FEATURES="--features travis-beta" KCOV_FEATURES="" TARGETS="-p ethash -p ethcore-util -p ethcore -p ethsync -p ethcore-rpc -p parity" ARCHIVE_SUFFIX="-${TRAVIS_OS_NAME}-${TRAVIS_TAG}" - rust: nightly @@ -52,7 +54,7 @@ after_success: | ./kcov-master/tmp/usr/local/bin/kcov --coveralls-id=${TRAVIS_JOB_ID} --exclude-pattern /usr/,/.cargo,/root/.multirust target/kcov target/debug/parity-* && [ $TRAVIS_BRANCH = master ] && [ $TRAVIS_PULL_REQUEST = false ] && - [ $TRAVIS_RUST_VERSION = beta ] && + [ $TRAVIS_RUST_VERSION = stable ] && cargo doc --no-deps --verbose ${KCOV_FEATURES} ${TARGETS} && echo '' > target/doc/index.html && pip install --user ghp-import && From 003d1fd0cc1c1f815c8f0b772baefd374dab67aa Mon Sep 17 00:00:00 2001 From: arkpar Date: Sat, 5 Mar 2016 23:09:51 +0100 Subject: [PATCH 02/22] Network tracing improvements --- sync/src/chain.rs | 2 +- util/src/network/connection.rs | 10 ++++---- util/src/network/handshake.rs | 14 +++++------ util/src/network/host.rs | 43 +++++++++++++++++++--------------- util/src/network/session.rs | 9 ++++--- 5 files changed, 43 insertions(+), 35 deletions(-) diff --git a/sync/src/chain.rs b/sync/src/chain.rs index 530cfa424..63640f87f 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -575,7 +575,7 @@ impl ChainSync { pub fn on_peer_connected(&mut self, io: &mut SyncIo, peer: PeerId) { trace!(target: "sync", "== Connected {}", peer); if let Err(e) = self.send_status(io) { - warn!(target:"sync", "Error sending status request: {:?}", e); + trace!(target:"sync", "Error sending status request: {:?}", e); io.disable_peer(peer); } } diff --git a/util/src/network/connection.rs b/util/src/network/connection.rs index 55e688c91..fe65be6d1 100644 --- a/util/src/network/connection.rs +++ b/util/src/network/connection.rs @@ -190,25 +190,25 @@ impl Connection { /// Register this connection with the IO event loop. pub fn register_socket(&self, reg: Token, event_loop: &mut EventLoop) -> io::Result<()> { - trace!(target: "net", "connection register; token={:?}", reg); + trace!(target: "network", "connection register; token={:?}", reg); if let Err(e) = event_loop.register(&self.socket, reg, self.interest, PollOpt::edge() | PollOpt::oneshot()) { - debug!("Failed to register {:?}, {:?}", reg, e); + trace!(target: "network", "Failed to register {:?}, {:?}", reg, e); } Ok(()) } /// Update connection registration. Should be called at the end of the IO handler. pub fn update_socket(&self, reg: Token, event_loop: &mut EventLoop) -> io::Result<()> { - trace!(target: "net", "connection reregister; token={:?}", reg); + trace!(target: "network", "connection reregister; token={:?}", reg); event_loop.reregister( &self.socket, reg, self.interest, PollOpt::edge() | PollOpt::oneshot()).or_else(|e| { - debug!("Failed to reregister {:?}, {:?}", reg, e); + trace!(target: "network", "Failed to reregister {:?}, {:?}", reg, e); Ok(()) }) } /// Delete connection registration. Should be called at the end of the IO handler. pub fn deregister_socket(&self, event_loop: &mut EventLoop) -> io::Result<()> { - trace!(target: "net", "connection deregister; token={:?}", self.token); + trace!(target: "network", "connection deregister; token={:?}", self.token); event_loop.deregister(&self.socket).ok(); // ignore errors here Ok(()) } diff --git a/util/src/network/handshake.rs b/util/src/network/handshake.rs index cca133ba9..a72cc28ad 100644 --- a/util/src/network/handshake.rs +++ b/util/src/network/handshake.rs @@ -222,7 +222,7 @@ impl Handshake { /// Parse, validate and confirm auth message fn read_auth(&mut self, secret: &Secret, data: &[u8]) -> Result<(), UtilError> { - trace!(target:"net", "Received handshake auth from {:?}", self.connection.socket.peer_addr()); + trace!(target:"network", "Received handshake auth from {:?}", self.connection.socket.peer_addr()); if data.len() != V4_AUTH_PACKET_SIZE { debug!(target:"net", "Wrong auth packet size"); return Err(From::from(NetworkError::BadProtocol)); @@ -253,7 +253,7 @@ impl Handshake { } fn read_auth_eip8(&mut self, secret: &Secret, data: &[u8]) -> Result<(), UtilError> { - trace!(target:"net", "Received EIP8 handshake auth from {:?}", self.connection.socket.peer_addr()); + trace!(target:"network", "Received EIP8 handshake auth from {:?}", self.connection.socket.peer_addr()); self.auth_cipher.extend_from_slice(data); let auth = try!(ecies::decrypt(secret, &self.auth_cipher[0..2], &self.auth_cipher[2..])); let rlp = UntrustedRlp::new(&auth); @@ -268,7 +268,7 @@ impl Handshake { /// Parse and validate ack message fn read_ack(&mut self, secret: &Secret, data: &[u8]) -> Result<(), UtilError> { - trace!(target:"net", "Received handshake auth to {:?}", self.connection.socket.peer_addr()); + trace!(target:"network", "Received handshake auth to {:?}", self.connection.socket.peer_addr()); if data.len() != V4_ACK_PACKET_SIZE { debug!(target:"net", "Wrong ack packet size"); return Err(From::from(NetworkError::BadProtocol)); @@ -296,7 +296,7 @@ impl Handshake { } fn read_ack_eip8(&mut self, secret: &Secret, data: &[u8]) -> Result<(), UtilError> { - trace!(target:"net", "Received EIP8 handshake auth from {:?}", self.connection.socket.peer_addr()); + trace!(target:"network", "Received EIP8 handshake auth from {:?}", self.connection.socket.peer_addr()); self.ack_cipher.extend_from_slice(data); let ack = try!(ecies::decrypt(secret, &self.ack_cipher[0..2], &self.ack_cipher[2..])); let rlp = UntrustedRlp::new(&ack); @@ -309,7 +309,7 @@ impl Handshake { /// Sends auth message fn write_auth(&mut self, secret: &Secret, public: &Public) -> Result<(), UtilError> { - trace!(target:"net", "Sending handshake auth to {:?}", self.connection.socket.peer_addr()); + trace!(target:"network", "Sending handshake auth to {:?}", self.connection.socket.peer_addr()); let mut data = [0u8; /*Signature::SIZE*/ 65 + /*H256::SIZE*/ 32 + /*Public::SIZE*/ 64 + /*H256::SIZE*/ 32 + 1]; //TODO: use associated constants let len = data.len(); { @@ -336,7 +336,7 @@ impl Handshake { /// Sends ack message fn write_ack(&mut self) -> Result<(), UtilError> { - trace!(target:"net", "Sending handshake ack to {:?}", self.connection.socket.peer_addr()); + trace!(target:"network", "Sending handshake ack to {:?}", self.connection.socket.peer_addr()); let mut data = [0u8; 1 + /*Public::SIZE*/ 64 + /*H256::SIZE*/ 32]; //TODO: use associated constants let len = data.len(); { @@ -355,7 +355,7 @@ impl Handshake { /// Sends EIP8 ack message fn write_ack_eip8(&mut self) -> Result<(), UtilError> { - trace!(target:"net", "Sending EIP8 handshake ack to {:?}", self.connection.socket.peer_addr()); + trace!(target:"network", "Sending EIP8 handshake ack to {:?}", self.connection.socket.peer_addr()); let mut rlp = RlpStream::new_list(3); rlp.append(self.ecdhe.public()); rlp.append(&self.nonce); diff --git a/util/src/network/host.rs b/util/src/network/host.rs index f2cc9fe48..ece24a1d1 100644 --- a/util/src/network/host.rs +++ b/util/src/network/host.rs @@ -170,29 +170,37 @@ pub struct NetworkContext<'s, Message> where Message: Send + Sync + Clone + 'sta io: &'s IoContext>, protocol: ProtocolId, sessions: Arc>>, - session: Option, + session: Option, + session_id: Option, } impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone + 'static, { /// Create a new network IO access point. Takes references to all the data that can be updated within the IO handler. fn new(io: &'s IoContext>, protocol: ProtocolId, - session: Option, sessions: Arc>>) -> NetworkContext<'s, Message> { + session: Option, sessions: Arc>>) -> NetworkContext<'s, Message> { + let id = session.as_ref().map(|s| s.lock().unwrap().token()); NetworkContext { io: io, protocol: protocol, + session_id: id, session: session, sessions: sessions, } } + fn resolve_session(&self, peer: PeerId) -> Option { + match self.session_id { + Some(id) if id == peer => self.session.clone(), + _ => self.sessions.read().unwrap().get(peer).cloned(), + } + } + /// Send a packet over the network to another peer. pub fn send(&self, peer: PeerId, packet_id: PacketId, data: Vec) -> Result<(), UtilError> { - let session = { self.sessions.read().unwrap().get(peer).cloned() }; + let session = self.resolve_session(peer); if let Some(session) = session { - session.lock().unwrap().deref_mut().send_packet(self.protocol, packet_id as u8, &data).unwrap_or_else(|e| { - warn!(target: "network", "Send error: {:?}", e); - }); //TODO: don't copy vector data + try!(session.lock().unwrap().deref_mut().send_packet(self.protocol, packet_id as u8, &data)); try!(self.io.update_registration(peer)); } else { trace!(target: "network", "Send: Peer no longer exist") @@ -200,14 +208,10 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone Ok(()) } - /// Respond to a current network message. Panics if no there is no packet in the context. + /// Respond to a current network message. Panics if no there is no packet in the context. If the session is expired returns nothing. pub fn respond(&self, packet_id: PacketId, data: Vec) -> Result<(), UtilError> { - match self.session { - Some(session) => self.send(session, packet_id, data), - None => { - panic!("Respond: Session does not exist") - } - } + assert!(self.session.is_some(), "Respond called without network context"); + self.send(self.session_id.unwrap(), packet_id, data) } /// Send an IO message @@ -215,7 +219,6 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone self.io.message(NetworkIoMessage::User(msg)); } - /// Disable current protocol capability for given peer. If no capabilities left peer gets disconnected. pub fn disable_peer(&self, peer: PeerId) { //TODO: remove capability, disconnect if no capabilities left @@ -239,7 +242,7 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone /// Returns peer identification string pub fn peer_info(&self, peer: PeerId) -> String { - let session = { self.sessions.read().unwrap().get(peer).cloned() }; + let session = self.resolve_session(peer); if let Some(session) = session { return session.lock().unwrap().info.client_version.clone() } @@ -624,7 +627,7 @@ impl Host where Message: Send + Sync + Clone { let mut packet_data: Option<(ProtocolId, PacketId, Vec)> = None; let mut kill = false; let session = { self.sessions.read().unwrap().get(token).cloned() }; - if let Some(session) = session { + if let Some(session) = session.clone() { let mut s = session.lock().unwrap(); match s.readable(io, &self.info.read().unwrap()) { Err(e) => { @@ -656,11 +659,11 @@ impl Host where Message: Send + Sync + Clone { } for p in ready_data { let h = self.handlers.read().unwrap().get(p).unwrap().clone(); - h.connected(&NetworkContext::new(io, p, Some(token), self.sessions.clone()), &token); + h.connected(&NetworkContext::new(io, p, session.clone(), self.sessions.clone()), &token); } if let Some((p, packet_id, data)) = packet_data { let h = self.handlers.read().unwrap().get(p).unwrap().clone(); - h.read(&NetworkContext::new(io, p, Some(token), self.sessions.clone()), &token, packet_id, &data[1..]); + h.read(&NetworkContext::new(io, p, session.clone(), self.sessions.clone()), &token, packet_id, &data[1..]); } io.update_registration(token).unwrap_or_else(|e| debug!(target: "network", "Token registration error: {:?}", e)); } @@ -718,6 +721,7 @@ impl Host where Message: Send + Sync + Clone { let mut to_disconnect: Vec = Vec::new(); let mut failure_id = None; let mut deregister = false; + let mut expired_session = None; match token { FIRST_HANDSHAKE ... LAST_HANDSHAKE => { let handshakes = self.handshakes.write().unwrap(); @@ -733,6 +737,7 @@ impl Host where Message: Send + Sync + Clone { FIRST_SESSION ... LAST_SESSION => { let sessions = self.sessions.write().unwrap(); if let Some(session) = sessions.get(token).cloned() { + expired_session = Some(session.clone()); let mut s = session.lock().unwrap(); if !s.expired() { if s.is_ready() { @@ -757,7 +762,7 @@ impl Host where Message: Send + Sync + Clone { } for p in to_disconnect { let h = self.handlers.read().unwrap().get(p).unwrap().clone(); - h.disconnected(&NetworkContext::new(io, p, Some(token), self.sessions.clone()), &token); + h.disconnected(&NetworkContext::new(io, p, expired_session.clone(), self.sessions.clone()), &token); } if deregister { io.deregister_stream(token).expect("Error deregistering stream"); diff --git a/util/src/network/session.rs b/util/src/network/session.rs index edf929a9a..84c063c92 100644 --- a/util/src/network/session.rs +++ b/util/src/network/session.rs @@ -213,6 +213,9 @@ impl Session { /// Send a protocol packet to peer. pub fn send_packet(&mut self, protocol: &str, packet_id: u8, data: &[u8]) -> Result<(), UtilError> { + if self.expired() { + return Err(From::from(NetworkError::Expired)); + } let mut i = 0usize; while protocol != self.info.capabilities[i].protocol { i += 1; @@ -351,15 +354,15 @@ impl Session { offset += caps[i].packet_count; i += 1; } - trace!(target: "net", "Hello: {} v{} {} {:?}", client_version, protocol, id, caps); + trace!(target: "network", "Hello: {} v{} {} {:?}", client_version, protocol, id, caps); self.info.client_version = client_version; self.info.capabilities = caps; if self.info.capabilities.is_empty() { - trace!("No common capabilities with peer."); + trace!(target: "network", "No common capabilities with peer."); return Err(From::from(self.disconnect(DisconnectReason::UselessPeer))); } if protocol != host.protocol_version { - trace!("Peer protocol version mismatch: {}", protocol); + trace!(target: "network", "Peer protocol version mismatch: {}", protocol); return Err(From::from(self.disconnect(DisconnectReason::UselessPeer))); } self.had_hello = true; From 51c95d4d67643f03fc0fd11cf241308acb01eb5a Mon Sep 17 00:00:00 2001 From: Gav Wood Date: Sun, 6 Mar 2016 21:57:55 +0100 Subject: [PATCH 03/22] Implement option 1. --- util/src/journaldb.rs | 393 +++++++++++++++++++++--------------------- 1 file changed, 194 insertions(+), 199 deletions(-) diff --git a/util/src/journaldb.rs b/util/src/journaldb.rs index 01e53f819..5f94dcbeb 100644 --- a/util/src/journaldb.rs +++ b/util/src/journaldb.rs @@ -20,15 +20,12 @@ use common::*; use rlp::*; use hashdb::*; use memorydb::*; -use kvdb::{Database, DBTransaction, DatabaseConfig}; +use rocksdb::{DB, Writable, WriteBatch, IteratorMode}; #[cfg(test)] use std::env; /// Implementation of the HashDB trait for a disk-backed database with a memory overlay -/// and, possibly, latent-removal semantics. -/// -/// If `counters` is `None`, then it behaves exactly like OverlayDB. If not it behaves -/// differently: +/// and latent-removal semantics. /// /// Like OverlayDB, there is a memory overlay; `commit()` must be called in order to /// write operations out to disk. Unlike OverlayDB, `remove()` operations do not take effect @@ -36,8 +33,8 @@ use std::env; /// the removals actually take effect. pub struct JournalDB { overlay: MemoryDB, - backing: Arc, - counters: Option>>>, + backing: Arc, + counters: Arc>>, } impl Clone for JournalDB { @@ -50,51 +47,33 @@ impl Clone for JournalDB { } } -// all keys must be at least 12 bytes -const LATEST_ERA_KEY : [u8; 12] = [ b'l', b'a', b's', b't', 0, 0, 0, 0, 0, 0, 0, 0 ]; -const VERSION_KEY : [u8; 12] = [ b'j', b'v', b'e', b'r', 0, 0, 0, 0, 0, 0, 0, 0 ]; +const LAST_ERA_KEY : [u8; 4] = [ b'l', b'a', b's', b't' ]; +const VERSION_KEY : [u8; 4] = [ b'j', b'v', b'e', b'r' ]; -const DB_VERSION : u32 = 3; -const DB_VERSION_NO_JOURNAL : u32 = 3 + 256; - -const PADDING : [u8; 10] = [ 0u8; 10 ]; +const DB_VERSION: u32 = 1; impl JournalDB { - - /// Create a new instance from file - pub fn new(path: &str) -> JournalDB { - Self::from_prefs(path, true) + /// Create a new instance given a `backing` database. + pub fn new(backing: DB) -> JournalDB { + let db = Arc::new(backing); + JournalDB::new_with_arc(db) } - /// Create a new instance from file - pub fn from_prefs(path: &str, prefer_journal: bool) -> JournalDB { - let opts = DatabaseConfig { - prefix_size: Some(12) //use 12 bytes as prefix, this must match account_db prefix - }; - let backing = Database::open(&opts, path).unwrap_or_else(|e| { - panic!("Error opening state db: {}", e); - }); - let with_journal; - if !backing.is_empty() { + /// Create a new instance given a shared `backing` database. + pub fn new_with_arc(backing: Arc) -> JournalDB { + if backing.iterator(IteratorMode::Start).next().is_some() { match backing.get(&VERSION_KEY).map(|d| d.map(|v| decode::(&v))) { - Ok(Some(DB_VERSION)) => { with_journal = true; }, - Ok(Some(DB_VERSION_NO_JOURNAL)) => { with_journal = false; }, + Ok(Some(DB_VERSION)) => {}, v => panic!("Incompatible DB version, expected {}, got {:?}", DB_VERSION, v) } } else { - backing.put(&VERSION_KEY, &encode(&(if prefer_journal { DB_VERSION } else { DB_VERSION_NO_JOURNAL }))).expect("Error writing version to database"); - with_journal = prefer_journal; + backing.put(&VERSION_KEY, &encode(&DB_VERSION)).expect("Error writing version to database"); } - - let counters = if with_journal { - Some(Arc::new(RwLock::new(JournalDB::read_counters(&backing)))) - } else { - None - }; + let counters = JournalDB::read_counters(&backing); JournalDB { overlay: MemoryDB::new(), - backing: Arc::new(backing), - counters: counters, + backing: backing, + counters: Arc::new(RwLock::new(counters)), } } @@ -103,55 +82,93 @@ impl JournalDB { pub fn new_temp() -> JournalDB { let mut dir = env::temp_dir(); dir.push(H32::random().hex()); - Self::new(dir.to_str().unwrap()) + Self::new(DB::open_default(dir.to_str().unwrap()).unwrap()) } /// Check if this database has any commits pub fn is_empty(&self) -> bool { - self.backing.get(&LATEST_ERA_KEY).expect("Low level database error").is_none() + self.backing.get(&LAST_ERA_KEY).expect("Low level database error").is_none() } - /// Commit all recent insert operations. - pub fn commit(&mut self, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result { - let have_counters = self.counters.is_some(); - if have_counters { - self.commit_with_counters(now, id, end) - } else { - self.commit_without_counters() + fn morph_key(key: &H256, index: u8) -> Bytes { + let mut ret = key.bytes().to_owned(); + ret.push(index); + ret + } + + // The next three are valid only as long as there is an insert operation of `key` in the journal. + fn set_already_in(batch: &WriteBatch, key: &H256) { batch.put(&Self::morph_key(key, 0), &[1u8]); } + fn reset_already_in(batch: &WriteBatch, key: &H256) { batch.delete(&Self::morph_key(key, 0)); } + fn is_already_in(backing: &DB, key: &H256) -> bool { + backing.get(&Self::morph_key(key, 0)).expect("Low-level database error. Some issue with your hard disk?").is_some() + } + + fn insert_keys(inserts: &Vec<(H256, Bytes)>, backing: &DB, counters: &mut HashMap, batch: &WriteBatch) { + for &(ref h, ref d) in inserts { + if let Some(c) = counters.get_mut(h) { + // already counting. increment. + *c += 1; + continue; + } + + // this is the first entry for this node in the journal. + if backing.get(&h.bytes()).expect("Low-level database error. Some issue with your hard disk?").is_some() { + // already in the backing DB. start counting, and remember it was already in. + Self::set_already_in(batch, &h); + counters.insert(h.clone(), 1); + continue; + } + + // Gets removed when a key leaves the journal, so should never be set when we're placing a new key. + //Self::reset_already_in(&h); + assert!(!Self::is_already_in(backing, &h)); + batch.put(&h.bytes(), d); } } - /// Drain the overlay and place it into a batch for the DB. - fn batch_overlay_insertions(overlay: &mut MemoryDB, batch: &DBTransaction) -> usize { - let mut inserts = 0usize; - let mut deletes = 0usize; - for i in overlay.drain().into_iter() { - let (key, (value, rc)) = i; - if rc > 0 { - assert!(rc == 1); - batch.put(&key.bytes(), &value).expect("Low-level database error. Some issue with your hard disk?"); - inserts += 1; - } - if rc < 0 { - assert!(rc == -1); - deletes += 1; + fn replay_keys(inserts: &Vec, backing: &DB, counters: &mut HashMap) { + for h in inserts { + if let Some(c) = counters.get_mut(h) { + // already counting. increment. + *c += 1; + continue; } + + // this is the first entry for this node in the journal. + // it is initialised to 1 if it was already in. + counters.insert(h.clone(), if Self::is_already_in(backing, h) {1} else {0}); } - trace!("commit: Inserted {}, Deleted {} nodes", inserts, deletes); - inserts + deletes } - /// Just commit the overlay into the backing DB. - fn commit_without_counters(&mut self) -> Result { - let batch = DBTransaction::new(); - let ret = Self::batch_overlay_insertions(&mut self.overlay, &batch); - try!(self.backing.write(batch)); - Ok(ret as u32) + fn kill_keys(deletes: Vec, counters: &mut HashMap, batch: &WriteBatch) { + for h in deletes.into_iter() { + let mut n: Option = None; + if let Some(c) = counters.get_mut(&h) { + if *c > 1 { + *c -= 1; + continue; + } else { + n = Some(*c); + } + } + match &n { + &Some(i) if i == 1 => { + counters.remove(&h); + Self::reset_already_in(batch, &h); + } + &None => { + // Gets removed when moving from 1 to 0 additional refs. Should never be here at 0 additional refs. + //assert!(!Self::is_already_in(db, &h)); + batch.delete(&h.bytes()); + } + _ => panic!("Invalid value in counters: {:?}", n), + } + } } /// Commit all recent insert operations and historical removals from the old era /// to the backing database. - fn commit_with_counters(&mut self, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result { + pub fn commit(&mut self, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result { // journal format: // [era, 0] => [ id, [insert_0, ...], [remove_0, ...] ] // [era, 1] => [ id, [insert_0, ...], [remove_0, ...] ] @@ -159,62 +176,84 @@ impl JournalDB { // TODO: store reclaim_period. - // when we make a new commit, we journal the inserts and removes. - // for each end_era that we journaled that we are no passing by, - // we remove all of its removes assuming it is canonical and all - // of its inserts otherwise. + // When we make a new commit, we make a journal of all blocks in the recent history and record + // all keys that were inserted and deleted. The journal is ordered by era; multiple commits can + // share the same era. This forms a data structure similar to a queue but whose items are tuples. + // By the time comes to remove a tuple from the queue (i.e. then the era passes from recent history + // into ancient history) then only one commit from the tuple is considered canonical. This commit + // is kept in the main backing database, whereas any others from the same era are reverted. + // + // It is possible that a key, properly available in the backing database be deleted and re-inserted + // in the recent history queue, yet have both operations in commits that are eventually non-canonical. + // To avoid the original, and still required, key from being deleted, we maintain a reference count + // which includes an original key, if any. + // + // The semantics of the `counter` are: + // insert key k: + // counter already contains k: count += 1 + // counter doesn't contain k: + // backing db contains k: count = 1 + // backing db doesn't contain k: insert into backing db, count = 0 + // delete key k: + // counter contains k (count is asserted to be non-zero): + // count > 1: counter -= 1 + // count == 1: remove counter + // count == 0: remove key from backing db + // counter doesn't contain k: remove key from backing db // - // We also keep reference counters for each key inserted in the journal to handle - // the following cases where key K must not be deleted from the DB when processing removals : - // Given H is the journal size in eras, 0 <= C <= H. - // Key K is removed in era A(N) and re-inserted in canonical era B(N + C). - // Key K is removed in era A(N) and re-inserted in non-canonical era B`(N + C). - // Key K is added in non-canonical era A'(N) canonical B(N + C). + // Practically, this means that for each commit block turning from recent to ancient we do the + // following: + // is_canonical: + // inserts: Ignored (left alone in the backing database). + // deletes: Enacted; however, recent history queue is checked for ongoing references. This is + // reduced as a preference to deletion from the backing database. + // !is_canonical: + // inserts: Reverted; however, recent history queue is checked for ongoing references. This is + // reduced as a preference to deletion from the backing database. + // deletes: Ignored (they were never inserted). // - // The counter is encreased each time a key is inserted in the journal in the commit. The list of insertions - // is saved with the era record. When the era becomes end_era and goes out of journal the counter is decreased - // and the key is safe to delete. // record new commit's details. - trace!("commit: #{} ({}), end era: {:?}", now, id, end); - let mut counters = self.counters.as_ref().unwrap().write().unwrap(); - let batch = DBTransaction::new(); + let batch = WriteBatch::new(); + let mut counters = self.counters.write().unwrap(); { let mut index = 0usize; let mut last; - while { - let record = try!(self.backing.get({ - let mut r = RlpStream::new_list(3); - r.append(&now); - r.append(&index); - r.append(&&PADDING[..]); - last = r.drain(); - &last - })); - match record { - Some(r) => { - assert!(&Rlp::new(&r).val_at::(0) != id); - true - }, - None => false, - } - } { + while try!(self.backing.get({ + let mut r = RlpStream::new_list(2); + r.append(&now); + r.append(&index); + last = r.drain(); + &last + })).is_some() { index += 1; } + let drained = self.overlay.drain(); + let removes: Vec = drained + .iter() + .filter_map(|(ref k, &(_, ref c))| if *c < 0 {Some(k.clone())} else {None}).cloned() + .collect(); + let inserts: Vec<(H256, Bytes)> = drained + .into_iter() + .filter_map(|(k, (v, r))| if r > 0 { assert!(r == 1); Some((k, v)) } else { assert!(r >= -1); None }) + .collect(); + let mut r = RlpStream::new_list(3); - let inserts: Vec = self.overlay.keys().iter().filter(|&(_, &c)| c > 0).map(|(key, _)| key.clone()).collect(); - // Increase counter for each inserted key no matter if the block is canonical or not. - for i in &inserts { - *counters.entry(i.clone()).or_insert(0) += 1; - } - let removes: Vec = self.overlay.keys().iter().filter(|&(_, &c)| c < 0).map(|(key, _)| key.clone()).collect(); r.append(id); - r.append(&inserts); + + // Process the new inserts. + // We use the inserts for three things. For each: + // - we place into the backing DB or increment the counter if already in; + // - we note in the backing db that it was already in; + // - we write the key into our journal for this block; + + r.begin_list(inserts.len()); + inserts.iter().foreach(|&(k, _)| {r.append(&k);}); r.append(&removes); + Self::insert_keys(&inserts, &self.backing, &mut counters, &batch); try!(batch.put(&last, r.as_raw())); - try!(batch.put(&LATEST_ERA_KEY, &encode(&now))); } // apply old commits' details @@ -222,105 +261,66 @@ impl JournalDB { let mut index = 0usize; let mut last; let mut to_remove: Vec = Vec::new(); - let mut canon_inserts: Vec = Vec::new(); while let Some(rlp_data) = try!(self.backing.get({ - let mut r = RlpStream::new_list(3); + let mut r = RlpStream::new_list(2); r.append(&end_era); r.append(&index); - r.append(&&PADDING[..]); last = r.drain(); &last })) { let rlp = Rlp::new(&rlp_data); - let mut inserts: Vec = rlp.val_at(1); - JournalDB::decrease_counters(&inserts, &mut counters); + let inserts: Vec = rlp.val_at(1); + let deletes: Vec = rlp.val_at(2); // Collect keys to be removed. These are removed keys for canonical block, inserted for non-canonical - if canon_id == rlp.val_at(0) { - let mut canon_deletes: Vec = rlp.val_at(2); - trace!("Purging nodes deleted from canon: {:?}", canon_deletes); - to_remove.append(&mut canon_deletes); - canon_inserts = inserts; - } - else { - trace!("Purging nodes inserted in non-canon: {:?}", inserts); - to_remove.append(&mut inserts); - } - trace!("commit: Delete journal for time #{}.{}: {}, (canon was {}): {} entries", end_era, index, rlp.val_at::(0), canon_id, to_remove.len()); + Self::kill_keys(if canon_id == rlp.val_at(0) {deletes} else {inserts}, &mut counters, &batch); try!(batch.delete(&last)); index += 1; } - - let canon_inserts = canon_inserts.drain(..).collect::>(); - // Purge removed keys if they are not referenced and not re-inserted in the canon commit - let mut deletes = 0; - trace!("Purging filtered nodes: {:?}", to_remove.iter().filter(|h| !counters.contains_key(h) && !canon_inserts.contains(h)).collect::>()); - for h in to_remove.iter().filter(|h| !counters.contains_key(h) && !canon_inserts.contains(h)) { - try!(batch.delete(&h)); - deletes += 1; - } - trace!("Total nodes purged: {}", deletes); + try!(batch.put(&LAST_ERA_KEY, &encode(&end_era))); + trace!("JournalDB: delete journal for time #{}.{}, (canon was {})", end_era, index, canon_id); } - // Commit overlay insertions - let ret = Self::batch_overlay_insertions(&mut self.overlay, &batch); try!(self.backing.write(batch)); - Ok(ret as u32) - } - - - // Decrease counters for given keys. Deletes obsolete counters - fn decrease_counters(keys: &[H256], counters: &mut HashMap) { - for i in keys.iter() { - let delete_counter = { - let cnt = counters.get_mut(i).expect("Missing key counter"); - *cnt -= 1; - *cnt == 0 - }; - if delete_counter { - counters.remove(i); - } - } +// trace!("JournalDB::commit() deleted {} nodes", deletes); + Ok(0) } fn payload(&self, key: &H256) -> Option { self.backing.get(&key.bytes()).expect("Low-level database error. Some issue with your hard disk?").map(|v| v.to_vec()) } - fn read_counters(db: &Database) -> HashMap { - let mut res = HashMap::new(); - if let Some(val) = db.get(&LATEST_ERA_KEY).expect("Low-level database error.") { - let mut era = decode::(&val); + fn read_counters(db: &DB) -> HashMap { + let mut counters = HashMap::new(); + if let Some(val) = db.get(&LAST_ERA_KEY).expect("Low-level database error.") { + let mut era = decode::(&val) + 1; loop { let mut index = 0usize; while let Some(rlp_data) = db.get({ - let mut r = RlpStream::new_list(3); + let mut r = RlpStream::new_list(2); r.append(&era); r.append(&index); - r.append(&&PADDING[..]); &r.drain() }).expect("Low-level database error.") { let rlp = Rlp::new(&rlp_data); - let to_add: Vec = rlp.val_at(1); - for h in to_add { - *res.entry(h).or_insert(0) += 1; - } + let inserts: Vec = rlp.val_at(1); + Self::replay_keys(&inserts, db, &mut counters); index += 1; }; - if index == 0 || era == 0 { + if index == 0 { break; } - era -= 1; + era += 1; } } - trace!("Recovered {} counters", res.len()); - res + trace!("Recovered {} counters", counters.len()); + counters } } impl HashDB for JournalDB { fn keys(&self) -> HashMap { let mut ret: HashMap = HashMap::new(); - for (key, _) in self.backing.iter() { + for (key, _) in self.backing.iterator(IteratorMode::Start) { let h = H256::from_slice(key.deref()); ret.insert(h, 1); } @@ -368,6 +368,28 @@ mod tests { use super::*; use hashdb::*; + #[test] + fn insert_same_in_fork() { + // history is 1 + let mut jdb = JournalDB::new_temp(); + + let x = jdb.insert(b"X"); + jdb.commit(1, &b"1".sha3(), None).unwrap(); + jdb.commit(2, &b"2".sha3(), None).unwrap(); + jdb.commit(3, &b"1002a".sha3(), Some((1, b"1".sha3()))).unwrap(); + jdb.commit(4, &b"1003a".sha3(), Some((2, b"2".sha3()))).unwrap(); + + jdb.remove(&x); + jdb.commit(3, &b"1002b".sha3(), Some((1, b"1".sha3()))).unwrap(); + let x = jdb.insert(b"X"); + jdb.commit(4, &b"1003b".sha3(), Some((2, b"2".sha3()))).unwrap(); + + jdb.commit(5, &b"1004a".sha3(), Some((3, b"1002a".sha3()))).unwrap(); + jdb.commit(6, &b"1005a".sha3(), Some((4, b"1003a".sha3()))).unwrap(); + + assert!(jdb.exists(&x)); + } + #[test] fn long_history() { // history is 3 @@ -487,31 +509,4 @@ mod tests { jdb.commit(2, &b"2a".sha3(), Some((1, b"1a".sha3()))).unwrap(); assert!(jdb.exists(&foo)); } - - #[test] - fn reopen() { - let mut dir = ::std::env::temp_dir(); - dir.push(H32::random().hex()); - - let foo = { - let mut jdb = JournalDB::new(dir.to_str().unwrap()); - // history is 1 - let foo = jdb.insert(b"foo"); - jdb.commit(0, &b"0".sha3(), None).unwrap(); - foo - }; - - { - let mut jdb = JournalDB::new(dir.to_str().unwrap()); - jdb.remove(&foo); - jdb.commit(1, &b"1".sha3(), Some((0, b"0".sha3()))).unwrap(); - } - - { - let mut jdb = JournalDB::new(dir.to_str().unwrap()); - assert!(jdb.exists(&foo)); - jdb.commit(2, &b"2".sha3(), Some((1, b"1".sha3()))).unwrap(); - assert!(!jdb.exists(&foo)); - } - } } From bfd882c7e03dee68d39c8722e65c9b395ed8b438 Mon Sep 17 00:00:00 2001 From: Gav Wood Date: Sun, 6 Mar 2016 22:05:12 +0100 Subject: [PATCH 04/22] Fix warnings. --- util/src/journaldb.rs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/util/src/journaldb.rs b/util/src/journaldb.rs index 5f94dcbeb..b7e495503 100644 --- a/util/src/journaldb.rs +++ b/util/src/journaldb.rs @@ -97,8 +97,8 @@ impl JournalDB { } // The next three are valid only as long as there is an insert operation of `key` in the journal. - fn set_already_in(batch: &WriteBatch, key: &H256) { batch.put(&Self::morph_key(key, 0), &[1u8]); } - fn reset_already_in(batch: &WriteBatch, key: &H256) { batch.delete(&Self::morph_key(key, 0)); } + fn set_already_in(batch: &WriteBatch, key: &H256) { batch.put(&Self::morph_key(key, 0), &[1u8]).expect("Low-level database error. Some issue with your hard disk?"); } + fn reset_already_in(batch: &WriteBatch, key: &H256) { batch.delete(&Self::morph_key(key, 0)).expect("Low-level database error. Some issue with your hard disk?"); } fn is_already_in(backing: &DB, key: &H256) -> bool { backing.get(&Self::morph_key(key, 0)).expect("Low-level database error. Some issue with your hard disk?").is_some() } @@ -122,7 +122,7 @@ impl JournalDB { // Gets removed when a key leaves the journal, so should never be set when we're placing a new key. //Self::reset_already_in(&h); assert!(!Self::is_already_in(backing, &h)); - batch.put(&h.bytes(), d); + batch.put(&h.bytes(), d).expect("Low-level database error. Some issue with your hard disk?"); } } @@ -159,7 +159,7 @@ impl JournalDB { &None => { // Gets removed when moving from 1 to 0 additional refs. Should never be here at 0 additional refs. //assert!(!Self::is_already_in(db, &h)); - batch.delete(&h.bytes()); + batch.delete(&h.bytes()).expect("Low-level database error. Some issue with your hard disk?"); } _ => panic!("Invalid value in counters: {:?}", n), } @@ -260,7 +260,6 @@ impl JournalDB { if let Some((end_era, canon_id)) = end { let mut index = 0usize; let mut last; - let mut to_remove: Vec = Vec::new(); while let Some(rlp_data) = try!(self.backing.get({ let mut r = RlpStream::new_list(2); r.append(&end_era); From bc2fb14b5d6fbfac97a9e1d03e8e3d93d2da0283 Mon Sep 17 00:00:00 2001 From: Gav Wood Date: Sun, 6 Mar 2016 22:39:04 +0100 Subject: [PATCH 05/22] Add memory usage reports. Update to be similar to master. --- ethcore/src/client.rs | 8 +- parity/main.rs | 3 +- util/src/journaldb.rs | 213 +++++++++++++++++++++++++++++++++++------- util/src/memorydb.rs | 6 ++ 4 files changed, 194 insertions(+), 36 deletions(-) diff --git a/ethcore/src/client.rs b/ethcore/src/client.rs index 858185873..9688cc527 100644 --- a/ethcore/src/client.rs +++ b/ethcore/src/client.rs @@ -190,6 +190,8 @@ pub struct ClientReport { pub transactions_applied: usize, /// How much gas has been processed so far. pub gas_processed: U256, + /// Memory used by state DB + pub state_db_mem: usize, } impl ClientReport { @@ -222,7 +224,7 @@ pub struct Client where V: Verifier { } const HISTORY: u64 = 1000; -const CLIENT_DB_VER_STR: &'static str = "4.0"; +const CLIENT_DB_VER_STR: &'static str = "5.1"; impl Client { /// Create a new client with given spec and DB path. @@ -432,7 +434,9 @@ impl Client where V: Verifier { /// Get the report. pub fn report(&self) -> ClientReport { - self.report.read().unwrap().clone() + let mut report = self.report.read().unwrap().clone(); + report.state_db_mem = self.state_db.lock().unwrap().mem_used(); + report } /// Tick the client. diff --git a/parity/main.rs b/parity/main.rs index 3f4243a0a..605fb315d 100644 --- a/parity/main.rs +++ b/parity/main.rs @@ -395,7 +395,7 @@ impl Informant { let sync_info = sync.status(); if let (_, _, &Some(ref last_report)) = (self.chain_info.read().unwrap().deref(), self.cache_info.read().unwrap().deref(), self.report.read().unwrap().deref()) { - println!("[ #{} {} ]---[ {} blk/s | {} tx/s | {} gas/s //··· {}/{} peers, #{}, {}+{} queued ···// mem: {} chain, {} queue, {} sync ]", + println!("[ #{} {} ]---[ {} blk/s | {} tx/s | {} gas/s //··· {}/{} peers, #{}, {}+{} queued ···// mem: {} db, {} chain, {} queue, {} sync ]", chain_info.best_block_number, chain_info.best_block_hash, (report.blocks_imported - last_report.blocks_imported) / dur, @@ -408,6 +408,7 @@ impl Informant { queue_info.unverified_queue_size, queue_info.verified_queue_size, + Informant::format_bytes(report.state_db_mem), Informant::format_bytes(cache_info.total()), Informant::format_bytes(queue_info.mem_used), Informant::format_bytes(sync_info.mem_used), diff --git a/util/src/journaldb.rs b/util/src/journaldb.rs index b7e495503..f04affb7b 100644 --- a/util/src/journaldb.rs +++ b/util/src/journaldb.rs @@ -20,7 +20,7 @@ use common::*; use rlp::*; use hashdb::*; use memorydb::*; -use rocksdb::{DB, Writable, WriteBatch, IteratorMode}; +use kvdb::{Database, DBTransaction, DatabaseConfig}; #[cfg(test)] use std::env; @@ -33,8 +33,8 @@ use std::env; /// the removals actually take effect. pub struct JournalDB { overlay: MemoryDB, - backing: Arc, - counters: Arc>>, + backing: Arc, + counters: Option>>>, } impl Clone for JournalDB { @@ -47,33 +47,50 @@ impl Clone for JournalDB { } } -const LAST_ERA_KEY : [u8; 4] = [ b'l', b'a', b's', b't' ]; -const VERSION_KEY : [u8; 4] = [ b'j', b'v', b'e', b'r' ]; +// all keys must be at least 12 bytes +const LATEST_ERA_KEY : [u8; 12] = [ b'l', b'a', b's', b't', 0, 0, 0, 0, 0, 0, 0, 0 ]; +const VERSION_KEY : [u8; 12] = [ b'j', b'v', b'e', b'r', 0, 0, 0, 0, 0, 0, 0, 0 ]; -const DB_VERSION: u32 = 1; +const DB_VERSION : u32 = 3; +const DB_VERSION_NO_JOURNAL : u32 = 3 + 256; + +const PADDING : [u8; 10] = [ 0u8; 10 ]; impl JournalDB { - /// Create a new instance given a `backing` database. - pub fn new(backing: DB) -> JournalDB { - let db = Arc::new(backing); - JournalDB::new_with_arc(db) + /// Create a new instance from file + pub fn new(path: &str) -> JournalDB { + Self::from_prefs(path, true) } - /// Create a new instance given a shared `backing` database. - pub fn new_with_arc(backing: Arc) -> JournalDB { - if backing.iterator(IteratorMode::Start).next().is_some() { + /// Create a new instance from file + pub fn from_prefs(path: &str, prefer_journal: bool) -> JournalDB { + let opts = DatabaseConfig { + prefix_size: Some(12) //use 12 bytes as prefix, this must match account_db prefix + }; + let backing = Database::open(&opts, path).unwrap_or_else(|e| { + panic!("Error opening state db: {}", e); + }); + let with_journal; + if !backing.is_empty() { match backing.get(&VERSION_KEY).map(|d| d.map(|v| decode::(&v))) { - Ok(Some(DB_VERSION)) => {}, + Ok(Some(DB_VERSION)) => { with_journal = true; }, + Ok(Some(DB_VERSION_NO_JOURNAL)) => { with_journal = false; }, v => panic!("Incompatible DB version, expected {}, got {:?}", DB_VERSION, v) } } else { - backing.put(&VERSION_KEY, &encode(&DB_VERSION)).expect("Error writing version to database"); + backing.put(&VERSION_KEY, &encode(&(if prefer_journal { DB_VERSION } else { DB_VERSION_NO_JOURNAL }))).expect("Error writing version to database"); + with_journal = prefer_journal; } - let counters = JournalDB::read_counters(&backing); + + let counters = if with_journal { + Some(Arc::new(RwLock::new(JournalDB::read_counters(&backing)))) + } else { + None + }; JournalDB { overlay: MemoryDB::new(), - backing: backing, - counters: Arc::new(RwLock::new(counters)), + backing: Arc::new(backing), + counters: counters, } } @@ -87,7 +104,45 @@ impl JournalDB { /// Check if this database has any commits pub fn is_empty(&self) -> bool { - self.backing.get(&LAST_ERA_KEY).expect("Low level database error").is_none() + self.backing.get(&LATEST_ERA_KEY).expect("Low level database error").is_none() + } + + /// Commit all recent insert operations. + pub fn commit(&mut self, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result { + let have_counters = self.counters.is_some(); + if have_counters { + self.commit_with_counters(now, id, end) + } else { + self.commit_without_counters() + } + } + + /// Drain the overlay and place it into a batch for the DB. + fn batch_overlay_insertions(overlay: &mut MemoryDB, batch: &DBTransaction) -> usize { + let mut inserts = 0usize; + let mut deletes = 0usize; + for i in overlay.drain().into_iter() { + let (key, (value, rc)) = i; + if rc > 0 { + assert!(rc == 1); + batch.put(&key.bytes(), &value).expect("Low-level database error. Some issue with your hard disk?"); + inserts += 1; + } + if rc < 0 { + assert!(rc == -1); + deletes += 1; + } + } + trace!("commit: Inserted {}, Deleted {} nodes", inserts, deletes); + inserts + deletes + } + + /// Just commit the overlay into the backing DB. + fn commit_without_counters(&mut self) -> Result { + let batch = DBTransaction::new(); + let ret = Self::batch_overlay_insertions(&mut self.overlay, &batch); + try!(self.backing.write(batch)); + Ok(ret as u32) } fn morph_key(key: &H256, index: u8) -> Bytes { @@ -97,13 +152,13 @@ impl JournalDB { } // The next three are valid only as long as there is an insert operation of `key` in the journal. - fn set_already_in(batch: &WriteBatch, key: &H256) { batch.put(&Self::morph_key(key, 0), &[1u8]).expect("Low-level database error. Some issue with your hard disk?"); } - fn reset_already_in(batch: &WriteBatch, key: &H256) { batch.delete(&Self::morph_key(key, 0)).expect("Low-level database error. Some issue with your hard disk?"); } - fn is_already_in(backing: &DB, key: &H256) -> bool { + fn set_already_in(batch: &DBTransaction, key: &H256) { batch.put(&Self::morph_key(key, 0), &[1u8]).expect("Low-level database error. Some issue with your hard disk?"); } + fn reset_already_in(batch: &DBTransaction, key: &H256) { batch.delete(&Self::morph_key(key, 0)).expect("Low-level database error. Some issue with your hard disk?"); } + fn is_already_in(backing: &Database, key: &H256) -> bool { backing.get(&Self::morph_key(key, 0)).expect("Low-level database error. Some issue with your hard disk?").is_some() } - fn insert_keys(inserts: &Vec<(H256, Bytes)>, backing: &DB, counters: &mut HashMap, batch: &WriteBatch) { + fn insert_keys(inserts: &Vec<(H256, Bytes)>, backing: &Database, counters: &mut HashMap, batch: &DBTransaction) { for &(ref h, ref d) in inserts { if let Some(c) = counters.get_mut(h) { // already counting. increment. @@ -126,7 +181,7 @@ impl JournalDB { } } - fn replay_keys(inserts: &Vec, backing: &DB, counters: &mut HashMap) { + fn replay_keys(inserts: &Vec, backing: &Database, counters: &mut HashMap) { for h in inserts { if let Some(c) = counters.get_mut(h) { // already counting. increment. @@ -140,7 +195,7 @@ impl JournalDB { } } - fn kill_keys(deletes: Vec, counters: &mut HashMap, batch: &WriteBatch) { + fn kill_keys(deletes: Vec, counters: &mut HashMap, batch: &DBTransaction) { for h in deletes.into_iter() { let mut n: Option = None; if let Some(c) = counters.get_mut(&h) { @@ -168,7 +223,7 @@ impl JournalDB { /// Commit all recent insert operations and historical removals from the old era /// to the backing database. - pub fn commit(&mut self, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result { + fn commit_with_counters(&mut self, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result { // journal format: // [era, 0] => [ id, [insert_0, ...], [remove_0, ...] ] // [era, 1] => [ id, [insert_0, ...], [remove_0, ...] ] @@ -214,8 +269,8 @@ impl JournalDB { // // record new commit's details. - let batch = WriteBatch::new(); - let mut counters = self.counters.write().unwrap(); + let batch = DBTransaction::new(); + let mut counters = self.counters.as_ref().unwrap().write().unwrap(); { let mut index = 0usize; let mut last; @@ -224,6 +279,7 @@ impl JournalDB { let mut r = RlpStream::new_list(2); r.append(&now); r.append(&index); + r.append(&&PADDING[..]); last = r.drain(); &last })).is_some() { @@ -264,6 +320,7 @@ impl JournalDB { let mut r = RlpStream::new_list(2); r.append(&end_era); r.append(&index); + r.append(&&PADDING[..]); last = r.drain(); &last })) { @@ -275,7 +332,7 @@ impl JournalDB { try!(batch.delete(&last)); index += 1; } - try!(batch.put(&LAST_ERA_KEY, &encode(&end_era))); + try!(batch.put(&LATEST_ERA_KEY, &encode(&end_era))); trace!("JournalDB: delete journal for time #{}.{}, (canon was {})", end_era, index, canon_id); } @@ -288,9 +345,9 @@ impl JournalDB { self.backing.get(&key.bytes()).expect("Low-level database error. Some issue with your hard disk?").map(|v| v.to_vec()) } - fn read_counters(db: &DB) -> HashMap { + fn read_counters(db: &Database) -> HashMap { let mut counters = HashMap::new(); - if let Some(val) = db.get(&LAST_ERA_KEY).expect("Low-level database error.") { + if let Some(val) = db.get(&LATEST_ERA_KEY).expect("Low-level database error.") { let mut era = decode::(&val) + 1; loop { let mut index = 0usize; @@ -298,6 +355,7 @@ impl JournalDB { let mut r = RlpStream::new_list(2); r.append(&era); r.append(&index); + r.append(&&PADDING[..]); &r.drain() }).expect("Low-level database error.") { let rlp = Rlp::new(&rlp_data); @@ -314,12 +372,17 @@ impl JournalDB { trace!("Recovered {} counters", counters.len()); counters } -} + + /// Returns heap memory size used + pub fn mem_used(&self) -> usize { + self.overlay.mem_used() + match &self.counters { &Some(ref c) => c.read().unwrap().heap_size_of_children(), &None => 0 } + } + } impl HashDB for JournalDB { fn keys(&self) -> HashMap { let mut ret: HashMap = HashMap::new(); - for (key, _) in self.backing.iterator(IteratorMode::Start) { + for (key, _) in self.backing.iter() { let h = H256::from_slice(key.deref()); ret.insert(h, 1); } @@ -508,4 +571,88 @@ mod tests { jdb.commit(2, &b"2a".sha3(), Some((1, b"1a".sha3()))).unwrap(); assert!(jdb.exists(&foo)); } + + + #[test] + fn reopen() { + let mut dir = ::std::env::temp_dir(); + dir.push(H32::random().hex()); + let bar = H256::random(); + + let foo = { + let mut jdb = JournalDB::new(dir.to_str().unwrap()); + // history is 1 + let foo = jdb.insert(b"foo"); + jdb.emplace(bar.clone(), b"bar".to_vec()); + jdb.commit(0, &b"0".sha3(), None).unwrap(); + foo + }; + + { + let mut jdb = JournalDB::new(dir.to_str().unwrap()); + jdb.remove(&foo); + jdb.commit(1, &b"1".sha3(), Some((0, b"0".sha3()))).unwrap(); + } + + { + let mut jdb = JournalDB::new(dir.to_str().unwrap()); + assert!(jdb.exists(&foo)); + assert!(jdb.exists(&bar)); + jdb.commit(2, &b"2".sha3(), Some((1, b"1".sha3()))).unwrap(); + assert!(!jdb.exists(&foo)); + } + } + + #[test] + fn reopen_remove() { + let mut dir = ::std::env::temp_dir(); + dir.push(H32::random().hex()); + let bar = H256::random(); + + let foo = { + let mut jdb = JournalDB::new(dir.to_str().unwrap()); + // history is 1 + let foo = jdb.insert(b"foo"); + jdb.commit(0, &b"0".sha3(), None).unwrap(); + jdb.insert(b"foo"); + jdb.commit(1, &b"1".sha3(), None).unwrap(); + foo + }; + + { + let mut jdb = JournalDB::new(dir.to_str().unwrap()); + jdb.remove(&foo); + jdb.commit(2, &b"2".sha3(), Some((1, b"1".sha3()))).unwrap(); + assert!(jdb.exists(&foo)); + jdb.commit(3, &b"3".sha3(), Some((2, b"2".sha3()))).unwrap(); + assert!(!jdb.exists(&foo)); + } + } + #[test] + fn reopen_fork() { + let mut dir = ::std::env::temp_dir(); + dir.push(H32::random().hex()); + let (foo, bar, baz) = { + let mut jdb = JournalDB::new(dir.to_str().unwrap()); + // history is 1 + let foo = jdb.insert(b"foo"); + let bar = jdb.insert(b"bar"); + jdb.commit(0, &b"0".sha3(), None).unwrap(); + jdb.remove(&foo); + let baz = jdb.insert(b"baz"); + jdb.commit(1, &b"1a".sha3(), Some((0, b"0".sha3()))).unwrap(); + + jdb.remove(&bar); + jdb.commit(1, &b"1b".sha3(), Some((0, b"0".sha3()))).unwrap(); + (foo, bar, baz) + }; + + { + let mut jdb = JournalDB::new(dir.to_str().unwrap()); + jdb.commit(2, &b"2b".sha3(), Some((1, b"1b".sha3()))).unwrap(); + assert!(jdb.exists(&foo)); + assert!(!jdb.exists(&baz)); + assert!(!jdb.exists(&bar)); + } + } } diff --git a/util/src/memorydb.rs b/util/src/memorydb.rs index 680a6e1d0..9cd018935 100644 --- a/util/src/memorydb.rs +++ b/util/src/memorydb.rs @@ -21,6 +21,7 @@ use bytes::*; use rlp::*; use sha3::*; use hashdb::*; +use heapsize::*; use std::mem; use std::collections::HashMap; @@ -143,6 +144,11 @@ impl MemoryDB { } self.raw(key).unwrap() } + + /// Returns the size of allocated heap memory + pub fn mem_used(&self) -> usize { + self.data.heap_size_of_children() + } } static NULL_RLP_STATIC: [u8; 1] = [0x80; 1]; From 4230fdfffea3c897402eaa095a46d959928e4692 Mon Sep 17 00:00:00 2001 From: Gav Wood Date: Sun, 6 Mar 2016 22:43:21 +0100 Subject: [PATCH 06/22] More veriosning fixups. --- util/src/journaldb.rs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/util/src/journaldb.rs b/util/src/journaldb.rs index f04affb7b..925c1c065 100644 --- a/util/src/journaldb.rs +++ b/util/src/journaldb.rs @@ -99,7 +99,7 @@ impl JournalDB { pub fn new_temp() -> JournalDB { let mut dir = env::temp_dir(); dir.push(H32::random().hex()); - Self::new(DB::open_default(dir.to_str().unwrap()).unwrap()) + Self::new(dir.to_str().unwrap()) } /// Check if this database has any commits @@ -269,14 +269,15 @@ impl JournalDB { // // record new commit's details. - let batch = DBTransaction::new(); + trace!("commit: #{} ({}), end era: {:?}", now, id, end); let mut counters = self.counters.as_ref().unwrap().write().unwrap(); + let batch = DBTransaction::new(); { let mut index = 0usize; let mut last; while try!(self.backing.get({ - let mut r = RlpStream::new_list(2); + let mut r = RlpStream::new_list(3); r.append(&now); r.append(&index); r.append(&&PADDING[..]); @@ -310,6 +311,7 @@ impl JournalDB { r.append(&removes); Self::insert_keys(&inserts, &self.backing, &mut counters, &batch); try!(batch.put(&last, r.as_raw())); + try!(batch.put(&LATEST_ERA_KEY, &encode(&now))); } // apply old commits' details @@ -317,7 +319,7 @@ impl JournalDB { let mut index = 0usize; let mut last; while let Some(rlp_data) = try!(self.backing.get({ - let mut r = RlpStream::new_list(2); + let mut r = RlpStream::new_list(3); r.append(&end_era); r.append(&index); r.append(&&PADDING[..]); @@ -352,7 +354,7 @@ impl JournalDB { loop { let mut index = 0usize; while let Some(rlp_data) = db.get({ - let mut r = RlpStream::new_list(2); + let mut r = RlpStream::new_list(3); r.append(&era); r.append(&index); r.append(&&PADDING[..]); From 0980c7130a5f299846876ec000b2fdbf8bc82c15 Mon Sep 17 00:00:00 2001 From: Gav Wood Date: Mon, 7 Mar 2016 06:58:43 +0100 Subject: [PATCH 07/22] Fix replay_keys Counters should never have an entry with zero value. --- util/src/journaldb.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/util/src/journaldb.rs b/util/src/journaldb.rs index 925c1c065..a008fa47e 100644 --- a/util/src/journaldb.rs +++ b/util/src/journaldb.rs @@ -191,7 +191,9 @@ impl JournalDB { // this is the first entry for this node in the journal. // it is initialised to 1 if it was already in. - counters.insert(h.clone(), if Self::is_already_in(backing, h) {1} else {0}); + if Self::is_already_in(backing, h) { + counters.insert(h.clone(), 1); + } } } From fd87633db662126ea95b29c1027d2d88d7565639 Mon Sep 17 00:00:00 2001 From: Gav Wood Date: Mon, 7 Mar 2016 07:57:50 +0100 Subject: [PATCH 08/22] Remove superfluous LATEST_KEY write. --- util/src/journaldb.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/util/src/journaldb.rs b/util/src/journaldb.rs index a008fa47e..bb79a447c 100644 --- a/util/src/journaldb.rs +++ b/util/src/journaldb.rs @@ -336,7 +336,6 @@ impl JournalDB { try!(batch.delete(&last)); index += 1; } - try!(batch.put(&LATEST_ERA_KEY, &encode(&end_era))); trace!("JournalDB: delete journal for time #{}.{}, (canon was {})", end_era, index, canon_id); } From 73207c23557e958761ea72b91ccdaee4ce5dd457 Mon Sep 17 00:00:00 2001 From: Gav Wood Date: Mon, 7 Mar 2016 08:01:14 +0100 Subject: [PATCH 09/22] Revert accidental beta regressions. --- util/src/journaldb.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/util/src/journaldb.rs b/util/src/journaldb.rs index bb79a447c..2e78f1cce 100644 --- a/util/src/journaldb.rs +++ b/util/src/journaldb.rs @@ -351,7 +351,7 @@ impl JournalDB { fn read_counters(db: &Database) -> HashMap { let mut counters = HashMap::new(); if let Some(val) = db.get(&LATEST_ERA_KEY).expect("Low-level database error.") { - let mut era = decode::(&val) + 1; + let mut era = decode::(&val); loop { let mut index = 0usize; while let Some(rlp_data) = db.get({ @@ -366,10 +366,10 @@ impl JournalDB { Self::replay_keys(&inserts, db, &mut counters); index += 1; }; - if index == 0 { + if index == 0 || era == 0 { break; } - era += 1; + era -= 1; } } trace!("Recovered {} counters", counters.len()); From 4d1effb008303ef0a4ce98134a7c8658031ed2f2 Mon Sep 17 00:00:00 2001 From: Gav Wood Date: Mon, 7 Mar 2016 09:10:02 +0100 Subject: [PATCH 10/22] Fix tests. --- util/src/journaldb.rs | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/util/src/journaldb.rs b/util/src/journaldb.rs index 2e78f1cce..57af857a9 100644 --- a/util/src/journaldb.rs +++ b/util/src/journaldb.rs @@ -182,6 +182,7 @@ impl JournalDB { } fn replay_keys(inserts: &Vec, backing: &Database, counters: &mut HashMap) { + println!("replay_keys: inserts={:?}, counters={:?}", inserts, counters); for h in inserts { if let Some(c) = counters.get_mut(h) { // already counting. increment. @@ -192,9 +193,11 @@ impl JournalDB { // this is the first entry for this node in the journal. // it is initialised to 1 if it was already in. if Self::is_already_in(backing, h) { + println!("replace_keys: Key {} was already in!", h); counters.insert(h.clone(), 1); } } + println!("replay_keys: (end) counters={:?}", counters); } fn kill_keys(deletes: Vec, counters: &mut HashMap, batch: &DBTransaction) { @@ -361,6 +364,7 @@ impl JournalDB { r.append(&&PADDING[..]); &r.drain() }).expect("Low-level database error.") { + println!("read_counters: era={}, index={}", era, index); let rlp = Rlp::new(&rlp_data); let inserts: Vec = rlp.val_at(1); Self::replay_keys(&inserts, db, &mut counters); @@ -617,17 +621,23 @@ mod tests { // history is 1 let foo = jdb.insert(b"foo"); jdb.commit(0, &b"0".sha3(), None).unwrap(); + jdb.commit(1, &b"1".sha3(), Some((0, b"0".sha3()))).unwrap(); + + // foo is ancient history. + jdb.insert(b"foo"); - jdb.commit(1, &b"1".sha3(), None).unwrap(); + jdb.commit(2, &b"2".sha3(), Some((1, b"1".sha3()))).unwrap(); foo }; { let mut jdb = JournalDB::new(dir.to_str().unwrap()); jdb.remove(&foo); - jdb.commit(2, &b"2".sha3(), Some((1, b"1".sha3()))).unwrap(); - assert!(jdb.exists(&foo)); jdb.commit(3, &b"3".sha3(), Some((2, b"2".sha3()))).unwrap(); + assert!(jdb.exists(&foo)); + jdb.remove(&foo); + jdb.commit(4, &b"4".sha3(), Some((3, b"3".sha3()))).unwrap(); + jdb.commit(5, &b"5".sha3(), Some((4, b"4".sha3()))).unwrap(); assert!(!jdb.exists(&foo)); } } From bcae4f6e7b5d7e0b9d2ac4a9df370d35f92d5773 Mon Sep 17 00:00:00 2001 From: debris Date: Mon, 7 Mar 2016 10:30:19 +0100 Subject: [PATCH 11/22] fixed jsonrpc reporting current block is one less than actuall, fixed #612 --- rpc/src/v1/impls/eth.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/rpc/src/v1/impls/eth.rs b/rpc/src/v1/impls/eth.rs index 2313d5114..7113c55b1 100644 --- a/rpc/src/v1/impls/eth.rs +++ b/rpc/src/v1/impls/eth.rs @@ -312,7 +312,8 @@ impl EthFilter for EthFilterClient { None => Ok(Value::Array(vec![] as Vec)), Some(info) => match info.filter { PollFilter::Block => { - let current_number = client.chain_info().best_block_number; + // + 1, cause we want to return hashes including current block hash. + let current_number = client.chain_info().best_block_number + 1; let hashes = (info.block_number..current_number).into_iter() .map(BlockId::Number) .filter_map(|id| client.block_hash(id)) From 72016196cd654697dbfd58e7580807021fb0888a Mon Sep 17 00:00:00 2001 From: Gav Wood Date: Mon, 7 Mar 2016 10:56:39 +0100 Subject: [PATCH 12/22] Remove println!s. --- util/src/journaldb.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/util/src/journaldb.rs b/util/src/journaldb.rs index 57af857a9..d0d7c05ff 100644 --- a/util/src/journaldb.rs +++ b/util/src/journaldb.rs @@ -182,7 +182,7 @@ impl JournalDB { } fn replay_keys(inserts: &Vec, backing: &Database, counters: &mut HashMap) { - println!("replay_keys: inserts={:?}, counters={:?}", inserts, counters); + trace!("replay_keys: inserts={:?}, counters={:?}", inserts, counters); for h in inserts { if let Some(c) = counters.get_mut(h) { // already counting. increment. @@ -193,11 +193,11 @@ impl JournalDB { // this is the first entry for this node in the journal. // it is initialised to 1 if it was already in. if Self::is_already_in(backing, h) { - println!("replace_keys: Key {} was already in!", h); + trace!("replace_keys: Key {} was already in!", h); counters.insert(h.clone(), 1); } } - println!("replay_keys: (end) counters={:?}", counters); + trace!("replay_keys: (end) counters={:?}", counters); } fn kill_keys(deletes: Vec, counters: &mut HashMap, batch: &DBTransaction) { @@ -364,7 +364,7 @@ impl JournalDB { r.append(&&PADDING[..]); &r.drain() }).expect("Low-level database error.") { - println!("read_counters: era={}, index={}", era, index); + trace!("read_counters: era={}, index={}", era, index); let rlp = Rlp::new(&rlp_data); let inserts: Vec = rlp.val_at(1); Self::replay_keys(&inserts, db, &mut counters); From 58721475ffa68a77c23007caa6db884948e4b992 Mon Sep 17 00:00:00 2001 From: arkpar Date: Mon, 7 Mar 2016 11:34:07 +0100 Subject: [PATCH 13/22] Do not remove the peer immediatelly on send error --- sync/src/chain.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/sync/src/chain.rs b/sync/src/chain.rs index 63640f87f..fe1b559cd 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -575,7 +575,7 @@ impl ChainSync { pub fn on_peer_connected(&mut self, io: &mut SyncIo, peer: PeerId) { trace!(target: "sync", "== Connected {}", peer); if let Err(e) = self.send_status(io) { - trace!(target:"sync", "Error sending status request: {:?}", e); + warn!(target:"sync", "Error sending status request: {:?}", e); io.disable_peer(peer); } } @@ -900,9 +900,8 @@ impl ChainSync { } match sync.send(peer_id, packet_id, packet) { Err(e) => { - warn!(target:"sync", "Error sending request: {:?}", e); + debug!(target:"sync", "Error sending request: {:?}", e); sync.disable_peer(peer_id); - self.on_peer_aborting(sync, peer_id); } Ok(_) => { let mut peer = self.peers.get_mut(&peer_id).unwrap(); @@ -915,9 +914,8 @@ impl ChainSync { /// Generic packet sender fn send_packet(&mut self, sync: &mut SyncIo, peer_id: PeerId, packet_id: PacketId, packet: Bytes) { if let Err(e) = sync.send(peer_id, packet_id, packet) { - warn!(target:"sync", "Error sending packet: {:?}", e); + debug!(target:"sync", "Error sending packet: {:?}", e); sync.disable_peer(peer_id); - self.on_peer_aborting(sync, peer_id); } } /// Called when peer sends us new transactions From ec3698066b8dd3f5a061498a9203644511cf8e21 Mon Sep 17 00:00:00 2001 From: Gav Wood Date: Mon, 7 Mar 2016 12:21:11 +0100 Subject: [PATCH 14/22] Normal CLI options with geth. Support node identity. Support fine-grained JSONRPC API enabling. --- ethcore/src/client.rs | 3 + parity/main.rs | 130 ++++++++++++++++++++++++++++++------------ 2 files changed, 96 insertions(+), 37 deletions(-) diff --git a/ethcore/src/client.rs b/ethcore/src/client.rs index 9688cc527..8471666aa 100644 --- a/ethcore/src/client.rs +++ b/ethcore/src/client.rs @@ -87,6 +87,8 @@ pub struct ClientConfig { pub blockchain: BlockChainConfig, /// Prefer journal rather than archive. pub prefer_journal: bool, + /// The name of the client instance. + pub name: String, } impl Default for ClientConfig { @@ -95,6 +97,7 @@ impl Default for ClientConfig { queue: Default::default(), blockchain: Default::default(), prefer_journal: false, + name: Default::default(), } } } diff --git a/parity/main.rs b/parity/main.rs index 605fb315d..43b0504f1 100644 --- a/parity/main.rs +++ b/parity/main.rs @@ -53,6 +53,16 @@ use docopt::Docopt; use daemonize::Daemonize; use number_prefix::{binary_prefix, Standalone, Prefixed}; +fn die_with_message(msg: &str) -> ! { + println!("ERROR: {}", msg); + exit(1); +} + +#[macro_export] +macro_rules! die { + ($($arg:tt)*) => (die_with_message(&format!("{}", format_args!($($arg)*)))); +} + const USAGE: &'static str = r#" Parity. Ethereum Client. By Wood/Paronyan/Kotewicz/Drwięga/Volf. @@ -62,13 +72,16 @@ Usage: parity daemon [options] [ --no-bootstrap | ... ] parity [options] [ --no-bootstrap | ... ] -Options: +Protocol Options: --chain CHAIN Specify the blockchain type. CHAIN may be either a JSON chain specification file - or frontier, mainnet, morden, or testnet [default: frontier]. + or olympic, frontier, homestead, mainnet, morden, or testnet [default: homestead]. + --testnet Equivalent to --chain testnet (geth-compatible). + --networkid INDEX Override the network identifier from the chain we are on. --archive Client should not prune the state/storage trie. - -d --db-path PATH Specify the database & configuration directory path [default: $HOME/.parity] - --keys-path PATH Specify the path for JSON key files to be found [default: $HOME/.web3/keys] + -d --datadir PATH Specify the database & configuration directory path [default: $HOME/.parity] + --identity NAME Specify your node's name. +Networking Options: --no-bootstrap Don't bother trying to connect to any nodes initially. --listen-address URL Specify the IP/port on which to listen for peers [default: 0.0.0.0:30304]. --public-address URL Specify the IP/port on which peers may connect. @@ -78,18 +91,32 @@ Options: --no-upnp Disable trying to figure out the correct public adderss over UPnP. --node-key KEY Specify node secret key, either as 64-character hex string or input to SHA3 operation. +API and Console Options: + -j --jsonrpc Enable the JSON-RPC API sever. + --jsonrpc-addr HOST Specify the hostname portion of the JSONRPC API server [default: 127.0.0.1]. + --jsonrpc-port PORT Specify the port portion of the JSONRPC API server [default: 8545]. + --jsonrpc-cors URL Specify CORS header for JSON-RPC API responses [default: null]. + --jsonrpc-apis APIS Specify the APIs available through the JSONRPC interface. APIS is a comma-delimited + list of API name. Possible name are web3, eth and net. [default: web3,eth,net]. + --rpc Equivalent to --jsonrpc (geth-compatible). + --rpcaddr HOST Equivalent to --jsonrpc-addr HOST (geth-compatible). + --rpcport PORT Equivalent to --jsonrpc-port PORT (geth-compatible). + --rpcapi APIS Equivalent to --jsonrpc-apis APIS (geth-compatible). + --rpccorsdomain URL Equivalent to --jsonrpc-cors URL (geth-compatible). + +Sealing/Mining Options: + --author ADDRESS Specify the block author (aka "coinbase") address for sending block rewards + from sealed blocks [default: 0037a6b811ffeb6e072da21179d11b1406371c63]. + --extradata STRING Specify a custom extra-data for authored blocks, no more than 32 characters. + +Memory Footprint Options: --cache-pref-size BYTES Specify the prefered size of the blockchain cache in bytes [default: 16384]. --cache-max-size BYTES Specify the maximum size of the blockchain cache in bytes [default: 262144]. --queue-max-size BYTES Specify the maximum size of memory to use for block queue [default: 52428800]. + --cache MEGABYTES Set total amount of cache to use for the entire system, mutually exclusive with + other cache options (geth-compatible). - -j --jsonrpc Enable the JSON-RPC API sever. - --jsonrpc-url URL Specify URL for JSON-RPC API server [default: 127.0.0.1:8545]. - --jsonrpc-cors URL Specify CORS header for JSON-RPC API responses [default: null]. - - --author ADDRESS Specify the block author (aka "coinbase") address for sending block rewards - from sealed blocks [default: 0037a6b811ffeb6e072da21179d11b1406371c63]. - --extra-data STRING Specify a custom extra-data for authored blocks, no more than 32 characters. - +Miscellaneous Options: -l --logging LOGGING Specify the logging level. -v --version Show information about version. -h --help Show this screen. @@ -101,14 +128,18 @@ struct Args { arg_pid_file: String, arg_enode: Vec, flag_chain: String, + flag_testnet: bool, flag_db_path: String, + flag_networkid: Option, + flag_identity: String, + flag_cache: Option, flag_keys_path: String, flag_archive: bool, flag_no_bootstrap: bool, flag_listen_address: String, flag_public_address: Option, flag_address: Option, - flag_peers: u32, + flag_peers: usize, flag_no_discovery: bool, flag_no_upnp: bool, flag_node_key: Option, @@ -116,8 +147,15 @@ struct Args { flag_cache_max_size: usize, flag_queue_max_size: usize, flag_jsonrpc: bool, - flag_jsonrpc_url: String, + flag_jsonrpc_addr: String, + flag_jsonrpc_port: u16, flag_jsonrpc_cors: String, + flag_jsonrpc_apis: String, + flag_rpc: bool, + flag_rpcaddr: Option, + flag_rpcport: Option, + flag_rpccorsdomain: Option, + flag_rpcapi: Option, flag_logging: Option, flag_version: bool, flag_author: String, @@ -151,14 +189,23 @@ fn setup_log(init: &Option) { } #[cfg(feature = "rpc")] -fn setup_rpc_server(client: Arc, sync: Arc, url: &str, cors_domain: &str) { +fn setup_rpc_server(client: Arc, sync: Arc, url: &str, cors_domain: &str, apis: Vec<&str>) { use rpc::v1::*; let mut server = rpc::HttpServer::new(1); - server.add_delegate(Web3Client::new().to_delegate()); - server.add_delegate(EthClient::new(&client, &sync).to_delegate()); - server.add_delegate(EthFilterClient::new(&client).to_delegate()); - server.add_delegate(NetClient::new(&sync).to_delegate()); + for api in apis.into_iter() { + match api { + "web3" => server.add_delegate(Web3Client::new().to_delegate()), + "net" => server.add_delegate(NetClient::new(&sync).to_delegate()), + "eth" => { + server.add_delegate(EthClient::new(&client, &sync).to_delegate()); + server.add_delegate(EthFilterClient::new(&client).to_delegate()); + } + _ => { + die!("{}: Invalid API name to be enabled.", api); + } + } + } server.start_async(url, cors_domain); } @@ -179,16 +226,6 @@ By Wood/Paronyan/Kotewicz/Drwięga/Volf.\ ", version()); } -fn die_with_message(msg: &str) -> ! { - println!("ERROR: {}", msg); - exit(1); -} - -#[macro_export] -macro_rules! die { - ($($arg:tt)*) => (die_with_message(&format!("{}", format_args!($($arg)*)))); -} - struct Configuration { args: Args } @@ -221,8 +258,11 @@ impl Configuration { } fn spec(&self) -> Spec { + if self.args.flag_testnet { + return ethereum::new_morden(); + } match self.args.flag_chain.as_ref() { - "frontier" | "mainnet" => ethereum::new_frontier(), + "frontier" | "homestead" | "mainnet" => ethereum::new_frontier(), "morden" | "testnet" => ethereum::new_morden(), "olympic" => ethereum::new_olympic(), f => Spec::from_json_utf8(contents(f).unwrap_or_else(|_| die!("{}: Couldn't read chain specification file. Sure it exists?", f)).as_ref()), @@ -276,7 +316,7 @@ impl Configuration { ret.public_address = public; ret.use_secret = self.args.flag_node_key.as_ref().map(|s| Secret::from_str(&s).unwrap_or_else(|_| s.sha3())); ret.discovery_enabled = !self.args.flag_no_discovery; - ret.ideal_peers = self.args.flag_peers; + ret.ideal_peers = self.args.flag_peers as u32; let mut net_path = PathBuf::from(&self.path()); net_path.push("network"); ret.config_path = Some(net_path.to_str().unwrap().to_owned()); @@ -307,13 +347,22 @@ impl Configuration { let spec = self.spec(); let net_settings = self.net_settings(&spec); let mut sync_config = SyncConfig::default(); - sync_config.network_id = spec.network_id(); + sync_config.network_id = self.args.flag_networkid.as_ref().map(|id| U256::from_str(id).unwrap_or_else(|_| die!("{}: Invalid index given with --networkid", id))).unwrap_or(spec.network_id()); // Build client let mut client_config = ClientConfig::default(); - client_config.blockchain.pref_cache_size = self.args.flag_cache_pref_size; - client_config.blockchain.max_cache_size = self.args.flag_cache_max_size; + match self.args.flag_cache { + Some(mb) => { + client_config.blockchain.max_cache_size = mb * 1024 * 1024; + client_config.blockchain.pref_cache_size = client_config.blockchain.max_cache_size / 2; + } + None => { + client_config.blockchain.pref_cache_size = self.args.flag_cache_pref_size; + client_config.blockchain.max_cache_size = self.args.flag_cache_max_size; + } + } client_config.prefer_journal = !self.args.flag_archive; + client_config.name = self.args.flag_identity.clone(); client_config.queue.max_mem_use = self.args.flag_queue_max_size; let mut service = ClientService::start(client_config, spec, net_settings, &Path::new(&self.path())).unwrap(); let client = service.client().clone(); @@ -324,9 +373,16 @@ impl Configuration { let sync = EthSync::register(service.network(), sync_config, client); // Setup rpc - if self.args.flag_jsonrpc { - setup_rpc_server(service.client(), sync.clone(), &self.args.flag_jsonrpc_url, &self.args.flag_jsonrpc_cors); - SocketAddr::from_str(&self.args.flag_jsonrpc_url).unwrap_or_else(|_|die!("{}: Invalid JSONRPC listen address given with --jsonrpc-url. Should be of the form 'IP:port'.", self.args.flag_jsonrpc_url)); + if self.args.flag_jsonrpc || self.args.flag_rpc { + let url = format!("{}:{}", + self.args.flag_rpcaddr.as_ref().unwrap_or(&self.args.flag_jsonrpc_addr), + self.args.flag_rpcport.unwrap_or(self.args.flag_jsonrpc_port) + ); + SocketAddr::from_str(&url).unwrap_or_else(|_|die!("{}: Invalid JSONRPC listen host/port given.", url)); + let cors = self.args.flag_rpccorsdomain.as_ref().unwrap_or(&self.args.flag_jsonrpc_cors); + // TODO: use this as the API list. + let apis = self.args.flag_rpcapi.as_ref().unwrap_or(&self.args.flag_jsonrpc_apis); + setup_rpc_server(service.client(), sync.clone(), &url, cors, apis.split(",").collect()); } // Register IO handler From b2fc077f8c6a8a28ca0f14e975107b7431887f21 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Tue, 8 Mar 2016 16:42:30 +0100 Subject: [PATCH 15/22] Fixing CLI parameters --- parity/main.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/parity/main.rs b/parity/main.rs index 43b0504f1..ceb58e31e 100644 --- a/parity/main.rs +++ b/parity/main.rs @@ -79,6 +79,7 @@ Protocol Options: --networkid INDEX Override the network identifier from the chain we are on. --archive Client should not prune the state/storage trie. -d --datadir PATH Specify the database & configuration directory path [default: $HOME/.parity] + --keys-path PATH Specify the path for JSON key files to be found [default: $HOME/.web3/keys] --identity NAME Specify your node's name. Networking Options: @@ -113,7 +114,7 @@ Memory Footprint Options: --cache-pref-size BYTES Specify the prefered size of the blockchain cache in bytes [default: 16384]. --cache-max-size BYTES Specify the maximum size of the blockchain cache in bytes [default: 262144]. --queue-max-size BYTES Specify the maximum size of memory to use for block queue [default: 52428800]. - --cache MEGABYTES Set total amount of cache to use for the entire system, mutually exclusive with + --cache MEGABYTES Set total amount of cache to use for the entire system, mutually exclusive with other cache options (geth-compatible). Miscellaneous Options: @@ -129,7 +130,7 @@ struct Args { arg_enode: Vec, flag_chain: String, flag_testnet: bool, - flag_db_path: String, + flag_datadir: String, flag_networkid: Option, flag_identity: String, flag_cache: Option, @@ -238,7 +239,7 @@ impl Configuration { } fn path(&self) -> String { - self.args.flag_db_path.replace("$HOME", env::home_dir().unwrap().to_str().unwrap()) + self.args.flag_datadir.replace("$HOME", env::home_dir().unwrap().to_str().unwrap()) } fn author(&self) -> Address { From a1640dcf7205c2d47b017e2398c6fda03da889d6 Mon Sep 17 00:00:00 2001 From: debris Date: Wed, 9 Mar 2016 11:38:53 +0100 Subject: [PATCH 16/22] jsonrpc panic handle --- ethcore/src/block_queue.rs | 2 +- parity/main.rs | 34 +++++++++++++++++++++------------- rpc/src/lib.rs | 22 +++++++++++++++++----- 3 files changed, 39 insertions(+), 19 deletions(-) diff --git a/ethcore/src/block_queue.rs b/ethcore/src/block_queue.rs index 490a17995..8f1105b8b 100644 --- a/ethcore/src/block_queue.rs +++ b/ethcore/src/block_queue.rs @@ -385,7 +385,7 @@ impl BlockQueue { } } - pub fn collect_garbage(&self) { + pub fn collect_garbage(&self) { { let mut verification = self.verification.lock().unwrap(); verification.unverified.shrink_to_fit(); diff --git a/parity/main.rs b/parity/main.rs index 605fb315d..94db8e706 100644 --- a/parity/main.rs +++ b/parity/main.rs @@ -43,7 +43,7 @@ use std::path::PathBuf; use env_logger::LogBuilder; use ctrlc::CtrlC; use util::*; -use util::panics::MayPanic; +use util::panics::{MayPanic, PanicHandler}; use ethcore::spec::*; use ethcore::client::*; use ethcore::service::{ClientService, NetSyncMessage}; @@ -151,7 +151,7 @@ fn setup_log(init: &Option) { } #[cfg(feature = "rpc")] -fn setup_rpc_server(client: Arc, sync: Arc, url: &str, cors_domain: &str) { +fn setup_rpc_server(client: Arc, sync: Arc, url: &str, cors_domain: &str) -> Option> { use rpc::v1::*; let mut server = rpc::HttpServer::new(1); @@ -159,11 +159,12 @@ fn setup_rpc_server(client: Arc, sync: Arc, url: &str, cors_dom server.add_delegate(EthClient::new(&client, &sync).to_delegate()); server.add_delegate(EthFilterClient::new(&client).to_delegate()); server.add_delegate(NetClient::new(&sync).to_delegate()); - server.start_async(url, cors_domain); + Some(server.start_async(url, cors_domain)) } #[cfg(not(feature = "rpc"))] -fn setup_rpc_server(_client: Arc, _sync: Arc, _url: &str) { +fn setup_rpc_server(_client: Arc, _sync: Arc, _url: &str) -> Option> { + None } fn print_version() { @@ -323,26 +324,28 @@ impl Configuration { // Sync let sync = EthSync::register(service.network(), sync_config, client); - // Setup rpc - if self.args.flag_jsonrpc { - setup_rpc_server(service.client(), sync.clone(), &self.args.flag_jsonrpc_url, &self.args.flag_jsonrpc_cors); - SocketAddr::from_str(&self.args.flag_jsonrpc_url).unwrap_or_else(|_|die!("{}: Invalid JSONRPC listen address given with --jsonrpc-url. Should be of the form 'IP:port'.", self.args.flag_jsonrpc_url)); - } - // Register IO handler let io_handler = Arc::new(ClientIoHandler { client: service.client(), info: Default::default(), - sync: sync + sync: sync.clone(), }); service.io().register_handler(io_handler).expect("Error registering IO handler"); + // Setup rpc + let server_handler = if self.args.flag_jsonrpc { + SocketAddr::from_str(&self.args.flag_jsonrpc_url).unwrap_or_else(|_|die!("{}: Invalid JSONRPC listen address given with --jsonrpc-url. Should be of the form 'IP:port'.", self.args.flag_jsonrpc_url)); + setup_rpc_server(service.client(), sync, &self.args.flag_jsonrpc_url, &self.args.flag_jsonrpc_cors) + } else { + None + }; + // Handle exit - wait_for_exit(&service); + wait_for_exit(&service, server_handler); } } -fn wait_for_exit(client_service: &ClientService) { +fn wait_for_exit(client_service: &ClientService, server_handler: Option>) { let exit = Arc::new(Condvar::new()); // Handle possible exits @@ -351,6 +354,11 @@ fn wait_for_exit(client_service: &ClientService) { let e = exit.clone(); client_service.on_panic(move |_reason| { e.notify_all(); }); + if let Some(handler) = server_handler { + let e = exit.clone(); + handler.on_panic(move |_reason| { e.notify_all(); }); + } + // Wait for signal let mutex = Mutex::new(()); let _ = exit.wait(mutex.lock().unwrap()).unwrap(); diff --git a/rpc/src/lib.rs b/rpc/src/lib.rs index 0653a0c33..97a3a5fe5 100644 --- a/rpc/src/lib.rs +++ b/rpc/src/lib.rs @@ -29,6 +29,9 @@ extern crate ethcore; extern crate ethsync; extern crate transient_hashmap; +use std::sync::Arc; +use std::thread; +use util::panics::PanicHandler; use self::jsonrpc_core::{IoHandler, IoDelegate}; pub mod v1; @@ -36,7 +39,7 @@ pub mod v1; /// Http server. pub struct HttpServer { handler: IoHandler, - threads: usize + threads: usize, } impl HttpServer { @@ -44,7 +47,7 @@ impl HttpServer { pub fn new(threads: usize) -> HttpServer { HttpServer { handler: IoHandler::new(), - threads: threads + threads: threads, } } @@ -53,9 +56,18 @@ impl HttpServer { self.handler.add_delegate(delegate); } - /// Start server asynchronously in new thread - pub fn start_async(self, addr: &str, cors_domain: &str) { + /// Start server asynchronously in new thread and returns panic handler. + pub fn start_async(self, addr: &str, cors_domain: &str) -> Arc { + let addr = addr.to_owned(); + let cors_domain = cors_domain.to_owned(); + let panic_handler = PanicHandler::new_in_arc(); + let ph = panic_handler.clone(); let server = jsonrpc_http_server::Server::new(self.handler, self.threads); - server.start_async(addr, jsonrpc_http_server::AccessControlAllowOrigin::Value(cors_domain.to_owned())) + thread::Builder::new().name("jsonrpc_http".to_string()).spawn(move || { + ph.catch_panic(move || { + server.start(addr.as_ref(), jsonrpc_http_server::AccessControlAllowOrigin::Value(cors_domain)); + }).unwrap() + }).expect("Error while creating jsonrpc http thread"); + panic_handler } } From 6ad0ba8fe24bf35c9a74b48a25c6e84dc132429f Mon Sep 17 00:00:00 2001 From: Nikolay Volf Date: Wed, 9 Mar 2016 17:11:15 +0400 Subject: [PATCH 17/22] basic commands --- Cargo.lock | 20 ++++++++++++++++++++ Cargo.toml | 1 + parity/main.rs | 34 ++++++++++++++++++++++++++++++++++ util/src/keys/store.rs | 1 + 4 files changed, 56 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 55ed996ed..65ca8f566 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -15,6 +15,7 @@ dependencies = [ "fdlimit 0.1.0", "log 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", "number_prefix 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)", + "rpassword 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", "rustc-serialize 0.3.18 (registry+https://github.com/rust-lang/crates.io-index)", "time 0.1.34 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -680,6 +681,17 @@ dependencies = [ "librocksdb-sys 0.2.1 (git+https://github.com/arkpar/rust-rocksdb.git)", ] +[[package]] +name = "rpassword" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "kernel32-sys 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)", + "termios 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", + "winapi 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "rust-crypto" version = "0.2.34" @@ -813,6 +825,14 @@ dependencies = [ "winapi 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "termios" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "libc 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "time" version = "0.1.34" diff --git a/Cargo.toml b/Cargo.toml index 9b8ec6405..0852a16bc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,6 +21,7 @@ fdlimit = { path = "util/fdlimit" } daemonize = "0.2" ethcore-devtools = { path = "devtools" } number_prefix = "0.2" +rpassword = "0.1" [features] default = ["rpc"] diff --git a/parity/main.rs b/parity/main.rs index 296e1df65..a442f4fdb 100644 --- a/parity/main.rs +++ b/parity/main.rs @@ -32,6 +32,7 @@ extern crate fdlimit; extern crate daemonize; extern crate time; extern crate number_prefix; +extern crate rpassword; #[cfg(feature = "rpc")] extern crate ethcore_rpc as rpc; @@ -70,6 +71,7 @@ Parity. Ethereum Client. Usage: parity daemon [options] [ --no-bootstrap | ... ] + parity account parity [options] [ --no-bootstrap | ... ] Protocol Options: @@ -126,8 +128,10 @@ Miscellaneous Options: #[derive(Debug, RustcDecodable)] struct Args { cmd_daemon: bool, + cmd_account: bool, arg_pid_file: String, arg_enode: Vec, + arg_command: String, flag_chain: String, flag_testnet: bool, flag_datadir: String, @@ -337,9 +341,39 @@ impl Configuration { .start() .unwrap_or_else(|e| die!("Couldn't daemonize; {}", e)); } + if self.args.cmd_account { + self.execute_account_cli(&self.args.arg_command); + return; + } self.execute_client(); } + fn execute_account_cli(&self, command: &str) { + use util::keys::store::SecretStore; + use rpassword::read_password; + let mut secret_store = SecretStore::new(); + if command == "new" { + println!("Please note that password is NOT RECOVERABLE."); + println!("Type password: "); + let password = read_password().unwrap(); + println!("Repeat password: "); + let password_repeat = read_password().unwrap(); + if password != password_repeat { + println!("Passwords do not match!"); + return; + } + println!("New account address:"); + let new_address = secret_store.new_account(&password).unwrap(); + println!("{:?}", new_address); + } + if command == "list" { + println!("Known addresses:"); + for &(addr, _) in secret_store.accounts().unwrap().iter() { + println!("{:?}", addr); + } + } + } + fn execute_client(&self) { // Setup logging setup_log(&self.args.flag_logging); diff --git a/util/src/keys/store.rs b/util/src/keys/store.rs index 625d6fd8f..dcc165259 100644 --- a/util/src/keys/store.rs +++ b/util/src/keys/store.rs @@ -84,6 +84,7 @@ impl SecretStore { let mut path = ::std::env::home_dir().expect("Failed to get home dir"); path.push(".parity"); path.push("keys"); + ::std::fs::create_dir_all(&path).expect("Should panic since it is critical to be able to access home dir"); Self::new_in(&path) } From bcb9b0e45723f6e35035505ad89f8f747909041b Mon Sep 17 00:00:00 2001 From: debris Date: Wed, 9 Mar 2016 15:32:27 +0100 Subject: [PATCH 18/22] wait_for_exit takes only one input param, which is PanicHandler --- parity/main.rs | 31 +++++++++++++++++-------------- 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/parity/main.rs b/parity/main.rs index 296e1df65..adc3972e4 100644 --- a/parity/main.rs +++ b/parity/main.rs @@ -43,7 +43,7 @@ use std::path::PathBuf; use env_logger::LogBuilder; use ctrlc::CtrlC; use util::*; -use util::panics::{MayPanic, PanicHandler}; +use util::panics::{MayPanic, ForwardPanic, PanicHandler}; use ethcore::spec::*; use ethcore::client::*; use ethcore::service::{ClientService, NetSyncMessage}; @@ -341,6 +341,9 @@ impl Configuration { } fn execute_client(&self) { + // Setup panic handler + let panic_handler = PanicHandler::new_in_arc(); + // Setup logging setup_log(&self.args.flag_logging); // Raise fdlimit @@ -367,6 +370,7 @@ impl Configuration { client_config.name = self.args.flag_identity.clone(); client_config.queue.max_mem_use = self.args.flag_queue_max_size; let mut service = ClientService::start(client_config, spec, net_settings, &Path::new(&self.path())).unwrap(); + panic_handler.forward_from(&service); let client = service.client().clone(); client.set_author(self.author()); client.set_extra_data(self.extra_data()); @@ -375,7 +379,7 @@ impl Configuration { let sync = EthSync::register(service.network(), sync_config, client); // Setup rpc - let server_handler = if self.args.flag_jsonrpc || self.args.flag_rpc { + if self.args.flag_jsonrpc || self.args.flag_rpc { let url = format!("{}:{}", self.args.flag_rpcaddr.as_ref().unwrap_or(&self.args.flag_jsonrpc_addr), self.args.flag_rpcport.unwrap_or(self.args.flag_jsonrpc_port) @@ -384,10 +388,12 @@ impl Configuration { let cors = self.args.flag_rpccorsdomain.as_ref().unwrap_or(&self.args.flag_jsonrpc_cors); // TODO: use this as the API list. let apis = self.args.flag_rpcapi.as_ref().unwrap_or(&self.args.flag_jsonrpc_apis); - setup_rpc_server(service.client(), sync.clone(), &url, cors, apis.split(",").collect()) - } else { - None - }; + let server_handler = setup_rpc_server(service.client(), sync.clone(), &url, cors, apis.split(",").collect()); + if let Some(handler) = server_handler { + panic_handler.forward_from(handler.deref()); + } + + } // Register IO handler let io_handler = Arc::new(ClientIoHandler { @@ -398,23 +404,20 @@ impl Configuration { service.io().register_handler(io_handler).expect("Error registering IO handler"); // Handle exit - wait_for_exit(&service, server_handler); + wait_for_exit(panic_handler); } } -fn wait_for_exit(client_service: &ClientService, server_handler: Option>) { +fn wait_for_exit(panic_handler: Arc) { let exit = Arc::new(Condvar::new()); // Handle possible exits let e = exit.clone(); CtrlC::set_handler(move || { e.notify_all(); }); - let e = exit.clone(); - client_service.on_panic(move |_reason| { e.notify_all(); }); - if let Some(handler) = server_handler { - let e = exit.clone(); - handler.on_panic(move |_reason| { e.notify_all(); }); - } + // Handle panics + let e = exit.clone(); + panic_handler.on_panic(move |_reason| { e.notify_all(); }); // Wait for signal let mutex = Mutex::new(()); From 7ff4d145448487685be000f572f790c3bcef5ae9 Mon Sep 17 00:00:00 2001 From: Nikolay Volf Date: Wed, 9 Mar 2016 19:27:44 +0400 Subject: [PATCH 19/22] adding return to if branch --- parity/main.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/parity/main.rs b/parity/main.rs index 1a2847439..9a45980ef 100644 --- a/parity/main.rs +++ b/parity/main.rs @@ -365,6 +365,7 @@ impl Configuration { println!("New account address:"); let new_address = secret_store.new_account(&password).unwrap(); println!("{:?}", new_address); + return; } if command == "list" { println!("Known addresses:"); From 423dd7e0a967a808b769304a245f4316e5a2aa5b Mon Sep 17 00:00:00 2001 From: debris Date: Wed, 9 Mar 2016 18:04:13 +0100 Subject: [PATCH 20/22] updated jsonrpc-core and http-server libs --- Cargo.lock | 10 +++++----- parity/main.rs | 4 ++-- rpc/Cargo.toml | 4 ++-- rpc/src/lib.rs | 22 ++++++++++------------ 4 files changed, 19 insertions(+), 21 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 55ed996ed..15845c806 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -228,8 +228,8 @@ dependencies = [ "ethcore 0.9.99", "ethcore-util 0.9.99", "ethsync 0.9.99", - "jsonrpc-core 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)", - "jsonrpc-http-server 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)", + "jsonrpc-core 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "jsonrpc-http-server 3.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", "rustc-serialize 0.3.18 (registry+https://github.com/rust-lang/crates.io-index)", "serde 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -400,7 +400,7 @@ dependencies = [ [[package]] name = "jsonrpc-core" -version = "1.2.0" +version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "serde 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -411,11 +411,11 @@ dependencies = [ [[package]] name = "jsonrpc-http-server" -version = "2.1.0" +version = "3.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "hyper 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)", - "jsonrpc-core 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)", + "jsonrpc-core 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "unicase 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)", ] diff --git a/parity/main.rs b/parity/main.rs index adc3972e4..f28ef84c3 100644 --- a/parity/main.rs +++ b/parity/main.rs @@ -193,7 +193,7 @@ fn setup_log(init: &Option) { fn setup_rpc_server(client: Arc, sync: Arc, url: &str, cors_domain: &str, apis: Vec<&str>) -> Option> { use rpc::v1::*; - let mut server = rpc::HttpServer::new(1); + let server = rpc::RpcServer::new(); for api in apis.into_iter() { match api { "web3" => server.add_delegate(Web3Client::new().to_delegate()), @@ -207,7 +207,7 @@ fn setup_rpc_server(client: Arc, sync: Arc, url: &str, cors_dom } } } - Some(server.start_async(url, cors_domain)) + Some(server.start_http(url, cors_domain, 1)) } #[cfg(not(feature = "rpc"))] diff --git a/rpc/Cargo.toml b/rpc/Cargo.toml index bfdf8f2d3..f324aba10 100644 --- a/rpc/Cargo.toml +++ b/rpc/Cargo.toml @@ -12,8 +12,8 @@ build = "build.rs" log = "0.3" serde = "0.7.0" serde_json = "0.7.0" -jsonrpc-core = "1.2" -jsonrpc-http-server = "2.1" +jsonrpc-core = "2.0" +jsonrpc-http-server = "3.0" ethcore-util = { path = "../util" } ethcore = { path = "../ethcore" } ethash = { path = "../ethash" } diff --git a/rpc/src/lib.rs b/rpc/src/lib.rs index 97a3a5fe5..731ded8c4 100644 --- a/rpc/src/lib.rs +++ b/rpc/src/lib.rs @@ -37,35 +37,33 @@ use self::jsonrpc_core::{IoHandler, IoDelegate}; pub mod v1; /// Http server. -pub struct HttpServer { - handler: IoHandler, - threads: usize, +pub struct RpcServer { + handler: Arc, } -impl HttpServer { +impl RpcServer { /// Construct new http server object with given number of threads. - pub fn new(threads: usize) -> HttpServer { - HttpServer { - handler: IoHandler::new(), - threads: threads, + pub fn new() -> RpcServer { + RpcServer { + handler: Arc::new(IoHandler::new()), } } /// Add io delegate. - pub fn add_delegate(&mut self, delegate: IoDelegate) where D: Send + Sync + 'static { + pub fn add_delegate(&self, delegate: IoDelegate) where D: Send + Sync + 'static { self.handler.add_delegate(delegate); } /// Start server asynchronously in new thread and returns panic handler. - pub fn start_async(self, addr: &str, cors_domain: &str) -> Arc { + pub fn start_http(&self, addr: &str, cors_domain: &str, threads: usize) -> Arc { let addr = addr.to_owned(); let cors_domain = cors_domain.to_owned(); let panic_handler = PanicHandler::new_in_arc(); let ph = panic_handler.clone(); - let server = jsonrpc_http_server::Server::new(self.handler, self.threads); + let server = jsonrpc_http_server::Server::new(self.handler.clone()); thread::Builder::new().name("jsonrpc_http".to_string()).spawn(move || { ph.catch_panic(move || { - server.start(addr.as_ref(), jsonrpc_http_server::AccessControlAllowOrigin::Value(cors_domain)); + server.start(addr.as_ref(), jsonrpc_http_server::AccessControlAllowOrigin::Value(cors_domain), threads); }).unwrap() }).expect("Error while creating jsonrpc http thread"); panic_handler From 8a83e27d6a8f2f298e6b0dc44a60f9190e8e6c2a Mon Sep 17 00:00:00 2001 From: Nikolay Volf Date: Wed, 9 Mar 2016 22:55:41 +0400 Subject: [PATCH 21/22] cfg-test for noop verifier --- ethcore/src/verification/mod.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/ethcore/src/verification/mod.rs b/ethcore/src/verification/mod.rs index 260121989..fe1f406cc 100644 --- a/ethcore/src/verification/mod.rs +++ b/ethcore/src/verification/mod.rs @@ -17,9 +17,11 @@ pub mod verification; pub mod verifier; mod canon_verifier; +#[cfg(test)] mod noop_verifier; pub use self::verification::*; pub use self::verifier::Verifier; pub use self::canon_verifier::CanonVerifier; +#[cfg(test)] pub use self::noop_verifier::NoopVerifier; From accc1db43fc46e3bd2ab425c778dcfaa843bdec8 Mon Sep 17 00:00:00 2001 From: Nikolay Volf Date: Wed, 9 Mar 2016 23:39:36 +0400 Subject: [PATCH 22/22] chaning docopt config a bit --- parity/main.rs | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/parity/main.rs b/parity/main.rs index 9a45980ef..92400728d 100644 --- a/parity/main.rs +++ b/parity/main.rs @@ -71,7 +71,7 @@ Parity. Ethereum Client. Usage: parity daemon [options] [ --no-bootstrap | ... ] - parity account + parity account (new | list) parity [options] [ --no-bootstrap | ... ] Protocol Options: @@ -129,9 +129,10 @@ Miscellaneous Options: struct Args { cmd_daemon: bool, cmd_account: bool, + cmd_new: bool, + cmd_list: bool, arg_pid_file: String, arg_enode: Vec, - arg_command: String, flag_chain: String, flag_testnet: bool, flag_datadir: String, @@ -342,17 +343,17 @@ impl Configuration { .unwrap_or_else(|e| die!("Couldn't daemonize; {}", e)); } if self.args.cmd_account { - self.execute_account_cli(&self.args.arg_command); + self.execute_account_cli(); return; } self.execute_client(); } - fn execute_account_cli(&self, command: &str) { + fn execute_account_cli(&self) { use util::keys::store::SecretStore; use rpassword::read_password; let mut secret_store = SecretStore::new(); - if command == "new" { + if self.args.cmd_new { println!("Please note that password is NOT RECOVERABLE."); println!("Type password: "); let password = read_password().unwrap(); @@ -367,7 +368,7 @@ impl Configuration { println!("{:?}", new_address); return; } - if command == "list" { + if self.args.cmd_list { println!("Known addresses:"); for &(addr, _) in secret_store.accounts().unwrap().iter() { println!("{:?}", addr);