Snapshot fixes and optimizations (#2863)

This commit is contained in:
Arkadiy Paronyan 2016-10-25 18:40:01 +02:00 committed by GitHub
parent 2d2e9c4d6e
commit 135d5d0e4c
8 changed files with 84 additions and 52 deletions

View File

@ -38,6 +38,7 @@ use util::kvdb::Database;
use util::trie::{TrieDB, TrieDBMut, Trie, TrieMut}; use util::trie::{TrieDB, TrieDBMut, Trie, TrieMut};
use util::sha3::SHA3_NULL_RLP; use util::sha3::SHA3_NULL_RLP;
use rlp::{RlpStream, Stream, UntrustedRlp, View}; use rlp::{RlpStream, Stream, UntrustedRlp, View};
use bloom_journal::Bloom;
use self::account::Account; use self::account::Account;
use self::block::AbridgedBlock; use self::block::AbridgedBlock;
@ -390,6 +391,7 @@ pub struct StateRebuilder {
state_root: H256, state_root: H256,
code_map: HashMap<H256, Bytes>, // maps code hashes to code itself. code_map: HashMap<H256, Bytes>, // maps code hashes to code itself.
missing_code: HashMap<H256, Vec<H256>>, // maps code hashes to lists of accounts missing that code. missing_code: HashMap<H256, Vec<H256>>, // maps code hashes to lists of accounts missing that code.
bloom: Bloom,
} }
impl StateRebuilder { impl StateRebuilder {
@ -400,6 +402,7 @@ impl StateRebuilder {
state_root: SHA3_NULL_RLP, state_root: SHA3_NULL_RLP,
code_map: HashMap::new(), code_map: HashMap::new(),
missing_code: HashMap::new(), missing_code: HashMap::new(),
bloom: StateDB::load_bloom(&*db),
} }
} }
@ -462,9 +465,6 @@ impl StateRebuilder {
let backing = self.db.backing().clone(); let backing = self.db.backing().clone();
// bloom has to be updated
let mut bloom = StateDB::load_bloom(&backing);
// batch trie writes // batch trie writes
{ {
let mut account_trie = if self.state_root != SHA3_NULL_RLP { let mut account_trie = if self.state_root != SHA3_NULL_RLP {
@ -475,17 +475,17 @@ impl StateRebuilder {
for (hash, thin_rlp) in pairs { for (hash, thin_rlp) in pairs {
if &thin_rlp[..] != &empty_rlp[..] { if &thin_rlp[..] != &empty_rlp[..] {
bloom.set(&*hash); self.bloom.set(&*hash);
} }
try!(account_trie.insert(&hash, &thin_rlp)); try!(account_trie.insert(&hash, &thin_rlp));
} }
} }
let bloom_journal = bloom.drain_journal(); let bloom_journal = self.bloom.drain_journal();
let mut batch = backing.transaction(); let mut batch = backing.transaction();
try!(StateDB::commit_bloom(&mut batch, bloom_journal)); try!(StateDB::commit_bloom(&mut batch, bloom_journal));
try!(self.db.inject(&mut batch)); try!(self.db.inject(&mut batch));
try!(backing.write(batch).map_err(::util::UtilError::SimpleString)); backing.write_buffered(batch);
trace!(target: "snapshot", "current state root: {:?}", self.state_root); trace!(target: "snapshot", "current state root: {:?}", self.state_root);
Ok(()) Ok(())
} }
@ -628,7 +628,7 @@ impl BlockRebuilder {
} else { } else {
self.chain.insert_unordered_block(&mut batch, &block_bytes, receipts, None, is_best, false); self.chain.insert_unordered_block(&mut batch, &block_bytes, receipts, None, is_best, false);
} }
self.db.write(batch).expect("Error writing to the DB"); self.db.write_buffered(batch);
self.chain.commit(); self.chain.commit();
parent_hash = BlockView::new(&block_bytes).hash(); parent_hash = BlockView::new(&block_bytes).hash();

View File

@ -74,6 +74,7 @@ struct Restoration {
snappy_buffer: Bytes, snappy_buffer: Bytes,
final_state_root: H256, final_state_root: H256,
guard: Guard, guard: Guard,
db: Arc<Database>,
} }
struct RestorationParams<'a> { struct RestorationParams<'a> {
@ -105,12 +106,13 @@ impl Restoration {
manifest: manifest, manifest: manifest,
state_chunks_left: state_chunks, state_chunks_left: state_chunks,
block_chunks_left: block_chunks, block_chunks_left: block_chunks,
state: StateRebuilder::new(raw_db, params.pruning), state: StateRebuilder::new(raw_db.clone(), params.pruning),
blocks: blocks, blocks: blocks,
writer: params.writer, writer: params.writer,
snappy_buffer: Vec::new(), snappy_buffer: Vec::new(),
final_state_root: root, final_state_root: root,
guard: params.guard, guard: params.guard,
db: raw_db,
}) })
} }
@ -467,24 +469,25 @@ impl Service {
/// Feed a chunk of either kind. no-op if no restoration or status is wrong. /// Feed a chunk of either kind. no-op if no restoration or status is wrong.
fn feed_chunk(&self, hash: H256, chunk: &[u8], is_state: bool) -> Result<(), Error> { fn feed_chunk(&self, hash: H256, chunk: &[u8], is_state: bool) -> Result<(), Error> {
// TODO: be able to process block chunks and state chunks at same time? // TODO: be able to process block chunks and state chunks at same time?
let (result, db) = {
let mut restoration = self.restoration.lock(); let mut restoration = self.restoration.lock();
match self.status() { match self.status() {
RestorationStatus::Inactive | RestorationStatus::Failed => Ok(()), RestorationStatus::Inactive | RestorationStatus::Failed => return Ok(()),
RestorationStatus::Ongoing { .. } => { RestorationStatus::Ongoing { .. } => {
let res = { let (res, db) = {
let rest = match *restoration { let rest = match *restoration {
Some(ref mut r) => r, Some(ref mut r) => r,
None => return Ok(()), None => return Ok(()),
}; };
match is_state { (match is_state {
true => rest.feed_state(hash, chunk), true => rest.feed_state(hash, chunk),
false => rest.feed_blocks(hash, chunk, &*self.engine), false => rest.feed_blocks(hash, chunk, &*self.engine),
}.map(|_| rest.is_done()) }.map(|_| rest.is_done()), rest.db.clone())
}; };
match res { let res = match res {
Ok(is_done) => { Ok(is_done) => {
match is_state { match is_state {
true => self.state_chunks.fetch_add(1, Ordering::SeqCst), true => self.state_chunks.fetch_add(1, Ordering::SeqCst),
@ -492,14 +495,20 @@ impl Service {
}; };
match is_done { match is_done {
true => self.finalize_restoration(&mut *restoration), true => {
try!(db.flush().map_err(::util::UtilError::SimpleString));
self.finalize_restoration(&mut *restoration)
},
false => Ok(()) false => Ok(())
} }
} }
other => other.map(drop), other => other.map(drop),
};
(res, db)
} }
} }
} };
result.and_then(|_| db.flush().map_err(|e| ::util::UtilError::SimpleString(e).into()))
} }
/// Feed a state chunk to be processed synchronously. /// Feed a state chunk to be processed synchronously.
@ -549,8 +558,9 @@ impl SnapshotService for Service {
} }
fn begin_restore(&self, manifest: ManifestData) { fn begin_restore(&self, manifest: ManifestData) {
self.io_channel.send(ClientIoMessage::BeginRestoration(manifest)) if let Err(e) = self.io_channel.send(ClientIoMessage::BeginRestoration(manifest)) {
.expect("snapshot service and io service are kept alive by client service; qed"); trace!("Error sending snapshot service message: {:?}", e);
}
} }
fn abort_restore(&self) { fn abort_restore(&self) {
@ -559,13 +569,15 @@ impl SnapshotService for Service {
} }
fn restore_state_chunk(&self, hash: H256, chunk: Bytes) { fn restore_state_chunk(&self, hash: H256, chunk: Bytes) {
self.io_channel.send(ClientIoMessage::FeedStateChunk(hash, chunk)) if let Err(e) = self.io_channel.send(ClientIoMessage::FeedStateChunk(hash, chunk)) {
.expect("snapshot service and io service are kept alive by client service; qed"); trace!("Error sending snapshot service message: {:?}", e);
}
} }
fn restore_block_chunk(&self, hash: H256, chunk: Bytes) { fn restore_block_chunk(&self, hash: H256, chunk: Bytes) {
self.io_channel.send(ClientIoMessage::FeedBlockChunk(hash, chunk)) if let Err(e) = self.io_channel.send(ClientIoMessage::FeedBlockChunk(hash, chunk)) {
.expect("snapshot service and io service are kept alive by client service; qed"); trace!("Error sending snapshot service message: {:?}", e);
}
} }
} }

