diff --git a/ethcore/src/snapshot/mod.rs b/ethcore/src/snapshot/mod.rs index 4523f4f16..8b032c0e1 100644 --- a/ethcore/src/snapshot/mod.rs +++ b/ethcore/src/snapshot/mod.rs @@ -38,6 +38,7 @@ use util::kvdb::Database; use util::trie::{TrieDB, TrieDBMut, Trie, TrieMut}; use util::sha3::SHA3_NULL_RLP; use rlp::{RlpStream, Stream, UntrustedRlp, View}; +use bloom_journal::Bloom; use self::account::Account; use self::block::AbridgedBlock; @@ -390,6 +391,7 @@ pub struct StateRebuilder { state_root: H256, code_map: HashMap, // maps code hashes to code itself. missing_code: HashMap>, // maps code hashes to lists of accounts missing that code. + bloom: Bloom, } impl StateRebuilder { @@ -400,6 +402,7 @@ impl StateRebuilder { state_root: SHA3_NULL_RLP, code_map: HashMap::new(), missing_code: HashMap::new(), + bloom: StateDB::load_bloom(&*db), } } @@ -462,9 +465,6 @@ impl StateRebuilder { let backing = self.db.backing().clone(); - // bloom has to be updated - let mut bloom = StateDB::load_bloom(&backing); - // batch trie writes { let mut account_trie = if self.state_root != SHA3_NULL_RLP { @@ -475,17 +475,17 @@ impl StateRebuilder { for (hash, thin_rlp) in pairs { if &thin_rlp[..] != &empty_rlp[..] { - bloom.set(&*hash); + self.bloom.set(&*hash); } try!(account_trie.insert(&hash, &thin_rlp)); } } - let bloom_journal = bloom.drain_journal(); + let bloom_journal = self.bloom.drain_journal(); let mut batch = backing.transaction(); try!(StateDB::commit_bloom(&mut batch, bloom_journal)); try!(self.db.inject(&mut batch)); - try!(backing.write(batch).map_err(::util::UtilError::SimpleString)); + backing.write_buffered(batch); trace!(target: "snapshot", "current state root: {:?}", self.state_root); Ok(()) } @@ -628,7 +628,7 @@ impl BlockRebuilder { } else { self.chain.insert_unordered_block(&mut batch, &block_bytes, receipts, None, is_best, false); } - self.db.write(batch).expect("Error writing to the DB"); + self.db.write_buffered(batch); self.chain.commit(); parent_hash = BlockView::new(&block_bytes).hash(); diff --git a/ethcore/src/snapshot/service.rs b/ethcore/src/snapshot/service.rs index 57782e6cd..b3aaa017e 100644 --- a/ethcore/src/snapshot/service.rs +++ b/ethcore/src/snapshot/service.rs @@ -74,6 +74,7 @@ struct Restoration { snappy_buffer: Bytes, final_state_root: H256, guard: Guard, + db: Arc, } struct RestorationParams<'a> { @@ -105,12 +106,13 @@ impl Restoration { manifest: manifest, state_chunks_left: state_chunks, block_chunks_left: block_chunks, - state: StateRebuilder::new(raw_db, params.pruning), + state: StateRebuilder::new(raw_db.clone(), params.pruning), blocks: blocks, writer: params.writer, snappy_buffer: Vec::new(), final_state_root: root, guard: params.guard, + db: raw_db, }) } @@ -467,39 +469,46 @@ impl Service { /// Feed a chunk of either kind. no-op if no restoration or status is wrong. fn feed_chunk(&self, hash: H256, chunk: &[u8], is_state: bool) -> Result<(), Error> { // TODO: be able to process block chunks and state chunks at same time? - let mut restoration = self.restoration.lock(); + let (result, db) = { + let mut restoration = self.restoration.lock(); - match self.status() { - RestorationStatus::Inactive | RestorationStatus::Failed => Ok(()), - RestorationStatus::Ongoing { .. } => { - let res = { - let rest = match *restoration { - Some(ref mut r) => r, - None => return Ok(()), - }; - - match is_state { - true => rest.feed_state(hash, chunk), - false => rest.feed_blocks(hash, chunk, &*self.engine), - }.map(|_| rest.is_done()) - }; - - match res { - Ok(is_done) => { - match is_state { - true => self.state_chunks.fetch_add(1, Ordering::SeqCst), - false => self.block_chunks.fetch_add(1, Ordering::SeqCst), + match self.status() { + RestorationStatus::Inactive | RestorationStatus::Failed => return Ok(()), + RestorationStatus::Ongoing { .. } => { + let (res, db) = { + let rest = match *restoration { + Some(ref mut r) => r, + None => return Ok(()), }; - match is_done { - true => self.finalize_restoration(&mut *restoration), - false => Ok(()) + (match is_state { + true => rest.feed_state(hash, chunk), + false => rest.feed_blocks(hash, chunk, &*self.engine), + }.map(|_| rest.is_done()), rest.db.clone()) + }; + + let res = match res { + Ok(is_done) => { + match is_state { + true => self.state_chunks.fetch_add(1, Ordering::SeqCst), + false => self.block_chunks.fetch_add(1, Ordering::SeqCst), + }; + + match is_done { + true => { + try!(db.flush().map_err(::util::UtilError::SimpleString)); + self.finalize_restoration(&mut *restoration) + }, + false => Ok(()) + } } - } - other => other.map(drop), + other => other.map(drop), + }; + (res, db) } } - } + }; + result.and_then(|_| db.flush().map_err(|e| ::util::UtilError::SimpleString(e).into())) } /// Feed a state chunk to be processed synchronously. @@ -549,8 +558,9 @@ impl SnapshotService for Service { } fn begin_restore(&self, manifest: ManifestData) { - self.io_channel.send(ClientIoMessage::BeginRestoration(manifest)) - .expect("snapshot service and io service are kept alive by client service; qed"); + if let Err(e) = self.io_channel.send(ClientIoMessage::BeginRestoration(manifest)) { + trace!("Error sending snapshot service message: {:?}", e); + } } fn abort_restore(&self) { @@ -559,13 +569,15 @@ impl SnapshotService for Service { } fn restore_state_chunk(&self, hash: H256, chunk: Bytes) { - self.io_channel.send(ClientIoMessage::FeedStateChunk(hash, chunk)) - .expect("snapshot service and io service are kept alive by client service; qed"); + if let Err(e) = self.io_channel.send(ClientIoMessage::FeedStateChunk(hash, chunk)) { + trace!("Error sending snapshot service message: {:?}", e); + } } fn restore_block_chunk(&self, hash: H256, chunk: Bytes) { - self.io_channel.send(ClientIoMessage::FeedBlockChunk(hash, chunk)) - .expect("snapshot service and io service are kept alive by client service; qed"); + if let Err(e) = self.io_channel.send(ClientIoMessage::FeedBlockChunk(hash, chunk)) { + trace!("Error sending snapshot service message: {:?}", e); + } } } diff --git a/sync/src/chain.rs b/sync/src/chain.rs index 850dff228..916e7424e 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -1253,7 +1253,12 @@ impl ChainSync { } peer.asking = asking; peer.ask_time = time::precise_time_s(); - if let Err(e) = sync.send(peer_id, packet_id, packet) { + let result = if packet_id >= ETH_PACKET_COUNT { + sync.send_protocol(WARP_SYNC_PROTOCOL_ID, peer_id, packet_id, packet) + } else { + sync.send(peer_id, packet_id, packet) + }; + if let Err(e) = result { debug!(target:"sync", "Error sending request: {:?}", e); sync.disable_peer(peer_id); } @@ -1270,8 +1275,9 @@ impl ChainSync { /// Called when peer sends us new transactions fn on_peer_transactions(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { - // accepting transactions once only fully synced - if !io.is_chain_queue_empty() { + // Accept transactions only when fully synced + if !io.is_chain_queue_empty() || self.state != SyncState::Idle || self.state != SyncState::NewBlocks { + trace!(target: "sync", "{} Ignoring transactions while syncing", peer_id); return Ok(()); } if !self.peers.get(&peer_id).map_or(false, |p| p.can_sync()) { @@ -1570,7 +1576,7 @@ impl ChainSync { SNAPSHOT_MANIFEST_PACKET => self.on_snapshot_manifest(io, peer, &rlp), SNAPSHOT_DATA_PACKET => self.on_snapshot_data(io, peer, &rlp), _ => { - debug!(target: "sync", "Unknown packet {}", packet_id); + debug!(target: "sync", "{}: Unknown packet {}", peer, packet_id); Ok(()) } }; diff --git a/sync/src/sync_io.rs b/sync/src/sync_io.rs index 25d235c60..c78074aed 100644 --- a/sync/src/sync_io.rs +++ b/sync/src/sync_io.rs @@ -34,6 +34,8 @@ pub trait SyncIo { fn respond(&mut self, packet_id: PacketId, data: Vec) -> Result<(), NetworkError>; /// Send a packet to a peer. fn send(&mut self, peer_id: PeerId, packet_id: PacketId, data: Vec) -> Result<(), NetworkError>; + /// Send a packet to a peer using specified protocol. + fn send_protocol(&mut self, protocol: ProtocolId, peer_id: PeerId, packet_id: PacketId, data: Vec) -> Result<(), NetworkError>; /// Get the blockchain fn chain(&self) -> &BlockChainClient; /// Get the snapshot service. @@ -98,6 +100,10 @@ impl<'s, 'h> SyncIo for NetSyncIo<'s, 'h> { self.network.send(peer_id, packet_id, data) } + fn send_protocol(&mut self, protocol: ProtocolId, peer_id: PeerId, packet_id: PacketId, data: Vec) -> Result<(), NetworkError>{ + self.network.send_protocol(protocol, peer_id, packet_id, data) + } + fn chain(&self) -> &BlockChainClient { self.chain } diff --git a/sync/src/tests/helpers.rs b/sync/src/tests/helpers.rs index 801db234d..202ab4f17 100644 --- a/sync/src/tests/helpers.rs +++ b/sync/src/tests/helpers.rs @@ -78,6 +78,10 @@ impl<'p> SyncIo for TestIo<'p> { Ok(()) } + fn send_protocol(&mut self, _protocol: ProtocolId, peer_id: PeerId, packet_id: PacketId, data: Vec) -> Result<(), NetworkError> { + self.send(peer_id, packet_id, data) + } + fn chain(&self) -> &BlockChainClient { self.chain } diff --git a/util/network/src/host.rs b/util/network/src/host.rs index 866534397..177a44843 100644 --- a/util/network/src/host.rs +++ b/util/network/src/host.rs @@ -241,9 +241,14 @@ impl<'s> NetworkContext<'s> { /// Send a packet over the network to another peer. pub fn send(&self, peer: PeerId, packet_id: PacketId, data: Vec) -> Result<(), NetworkError> { + self.send_protocol(self.protocol, peer, packet_id, data) + } + + /// Send a packet over the network to another peer using specified protocol. + pub fn send_protocol(&self, protocol: ProtocolId, peer: PeerId, packet_id: PacketId, data: Vec) -> Result<(), NetworkError> { let session = self.resolve_session(peer); if let Some(session) = session { - try!(session.lock().send_packet(self.io, self.protocol, packet_id as u8, &data)); + try!(session.lock().send_packet(self.io, protocol, packet_id as u8, &data)); } else { trace!(target: "network", "Send: Peer no longer exist") } @@ -911,7 +916,7 @@ impl Host { } } - fn update_nodes(&self, io: &IoContext, node_changes: TableUpdates) { + fn update_nodes(&self, _io: &IoContext, node_changes: TableUpdates) { let mut to_remove: Vec = Vec::new(); { let sessions = self.sessions.write(); @@ -926,7 +931,6 @@ impl Host { } for i in to_remove { trace!(target: "network", "Removed from node table: {}", i); - self.kill_connection(i, io, false); } self.nodes.write().update(node_changes, &*self.reserved_nodes.read()); } diff --git a/util/network/src/session.rs b/util/network/src/session.rs index 1791a441d..8d5578e83 100644 --- a/util/network/src/session.rs +++ b/util/network/src/session.rs @@ -395,7 +395,7 @@ impl Session { PACKET_PEERS => Ok(SessionData::None), PACKET_USER ... PACKET_LAST => { let mut i = 0usize; - while packet_id < self.info.capabilities[i].id_offset { + while packet_id > self.info.capabilities[i].id_offset + self.info.capabilities[i].packet_count { i += 1; if i == self.info.capabilities.len() { debug!(target: "network", "Unknown packet: {:?}", packet_id); @@ -469,7 +469,7 @@ impl Session { offset += caps[i].packet_count; i += 1; } - trace!(target: "network", "Hello: {} v{} {} {:?}", client_version, protocol, id, caps); + debug!(target: "network", "Hello: {} v{} {} {:?}", client_version, protocol, id, caps); self.info.protocol_version = protocol; self.info.client_version = client_version; self.info.capabilities = caps; diff --git a/util/src/journaldb/overlayrecentdb.rs b/util/src/journaldb/overlayrecentdb.rs index bf01567fb..e58fe2522 100644 --- a/util/src/journaldb/overlayrecentdb.rs +++ b/util/src/journaldb/overlayrecentdb.rs @@ -348,13 +348,13 @@ impl JournalDB for OverlayRecentDB { match rc { 0 => {} 1 => { - if try!(self.backing.get(self.column, &key)).is_some() { + if cfg!(debug_assertions) && try!(self.backing.get(self.column, &key)).is_some() { return Err(BaseDataError::AlreadyExists(key).into()); } batch.put(self.column, &key, &value) } -1 => { - if try!(self.backing.get(self.column, &key)).is_none() { + if cfg!(debug_assertions) && try!(self.backing.get(self.column, &key)).is_none() { return Err(BaseDataError::NegativelyReferencedHash(key).into()); } batch.delete(self.column, &key)