View File

@ -1253,7 +1253,12 @@ impl ChainSync {
} }
peer.asking = asking; peer.asking = asking;
peer.ask_time = time::precise_time_s(); peer.ask_time = time::precise_time_s();
if let Err(e) = sync.send(peer_id, packet_id, packet) { let result = if packet_id >= ETH_PACKET_COUNT {
sync.send_protocol(WARP_SYNC_PROTOCOL_ID, peer_id, packet_id, packet)
} else {
sync.send(peer_id, packet_id, packet)
};
if let Err(e) = result {
debug!(target:"sync", "Error sending request: {:?}", e); debug!(target:"sync", "Error sending request: {:?}", e);
sync.disable_peer(peer_id); sync.disable_peer(peer_id);
} }
@ -1270,8 +1275,9 @@ impl ChainSync {
/// Called when peer sends us new transactions /// Called when peer sends us new transactions
fn on_peer_transactions(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { fn on_peer_transactions(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> {
// accepting transactions once only fully synced // Accept transactions only when fully synced
if !io.is_chain_queue_empty() { if !io.is_chain_queue_empty() || self.state != SyncState::Idle || self.state != SyncState::NewBlocks {
trace!(target: "sync", "{} Ignoring transactions while syncing", peer_id);
return Ok(()); return Ok(());
} }
if !self.peers.get(&peer_id).map_or(false, |p| p.can_sync()) { if !self.peers.get(&peer_id).map_or(false, |p| p.can_sync()) {
@ -1570,7 +1576,7 @@ impl ChainSync {
SNAPSHOT_MANIFEST_PACKET => self.on_snapshot_manifest(io, peer, &rlp), SNAPSHOT_MANIFEST_PACKET => self.on_snapshot_manifest(io, peer, &rlp),
SNAPSHOT_DATA_PACKET => self.on_snapshot_data(io, peer, &rlp), SNAPSHOT_DATA_PACKET => self.on_snapshot_data(io, peer, &rlp),
_ => { _ => {
debug!(target: "sync", "Unknown packet {}", packet_id); debug!(target: "sync", "{}: Unknown packet {}", peer, packet_id);
Ok(()) Ok(())
} }
}; };

View File

@ -34,6 +34,8 @@ pub trait SyncIo {
fn respond(&mut self, packet_id: PacketId, data: Vec<u8>) -> Result<(), NetworkError>; fn respond(&mut self, packet_id: PacketId, data: Vec<u8>) -> Result<(), NetworkError>;
/// Send a packet to a peer. /// Send a packet to a peer.
fn send(&mut self, peer_id: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), NetworkError>; fn send(&mut self, peer_id: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), NetworkError>;
/// Send a packet to a peer using specified protocol.
fn send_protocol(&mut self, protocol: ProtocolId, peer_id: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), NetworkError>;
/// Get the blockchain /// Get the blockchain
fn chain(&self) -> &BlockChainClient; fn chain(&self) -> &BlockChainClient;
/// Get the snapshot service. /// Get the snapshot service.
@ -98,6 +100,10 @@ impl<'s, 'h> SyncIo for NetSyncIo<'s, 'h> {
self.network.send(peer_id, packet_id, data) self.network.send(peer_id, packet_id, data)
} }
fn send_protocol(&mut self, protocol: ProtocolId, peer_id: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), NetworkError>{
self.network.send_protocol(protocol, peer_id, packet_id, data)
}
fn chain(&self) -> &BlockChainClient { fn chain(&self) -> &BlockChainClient {
self.chain self.chain
} }

View File

@ -78,6 +78,10 @@ impl<'p> SyncIo for TestIo<'p> {
Ok(()) Ok(())
} }
fn send_protocol(&mut self, _protocol: ProtocolId, peer_id: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), NetworkError> {
self.send(peer_id, packet_id, data)
}
fn chain(&self) -> &BlockChainClient { fn chain(&self) -> &BlockChainClient {
self.chain self.chain
} }

View File

@ -241,9 +241,14 @@ impl<'s> NetworkContext<'s> {
/// Send a packet over the network to another peer. /// Send a packet over the network to another peer.
pub fn send(&self, peer: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), NetworkError> { pub fn send(&self, peer: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), NetworkError> {
self.send_protocol(self.protocol, peer, packet_id, data)
}
/// Send a packet over the network to another peer using specified protocol.
pub fn send_protocol(&self, protocol: ProtocolId, peer: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), NetworkError> {
let session = self.resolve_session(peer); let session = self.resolve_session(peer);
if let Some(session) = session { if let Some(session) = session {
try!(session.lock().send_packet(self.io, self.protocol, packet_id as u8, &data)); try!(session.lock().send_packet(self.io, protocol, packet_id as u8, &data));
} else { } else {
trace!(target: "network", "Send: Peer no longer exist") trace!(target: "network", "Send: Peer no longer exist")
} }
@ -911,7 +916,7 @@ impl Host {
} }
} }
fn update_nodes(&self, io: &IoContext<NetworkIoMessage>, node_changes: TableUpdates) { fn update_nodes(&self, _io: &IoContext<NetworkIoMessage>, node_changes: TableUpdates) {
let mut to_remove: Vec<PeerId> = Vec::new(); let mut to_remove: Vec<PeerId> = Vec::new();
{ {
let sessions = self.sessions.write(); let sessions = self.sessions.write();
@ -926,7 +931,6 @@ impl Host {
} }
for i in to_remove { for i in to_remove {
trace!(target: "network", "Removed from node table: {}", i); trace!(target: "network", "Removed from node table: {}", i);
self.kill_connection(i, io, false);
} }
self.nodes.write().update(node_changes, &*self.reserved_nodes.read()); self.nodes.write().update(node_changes, &*self.reserved_nodes.read());
} }

View File

@ -395,7 +395,7 @@ impl Session {
PACKET_PEERS => Ok(SessionData::None), PACKET_PEERS => Ok(SessionData::None),
PACKET_USER ... PACKET_LAST => { PACKET_USER ... PACKET_LAST => {
let mut i = 0usize; let mut i = 0usize;
while packet_id < self.info.capabilities[i].id_offset { while packet_id > self.info.capabilities[i].id_offset + self.info.capabilities[i].packet_count {
i += 1; i += 1;
if i == self.info.capabilities.len() { if i == self.info.capabilities.len() {
debug!(target: "network", "Unknown packet: {:?}", packet_id); debug!(target: "network", "Unknown packet: {:?}", packet_id);
@ -469,7 +469,7 @@ impl Session {
offset += caps[i].packet_count; offset += caps[i].packet_count;
i += 1; i += 1;
} }
trace!(target: "network", "Hello: {} v{} {} {:?}", client_version, protocol, id, caps); debug!(target: "network", "Hello: {} v{} {} {:?}", client_version, protocol, id, caps);
self.info.protocol_version = protocol; self.info.protocol_version = protocol;
self.info.client_version = client_version; self.info.client_version = client_version;
self.info.capabilities = caps; self.info.capabilities = caps;

View File

@ -348,13 +348,13 @@ impl JournalDB for OverlayRecentDB {
match rc { match rc {
0 => {} 0 => {}
1 => { 1 => {
if try!(self.backing.get(self.column, &key)).is_some() { if cfg!(debug_assertions) && try!(self.backing.get(self.column, &key)).is_some() {
return Err(BaseDataError::AlreadyExists(key).into()); return Err(BaseDataError::AlreadyExists(key).into());
} }
batch.put(self.column, &key, &value) batch.put(self.column, &key, &value)
} }
-1 => { -1 => {
if try!(self.backing.get(self.column, &key)).is_none() { if cfg!(debug_assertions) && try!(self.backing.get(self.column, &key)).is_none() {
return Err(BaseDataError::NegativelyReferencedHash(key).into()); return Err(BaseDataError::NegativelyReferencedHash(key).into());
} }
batch.delete(self.column, &key) batch.delete(self.column, &key)