From 487dfb02083a92abccd05e6f4e97eb1ea956e60e Mon Sep 17 00:00:00 2001 From: Arkadiy Paronyan Date: Tue, 18 Oct 2016 18:16:00 +0200 Subject: [PATCH] Snapshot sync part 2 (#2098) * Split block downloader into a module * Snapshot sync progress * Warp sync CLI option * Increased snapshot chunk and ping timeouts * Fixed an issue with delayed writes * Updated bootnodes * Don't run pending IO tasks on shutdown * Optional first_block; removed insert_snapshot_block * Fixing expect calls * Fixed stalled sync * style and docs * Update block_sync.rs [ci:skip] --- ethcore/res/ethereum/frontier.json | 3 +- ethcore/src/blockchain/best_block.rs | 9 + ethcore/src/blockchain/blockchain.rs | 152 +++-- ethcore/src/client/client.rs | 71 +- ethcore/src/client/test_client.rs | 8 + ethcore/src/client/traits.rs | 3 + ethcore/src/snapshot/mod.rs | 10 +- ethcore/src/snapshot/service.rs | 11 +- ethcore/src/snapshot/tests/blocks.rs | 2 +- ethcore/src/types/blockchain_info.rs | 10 +- ethcore/src/types/restoration_status.rs | 4 + ethcore/src/verification/verification.rs | 6 +- parity/blockchain.rs | 2 +- parity/cli/config.full.toml | 1 + parity/cli/config.toml | 1 + parity/cli/mod.rs | 5 + parity/cli/usage.txt | 1 + parity/configuration.rs | 3 + parity/informant.rs | 54 +- parity/io_handler.rs | 2 +- parity/run.rs | 12 +- parity/snapshot.rs | 2 +- rpc/src/v1/impls/eth.rs | 2 +- rpc/src/v1/tests/helpers/sync_provider.rs | 1 + sync/src/api.rs | 33 +- sync/src/block_sync.rs | 477 ++++++++++++++ sync/src/blocks.rs | 188 +++++- sync/src/chain.rs | 750 +++++++++++----------- sync/src/lib.rs | 1 + sync/src/sync_io.rs | 17 +- sync/src/tests/helpers.rs | 7 + sync/src/tests/snapshot.rs | 6 +- util/io/src/worker.rs | 10 +- util/network/src/connection.rs | 7 +- util/network/src/session.rs | 2 +- util/src/standard.rs | 2 +- 36 files changed, 1347 insertions(+), 528 deletions(-) create mode 100644 sync/src/block_sync.rs diff --git a/ethcore/res/ethereum/frontier.json b/ethcore/res/ethereum/frontier.json index 8e92e49b3..354067a13 100644 --- a/ethcore/res/ethereum/frontier.json +++ b/ethcore/res/ethereum/frontier.json @@ -164,7 +164,8 @@ "enode://a979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c@52.16.188.185:30303", "enode://de471bccee3d042261d52e9bff31458daecc406142b401d4cd848f677479f73104b9fdeb090af9583d3391b7f10cb2ba9e26865dd5fca4fcdc0fb1e3b723c786@54.94.239.50:30303", "enode://1118980bf48b0a3640bdba04e0fe78b1add18e1cd99bf22d53daac1fd9972ad650df52176e7c7d89d1114cfef2bc23a2959aa54998a46afcf7d91809f0855082@52.74.57.123:30303", - "enode://248f12bc8b18d5289358085520ac78cd8076485211e6d96ab0bc93d6cd25442db0ce3a937dc404f64f207b0b9aed50e25e98ce32af5ac7cb321ff285b97de485@zero.parity.io:30303" + "enode://4cd540b2c3292e17cff39922e864094bf8b0741fcc8c5dcea14957e389d7944c70278d872902e3d0345927f621547efa659013c400865485ab4bfa0c6596936f@zero.parity.io:30303", + "enode://cc92c4c40d612a10c877ca023ef0496c843fbc92b6c6c0d55ce0b863d51d821c4bd70daebb54324a6086374e6dc05708fed39862b275f169cb678e655da9d07d@136.243.154.246:30303" ], "accounts": { "0000000000000000000000000000000000000001": { "builtin": { "name": "ecrecover", "pricing": { "linear": { "base": 3000, "word": 0 } } } }, diff --git a/ethcore/src/blockchain/best_block.rs b/ethcore/src/blockchain/best_block.rs index 0cea6190c..d5a6c06b2 100644 --- a/ethcore/src/blockchain/best_block.rs +++ b/ethcore/src/blockchain/best_block.rs @@ -29,3 +29,12 @@ pub struct BestBlock { /// Best block uncompressed bytes pub block: Bytes, } + +/// Best ancient block info. If the blockchain has a gap this keeps track of where it starts. +#[derive(Default)] +pub struct BestAncientBlock { + /// Best block hash. + pub hash: H256, + /// Best block number. + pub number: BlockNumber, +} diff --git a/ethcore/src/blockchain/blockchain.rs b/ethcore/src/blockchain/blockchain.rs index 51be65043..67d9cd894 100644 --- a/ethcore/src/blockchain/blockchain.rs +++ b/ethcore/src/blockchain/blockchain.rs @@ -27,7 +27,8 @@ use log_entry::{LogEntry, LocalizedLogEntry}; use receipt::Receipt; use blooms::{Bloom, BloomGroup}; use blockchain::block_info::{BlockInfo, BlockLocation, BranchBecomingCanonChainData}; -use blockchain::best_block::BestBlock; +use blockchain::best_block::{BestBlock, BestAncientBlock}; +use types::blockchain_info::BlockChainInfo; use types::tree_route::TreeRoute; use blockchain::update::ExtrasUpdate; use blockchain::{CacheSize, ImportRoute, Config}; @@ -43,16 +44,24 @@ pub trait BlockProvider { /// (though not necessarily a part of the canon chain). fn is_known(&self, hash: &H256) -> bool; - /// Get the first block which this chain holds. + /// Get the first block of the best part of the chain. + /// Return `None` if there is no gap and the first block is the genesis. /// Any queries of blocks which precede this one are not guaranteed to /// succeed. - fn first_block(&self) -> H256; + fn first_block(&self) -> Option; /// Get the number of the first block. - fn first_block_number(&self) -> BlockNumber { - self.block_number(&self.first_block()).expect("First block always stored; qed") + fn first_block_number(&self) -> Option { + self.first_block().map(|b| self.block_number(&b).expect("First block is always set to an existing block or `None`. Existing block always has a number; qed")) } + /// Get the best block of an first block sequence if there is a gap. + fn best_ancient_block(&self) -> Option; + + /// Get the number of the first block. + fn best_ancient_number(&self) -> Option { + self.best_ancient_block().map(|h| self.block_number(&h).expect("Ancient block is always set to an existing block or `None`. Existing block always has a number; qed")) + } /// Get raw block data fn block(&self, hash: &H256) -> Option; @@ -160,9 +169,14 @@ impl bc::group::BloomGroupDatabase for BlockChain { pub struct BlockChain { // All locks must be captured in the order declared here. blooms_config: bc::Config, - first_block: H256, best_block: RwLock, + // Stores best block of the first uninterrupted sequence of blocks. `None` if there are no gaps. + // Only updated with `insert_unordered_block`. + best_ancient_block: RwLock>, + // Stores the last block of the last sequence of blocks. `None` if there are no gaps. + // This is calculated on start and does not get updated. + first_block: Option, // block cache block_headers: RwLock>, @@ -191,8 +205,16 @@ impl BlockProvider for BlockChain { self.db.exists_with_cache(db::COL_EXTRA, &self.block_details, hash) } - fn first_block(&self) -> H256 { - self.first_block + fn first_block(&self) -> Option { + self.first_block.clone() + } + + fn best_ancient_block(&self) -> Option { + self.best_ancient_block.read().as_ref().map(|b| b.hash.clone()) + } + + fn best_ancient_number(&self) -> Option { + self.best_ancient_block.read().as_ref().map(|b| b.number) } /// Get raw block data @@ -400,8 +422,9 @@ impl BlockChain { levels: LOG_BLOOMS_LEVELS, elements_per_index: LOG_BLOOMS_ELEMENTS_PER_INDEX, }, - first_block: H256::zero(), + first_block: None, best_block: RwLock::new(BestBlock::default()), + best_ancient_block: RwLock::new(None), block_headers: RwLock::new(HashMap::new()), block_bodies: RwLock::new(HashMap::new()), block_details: RwLock::new(HashMap::new()), @@ -443,7 +466,6 @@ impl BlockChain { batch.write(db::COL_EXTRA, &header.number(), &hash); batch.put(db::COL_EXTRA, b"best", &hash); - batch.put(db::COL_EXTRA, b"first", &hash); bc.db.write(batch).expect("Low level database error. Some issue with disk?"); hash } @@ -455,32 +477,45 @@ impl BlockChain { let best_block_total_difficulty = bc.block_details(&best_block_hash).unwrap().total_difficulty; let best_block_rlp = bc.block(&best_block_hash).unwrap(); - let raw_first = bc.db.get(db::COL_EXTRA, b"first").unwrap().map_or(Vec::new(), |v| v.to_vec()); + let raw_first = bc.db.get(db::COL_EXTRA, b"first").unwrap().map(|v| v.to_vec()); + let mut best_ancient = bc.db.get(db::COL_EXTRA, b"ancient").unwrap().map(|h| H256::from_slice(&h)); + let best_ancient_number; + if best_ancient.is_none() && best_block_number > 1 && bc.block_hash(1).is_none() { + best_ancient = Some(bc.genesis_hash()); + best_ancient_number = Some(0); + } else { + best_ancient_number = best_ancient.as_ref().and_then(|h| bc.block_number(h)); + } // binary search for the first block. - if raw_first.is_empty() { - let (mut f, mut hash) = (best_block_number, best_block_hash); - let mut l = 0; + match raw_first { + None => { + let (mut f, mut hash) = (best_block_number, best_block_hash); + let mut l = best_ancient_number.unwrap_or(0); - loop { - if l >= f { break; } + loop { + if l >= f { break; } - let step = (f - l) >> 1; - let m = l + step; + let step = (f - l) >> 1; + let m = l + step; - match bc.block_hash(m) { - Some(h) => { f = m; hash = h }, - None => { l = m + 1 }, + match bc.block_hash(m) { + Some(h) => { f = m; hash = h }, + None => { l = m + 1 }, + } } - } - let mut batch = db.transaction(); - batch.put(db::COL_EXTRA, b"first", &hash); - db.write(batch).expect("Low level database error."); - - bc.first_block = hash; - } else { - bc.first_block = H256::from_slice(&raw_first); + if hash != bc.genesis_hash() { + trace!("First block calculated: {:?}", hash); + let mut batch = db.transaction(); + batch.put(db::COL_EXTRA, b"first", &hash); + db.write(batch).expect("Low level database error."); + bc.first_block = Some(hash); + } + }, + Some(raw_first) => { + bc.first_block = Some(H256::from_slice(&raw_first)); + }, } // and write them @@ -491,6 +526,14 @@ impl BlockChain { hash: best_block_hash, block: best_block_rlp, }; + + if let (Some(hash), Some(number)) = (best_ancient, best_ancient_number) { + let mut best_ancient_block = bc.best_ancient_block.write(); + *best_ancient_block = Some(BestAncientBlock { + hash: hash, + number: number, + }); + } } bc @@ -644,11 +687,12 @@ impl BlockChain { /// Inserts a verified, known block from the canonical chain. /// /// Can be performed out-of-order, but care must be taken that the final chain is in a correct state. - /// This is used by snapshot restoration. - /// + /// This is used by snapshot restoration and when downloading missing blocks for the chain gap. + /// `is_best` forces the best block to be updated to this block. + /// `is_ancient` forces the best block of the first block sequence to be updated to this block. /// Supply a dummy parent total difficulty when the parent block may not be in the chain. /// Returns true if the block is disconnected. - pub fn insert_snapshot_block(&self, bytes: &[u8], receipts: Vec, parent_td: Option, is_best: bool) -> bool { + pub fn insert_unordered_block(&self, batch: &mut DBTransaction, bytes: &[u8], receipts: Vec, parent_td: Option, is_best: bool, is_ancient: bool) -> bool { let block = BlockView::new(bytes); let header = block.header_view(); let hash = header.sha3(); @@ -659,8 +703,6 @@ impl BlockChain { assert!(self.pending_best_block.read().is_none()); - let mut batch = self.db.transaction(); - let block_rlp = UntrustedRlp::new(bytes); let compressed_header = block_rlp.at(0).unwrap().compress(RlpType::Blocks); let compressed_body = UntrustedRlp::new(&Self::block_to_body(bytes)).compress(RlpType::Blocks); @@ -674,13 +716,13 @@ impl BlockChain { if let Some(parent_details) = maybe_parent { // parent known to be in chain. let info = BlockInfo { - hash: hash, + hash: hash.clone(), number: header.number(), total_difficulty: parent_details.total_difficulty + header.difficulty(), location: BlockLocation::CanonChain, }; - self.prepare_update(&mut batch, ExtrasUpdate { + self.prepare_update(batch, ExtrasUpdate { block_hashes: self.prepare_block_hashes_update(bytes, &info), block_details: self.prepare_block_details_update(bytes, &info), block_receipts: self.prepare_block_receipts_update(receipts, &info), @@ -689,7 +731,21 @@ impl BlockChain { info: info, block: bytes }, is_best); - self.db.write(batch).unwrap(); + + if is_ancient { + let mut best_ancient_block = self.best_ancient_block.write(); + let ancient_number = best_ancient_block.as_ref().map_or(0, |b| b.number); + if self.block_hash(header.number() + 1).is_some() { + batch.delete(db::COL_EXTRA, b"ancient"); + *best_ancient_block = None; + } else if header.number() > ancient_number { + batch.put(db::COL_EXTRA, b"ancient", &hash); + *best_ancient_block = Some(BestAncientBlock { + hash: hash, + number: header.number(), + }); + } + } false } else { @@ -714,7 +770,7 @@ impl BlockChain { let mut update = HashMap::new(); update.insert(hash, block_details); - self.prepare_update(&mut batch, ExtrasUpdate { + self.prepare_update(batch, ExtrasUpdate { block_hashes: self.prepare_block_hashes_update(bytes, &info), block_details: update, block_receipts: self.prepare_block_receipts_update(receipts, &info), @@ -723,8 +779,6 @@ impl BlockChain { info: info, block: bytes, }, is_best); - self.db.write(batch).unwrap(); - true } } @@ -1210,6 +1264,24 @@ impl BlockChain { body.append_raw(block_rlp.at(2).as_raw(), 1); body.out() } + + /// Returns general blockchain information + pub fn chain_info(&self) -> BlockChainInfo { + // ensure data consistencly by locking everything first + let best_block = self.best_block.read(); + let best_ancient_block = self.best_ancient_block.read(); + BlockChainInfo { + total_difficulty: best_block.total_difficulty.clone(), + pending_total_difficulty: best_block.total_difficulty.clone(), + genesis_hash: self.genesis_hash(), + best_block_hash: best_block.hash.clone(), + best_block_number: best_block.number, + first_block_hash: self.first_block(), + first_block_number: From::from(self.first_block_number()), + ancient_block_hash: best_ancient_block.as_ref().map(|b| b.hash.clone()), + ancient_block_number: best_ancient_block.as_ref().map(|b| b.number), + } + } } #[cfg(test)] diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index 277ba2db0..2b13b0570 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -30,7 +30,7 @@ use util::kvdb::*; // other use io::*; -use views::{HeaderView, BodyView}; +use views::{HeaderView, BodyView, BlockView}; use error::{ImportError, ExecutionError, CallError, BlockError, ImportResult, Error as EthcoreError}; use header::BlockNumber; use state::State; @@ -431,6 +431,29 @@ impl Client { imported } + /// Import a block with transaction receipts. + /// The block is guaranteed to be the next best blocks in the first block sequence. + /// Does no sealing or transaction validation. + fn import_old_block(&self, block_bytes: Bytes, receipts_bytes: Bytes) -> H256 { + let block = BlockView::new(&block_bytes); + let hash = block.header().hash(); + let _import_lock = self.import_lock.lock(); + { + let _timer = PerfTimer::new("import_old_block"); + let chain = self.chain.read(); + + // Commit results + let receipts = ::rlp::decode(&receipts_bytes); + let mut batch = DBTransaction::new(&self.db.read()); + chain.insert_unordered_block(&mut batch, &block_bytes, receipts, None, false, true); + // Final commit to the DB + self.db.read().write_buffered(batch); + chain.commit(); + } + self.db.read().flush().expect("DB flush failed."); + hash + } + fn commit_block(&self, block: B, hash: &H256, block_data: &[u8]) -> ImportRoute where B: IsBlock + Drain { let number = block.header().number(); let parent = block.header().parent_hash().clone(); @@ -998,6 +1021,20 @@ impl BlockChainClient for Client { Ok(try!(self.block_queue.import(unverified))) } + fn import_block_with_receipts(&self, block_bytes: Bytes, receipts_bytes: Bytes) -> Result { + { + // check block order + let header = BlockView::new(&block_bytes).header_view(); + if self.chain.read().is_known(&header.hash()) { + return Err(BlockImportError::Import(ImportError::AlreadyInChain)); + } + if self.block_status(BlockID::Hash(header.parent_hash())) == BlockStatus::Unknown { + return Err(BlockImportError::Block(BlockError::UnknownParent(header.parent_hash()))); + } + } + Ok(self.import_old_block(block_bytes, receipts_bytes)) + } + fn queue_info(&self) -> BlockQueueInfo { self.block_queue.queue_info() } @@ -1007,14 +1044,7 @@ impl BlockChainClient for Client { } fn chain_info(&self) -> BlockChainInfo { - let chain = self.chain.read(); - BlockChainInfo { - total_difficulty: chain.best_block_total_difficulty(), - pending_total_difficulty: chain.best_block_total_difficulty(), - genesis_hash: chain.genesis_hash(), - best_block_hash: chain.best_block_hash(), - best_block_number: From::from(chain.best_block_number()) - } + self.chain.read().chain_info() } fn additional_params(&self) -> BTreeMap { @@ -1146,21 +1176,22 @@ impl MiningBlockChainClient for Client { } fn import_sealed_block(&self, block: SealedBlock) -> ImportResult { - let _import_lock = self.import_lock.lock(); - let _timer = PerfTimer::new("import_sealed_block"); - let start = precise_time_ns(); - let h = block.header().hash(); - let number = block.header().number(); - - let block_data = block.rlp_bytes(); - let route = self.commit_block(block, &h, &block_data); - trace!(target: "client", "Imported sealed block #{} ({})", number, h); - self.state_db.lock().sync_cache(&route.enacted, &route.retracted, false); + let start = precise_time_ns(); + let route = { + // scope for self.import_lock + let _import_lock = self.import_lock.lock(); + let _timer = PerfTimer::new("import_sealed_block"); + let number = block.header().number(); + let block_data = block.rlp_bytes(); + let route = self.commit_block(block, &h, &block_data); + trace!(target: "client", "Imported sealed block #{} ({})", number, h); + self.state_db.lock().sync_cache(&route.enacted, &route.retracted, false); + route + }; let (enacted, retracted) = self.calculate_enacted_retracted(&[route]); self.miner.chain_new_blocks(self, &[h.clone()], &[], &enacted, &retracted); - self.notify(|notify| { notify.new_blocks( vec![h.clone()], diff --git a/ethcore/src/client/test_client.rs b/ethcore/src/client/test_client.rs index 90c31bed8..e32a074bb 100644 --- a/ethcore/src/client/test_client.rs +++ b/ethcore/src/client/test_client.rs @@ -560,6 +560,10 @@ impl BlockChainClient for TestBlockChainClient { Ok(h) } + fn import_block_with_receipts(&self, b: Bytes, _r: Bytes) -> Result { + self.import_block(b) + } + fn queue_info(&self) -> QueueInfo { QueueInfo { verified_queue_size: self.queue_size.load(AtomicOrder::Relaxed), @@ -585,6 +589,10 @@ impl BlockChainClient for TestBlockChainClient { genesis_hash: self.genesis_hash.clone(), best_block_hash: self.last_hash.read().clone(), best_block_number: self.blocks.read().len() as BlockNumber - 1, + first_block_hash: None, + first_block_number: None, + ancient_block_hash: None, + ancient_block_number: None, } } diff --git a/ethcore/src/client/traits.rs b/ethcore/src/client/traits.rs index 5f7b62ee2..a8face23b 100644 --- a/ethcore/src/client/traits.rs +++ b/ethcore/src/client/traits.rs @@ -139,6 +139,9 @@ pub trait BlockChainClient : Sync + Send { /// Import a block into the blockchain. fn import_block(&self, bytes: Bytes) -> Result; + /// Import a block with transaction receipts. Does no sealing and transaction validation. + fn import_block_with_receipts(&self, block_bytes: Bytes, receipts_bytes: Bytes) -> Result; + /// Get block queue information. fn queue_info(&self) -> BlockQueueInfo; diff --git a/ethcore/src/snapshot/mod.rs b/ethcore/src/snapshot/mod.rs index a5e6b58bd..4c4c2a6d2 100644 --- a/ethcore/src/snapshot/mod.rs +++ b/ethcore/src/snapshot/mod.rs @@ -564,6 +564,7 @@ const POW_VERIFY_RATE: f32 = 0.02; /// After all chunks have been submitted, we "glue" the chunks together. pub struct BlockRebuilder { chain: BlockChain, + db: Arc, rng: OsRng, disconnected: Vec<(u64, H256)>, best_number: u64, @@ -571,9 +572,10 @@ pub struct BlockRebuilder { impl BlockRebuilder { /// Create a new BlockRebuilder. - pub fn new(chain: BlockChain, best_number: u64) -> Result { + pub fn new(chain: BlockChain, db: Arc, best_number: u64) -> Result { Ok(BlockRebuilder { chain: chain, + db: db, rng: try!(OsRng::new()), disconnected: Vec::new(), best_number: best_number, @@ -616,15 +618,17 @@ impl BlockRebuilder { } let is_best = cur_number == self.best_number; + let mut batch = self.db.transaction(); // special-case the first block in each chunk. if idx == 3 { - if self.chain.insert_snapshot_block(&block_bytes, receipts, Some(parent_total_difficulty), is_best) { + if self.chain.insert_unordered_block(&mut batch, &block_bytes, receipts, Some(parent_total_difficulty), is_best, false) { self.disconnected.push((cur_number, block.header.hash())); } } else { - self.chain.insert_snapshot_block(&block_bytes, receipts, None, is_best); + self.chain.insert_unordered_block(&mut batch, &block_bytes, receipts, None, is_best, false); } + self.db.write(batch).expect("Error writing to the DB"); self.chain.commit(); parent_hash = BlockView::new(&block_bytes).hash(); diff --git a/ethcore/src/snapshot/service.rs b/ethcore/src/snapshot/service.rs index 63232ad5b..57782e6cd 100644 --- a/ethcore/src/snapshot/service.rs +++ b/ethcore/src/snapshot/service.rs @@ -98,7 +98,7 @@ impl Restoration { .map_err(UtilError::SimpleString))); let chain = BlockChain::new(Default::default(), params.genesis, raw_db.clone()); - let blocks = try!(BlockRebuilder::new(chain, manifest.block_number)); + let blocks = try!(BlockRebuilder::new(chain, raw_db.clone(), manifest.block_number)); let root = manifest.state_root.clone(); Ok(Restoration { @@ -415,9 +415,14 @@ impl Service { guard: Guard::new(rest_dir), }; + let state_chunks = params.manifest.state_hashes.len(); + let block_chunks = params.manifest.block_hashes.len(); + *res = Some(try!(Restoration::new(params))); *self.status.lock() = RestorationStatus::Ongoing { + state_chunks: state_chunks as u32, + block_chunks: block_chunks as u32, state_chunks_done: self.state_chunks.load(Ordering::SeqCst) as u32, block_chunks_done: self.block_chunks.load(Ordering::SeqCst) as u32, }; @@ -535,7 +540,7 @@ impl SnapshotService for Service { fn status(&self) -> RestorationStatus { let mut cur_status = self.status.lock(); - if let RestorationStatus::Ongoing { ref mut state_chunks_done, ref mut block_chunks_done } = *cur_status { + if let RestorationStatus::Ongoing { ref mut state_chunks_done, ref mut block_chunks_done, .. } = *cur_status { *state_chunks_done = self.state_chunks.load(Ordering::SeqCst) as u32; *block_chunks_done = self.block_chunks.load(Ordering::SeqCst) as u32; } @@ -629,4 +634,4 @@ mod tests { service.restore_state_chunk(Default::default(), vec![]); service.restore_block_chunk(Default::default(), vec![]); } -} \ No newline at end of file +} diff --git a/ethcore/src/snapshot/tests/blocks.rs b/ethcore/src/snapshot/tests/blocks.rs index 6c4344b6e..06e069655 100644 --- a/ethcore/src/snapshot/tests/blocks.rs +++ b/ethcore/src/snapshot/tests/blocks.rs @@ -69,7 +69,7 @@ fn chunk_and_restore(amount: u64) { // restore it. let new_db = Arc::new(Database::open(&db_cfg, new_path.as_str()).unwrap()); let new_chain = BlockChain::new(Default::default(), &genesis, new_db.clone()); - let mut rebuilder = BlockRebuilder::new(new_chain, amount).unwrap(); + let mut rebuilder = BlockRebuilder::new(new_chain, new_db.clone(), amount).unwrap(); let reader = PackedReader::new(&snapshot_path).unwrap().unwrap(); let engine = ::engines::NullEngine::new(Default::default(), Default::default()); for chunk_hash in &reader.manifest().block_hashes { diff --git a/ethcore/src/types/blockchain_info.rs b/ethcore/src/types/blockchain_info.rs index ef8924aec..ff6aa8dde 100644 --- a/ethcore/src/types/blockchain_info.rs +++ b/ethcore/src/types/blockchain_info.rs @@ -31,5 +31,13 @@ pub struct BlockChainInfo { /// Best blockchain block hash. pub best_block_hash: H256, /// Best blockchain block number. - pub best_block_number: BlockNumber + pub best_block_number: BlockNumber, + /// Best ancient block hash. + pub ancient_block_hash: Option, + /// Best ancient block number. + pub ancient_block_number: Option, + /// First block on the best sequence. + pub first_block_hash: Option, + /// Number of the first block on the best sequence. + pub first_block_number: Option, } diff --git a/ethcore/src/types/restoration_status.rs b/ethcore/src/types/restoration_status.rs index 2840d9416..ddf4cf1db 100644 --- a/ethcore/src/types/restoration_status.rs +++ b/ethcore/src/types/restoration_status.rs @@ -23,6 +23,10 @@ pub enum RestorationStatus { Inactive, /// Ongoing restoration. Ongoing { + /// Total number of state chunks. + state_chunks: u32, + /// Total number of block chunks. + block_chunks: u32, /// Number of state chunks completed. state_chunks_done: u32, /// Number of block chunks completed. diff --git a/ethcore/src/verification/verification.rs b/ethcore/src/verification/verification.rs index f89ac7d9a..7269765a0 100644 --- a/ethcore/src/verification/verification.rs +++ b/ethcore/src/verification/verification.rs @@ -296,7 +296,7 @@ mod tests { self.blocks.contains_key(hash) } - fn first_block(&self) -> H256 { + fn first_block(&self) -> Option { unimplemented!() } @@ -313,6 +313,10 @@ mod tests { self.block(hash).map(|b| BlockChain::block_to_body(&b)) } + fn best_ancient_block(&self) -> Option { + None + } + /// Get the familial details concerning a block. fn block_details(&self, hash: &H256) -> Option { self.blocks.get(hash).map(|bytes| { diff --git a/parity/blockchain.rs b/parity/blockchain.rs index 1909450ba..94343cdf8 100644 --- a/parity/blockchain.rs +++ b/parity/blockchain.rs @@ -194,7 +194,7 @@ fn execute_import(cmd: ImportBlockchain) -> Result { } }; - let informant = Informant::new(client.clone(), None, None, cmd.logger_config.color); + let informant = Informant::new(client.clone(), None, None, None, cmd.logger_config.color); try!(service.register_io_handler(Arc::new(ImportIoHandler { info: Arc::new(informant), diff --git a/parity/cli/config.full.toml b/parity/cli/config.full.toml index b71bd9361..6d9c84fd4 100644 --- a/parity/cli/config.full.toml +++ b/parity/cli/config.full.toml @@ -28,6 +28,7 @@ nat = "any" id = "0x1" bootnodes = [] discovery = true +warp = true reserved_only = false reserved_peers = "./path_to_file" diff --git a/parity/cli/config.toml b/parity/cli/config.toml index 02ff9c0dd..5fcd4ce73 100644 --- a/parity/cli/config.toml +++ b/parity/cli/config.toml @@ -13,6 +13,7 @@ disable = true [network] disable = false +warp = false discovery = true nat = "any" min_peers = 10 diff --git a/parity/cli/mod.rs b/parity/cli/mod.rs index d4398b6f5..2935560c9 100644 --- a/parity/cli/mod.rs +++ b/parity/cli/mod.rs @@ -106,6 +106,8 @@ usage! { // -- Networking Options flag_no_network: bool = false, or |c: &Config| otry!(c.network).disable.clone(), + flag_warp: bool = false, + or |c: &Config| otry!(c.network).warp.clone(), flag_port: u16 = 30303u16, or |c: &Config| otry!(c.network).port.clone(), flag_min_peers: u16 = 25u16, @@ -300,6 +302,7 @@ struct Signer { #[derive(Default, Debug, PartialEq, RustcDecodable)] struct Network { disable: Option, + warp: Option, port: Option, min_peers: Option, max_peers: Option, @@ -486,6 +489,7 @@ mod tests { // -- Networking Options flag_no_network: false, + flag_warp: true, flag_port: 30303u16, flag_min_peers: 25u16, flag_max_peers: 50u16, @@ -643,6 +647,7 @@ mod tests { }), network: Some(Network { disable: Some(false), + warp: Some(false), port: None, min_peers: Some(10), max_peers: Some(20), diff --git a/parity/cli/usage.txt b/parity/cli/usage.txt index 736af5834..c602c0c8a 100644 --- a/parity/cli/usage.txt +++ b/parity/cli/usage.txt @@ -67,6 +67,7 @@ Account Options: Networking Options: --no-network Disable p2p networking. (default: {flag_no_network}) + --warp Enable syncing from the snapshot over the network. (default: {flag_warp}) --port PORT Override the port on which the node should listen (default: {flag_port}). --min-peers NUM Try to maintain at least NUM peers (default: {flag_min_peers}). diff --git a/parity/configuration.rs b/parity/configuration.rs index 24008fab5..ddfb03669 100644 --- a/parity/configuration.rs +++ b/parity/configuration.rs @@ -89,6 +89,7 @@ impl Configuration { let compaction = try!(self.args.flag_db_compaction.parse()); let wal = !self.args.flag_fast_and_loose; let enable_network = self.enable_network(&mode); + let warp_sync = self.args.flag_warp; let geth_compatibility = self.args.flag_geth; let signer_port = self.signer_port(); let dapps_conf = self.dapps_config(); @@ -240,6 +241,7 @@ impl Configuration { wal: wal, vm_type: vm_type, enable_network: enable_network, + warp_sync: warp_sync, geth_compatibility: geth_compatibility, signer_port: signer_port, net_settings: self.network_settings(), @@ -810,6 +812,7 @@ mod tests { ipc_conf: Default::default(), net_conf: default_network_config(), network_id: None, + warp_sync: false, acc_conf: Default::default(), gas_pricer: Default::default(), miner_extras: Default::default(), diff --git a/parity/informant.rs b/parity/informant.rs index f2cc41f64..a3749cfb0 100644 --- a/parity/informant.rs +++ b/parity/informant.rs @@ -26,6 +26,8 @@ use ethsync::{SyncProvider, ManageNetwork}; use util::{Uint, RwLock, Mutex, H256, Colour}; use ethcore::client::*; use ethcore::views::BlockView; +use ethcore::snapshot::service::Service as SnapshotService; +use ethcore::snapshot::{RestorationStatus, SnapshotService as SS}; use number_prefix::{binary_prefix, Standalone, Prefixed}; pub struct Informant { @@ -35,6 +37,7 @@ pub struct Informant { last_tick: RwLock, with_color: bool, client: Arc, + snapshot: Option>, sync: Option>, net: Option>, last_import: Mutex, @@ -55,7 +58,7 @@ impl MillisecondDuration for Duration { impl Informant { /// Make a new instance potentially `with_color` output. - pub fn new(client: Arc, sync: Option>, net: Option>, with_color: bool) -> Self { + pub fn new(client: Arc, sync: Option>, net: Option>, snapshot: Option>, with_color: bool) -> Self { Informant { chain_info: RwLock::new(None), cache_info: RwLock::new(None), @@ -63,6 +66,7 @@ impl Informant { last_tick: RwLock::new(Instant::now()), with_color: with_color, client: client, + snapshot: snapshot, sync: sync, net: net, last_import: Mutex::new(Instant::now()), @@ -92,8 +96,16 @@ impl Informant { let sync_status = self.sync.as_ref().map(|s| s.status()); let importing = queue_info.unverified_queue_size + queue_info.verified_queue_size > 3 - || self.sync.as_ref().map_or(false, |s| s.status().is_major_syncing()); - if !importing && elapsed < Duration::from_secs(30) { + || sync_status.map_or(false, |s| s.is_major_syncing()); + let (snapshot_sync, snapshot_current, snapshot_total) = self.snapshot.as_ref().map_or((false, 0, 0), |s| + match s.status() { + RestorationStatus::Ongoing { state_chunks, block_chunks, state_chunks_done, block_chunks_done } => + (true, state_chunks_done + block_chunks_done, state_chunks + block_chunks), + _ => (false, 0, 0), + } + ); + + if !importing && !snapshot_sync && elapsed < Duration::from_secs(30) { return; } @@ -109,27 +121,33 @@ impl Informant { info!(target: "import", "{} {} {}", match importing { - true => format!("Syncing {} {} {} {}+{} Qed", - paint(White.bold(), format!("{:>8}", format!("#{}", chain_info.best_block_number))), - paint(White.bold(), format!("{}", chain_info.best_block_hash)), - { - let last_report = match *write_report { Some(ref last_report) => last_report.clone(), _ => ClientReport::default() }; - format!("{} blk/s {} tx/s {} Mgas/s", - paint(Yellow.bold(), format!("{:4}", ((report.blocks_imported - last_report.blocks_imported) * 1000) as u64 / elapsed.as_milliseconds())), - paint(Yellow.bold(), format!("{:4}", ((report.transactions_applied - last_report.transactions_applied) * 1000) as u64 / elapsed.as_milliseconds())), - paint(Yellow.bold(), format!("{:3}", ((report.gas_processed - last_report.gas_processed) / From::from(elapsed.as_milliseconds() * 1000)).low_u64())) - ) - }, - paint(Green.bold(), format!("{:5}", queue_info.unverified_queue_size)), - paint(Green.bold(), format!("{:5}", queue_info.verified_queue_size)) - ), + true => match snapshot_sync { + false => format!("Syncing {} {} {} {}+{} Qed", + paint(White.bold(), format!("{:>8}", format!("#{}", chain_info.best_block_number))), + paint(White.bold(), format!("{}", chain_info.best_block_hash)), + { + let last_report = match *write_report { Some(ref last_report) => last_report.clone(), _ => ClientReport::default() }; + format!("{} blk/s {} tx/s {} Mgas/s", + paint(Yellow.bold(), format!("{:4}", ((report.blocks_imported - last_report.blocks_imported) * 1000) as u64 / elapsed.as_milliseconds())), + paint(Yellow.bold(), format!("{:4}", ((report.transactions_applied - last_report.transactions_applied) * 1000) as u64 / elapsed.as_milliseconds())), + paint(Yellow.bold(), format!("{:3}", ((report.gas_processed - last_report.gas_processed) / From::from(elapsed.as_milliseconds() * 1000)).low_u64())) + ) + }, + paint(Green.bold(), format!("{:5}", queue_info.unverified_queue_size)), + paint(Green.bold(), format!("{:5}", queue_info.verified_queue_size)) + ), + true => format!("Syncing snapshot {}/{}", snapshot_current, snapshot_total), + }, false => String::new(), }, match (&sync_status, &network_config) { (&Some(ref sync_info), &Some(ref net_config)) => format!("{}{}/{}/{} peers", match importing { true => format!("{} ", paint(Green.bold(), format!("{:>8}", format!("#{}", sync_info.last_imported_block_number.unwrap_or(chain_info.best_block_number))))), - false => String::new(), + false => match sync_info.last_imported_old_block_number { + Some(number) => format!("{} ", paint(Yellow.bold(), format!("{:>8}", format!("#{}", number)))), + None => String::new(), + } }, paint(Cyan.bold(), format!("{:2}", sync_info.num_active_peers)), paint(Cyan.bold(), format!("{:2}", sync_info.num_peers)), diff --git a/parity/io_handler.rs b/parity/io_handler.rs index bf73f55bb..0f1704049 100644 --- a/parity/io_handler.rs +++ b/parity/io_handler.rs @@ -61,4 +61,4 @@ impl IoHandler for ImportIoHandler { self.info.tick() } } -} \ No newline at end of file +} diff --git a/parity/run.rs b/parity/run.rs index 4610b6f2e..47b071734 100644 --- a/parity/run.rs +++ b/parity/run.rs @@ -70,6 +70,7 @@ pub struct RunCmd { pub ipc_conf: IpcConfiguration, pub net_conf: NetworkConfiguration, pub network_id: Option, + pub warp_sync: bool, pub acc_conf: AccountsConfig, pub gas_pricer: GasPricerConfig, pub miner_extras: MinerExtras, @@ -171,6 +172,7 @@ pub fn execute(cmd: RunCmd) -> Result<(), String> { sync_config.subprotocol_name.clone_from_slice(spec.subprotocol_name().as_bytes()); } sync_config.fork_block = spec.fork_block(); + sync_config.warp_sync = cmd.warp_sync; // prepare account provider let account_provider = Arc::new(try!(prepare_account_provider(&cmd.dirs, cmd.acc_conf))); @@ -231,7 +233,7 @@ pub fn execute(cmd: RunCmd) -> Result<(), String> { // create sync object let (sync_provider, manage_network, chain_notify) = try!(modules::sync( - &mut hypervisor, sync_config, net_conf.into(), client.clone(), snapshot_service, &cmd.logger_config, + &mut hypervisor, sync_config, net_conf.into(), client.clone(), snapshot_service.clone(), &cmd.logger_config, ).map_err(|e| format!("Sync error: {}", e))); service.add_notify(chain_notify.clone()); @@ -287,7 +289,13 @@ pub fn execute(cmd: RunCmd) -> Result<(), String> { // start signer server let signer_server = try!(signer::start(cmd.signer_conf, signer_deps)); - let informant = Arc::new(Informant::new(service.client(), Some(sync_provider.clone()), Some(manage_network.clone()), cmd.logger_config.color)); + let informant = Arc::new(Informant::new( + service.client(), + Some(sync_provider.clone()), + Some(manage_network.clone()), + Some(snapshot_service.clone()), + cmd.logger_config.color + )); let info_notify: Arc = informant.clone(); service.add_notify(info_notify); let io_handler = Arc::new(ClientIoHandler { diff --git a/parity/snapshot.rs b/parity/snapshot.rs index dd5c611d3..709dafe5f 100644 --- a/parity/snapshot.rs +++ b/parity/snapshot.rs @@ -81,7 +81,7 @@ fn restore_using(snapshot: Arc, reader: &R, let informant_handle = snapshot.clone(); ::std::thread::spawn(move || { - while let RestorationStatus::Ongoing { state_chunks_done, block_chunks_done } = informant_handle.status() { + while let RestorationStatus::Ongoing { state_chunks_done, block_chunks_done, .. } = informant_handle.status() { info!("Processed {}/{} state chunks and {}/{} block chunks.", state_chunks_done, num_state, block_chunks_done, num_blocks); ::std::thread::sleep(Duration::from_secs(5)); diff --git a/rpc/src/v1/impls/eth.rs b/rpc/src/v1/impls/eth.rs index c13229222..0889d81fe 100644 --- a/rpc/src/v1/impls/eth.rs +++ b/rpc/src/v1/impls/eth.rs @@ -257,7 +257,7 @@ impl Eth for EthClient where let status = take_weak!(self.sync).status(); match status.state { SyncState::Idle => Ok(SyncStatus::None), - SyncState::Waiting | SyncState::Blocks | SyncState::NewBlocks | SyncState::ChainHead + SyncState::Waiting | SyncState::Blocks | SyncState::NewBlocks | SyncState::SnapshotManifest | SyncState::SnapshotData | SyncState::SnapshotWaiting => { let current_block = U256::from(take_weak!(self.client).chain_info().best_block_number); let highest_block = U256::from(status.highest_block_number.unwrap_or(status.start_block_number)); diff --git a/rpc/src/v1/tests/helpers/sync_provider.rs b/rpc/src/v1/tests/helpers/sync_provider.rs index 74013660f..5a227d3fb 100644 --- a/rpc/src/v1/tests/helpers/sync_provider.rs +++ b/rpc/src/v1/tests/helpers/sync_provider.rs @@ -51,6 +51,7 @@ impl TestSyncProvider { mem_used: 0, num_snapshot_chunks: 0, snapshot_chunks_done: 0, + last_imported_old_block_number: None, }), } } diff --git a/sync/src/api.rs b/sync/src/api.rs index 54dfc91b7..1ea0a5bbb 100644 --- a/sync/src/api.rs +++ b/sync/src/api.rs @@ -15,7 +15,8 @@ // along with Parity. If not, see . use std::sync::Arc; -use std::str; +use std::collections::HashMap; +use util::Bytes; use network::{NetworkProtocolHandler, NetworkService, NetworkContext, PeerId, NetworkConfiguration as BasicNetworkConfiguration, NonReservedPeerMode, NetworkError}; use util::{U256, H256}; @@ -41,6 +42,8 @@ pub struct SyncConfig { pub subprotocol_name: [u8; 3], /// Fork block to check pub fork_block: Option<(BlockNumber, H256)>, + /// Enable snapshot sync + pub warp_sync: bool, } impl Default for SyncConfig { @@ -50,6 +53,7 @@ impl Default for SyncConfig { network_id: U256::from(1), subprotocol_name: *b"eth", fork_block: None, + warp_sync: true, } } } @@ -104,7 +108,12 @@ impl EthSync { let service = try!(NetworkService::new(try!(network_config.into_basic()))); let sync = Arc::new(EthSync{ network: service, - handler: Arc::new(SyncProtocolHandler { sync: RwLock::new(chain_sync), chain: chain, snapshot_service: snapshot_service }), + handler: Arc::new(SyncProtocolHandler { + sync: RwLock::new(chain_sync), + chain: chain, + snapshot_service: snapshot_service, + overlay: RwLock::new(HashMap::new()), + }), subprotocol_name: config.subprotocol_name, }); @@ -122,7 +131,7 @@ impl SyncProvider for EthSync { /// Get sync peers fn peers(&self) -> Vec { self.network.with_context_eval(self.subprotocol_name, |context| { - let sync_io = NetSyncIo::new(context, &*self.handler.chain, &*self.handler.snapshot_service); + let sync_io = NetSyncIo::new(context, &*self.handler.chain, &*self.handler.snapshot_service, &self.handler.overlay); self.handler.sync.write().peers(&sync_io) }).unwrap_or(Vec::new()) } @@ -135,6 +144,8 @@ struct SyncProtocolHandler { snapshot_service: Arc, /// Sync strategy sync: RwLock, + /// Chain overlay used to cache data such as fork block. + overlay: RwLock>, } impl NetworkProtocolHandler for SyncProtocolHandler { @@ -143,21 +154,21 @@ impl NetworkProtocolHandler for SyncProtocolHandler { } fn read(&self, io: &NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]) { - ChainSync::dispatch_packet(&self.sync, &mut NetSyncIo::new(io, &*self.chain, &*self.snapshot_service), *peer, packet_id, data); + ChainSync::dispatch_packet(&self.sync, &mut NetSyncIo::new(io, &*self.chain, &*self.snapshot_service, &self.overlay), *peer, packet_id, data); } fn connected(&self, io: &NetworkContext, peer: &PeerId) { - self.sync.write().on_peer_connected(&mut NetSyncIo::new(io, &*self.chain, &*self.snapshot_service), *peer); + self.sync.write().on_peer_connected(&mut NetSyncIo::new(io, &*self.chain, &*self.snapshot_service, &self.overlay), *peer); } fn disconnected(&self, io: &NetworkContext, peer: &PeerId) { - self.sync.write().on_peer_aborting(&mut NetSyncIo::new(io, &*self.chain, &*self.snapshot_service), *peer); + self.sync.write().on_peer_aborting(&mut NetSyncIo::new(io, &*self.chain, &*self.snapshot_service, &self.overlay), *peer); } fn timeout(&self, io: &NetworkContext, _timer: TimerToken) { - self.sync.write().maintain_peers(&mut NetSyncIo::new(io, &*self.chain, &*self.snapshot_service)); - self.sync.write().maintain_sync(&mut NetSyncIo::new(io, &*self.chain, &*self.snapshot_service)); - self.sync.write().propagate_new_transactions(&mut NetSyncIo::new(io, &*self.chain, &*self.snapshot_service)); + self.sync.write().maintain_peers(&mut NetSyncIo::new(io, &*self.chain, &*self.snapshot_service, &self.overlay)); + self.sync.write().maintain_sync(&mut NetSyncIo::new(io, &*self.chain, &*self.snapshot_service, &self.overlay)); + self.sync.write().propagate_new_transactions(&mut NetSyncIo::new(io, &*self.chain, &*self.snapshot_service, &self.overlay)); } } @@ -171,7 +182,7 @@ impl ChainNotify for EthSync { _duration: u64) { self.network.with_context(self.subprotocol_name, |context| { - let mut sync_io = NetSyncIo::new(context, &*self.handler.chain, &*self.handler.snapshot_service); + let mut sync_io = NetSyncIo::new(context, &*self.handler.chain, &*self.handler.snapshot_service, &self.handler.overlay); self.handler.sync.write().chain_new_blocks( &mut sync_io, &imported, @@ -239,7 +250,7 @@ impl ManageNetwork for EthSync { fn stop_network(&self) { self.network.with_context(self.subprotocol_name, |context| { - let mut sync_io = NetSyncIo::new(context, &*self.handler.chain, &*self.handler.snapshot_service); + let mut sync_io = NetSyncIo::new(context, &*self.handler.chain, &*self.handler.snapshot_service, &self.handler.overlay); self.handler.sync.write().abort(&mut sync_io); }); self.stop(); diff --git a/sync/src/block_sync.rs b/sync/src/block_sync.rs new file mode 100644 index 000000000..a45553f52 --- /dev/null +++ b/sync/src/block_sync.rs @@ -0,0 +1,477 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +/// +/// Blockchain downloader +/// + +use util::*; +use rlp::*; +use ethcore::views::{BlockView}; +use ethcore::header::{BlockNumber, Header as BlockHeader}; +use ethcore::client::{BlockStatus, BlockID, BlockImportError}; +use ethcore::block::Block; +use ethcore::error::{ImportError, BlockError}; +use sync_io::SyncIo; +use blocks::BlockCollection; + +const MAX_HEADERS_TO_REQUEST: usize = 128; +const MAX_BODIES_TO_REQUEST: usize = 128; +const MAX_RECEPITS_TO_REQUEST: usize = 128; +const SUBCHAIN_SIZE: u64 = 256; +const MAX_ROUND_PARENTS: usize = 32; + +#[derive(Copy, Clone, Eq, PartialEq, Debug)] +/// Downloader state +pub enum State { + /// No active downloads. + Idle, + /// Downloading subchain heads + ChainHead, + /// Downloading blocks + Blocks, + /// Download is complete + Complete, +} + +/// Data that needs to be requested from a peer. +pub enum BlockRequest { + Headers { + start: H256, + count: u64, + skip: u64, + }, + Bodies { + hashes: Vec, + }, + Receipts { + hashes: Vec, + }, +} + +#[derive(Eq, PartialEq, Debug)] +pub enum BlockDownloaderImportError { + /// Imported data is rejected as invalid. + Invalid, + /// Imported data is valid but rejected cause the downloader does not need it. + Useless, +} + +/// Block downloader strategy. +/// Manages state and block data for a block download process. +pub struct BlockDownloader { + /// Downloader state + state: State, + /// Highest block number seen + highest_block: Option, + /// Downloaded blocks, holds `H`, `B` and `S` + blocks: BlockCollection, + /// Last impoted block number + last_imported_block: BlockNumber, + /// Last impoted block hash + last_imported_hash: H256, + /// Number of blocks imported this round + imported_this_round: Option, + /// Block parents imported this round (hash, parent) + round_parents: VecDeque<(H256, H256)>, + /// Do we need to download block recetips. + download_receipts: bool, + /// Sync up to the block with this hash. + target_hash: Option, +} + +impl BlockDownloader { + /// Create a new instance of syncing strategy. + pub fn new(sync_receipts: bool, start_hash: &H256, start_number: BlockNumber) -> BlockDownloader { + BlockDownloader { + state: State::Idle, + highest_block: None, + last_imported_block: start_number, + last_imported_hash: start_hash.clone(), + blocks: BlockCollection::new(sync_receipts), + imported_this_round: None, + round_parents: VecDeque::new(), + download_receipts: sync_receipts, + target_hash: None, + } + } + + /// Reset sync. Clear all local downloaded data. + pub fn reset(&mut self) { + self.blocks.clear(); + self.state = State::Idle; + } + + /// Mark a block as known in the chain + pub fn mark_as_known(&mut self, hash: &H256, number: BlockNumber) { + if number == self.last_imported_block + 1 { + self.last_imported_block = number; + self.last_imported_hash = hash.clone(); + } + } + + /// Check if download is complete + pub fn is_complete(&self) -> bool { + self.state == State::Complete + } + + /// Check if particular block hash is being downloaded + pub fn is_downloading(&self, hash: &H256) -> bool { + self.blocks.is_downloading(hash) + } + + /// Set starting sync block + pub fn set_target(&mut self, hash: &H256) { + self.target_hash = Some(hash.clone()); + } + + /// Set starting sync block + pub fn _set_start(&mut self, hash: &H256, number: BlockNumber) { + self.last_imported_hash = hash.clone(); + self.last_imported_block = number; + } + + /// Unmark header as being downloaded. + pub fn clear_header_download(&mut self, hash: &H256) { + self.blocks.clear_header_download(hash) + } + + /// Unmark block body as being downloaded. + pub fn clear_body_download(&mut self, hashes: &[H256]) { + self.blocks.clear_body_download(hashes) + } + + /// Unmark block receipt as being downloaded. + pub fn clear_receipt_download(&mut self, hashes: &[H256]) { + self.blocks.clear_receipt_download(hashes) + } + /// Reset collection for a new sync round with given subchain block hashes. + pub fn reset_to(&mut self, hashes: Vec) { + self.reset(); + self.blocks.reset_to(hashes); + } + + /// Returns used heap memory size. + pub fn heap_size(&self) -> usize { + self.blocks.heap_size() + self.round_parents.heap_size_of_children() + } + + /// Returns best imported block number. + pub fn last_imported_block_number(&self) -> BlockNumber { + self.last_imported_block + } + + /// Add new block headers. + pub fn import_headers(&mut self, io: &mut SyncIo, r: &UntrustedRlp, expected_hash: Option) -> Result<(), BlockDownloaderImportError> { + let item_count = r.item_count(); + if self.state == State::Idle { + trace!(target: "sync", "Ignored unexpected block headers"); + return Ok(()) + } + if item_count == 0 && (self.state == State::Blocks) { + return Err(BlockDownloaderImportError::Invalid); + } + + let mut headers = Vec::new(); + let mut hashes = Vec::new(); + let mut valid_response = item_count == 0; //empty response is valid + for i in 0..item_count { + let info: BlockHeader = try!(r.val_at(i).map_err(|e| { + trace!(target: "sync", "Error decoding block header RLP: {:?}", e); + BlockDownloaderImportError::Invalid + })); + let number = BlockNumber::from(info.number()); + // Check if any of the headers matches the hash we requested + if !valid_response { + if let Some(expected) = expected_hash { + valid_response = expected == info.hash() + } + } + if self.blocks.contains(&info.hash()) { + trace!(target: "sync", "Skipping existing block header {} ({:?})", number, info.hash()); + continue; + } + + if self.highest_block.as_ref().map_or(true, |n| number > *n) { + self.highest_block = Some(number); + } + let hash = info.hash(); + let hdr = try!(r.at(i).map_err(|e| { + trace!(target: "sync", "Error decoding block header RLP: {:?}", e); + BlockDownloaderImportError::Invalid + })); + match io.chain().block_status(BlockID::Hash(hash.clone())) { + BlockStatus::InChain | BlockStatus::Queued => { + match self.state { + State::Blocks => trace!(target: "sync", "Header already in chain {} ({})", number, hash), + _ => trace!(target: "sync", "Header already in chain {} ({}), state = {:?}", number, hash, self.state), + } + headers.push(hdr.as_raw().to_vec()); + hashes.push(hash); + }, + BlockStatus::Bad => { + return Err(BlockDownloaderImportError::Invalid); + }, + BlockStatus::Unknown => { + headers.push(hdr.as_raw().to_vec()); + hashes.push(hash); + } + } + } + + // Disable the peer for this syncing round if it gives invalid chain + if !valid_response { + trace!(target: "sync", "Invalid headers response"); + return Err(BlockDownloaderImportError::Invalid); + } + + match self.state { + State::ChainHead => { + if !headers.is_empty() { + // TODO: validate heads better. E.g. check that there is enough distance between blocks. + trace!(target: "sync", "Received {} subchain heads, proceeding to download", headers.len()); + self.blocks.reset_to(hashes); + self.state = State::Blocks; + } + }, + State::Blocks => { + let count = headers.len(); + self.blocks.insert_headers(headers); + trace!(target: "sync", "Inserted {} headers", count); + }, + _ => trace!(target: "sync", "Unexpected headers({})", headers.len()), + } + + Ok(()) + } + + /// Called by peer once it has new block bodies + pub fn import_bodies(&mut self, _io: &mut SyncIo, r: &UntrustedRlp) -> Result<(), BlockDownloaderImportError> { + let item_count = r.item_count(); + if item_count == 0 { + return Err(BlockDownloaderImportError::Useless); + } + else if self.state != State::Blocks { + trace!(target: "sync", "Ignored unexpected block bodies"); + } + else { + let mut bodies = Vec::with_capacity(item_count); + for i in 0..item_count { + let body = try!(r.at(i).map_err(|e| { + trace!(target: "sync", "Error decoding block boides RLP: {:?}", e); + BlockDownloaderImportError::Invalid + })); + bodies.push(body.as_raw().to_vec()); + } + if self.blocks.insert_bodies(bodies) != item_count { + trace!(target: "sync", "Deactivating peer for giving invalid block bodies"); + return Err(BlockDownloaderImportError::Invalid); + } + } + Ok(()) + } + + /// Called by peer once it has new block bodies + pub fn import_receipts(&mut self, _io: &mut SyncIo, r: &UntrustedRlp) -> Result<(), BlockDownloaderImportError> { + let item_count = r.item_count(); + if item_count == 0 { + return Err(BlockDownloaderImportError::Useless); + } + else if self.state != State::Blocks { + trace!(target: "sync", "Ignored unexpected block receipts"); + } + else { + let mut receipts = Vec::with_capacity(item_count); + for i in 0..item_count { + let receipt = try!(r.at(i).map_err(|e| { + trace!(target: "sync", "Error decoding block receipts RLP: {:?}", e); + BlockDownloaderImportError::Invalid + })); + receipts.push(receipt.as_raw().to_vec()); + } + if self.blocks.insert_receipts(receipts) != item_count { + trace!(target: "sync", "Deactivating peer for giving invalid block receipts"); + return Err(BlockDownloaderImportError::Invalid); + } + } + Ok(()) + } + + fn start_sync_round(&mut self, io: &mut SyncIo) { + self.state = State::ChainHead; + trace!(target: "sync", "Starting round (last imported count = {:?}, block = {:?}", self.imported_this_round, self.last_imported_block); + // Check if need to retract to find the common block. The problem is that the peers still return headers by hash even + // from the non-canonical part of the tree. So we also retract if nothing has been imported last round. + match self.imported_this_round { + Some(n) if n == 0 && self.last_imported_block > 0 => { + // nothing was imported last round, step back to a previous block + // search parent in last round known parents first + if let Some(&(_, p)) = self.round_parents.iter().find(|&&(h, _)| h == self.last_imported_hash) { + self.last_imported_block -= 1; + self.last_imported_hash = p.clone(); + trace!(target: "sync", "Searching common header from the last round {} ({})", self.last_imported_block, self.last_imported_hash); + } else { + match io.chain().block_hash(BlockID::Number(self.last_imported_block - 1)) { + Some(h) => { + self.last_imported_block -= 1; + self.last_imported_hash = h; + trace!(target: "sync", "Searching common header in the blockchain {} ({})", self.last_imported_block, self.last_imported_hash); + } + None => { + debug!(target: "sync", "Could not revert to previous block, last: {} ({})", self.last_imported_block, self.last_imported_hash); + } + } + } + }, + _ => (), + } + self.imported_this_round = None; + } + + /// Find some headers or blocks to download for a peer. + pub fn request_blocks(&mut self, io: &mut SyncIo) -> Option { + match self.state { + State::Idle => { + self.start_sync_round(io); + return self.request_blocks(io); + }, + State::ChainHead => { + // Request subchain headers + trace!(target: "sync", "Starting sync with better chain"); + // Request MAX_HEADERS_TO_REQUEST - 2 headers apart so that + // MAX_HEADERS_TO_REQUEST would include headers for neighbouring subchains + return Some(BlockRequest::Headers { + start: self.last_imported_hash.clone(), + count: SUBCHAIN_SIZE, + skip: (MAX_HEADERS_TO_REQUEST - 2) as u64, + }); + }, + State::Blocks => { + // check to see if we need to download any block bodies first + let needed_bodies = self.blocks.needed_bodies(MAX_BODIES_TO_REQUEST, false); + if !needed_bodies.is_empty() { + return Some(BlockRequest::Bodies { + hashes: needed_bodies, + }); + } + + if self.download_receipts { + let needed_receipts = self.blocks.needed_receipts(MAX_RECEPITS_TO_REQUEST, false); + if !needed_receipts.is_empty() { + return Some(BlockRequest::Receipts { + hashes: needed_receipts, + }); + } + } + + // find subchain to download + if let Some((h, count)) = self.blocks.needed_headers(MAX_HEADERS_TO_REQUEST, false) { + return Some(BlockRequest::Headers { + start: h, + count: count as u64, + skip: 0, + }); + } + }, + State::Complete => (), + } + None + } + + /// Checks if there are blocks fully downloaded that can be imported into the blockchain and does the import. + pub fn collect_blocks(&mut self, io: &mut SyncIo, allow_out_of_order: bool) -> Result<(), BlockDownloaderImportError> { + let mut bad = false; + let mut imported = HashSet::new(); + let blocks = self.blocks.drain(); + let count = blocks.len(); + for block_and_receipts in blocks { + let block = block_and_receipts.block; + let receipts = block_and_receipts.receipts; + let (h, number, parent) = { + let header = BlockView::new(&block).header_view(); + (header.sha3(), header.number(), header.parent_hash()) + }; + + // Perform basic block verification + if !Block::is_good(&block) { + debug!(target: "sync", "Bad block rlp {:?} : {:?}", h, block); + bad = true; + break; + } + + if self.target_hash.as_ref().map_or(false, |t| t == &h) { + self.state = State::Complete; + trace!(target: "sync", "Sync target reached"); + return Ok(()); + } + + let result = if let Some(receipts) = receipts { + io.chain().import_block_with_receipts(block, receipts) + } else { + io.chain().import_block(block) + }; + + match result { + Err(BlockImportError::Import(ImportError::AlreadyInChain)) => { + trace!(target: "sync", "Block already in chain {:?}", h); + self.block_imported(&h, number, &parent); + }, + Err(BlockImportError::Import(ImportError::AlreadyQueued)) => { + trace!(target: "sync", "Block already queued {:?}", h); + self.block_imported(&h, number, &parent); + }, + Ok(_) => { + trace!(target: "sync", "Block queued {:?}", h); + imported.insert(h.clone()); + self.block_imported(&h, number, &parent); + }, + Err(BlockImportError::Block(BlockError::UnknownParent(_))) if allow_out_of_order => { + trace!(target: "sync", "Unknown new block parent, restarting sync"); + break; + }, + Err(e) => { + debug!(target: "sync", "Bad block {:?} : {:?}", h, e); + bad = true; + break; + } + } + } + trace!(target: "sync", "Imported {} of {}", imported.len(), count); + self.imported_this_round = Some(self.imported_this_round.unwrap_or(0) + imported.len()); + + if bad { + return Err(BlockDownloaderImportError::Invalid); + } + + if self.blocks.is_empty() { + // complete sync round + trace!(target: "sync", "Sync round complete"); + self.reset(); + } + Ok(()) + } + + fn block_imported(&mut self, hash: &H256, number: BlockNumber, parent: &H256) { + self.last_imported_block = number; + self.last_imported_hash = hash.clone(); + self.round_parents.push_back((hash.clone(), parent.clone())); + if self.round_parents.len() > MAX_ROUND_PARENTS { + self.round_parents.pop_front(); + } + } +} + +//TODO: module tests diff --git a/sync/src/blocks.rs b/sync/src/blocks.rs index ae2092f25..db385b9d5 100644 --- a/sync/src/blocks.rs +++ b/sync/src/blocks.rs @@ -25,6 +25,15 @@ known_heap_size!(0, HeaderId); struct SyncBlock { header: Bytes, body: Option, + receipts: Option, +} + +/// Block with optional receipt +pub struct BlockAndReceipts { + /// Block data. + pub block: Bytes, + /// Block receipts RLP list. + pub receipts: Option, } impl HeapSizeOf for SyncBlock { @@ -45,6 +54,8 @@ struct HeaderId { /// the downloaded blocks. #[derive(Default)] pub struct BlockCollection { + /// Does this collection need block receipts. + need_receipts: bool, /// Heads of subchains to download heads: Vec, /// Downloaded blocks. @@ -53,25 +64,32 @@ pub struct BlockCollection { parents: HashMap, /// Used to map body to header. header_ids: HashMap, + /// Used to map receipts root to header. + receipt_ids: HashMap, /// First block in `blocks`. head: Option, /// Set of block header hashes being downloaded downloading_headers: HashSet, /// Set of block bodies being downloaded identified by block hash. downloading_bodies: HashSet, + /// Set of block receipts being downloaded identified by block hash. + downloading_receipts: HashSet, } impl BlockCollection { /// Create a new instance. - pub fn new() -> BlockCollection { + pub fn new(download_receipts: bool) -> BlockCollection { BlockCollection { + need_receipts: download_receipts, blocks: HashMap::new(), header_ids: HashMap::new(), + receipt_ids: HashMap::new(), heads: Vec::new(), parents: HashMap::new(), head: None, downloading_headers: HashSet::new(), downloading_bodies: HashSet::new(), + downloading_receipts: HashSet::new(), } } @@ -80,10 +98,12 @@ impl BlockCollection { self.blocks.clear(); self.parents.clear(); self.header_ids.clear(); + self.receipt_ids.clear(); self.heads.clear(); self.head = None; self.downloading_headers.clear(); self.downloading_bodies.clear(); + self.downloading_receipts.clear(); } /// Reset collection for a new sync round with given subchain block hashes. @@ -108,8 +128,23 @@ impl BlockCollection { for b in bodies.into_iter() { if let Err(e) = self.insert_body(b) { trace!(target: "sync", "Ignored invalid body: {:?}", e); + } else { + inserted += 1; } - else { + } + inserted + } + + /// Insert a collection of block receipts for previously downloaded headers. + pub fn insert_receipts(&mut self, receipts: Vec) -> usize { + if !self.need_receipts { + return 0; + } + let mut inserted = 0; + for r in receipts.into_iter() { + if let Err(e) = self.insert_receipt(r) { + trace!(target: "sync", "Ignored invalid receipt: {:?}", e); + } else { inserted += 1; } } @@ -147,6 +182,38 @@ impl BlockCollection { needed_bodies } + + /// Returns a set of block hashes that require a receipt download. The returned set is marked as being downloaded. + pub fn needed_receipts(&mut self, count: usize, _ignore_downloading: bool) -> Vec { + if self.head.is_none() || !self.need_receipts { + return Vec::new(); + } + let mut needed_receipts: Vec = Vec::new(); + let mut head = self.head; + while head.is_some() && needed_receipts.len() < count { + head = self.parents.get(&head.unwrap()).cloned(); + if let Some(head) = head { + match self.blocks.get(&head) { + Some(block) if block.receipts.is_none() && !self.downloading_receipts.contains(&head) => { + self.downloading_receipts.insert(head.clone()); + needed_receipts.push(head.clone()); + } + _ => (), + } + } + } + for h in self.receipt_ids.values() { + if needed_receipts.len() >= count { + break; + } + if !self.downloading_receipts.contains(h) { + needed_receipts.push(h.clone()); + self.downloading_receipts.insert(h.clone()); + } + } + needed_receipts + } + /// Returns a set of block hashes that require a header download. The returned set is marked as being downloaded. pub fn needed_headers(&mut self, count: usize, ignore_downloading: bool) -> Option<(H256, usize)> { // find subchain to download @@ -163,18 +230,27 @@ impl BlockCollection { download.map(|h| (h, count)) } - /// Unmark a header as being downloaded. + /// Unmark header as being downloaded. pub fn clear_header_download(&mut self, hash: &H256) { self.downloading_headers.remove(hash); } - /// Unmark a block body as being downloaded. - pub fn clear_body_download(&mut self, hash: &H256) { - self.downloading_bodies.remove(hash); + /// Unmark block body as being downloaded. + pub fn clear_body_download(&mut self, hashes: &[H256]) { + for h in hashes { + self.downloading_bodies.remove(h); + } + } + + /// Unmark block receipt as being downloaded. + pub fn clear_receipt_download(&mut self, hashes: &[H256]) { + for h in hashes { + self.downloading_receipts.remove(h); + } } /// Get a valid chain of blocks ordered in descending order and ready for importing into blockchain. - pub fn drain(&mut self) -> Vec { + pub fn drain(&mut self) -> Vec { if self.blocks.is_empty() || self.head.is_none() { return Vec::new(); } @@ -188,7 +264,7 @@ impl BlockCollection { head = self.parents.get(&h).cloned(); if let Some(head) = head { match self.blocks.get(&head) { - Some(block) if block.body.is_some() => { + Some(block) if block.body.is_some() && (!self.need_receipts || block.receipts.is_some()) => { blocks.push(block); hashes.push(head); self.head = Some(head); @@ -198,19 +274,24 @@ impl BlockCollection { } } - for block in blocks.drain(..) { + for block in blocks { let mut block_rlp = RlpStream::new_list(3); block_rlp.append_raw(&block.header, 1); - let body = Rlp::new(block.body.as_ref().expect("blocks contains only full blocks; qed")); - block_rlp.append_raw(body.at(0).as_raw(), 1); - block_rlp.append_raw(body.at(1).as_raw(), 1); - drained.push(block_rlp.out()); + { + let body = Rlp::new(block.body.as_ref().expect("blocks contains only full blocks; qed")); + block_rlp.append_raw(body.at(0).as_raw(), 1); + block_rlp.append_raw(body.at(1).as_raw(), 1); + } + drained.push(BlockAndReceipts { + block: block_rlp.out(), + receipts: block.receipts.clone(), + }); } } for h in hashes { self.blocks.remove(&h); } - trace!("Drained {} blocks, new head :{:?}", drained.len(), self.head); + trace!(target: "sync", "Drained {} blocks, new head :{:?}", drained.len(), self.head); drained } @@ -241,14 +322,17 @@ impl BlockCollection { } fn insert_body(&mut self, b: Bytes) -> Result<(), NetworkError> { - let body = UntrustedRlp::new(&b); - let tx = try!(body.at(0)); - let tx_root = ordered_trie_root(tx.iter().map(|r| r.as_raw().to_vec())); //TODO: get rid of vectors here - let uncles = try!(body.at(1)).as_raw().sha3(); - let header_id = HeaderId { - transactions_root: tx_root, - uncles: uncles + let header_id = { + let body = UntrustedRlp::new(&b); + let tx = try!(body.at(0)); + let tx_root = ordered_trie_root(tx.iter().map(|r| r.as_raw().to_vec())); //TODO: get rid of vectors here + let uncles = try!(body.at(1)).as_raw().sha3(); + HeaderId { + transactions_root: tx_root, + uncles: uncles + } }; + match self.header_ids.get(&header_id).cloned() { Some(h) => { self.header_ids.remove(&header_id); @@ -256,7 +340,7 @@ impl BlockCollection { match self.blocks.get_mut(&h) { Some(ref mut block) => { trace!(target: "sync", "Got body {}", h); - block.body = Some(body.as_raw().to_vec()); + block.body = Some(b); Ok(()) }, None => { @@ -266,7 +350,35 @@ impl BlockCollection { } } None => { - trace!(target: "sync", "Ignored unknown/stale block body"); + trace!(target: "sync", "Ignored unknown/stale block body. tx_root = {:?}, uncles = {:?}", header_id.transactions_root, header_id.uncles); + Err(NetworkError::BadProtocol) + } + } + } + + fn insert_receipt(&mut self, r: Bytes) -> Result<(), NetworkError> { + let receipt_root = { + let receipts = UntrustedRlp::new(&r); + ordered_trie_root(receipts.iter().map(|r| r.as_raw().to_vec())) //TODO: get rid of vectors here + }; + match self.receipt_ids.get(&receipt_root).cloned() { + Some(h) => { + self.receipt_ids.remove(&receipt_root); + self.downloading_receipts.remove(&h); + match self.blocks.get_mut(&h) { + Some(ref mut block) => { + trace!(target: "sync", "Got receipt {}", h); + block.receipts = Some(r); + Ok(()) + }, + None => { + warn!("Got receipt with no header {}", h); + Err(NetworkError::BadProtocol) + } + } + } + None => { + trace!(target: "sync", "Ignored unknown/stale block receipt {:?}", receipt_root); Err(NetworkError::BadProtocol) } } @@ -280,7 +392,7 @@ impl BlockCollection { } match self.head { None if hash == self.heads[0] => { - trace!("New head {}", hash); + trace!(target: "sync", "New head {}", hash); self.head = Some(info.parent_hash().clone()); }, _ => () @@ -289,6 +401,7 @@ impl BlockCollection { let mut block = SyncBlock { header: header, body: None, + receipts: None, }; let header_id = HeaderId { transactions_root: info.transactions_root().clone(), @@ -302,8 +415,21 @@ impl BlockCollection { block.body = Some(body_stream.out()); } else { + trace!("Queueing body tx_root = {:?}, uncles = {:?}, block = {:?}, number = {}", header_id.transactions_root, header_id.uncles, hash, info.number()); self.header_ids.insert(header_id, hash.clone()); } + if self.need_receipts { + let receipt_root = info.receipts_root().clone(); + if receipt_root == sha3::SHA3_NULL_RLP { + let receipts_stream = RlpStream::new_list(0); + block.receipts = Some(receipts_stream.out()); + } else { + if self.receipt_ids.contains_key(&receipt_root) { + warn!(target: "sync", "Duplicate receipt root {:?}, block: {:?}", receipt_root, hash); + } + self.receipt_ids.insert(receipt_root, hash.clone()); + } + } self.parents.insert(info.parent_hash().clone(), hash.clone()); self.blocks.insert(hash.clone(), block); @@ -326,7 +452,7 @@ impl BlockCollection { Some(next) => { h = next.clone(); if old_subchains.contains(&h) { - trace!("Completed subchain {:?}", s); + trace!(target: "sync", "Completed subchain {:?}", s); break; // reached head of the other subchain, merge by not adding } }, @@ -362,7 +488,7 @@ mod test { #[test] fn create_clear() { - let mut bc = BlockCollection::new(); + let mut bc = BlockCollection::new(false); assert!(is_empty(&bc)); let client = TestBlockChainClient::new(); client.add_blocks(100, EachBlockWith::Nothing); @@ -375,7 +501,7 @@ mod test { #[test] fn insert_headers() { - let mut bc = BlockCollection::new(); + let mut bc = BlockCollection::new(false); assert!(is_empty(&bc)); let client = TestBlockChainClient::new(); let nblocks = 200; @@ -407,7 +533,7 @@ mod test { assert!(!bc.is_downloading(&hashes[0])); assert!(bc.contains(&hashes[0])); - assert_eq!(&bc.drain()[..], &blocks[0..6]); + assert_eq!(&bc.drain().into_iter().map(|b| b.block).collect::>()[..], &blocks[0..6]); assert!(!bc.contains(&hashes[0])); assert_eq!(hashes[5], bc.head.unwrap()); @@ -418,7 +544,7 @@ mod test { bc.insert_headers(headers[10..16].to_vec()); assert!(bc.drain().is_empty()); bc.insert_headers(headers[5..10].to_vec()); - assert_eq!(&bc.drain()[..], &blocks[6..16]); + assert_eq!(&bc.drain().into_iter().map(|b| b.block).collect::>()[..], &blocks[6..16]); assert_eq!(hashes[15], bc.heads[0]); bc.insert_headers(headers[15..].to_vec()); @@ -428,7 +554,7 @@ mod test { #[test] fn insert_headers_with_gap() { - let mut bc = BlockCollection::new(); + let mut bc = BlockCollection::new(false); assert!(is_empty(&bc)); let client = TestBlockChainClient::new(); let nblocks = 200; @@ -450,7 +576,7 @@ mod test { #[test] fn insert_headers_no_gap() { - let mut bc = BlockCollection::new(); + let mut bc = BlockCollection::new(false); assert!(is_empty(&bc)); let client = TestBlockChainClient::new(); let nblocks = 200; diff --git a/sync/src/chain.rs b/sync/src/chain.rs index ee2e90800..4c857edd9 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -90,16 +90,15 @@ use util::*; use rlp::*; use network::*; -use ethcore::views::{HeaderView, BlockView}; +use ethcore::views::{HeaderView}; use ethcore::header::{BlockNumber, Header as BlockHeader}; use ethcore::client::{BlockChainClient, BlockStatus, BlockID, BlockChainInfo, BlockImportError}; use ethcore::error::*; -use ethcore::block::Block; use ethcore::snapshot::{ManifestData, RestorationStatus}; use sync_io::SyncIo; use time; use super::SyncConfig; -use blocks::BlockCollection; +use block_sync::{BlockDownloader, BlockRequest, BlockDownloaderImportError as DownloaderImportError}; use snapshot::{Snapshot, ChunkType}; use rand::{thread_rng, Rng}; use api::PeerInfo as PeerInfoDigest; @@ -108,22 +107,22 @@ known_heap_size!(0, PeerInfo); type PacketDecodeError = DecoderError; +const PROTOCOL_VERSION_63: u8 = 63; +const PROTOCOL_VERSION_64: u8 = 64; const MAX_BODIES_TO_SEND: usize = 256; const MAX_HEADERS_TO_SEND: usize = 512; const MAX_NODE_DATA_TO_SEND: usize = 1024; const MAX_RECEIPTS_TO_SEND: usize = 1024; const MAX_RECEIPTS_HEADERS_TO_SEND: usize = 256; -const MAX_HEADERS_TO_REQUEST: usize = 128; -const MAX_BODIES_TO_REQUEST: usize = 128; const MIN_PEERS_PROPAGATION: usize = 4; const MAX_PEERS_PROPAGATION: usize = 128; const MAX_PEER_LAG_PROPAGATION: BlockNumber = 20; -const SUBCHAIN_SIZE: usize = 256; -const MAX_ROUND_PARENTS: usize = 32; const MAX_NEW_HASHES: usize = 64; const MAX_TX_TO_IMPORT: usize = 512; const MAX_NEW_BLOCK_AGE: BlockNumber = 20; const MAX_TRANSACTION_SIZE: usize = 300*1024; +// Min number of blocks to be behind for a snapshot sync +const SNAPSHOT_RESTORE_THRESHOLD: BlockNumber = 100000; const STATUS_PACKET: u8 = 0x00; const NEW_BLOCK_HASHES_PACKET: u8 = 0x01; @@ -145,9 +144,10 @@ const SNAPSHOT_DATA_PACKET: u8 = 0x14; const HEADERS_TIMEOUT_SEC: f64 = 15f64; const BODIES_TIMEOUT_SEC: f64 = 10f64; +const RECEIPTS_TIMEOUT_SEC: f64 = 10f64; const FORK_HEADER_TIMEOUT_SEC: f64 = 3f64; const SNAPSHOT_MANIFEST_TIMEOUT_SEC: f64 = 3f64; -const SNAPSHOT_DATA_TIMEOUT_SEC: f64 = 10f64; +const SNAPSHOT_DATA_TIMEOUT_SEC: f64 = 60f64; #[derive(Copy, Clone, Eq, PartialEq, Debug)] /// Sync state @@ -158,14 +158,12 @@ pub enum SyncState { SnapshotData, /// Waiting for snapshot restoration to complete SnapshotWaiting, - /// Downloading subchain heads - ChainHead, + /// Downloading new blocks + Blocks, /// Initial chain sync complete. Waiting for new packets Idle, /// Block downloading paused. Waiting for block queue to process blocks and free some space Waiting, - /// Downloading blocks - Blocks, /// Downloading blocks learned from `NewHashes` packet NewBlocks, } @@ -199,6 +197,8 @@ pub struct SyncStatus { pub num_snapshot_chunks: usize, /// Snapshot chunks downloaded pub snapshot_chunks_done: usize, + /// Last fully downloaded and imported ancient block number (if any). + pub last_imported_old_block_number: Option, } impl SyncStatus { @@ -207,6 +207,13 @@ impl SyncStatus { self.state != SyncState::Idle && self.state != SyncState::NewBlocks } + /// Indicates if snapshot download is in progress + pub fn is_snapshot_syncing(&self) -> bool { + self.state == SyncState::SnapshotManifest + || self.state == SyncState::SnapshotData + || self.state == SyncState::SnapshotWaiting + } + /// Returns max no of peers to display in informants pub fn current_max_peers(&self, min_peers: u32, max_peers: u32) -> u32 { if self.num_peers as u32 > min_peers { @@ -224,11 +231,19 @@ enum PeerAsking { ForkHeader, BlockHeaders, BlockBodies, - Heads, + BlockReceipts, SnapshotManifest, SnapshotData, } +#[derive(PartialEq, Eq, Debug, Clone, Copy)] +/// Block downloader channel. +enum BlockSet { + /// New blocks better than out best blocks + NewBlocks, + /// Missing old blocks + OldBlocks, +} #[derive(Clone, Eq, PartialEq)] enum ForkConfirmation { /// Fork block confirmation pending. @@ -243,7 +258,7 @@ enum ForkConfirmation { /// Syncing peer information struct PeerInfo { /// eth protocol version - protocol_version: u32, + protocol_version: u8, /// Peer chain genesis hash genesis: H256, /// Peer network id @@ -272,6 +287,8 @@ struct PeerInfo { snapshot_hash: Option, /// Best snapshot block number snapshot_number: Option, + /// Block set requested + block_set: Option, } impl PeerInfo { @@ -297,26 +314,18 @@ pub struct ChainSync { peers: HashMap, /// Peers active for current sync round active_peers: HashSet, - /// Downloaded blocks, holds `H`, `B` and `S` - blocks: BlockCollection, - /// Last impoted block number - last_imported_block: BlockNumber, - /// Last impoted block hash - last_imported_hash: H256, - /// Syncing total difficulty - syncing_difficulty: U256, + /// Block download process for new blocks + new_blocks: BlockDownloader, + /// Block download process for ancient blocks + old_blocks: Option, /// Last propagated block number last_sent_block_number: BlockNumber, - /// Max blocks to download ahead - _max_download_ahead_blocks: usize, - /// Number of blocks imported this round - imported_this_round: Option, - /// Block parents imported this round (hash, parent) - round_parents: VecDeque<(H256, H256)>, /// Network ID network_id: U256, /// Optional fork block to check fork_block: Option<(BlockNumber, H256)>, + /// Snapshot sync allowed. + snapshot_sync_enabled: bool, /// Snapshot downloader. snapshot: Snapshot, } @@ -326,46 +335,46 @@ type RlpResponseResult = Result, PacketDecodeError impl ChainSync { /// Create a new instance of syncing strategy. pub fn new(config: SyncConfig, chain: &BlockChainClient) -> ChainSync { - let chain = chain.chain_info(); - ChainSync { + let chain_info = chain.chain_info(); + let mut sync = ChainSync { state: SyncState::Idle, - starting_block: chain.best_block_number, + starting_block: chain.chain_info().best_block_number, highest_block: None, - last_imported_block: chain.best_block_number, - last_imported_hash: chain.best_block_hash, peers: HashMap::new(), active_peers: HashSet::new(), - blocks: BlockCollection::new(), - syncing_difficulty: U256::from(0u64), + new_blocks: BlockDownloader::new(false, &chain_info.best_block_hash, chain_info.best_block_number), + old_blocks: None, last_sent_block_number: 0, - imported_this_round: None, - round_parents: VecDeque::new(), - _max_download_ahead_blocks: max(MAX_HEADERS_TO_REQUEST, config.max_download_ahead_blocks), network_id: config.network_id, fork_block: config.fork_block, + snapshot_sync_enabled: config.warp_sync, snapshot: Snapshot::new(), - } + }; + sync.init_downloaders(chain); + sync } /// Returns synchonization status pub fn status(&self) -> SyncStatus { + let last_imported_number = self.new_blocks.last_imported_block_number(); SyncStatus { state: self.state.clone(), - protocol_version: if self.state == SyncState::SnapshotData { 64 } else { 63 }, + protocol_version: if self.state == SyncState::SnapshotData { PROTOCOL_VERSION_64 } else { PROTOCOL_VERSION_63 }, network_id: self.network_id, start_block_number: self.starting_block, - last_imported_block_number: Some(self.last_imported_block), - highest_block_number: self.highest_block.map(|n| max(n, self.last_imported_block)), - blocks_received: if self.last_imported_block > self.starting_block { self.last_imported_block - self.starting_block } else { 0 }, + last_imported_block_number: Some(last_imported_number), + last_imported_old_block_number: self.old_blocks.as_ref().map(|d| d.last_imported_block_number()), + highest_block_number: self.highest_block.map(|n| max(n, last_imported_number)), + blocks_received: if last_imported_number > self.starting_block { last_imported_number - self.starting_block } else { 0 }, blocks_total: match self.highest_block { Some(x) if x > self.starting_block => x - self.starting_block, _ => 0 }, num_peers: self.peers.values().filter(|p| p.is_allowed()).count(), num_active_peers: self.peers.values().filter(|p| p.is_allowed() && p.asking != PeerAsking::Nothing).count(), num_snapshot_chunks: self.snapshot.total_chunks(), snapshot_chunks_done: self.snapshot.done_chunks(), mem_used: - self.blocks.heap_size() - + self.peers.heap_size_of_children() - + self.round_parents.heap_size_of_children(), + self.new_blocks.heap_size() + + self.old_blocks.as_ref().map_or(0, |d| d.heap_size()) + + self.peers.heap_size_of_children(), } } @@ -380,7 +389,7 @@ impl ChainSync { capabilities: session_info.peer_capabilities.into_iter().map(|c| c.to_string()).collect(), remote_address: session_info.remote_address, local_address: session_info.local_address, - eth_version: peer_data.protocol_version, + eth_version: peer_data.protocol_version as u32, eth_difficulty: peer_data.difficulty, eth_head: peer_data.latest_hash, }) @@ -390,28 +399,29 @@ impl ChainSync { /// Abort all sync activity pub fn abort(&mut self, io: &mut SyncIo) { - self.restart(io); + self.reset_and_continue(io); self.peers.clear(); } #[cfg_attr(feature="dev", allow(for_kv_map))] // Because it's not possible to get `values_mut()` /// Reset sync. Clear all downloaded data but keep the queue fn reset(&mut self, io: &mut SyncIo) { - self.blocks.clear(); + self.new_blocks.reset(); self.snapshot.clear(); if self.state == SyncState::SnapshotData { debug!(target:"sync", "Aborting snapshot restore"); io.snapshot_service().abort_restore(); } for (_, ref mut p) in &mut self.peers { - p.asking_blocks.clear(); - p.asking_hash = None; - // mark any pending requests as expired - if p.asking != PeerAsking::Nothing && p.is_allowed() { - p.expired = true; + if p.block_set != Some(BlockSet::OldBlocks) { + p.asking_blocks.clear(); + p.asking_hash = None; + // mark any pending requests as expired + if p.asking != PeerAsking::Nothing && p.is_allowed() { + p.expired = true; + } } } - self.syncing_difficulty = From::from(0u64); self.state = SyncState::Idle; // Reactivate peers only if some progress has been made // since the last sync round of if starting fresh. @@ -419,26 +429,17 @@ impl ChainSync { } /// Restart sync - pub fn restart(&mut self, io: &mut SyncIo) { + pub fn reset_and_continue(&mut self, io: &mut SyncIo) { trace!(target: "sync", "Restarting"); self.reset(io); - self.start_sync_round(io); self.continue_sync(io); } /// Remove peer from active peer set. Peer will be reactivated on the next sync /// round. - fn deactivate_peer(&mut self, io: &mut SyncIo, peer_id: PeerId) { + fn deactivate_peer(&mut self, _io: &mut SyncIo, peer_id: PeerId) { trace!(target: "sync", "Deactivating peer {}", peer_id); self.active_peers.remove(&peer_id); - if self.active_peers.is_empty() { - trace!(target: "sync", "No more active peers"); - if self.state == SyncState::ChainHead { - self.complete_sync(io); - } else { - self.restart(io); - } - } } fn start_snapshot_sync(&mut self, io: &mut SyncIo, peer_id: PeerId) { @@ -447,18 +448,34 @@ impl ChainSync { self.state = SyncState::SnapshotManifest; } + /// Restart sync disregarding the block queue status. May end up re-downloading up to QUEUE_SIZE blocks + pub fn restart(&mut self, io: &mut SyncIo) { + self.init_downloaders(io.chain()); + self.reset_and_continue(io); + } + /// Restart sync after bad block has been detected. May end up re-downloading up to QUEUE_SIZE blocks - fn restart_on_bad_block(&mut self, io: &mut SyncIo) { + fn init_downloaders(&mut self, chain: &BlockChainClient) { // Do not assume that the block queue/chain still has our last_imported_block - let chain = io.chain().chain_info(); - self.last_imported_block = chain.best_block_number; - self.last_imported_hash = chain.best_block_hash; - self.restart(io); + let chain = chain.chain_info(); + self.new_blocks = BlockDownloader::new(false, &chain.best_block_hash, chain.best_block_number); + if let (Some(ancient_block_hash), Some(ancient_block_number)) = (chain.ancient_block_hash, chain.ancient_block_number) { + + trace!(target: "sync", "Downloading old blocks from {:?} (#{}) till {:?} (#{:?})", ancient_block_hash, ancient_block_number, chain.first_block_hash, chain.first_block_number); + let mut downloader = BlockDownloader::new(true, &ancient_block_hash, ancient_block_number); + if let Some(hash) = chain.first_block_hash { + trace!(target: "sync", "Downloader target set to {:?}", hash); + downloader.set_target(&hash); + } + self.old_blocks = Some(downloader); + } else { + self.old_blocks = None; + } } /// Called by peer to report status fn on_peer_status(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { - let protocol_version: u32 = try!(r.val_at(0)); + let protocol_version: u8 = try!(r.val_at(0)); let peer = PeerInfo { protocol_version: protocol_version, network_id: try!(r.val_at(1)), @@ -473,8 +490,9 @@ impl ChainSync { expired: false, confirmation: if self.fork_block.is_none() { ForkConfirmation::Confirmed } else { ForkConfirmation::Unconfirmed }, asking_snapshot_data: None, - snapshot_hash: if protocol_version == 64 { Some(try!(r.val_at(5))) } else { None }, - snapshot_number: if protocol_version == 64 { Some(try!(r.val_at(6))) } else { None }, + snapshot_hash: if protocol_version == PROTOCOL_VERSION_64 { Some(try!(r.val_at(5))) } else { None }, + snapshot_number: if protocol_version == PROTOCOL_VERSION_64 { Some(try!(r.val_at(6))) } else { None }, + block_set: None, }; trace!(target: "sync", "New peer {} (protocol: {}, network: {:?}, difficulty: {:?}, latest:{}, genesis:{})", peer_id, peer.protocol_version, peer.network_id, peer.difficulty, peer.latest_hash, peer.genesis); @@ -498,16 +516,17 @@ impl ChainSync { trace!(target: "sync", "Peer {} network id mismatch (ours: {}, theirs: {})", peer_id, self.network_id, peer.network_id); return Ok(()); } + if peer.protocol_version != PROTOCOL_VERSION_64 && peer.protocol_version != PROTOCOL_VERSION_63 { + io.disable_peer(peer_id); + trace!(target: "sync", "Peer {} unsupported eth protocol ({})", peer_id, peer.protocol_version); + return Ok(()); + } self.peers.insert(peer_id.clone(), peer); - // Don't activate peer immediatelly when searching for common block. - // Let the current sync round complete first. - if self.state != SyncState::ChainHead { - self.active_peers.insert(peer_id.clone()); - } + self.active_peers.insert(peer_id.clone()); debug!(target: "sync", "Connected {}:{}", peer_id, io.peer_info(peer_id)); if let Some((fork_block, _)) = self.fork_block { - self.request_headers_by_number(io, peer_id, fork_block, 1, 0, false, PeerAsking::ForkHeader); + self.request_fork_header_by_number(io, peer_id, fork_block); } else { self.sync_peer(io, peer_id, false); } @@ -519,23 +538,27 @@ impl ChainSync { fn on_peer_block_headers(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { let confirmed = match self.peers.get_mut(&peer_id) { Some(ref mut peer) if peer.asking == PeerAsking::ForkHeader => { + peer.asking = PeerAsking::Nothing; let item_count = r.item_count(); - if item_count == 0 || (item_count == 1 && - try!(r.at(0)).as_raw().sha3() == self.fork_block.expect("ForkHeader state is only entered when fork_block is some; qed").1) { - peer.asking = PeerAsking::Nothing; - if item_count == 0 { - trace!(target: "sync", "{}: Chain is too short to confirm the block", peer_id); - peer.confirmation = ForkConfirmation::TooShort; - } else { + let (fork_number, fork_hash) = self.fork_block.expect("ForkHeader request is sent only fork block is Some; qed").clone(); + if item_count == 0 || item_count != 1 { + trace!(target: "sync", "{}: Chain is too short to confirm the block", peer_id); + peer.confirmation = ForkConfirmation::TooShort; + } else { + let header = try!(r.at(0)).as_raw(); + if header.sha3() == fork_hash { trace!(target: "sync", "{}: Confirmed peer", peer_id); peer.confirmation = ForkConfirmation::Confirmed; + if !io.chain_overlay().read().contains_key(&fork_number) { + io.chain_overlay().write().insert(fork_number, header.to_vec()); + } + } else { + trace!(target: "sync", "{}: Fork mismatch", peer_id); + io.disconnect_peer(peer_id); + return Ok(()); } - true - } else { - trace!(target: "sync", "{}: Fork mismatch", peer_id); - io.disconnect_peer(peer_id); - return Ok(()); } + true }, _ => false, }; @@ -545,16 +568,16 @@ impl ChainSync { } self.clear_peer_download(peer_id); - let expected_asking = if self.state == SyncState::ChainHead { PeerAsking::Heads } else { PeerAsking::BlockHeaders }; let expected_hash = self.peers.get(&peer_id).and_then(|p| p.asking_hash); - if !self.reset_peer_asking(peer_id, expected_asking) || expected_hash.is_none() { - trace!(target: "sync", "{}: Ignored unexpected headers", peer_id); + let block_set = self.peers.get(&peer_id).and_then(|p| p.block_set).unwrap_or(BlockSet::NewBlocks); + if !self.reset_peer_asking(peer_id, PeerAsking::BlockHeaders) || expected_hash.is_none() { + trace!(target: "sync", "{}: Ignored unexpected headers, expected_hash = {:?}", peer_id, expected_hash); self.continue_sync(io); return Ok(()); } let item_count = r.item_count(); - trace!(target: "sync", "{} -> BlockHeaders ({} entries), state = {:?}", peer_id, item_count, self.state); - if self.state == SyncState::Idle { + trace!(target: "sync", "{} -> BlockHeaders ({} entries), state = {:?}, set = {:?}", peer_id, item_count, self.state, block_set); + if self.state == SyncState::Idle && self.old_blocks.is_none() { trace!(target: "sync", "Ignored unexpected block headers"); self.continue_sync(io); return Ok(()); @@ -564,87 +587,38 @@ impl ChainSync { self.continue_sync(io); return Ok(()); } - if item_count == 0 && (self.state == SyncState::Blocks || self.state == SyncState::NewBlocks) { - self.deactivate_peer(io, peer_id); //TODO: is this too harsh? - self.continue_sync(io); - return Ok(()); - } - let mut headers = Vec::new(); - let mut hashes = Vec::new(); - let mut valid_response = item_count == 0; //empty response is valid - for i in 0..item_count { - let info: BlockHeader = try!(r.val_at(i)); - let number = BlockNumber::from(info.number()); - // Check if any of the headers matches the hash we requested - if !valid_response { - if let Some(expected) = expected_hash { - valid_response = expected == info.hash() - } - } - if self.blocks.contains(&info.hash()) { - trace!(target: "sync", "Skipping existing block header {} ({:?})", number, info.hash()); - continue; - } - - if self.highest_block.as_ref().map_or(true, |n| number > *n) { - self.highest_block = Some(number); - } - let hash = info.hash(); - match io.chain().block_status(BlockID::Hash(hash.clone())) { - BlockStatus::InChain | BlockStatus::Queued => { - match self.state { - SyncState::Blocks | SyncState::NewBlocks => trace!(target: "sync", "Header already in chain {} ({})", number, hash), - _ => trace!(target: "sync", "Unexpected header already in chain {} ({}), state = {:?}", number, hash, self.state), + let result = { + let mut downloader = match block_set { + BlockSet::NewBlocks => &mut self.new_blocks, + BlockSet::OldBlocks => { + match self.old_blocks { + None => { + trace!(target: "sync", "Ignored block headers while block download is inactive"); + self.continue_sync(io); + return Ok(()); + }, + Some(ref mut blocks) => blocks, } - headers.push(try!(r.at(i)).as_raw().to_vec()); - hashes.push(hash); - }, - BlockStatus::Bad => { - warn!(target: "sync", "Bad header {} ({}) from {}: {}, state = {:?}", number, hash, peer_id, io.peer_info(peer_id), self.state); - io.disable_peer(peer_id); - return Ok(()); - }, - BlockStatus::Unknown => { - headers.push(try!(r.at(i)).as_raw().to_vec()); - hashes.push(hash); } - } - } + }; + downloader.import_headers(io, r, expected_hash) + }; - // Disable the peer for this syncing round if it gives invalid chain - if !valid_response { - trace!(target: "sync", "{} Disabled for invalid headers response", peer_id); - io.disable_peer(peer_id); - } - - if headers.is_empty() { - // Peer does not have any new subchain heads, deactivate it and try with another. - trace!(target: "sync", "{} Disabled for no data", peer_id); - self.deactivate_peer(io, peer_id); - } - match self.state { - SyncState::ChainHead => { - if headers.is_empty() { - // peer is not on our chain - // track back and try again - self.imported_this_round = Some(0); - self.start_sync_round(io); - } else { - // TODO: validate heads better. E.g. check that there is enough distance between blocks. - trace!(target: "sync", "Received {} subchain heads, proceeding to download", headers.len()); - self.blocks.reset_to(hashes); - self.state = SyncState::Blocks; - } + match result { + Err(DownloaderImportError::Useless) => { + self.deactivate_peer(io, peer_id); }, - SyncState::Blocks | SyncState::NewBlocks | SyncState::Waiting => { - trace!(target: "sync", "Inserted {} headers", headers.len()); - self.blocks.insert_headers(headers); + Err(DownloaderImportError::Invalid) => { + io.disable_peer(peer_id); + self.deactivate_peer(io, peer_id); + self.continue_sync(io); + return Ok(()); }, - _ => trace!(target: "sync", "Unexpected headers({}) from {} ({}), state = {:?}", headers.len(), peer_id, io.peer_info(peer_id), self.state) + Ok(()) => (), } - self.collect_blocks(io); + self.collect_blocks(io, block_set); // give a task to the same peer first if received valuable headers. self.sync_peer(io, peer_id, false); // give tasks to other peers @@ -655,29 +629,106 @@ impl ChainSync { /// Called by peer once it has new block bodies fn on_peer_block_bodies(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { self.clear_peer_download(peer_id); - self.reset_peer_asking(peer_id, PeerAsking::BlockBodies); + let block_set = self.peers.get(&peer_id).and_then(|p| p.block_set).unwrap_or(BlockSet::NewBlocks); + if !self.reset_peer_asking(peer_id, PeerAsking::BlockBodies) { + trace!(target: "sync", "{}: Ignored unexpected bodies", peer_id); + self.continue_sync(io); + return Ok(()); + } let item_count = r.item_count(); - trace!(target: "sync", "{} -> BlockBodies ({} entries)", peer_id, item_count); + trace!(target: "sync", "{} -> BlockBodies ({} entries), set = {:?}", peer_id, item_count, block_set); if item_count == 0 { self.deactivate_peer(io, peer_id); } - else if self.state != SyncState::Blocks && self.state != SyncState::NewBlocks && self.state != SyncState::Waiting { - trace!(target: "sync", "Ignored unexpected block bodies"); - } else if self.state == SyncState::Waiting { trace!(target: "sync", "Ignored block bodies while waiting"); } else { - let mut bodies = Vec::with_capacity(item_count); - for i in 0..item_count { - bodies.push(try!(r.at(i)).as_raw().to_vec()); + let result = { + let mut downloader = match block_set { + BlockSet::NewBlocks => &mut self.new_blocks, + BlockSet::OldBlocks => match self.old_blocks { + None => { + trace!(target: "sync", "Ignored block headers while block download is inactive"); + self.continue_sync(io); + return Ok(()); + }, + Some(ref mut blocks) => blocks, + } + }; + downloader.import_bodies(io, r) + }; + + match result { + Err(DownloaderImportError::Invalid) => { + io.disable_peer(peer_id); + self.deactivate_peer(io, peer_id); + self.continue_sync(io); + return Ok(()); + }, + Err(DownloaderImportError::Useless) => { + self.deactivate_peer(io, peer_id); + }, + Ok(()) => (), } - if self.blocks.insert_bodies(bodies) != item_count { - trace!(target: "sync", "Deactivating peer for giving invalid block bodies"); - self.deactivate_peer(io, peer_id); + + self.collect_blocks(io, block_set); + self.sync_peer(io, peer_id, false); + } + self.continue_sync(io); + Ok(()) + } + + /// Called by peer once it has new block receipts + fn on_peer_block_receipts(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { + self.clear_peer_download(peer_id); + let block_set = self.peers.get(&peer_id).and_then(|p| p.block_set).unwrap_or(BlockSet::NewBlocks); + if !self.reset_peer_asking(peer_id, PeerAsking::BlockReceipts) { + trace!(target: "sync", "{}: Ignored unexpected receipts", peer_id); + self.continue_sync(io); + return Ok(()); + } + let item_count = r.item_count(); + trace!(target: "sync", "{} -> BlockReceipts ({} entries)", peer_id, item_count); + if item_count == 0 { + self.deactivate_peer(io, peer_id); + } + else if self.state == SyncState::Waiting { + trace!(target: "sync", "Ignored block receipts while waiting"); + } + else + { + let result = { + let mut downloader = match block_set { + BlockSet::NewBlocks => &mut self.new_blocks, + BlockSet::OldBlocks => match self.old_blocks { + None => { + trace!(target: "sync", "Ignored block headers while block download is inactive"); + self.continue_sync(io); + return Ok(()); + }, + Some(ref mut blocks) => blocks, + } + }; + downloader.import_receipts(io, r) + }; + + match result { + Err(DownloaderImportError::Invalid) => { + io.disable_peer(peer_id); + self.deactivate_peer(io, peer_id); + self.continue_sync(io); + return Ok(()); + }, + Err(DownloaderImportError::Useless) => { + self.deactivate_peer(io, peer_id); + }, + Ok(()) => (), } - self.collect_blocks(io); + + self.collect_blocks(io, block_set); + self.sync_peer(io, peer_id, false); } self.continue_sync(io); Ok(()) @@ -704,7 +755,8 @@ impl ChainSync { peer.latest_hash = header.hash(); } } - if self.last_imported_block > header.number() && self.last_imported_block - header.number() > MAX_NEW_BLOCK_AGE { + let last_imported_number = self.new_blocks.last_imported_block_number(); + if last_imported_number > header.number() && last_imported_number - header.number() > MAX_NEW_BLOCK_AGE { trace!(target: "sync", "Ignored ancient new block {:?}", h); io.disable_peer(peer_id); return Ok(()); @@ -717,10 +769,7 @@ impl ChainSync { trace!(target: "sync", "New block already queued {:?}", h); }, Ok(_) => { - if header.number() == self.last_imported_block + 1 { - self.last_imported_block = header.number(); - self.last_imported_hash = header.hash(); - } + self.new_blocks.mark_as_known(&header.hash(), header.number()); trace!(target: "sync", "New block queued {:?} ({})", h, header.number()); }, Err(BlockImportError::Block(BlockError::UnknownParent(p))) => { @@ -741,7 +790,6 @@ impl ChainSync { let difficulty: U256 = try!(r.val_at(1)); if let Some(ref mut peer) = self.peers.get_mut(&peer_id) { if peer.difficulty.map_or(true, |pd| difficulty > pd) { - //self.state = SyncState::ChainHead; peer.difficulty = Some(difficulty); trace!(target: "sync", "Received block {:?} with no known parent. Peer needs syncing...", h); } @@ -770,16 +818,17 @@ impl ChainSync { let hashes = r.iter().take(MAX_NEW_HASHES).map(|item| (item.val_at::(0), item.val_at::(1))); let mut max_height: BlockNumber = 0; let mut new_hashes = Vec::new(); + let last_imported_number = self.new_blocks.last_imported_block_number(); for (rh, rn) in hashes { let hash = try!(rh); let number = try!(rn); if number > self.highest_block.unwrap_or(0) { self.highest_block = Some(number); } - if self.blocks.is_downloading(&hash) { + if self.new_blocks.is_downloading(&hash) { continue; } - if self.last_imported_block > number && self.last_imported_block - number > MAX_NEW_BLOCK_AGE { + if last_imported_number > number && last_imported_number - number > MAX_NEW_BLOCK_AGE { trace!(target: "sync", "Ignored ancient new block hash {:?}", hash); io.disable_peer(peer_id); continue; @@ -810,7 +859,7 @@ impl ChainSync { }; if max_height != 0 { trace!(target: "sync", "Downloading blocks for new hashes"); - self.blocks.reset_to(new_hashes); + self.new_blocks.reset_to(new_hashes); self.state = SyncState::NewBlocks; self.sync_peer(io, peer_id, true); } @@ -930,7 +979,11 @@ impl ChainSync { /// Resume downloading fn continue_sync(&mut self, io: &mut SyncIo) { - let mut peers: Vec<(PeerId, U256, u32)> = self.peers.iter().filter_map(|(k, p)| + if self.state != SyncState::Waiting && self.state != SyncState::SnapshotWaiting + && !self.peers.values().any(|p| p.asking != PeerAsking::Nothing && p.block_set != Some(BlockSet::OldBlocks) && p.can_sync()) { + self.complete_sync(io); + } + let mut peers: Vec<(PeerId, U256, u8)> = self.peers.iter().filter_map(|(k, p)| if p.can_sync() { Some((*k, p.difficulty.unwrap_or_else(U256::zero), p.protocol_version)) } else { None }).collect(); thread_rng().shuffle(&mut peers); //TODO: sort by rating // prefer peers with higher protocol version @@ -941,10 +994,6 @@ impl ChainSync { self.sync_peer(io, p, false); } } - if self.state != SyncState::Waiting && self.state != SyncState::SnapshotWaiting - && !self.peers.values().any(|p| p.asking != PeerAsking::Nothing && p.can_sync()) { - self.complete_sync(io); - } } /// Called after all blocks have been downloaded @@ -979,42 +1028,44 @@ impl ChainSync { trace!(target: "sync", "Waiting for the snapshot restoration"); return; } - (peer.latest_hash.clone(), peer.difficulty.clone(), peer.snapshot_number.as_ref().cloned(), peer.snapshot_hash.as_ref().cloned()) + (peer.latest_hash.clone(), peer.difficulty.clone(), peer.snapshot_number.as_ref().cloned().unwrap_or(0), peer.snapshot_hash.as_ref().cloned()) } else { return; } }; let chain_info = io.chain().chain_info(); - let td = chain_info.pending_total_difficulty; - let syncing_difficulty = max(self.syncing_difficulty, td); + let syncing_difficulty = chain_info.pending_total_difficulty; - if force || self.state == SyncState::NewBlocks || peer_difficulty.map_or(true, |pd| pd > syncing_difficulty) { + let higher_difficulty = peer_difficulty.map_or(true, |pd| pd > syncing_difficulty); + if force || self.state == SyncState::NewBlocks || higher_difficulty || self.old_blocks.is_some() { match self.state { - SyncState::Idle => { - // check if we can start snapshot sync with this peer - if peer_snapshot_number.unwrap_or(0) > 0 && chain_info.best_block_number == 0 { - self.start_snapshot_sync(io, peer_id); - } else { - if self.last_imported_block < chain_info.best_block_number { - self.last_imported_block = chain_info.best_block_number; - self.last_imported_hash = chain_info.best_block_hash; - } - trace!(target: "sync", "Starting sync with {}", peer_id); - self.start_sync_round(io); - self.sync_peer(io, peer_id, force); + SyncState::Idle if self.snapshot_sync_enabled + && chain_info.best_block_number < peer_snapshot_number + && (peer_snapshot_number - chain_info.best_block_number) > SNAPSHOT_RESTORE_THRESHOLD => { + trace!(target: "sync", "Starting snapshot sync: {} vs {}", peer_snapshot_number, chain_info.best_block_number); + self.start_snapshot_sync(io, peer_id); + }, + SyncState::Idle | SyncState::Blocks | SyncState::NewBlocks => { + if io.chain().queue_info().is_full() { + self.pause_sync(); + return; } - }, - SyncState::ChainHead => { - // Request subchain headers - trace!(target: "sync", "Starting sync with better chain"); - let last = self.last_imported_hash.clone(); - // Request MAX_HEADERS_TO_REQUEST - 2 headers apart so that - // MAX_HEADERS_TO_REQUEST would include headers for neighbouring subchains - self.request_headers_by_hash(io, peer_id, &last, SUBCHAIN_SIZE, MAX_HEADERS_TO_REQUEST - 2, false, PeerAsking::Heads); - }, - SyncState::Blocks | SyncState::NewBlocks => { - if io.chain().block_status(BlockID::Hash(peer_latest)) == BlockStatus::Unknown { - self.request_blocks(io, peer_id, false); + + let have_latest = io.chain().block_status(BlockID::Hash(peer_latest)) != BlockStatus::Unknown; + if !have_latest && higher_difficulty { + // check if got new blocks to download + if let Some(request) = self.new_blocks.request_blocks(io) { + self.request_blocks(io, peer_id, request, BlockSet::NewBlocks); + if self.state == SyncState::Idle { + self.state = SyncState::Blocks; + } + return; + } + } + + if let Some(request) = self.old_blocks.as_mut().and_then(|d| d.request_blocks(io)) { + self.request_blocks(io, peer_id, request, BlockSet::OldBlocks); + return; } }, SyncState::SnapshotData => { @@ -1028,61 +1079,18 @@ impl ChainSync { } } - fn start_sync_round(&mut self, io: &mut SyncIo) { - self.state = SyncState::ChainHead; - trace!(target: "sync", "Starting round (last imported count = {:?}, block = {:?}", self.imported_this_round, self.last_imported_block); - // Check if need to retract to find the common block. The problem is that the peers still return headers by hash even - // from the non-canonical part of the tree. So we also retract if nothing has been imported last round. - match self.imported_this_round { - Some(n) if n == 0 && self.last_imported_block > 0 => { - // nothing was imported last round, step back to a previous block - // search parent in last round known parents first - if let Some(&(_, p)) = self.round_parents.iter().find(|&&(h, _)| h == self.last_imported_hash) { - self.last_imported_block -= 1; - self.last_imported_hash = p.clone(); - trace!(target: "sync", "Searching common header from the last round {} ({})", self.last_imported_block, self.last_imported_hash); - } else { - match io.chain().block_hash(BlockID::Number(self.last_imported_block - 1)) { - Some(h) => { - self.last_imported_block -= 1; - self.last_imported_hash = h; - trace!(target: "sync", "Searching common header in the blockchain {} ({})", self.last_imported_block, self.last_imported_hash); - } - None => { - debug!(target: "sync", "Could not revert to previous block, last: {} ({})", self.last_imported_block, self.last_imported_hash); - } - } - } + /// Perofrm block download request` + fn request_blocks(&mut self, io: &mut SyncIo, peer_id: PeerId, request: BlockRequest, block_set: BlockSet) { + match request { + BlockRequest::Headers { start, count, skip } => { + self.request_headers_by_hash(io, peer_id, &start, count, skip, false, block_set); + }, + BlockRequest::Bodies { hashes } => { + self.request_bodies(io, peer_id, hashes, block_set); + }, + BlockRequest::Receipts { hashes } => { + self.request_receipts(io, peer_id, hashes, block_set); }, - _ => (), - } - self.imported_this_round = None; - } - - /// Find some headers or blocks to download for a peer. - fn request_blocks(&mut self, io: &mut SyncIo, peer_id: PeerId, ignore_others: bool) { - self.clear_peer_download(peer_id); - if io.chain().queue_info().is_full() { - self.pause_sync(); - return; - } - - // check to see if we need to download any block bodies first - let needed_bodies = self.blocks.needed_bodies(MAX_BODIES_TO_REQUEST, ignore_others); - if !needed_bodies.is_empty() { - if let Some(ref mut peer) = self.peers.get_mut(&peer_id) { - peer.asking_blocks = needed_bodies.clone(); - } - self.request_bodies(io, peer_id, needed_bodies); - return; - } - - // find subchain to download - if let Some((h, count)) = self.blocks.needed_headers(MAX_HEADERS_TO_REQUEST, ignore_others) { - if let Some(ref mut peer) = self.peers.get_mut(&peer_id) { - peer.asking_blocks = vec![h.clone()]; - } - self.request_headers_by_hash(io, peer_id, &h, count, 0, false, PeerAsking::BlockHeaders); } } @@ -1102,14 +1110,24 @@ impl ChainSync { fn clear_peer_download(&mut self, peer_id: PeerId) { if let Some(ref mut peer) = self.peers.get_mut(&peer_id) { match peer.asking { - PeerAsking::BlockHeaders | PeerAsking::Heads => { - for b in &peer.asking_blocks { - self.blocks.clear_header_download(b); + PeerAsking::BlockHeaders => { + if let Some(ref hash) = peer.asking_hash { + self.new_blocks.clear_header_download(hash); + if let Some(ref mut old) = self.old_blocks { + old.clear_header_download(hash); + } } }, PeerAsking::BlockBodies => { - for b in &peer.asking_blocks { - self.blocks.clear_body_download(b); + self.new_blocks.clear_body_download(&peer.asking_blocks); + if let Some(ref mut old) = self.old_blocks { + old.clear_body_download(&peer.asking_blocks); + } + }, + PeerAsking::BlockReceipts => { + self.new_blocks.clear_receipt_download(&peer.asking_blocks); + if let Some(ref mut old) = self.old_blocks { + old.clear_receipt_download(&peer.asking_blocks); } }, PeerAsking::SnapshotData => { @@ -1119,104 +1137,53 @@ impl ChainSync { }, _ => (), } - peer.asking_blocks.clear(); - peer.asking_snapshot_data = None; - } - } - - fn block_imported(&mut self, hash: &H256, number: BlockNumber, parent: &H256) { - self.last_imported_block = number; - self.last_imported_hash = hash.clone(); - self.round_parents.push_back((hash.clone(), parent.clone())); - if self.round_parents.len() > MAX_ROUND_PARENTS { - self.round_parents.pop_front(); } } /// Checks if there are blocks fully downloaded that can be imported into the blockchain and does the import. - fn collect_blocks(&mut self, io: &mut SyncIo) { - let mut restart = false; - let mut imported = HashSet::new(); - let blocks = self.blocks.drain(); - let count = blocks.len(); - for block in blocks { - let (h, number, parent) = { - let header = BlockView::new(&block).header_view(); - (header.sha3(), header.number(), header.parent_hash()) - }; - - // Perform basic block verification - if !Block::is_good(&block) { - debug!(target: "sync", "Bad block rlp {:?} : {:?}", h, block); - restart = true; - break; - } - - match io.chain().import_block(block) { - Err(BlockImportError::Import(ImportError::AlreadyInChain)) => { - trace!(target: "sync", "Block already in chain {:?}", h); - self.block_imported(&h, number, &parent); - }, - Err(BlockImportError::Import(ImportError::AlreadyQueued)) => { - trace!(target: "sync", "Block already queued {:?}", h); - self.block_imported(&h, number, &parent); - }, - Ok(_) => { - trace!(target: "sync", "Block queued {:?}", h); - imported.insert(h.clone()); - self.block_imported(&h, number, &parent); - }, - Err(BlockImportError::Block(BlockError::UnknownParent(_))) if self.state == SyncState::NewBlocks => { - trace!(target: "sync", "Unknown new block parent, restarting sync"); - break; - }, - Err(e) => { - debug!(target: "sync", "Bad block {:?} : {:?}", h, e); - restart = true; - break; + fn collect_blocks(&mut self, io: &mut SyncIo, block_set: BlockSet) { + match block_set { + BlockSet::NewBlocks => { + if self.new_blocks.collect_blocks(io, self.state == SyncState::NewBlocks) == Err(DownloaderImportError::Invalid) { + self.restart(io); + } + }, + BlockSet::OldBlocks => { + if self.old_blocks.as_mut().map_or(false, |downloader| { downloader.collect_blocks(io, false) == Err(DownloaderImportError::Invalid) }) { + self.restart(io); + } else if self.old_blocks.as_ref().map_or(false, |downloader| { downloader.is_complete() }) { + trace!(target: "sync", "Background block download is complete"); + self.old_blocks = None; } } } - trace!(target: "sync", "Imported {} of {}", imported.len(), count); - self.imported_this_round = Some(self.imported_this_round.unwrap_or(0) + imported.len()); - - if restart { - self.restart_on_bad_block(io); - return; - } - - if self.blocks.is_empty() { - // complete sync round - trace!(target: "sync", "Sync round complete"); - self.restart(io); - } } /// Request headers from a peer by block hash #[cfg_attr(feature="dev", allow(too_many_arguments))] - fn request_headers_by_hash(&mut self, sync: &mut SyncIo, peer_id: PeerId, h: &H256, count: usize, skip: usize, reverse: bool, asking: PeerAsking) { - trace!(target: "sync", "{} <- GetBlockHeaders: {} entries starting from {}", peer_id, count, h); + fn request_headers_by_hash(&mut self, sync: &mut SyncIo, peer_id: PeerId, h: &H256, count: u64, skip: u64, reverse: bool, set: BlockSet) { + trace!(target: "sync", "{} <- GetBlockHeaders: {} entries starting from {}, set = {:?}", peer_id, count, h, set); let mut rlp = RlpStream::new_list(4); rlp.append(h); rlp.append(&count); rlp.append(&skip); rlp.append(&if reverse {1u32} else {0u32}); - self.send_request(sync, peer_id, asking, GET_BLOCK_HEADERS_PACKET, rlp.out()); - self.peers.get_mut(&peer_id) - .expect("peer_id may originate either from on_packet, where it is already validated or from enumerating self.peers. qed") - .asking_hash = Some(h.clone()); + self.send_request(sync, peer_id, PeerAsking::BlockHeaders, GET_BLOCK_HEADERS_PACKET, rlp.out()); + let peer = self.peers.get_mut(&peer_id).expect("peer_id may originate either from on_packet, where it is already validated or from enumerating self.peers. qed"); + peer.asking_hash = Some(h.clone()); + peer.block_set = Some(set); } /// Request headers from a peer by block number #[cfg_attr(feature="dev", allow(too_many_arguments))] - fn request_headers_by_number(&mut self, sync: &mut SyncIo, peer_id: PeerId, n: BlockNumber, count: usize, skip: usize, reverse: bool, asking: PeerAsking) { - trace!(target: "sync", "{} <- GetBlockHeaders: {} entries starting from {}", peer_id, count, n); + fn request_fork_header_by_number(&mut self, sync: &mut SyncIo, peer_id: PeerId, n: BlockNumber) { + trace!(target: "sync", "{} <- GetForkHeader: at {}", peer_id, n); let mut rlp = RlpStream::new_list(4); rlp.append(&n); - rlp.append(&count); - rlp.append(&skip); - rlp.append(&if reverse {1u32} else {0u32}); - self.send_request(sync, peer_id, asking, GET_BLOCK_HEADERS_PACKET, rlp.out()); + rlp.append(&1u32); + rlp.append(&0u32); + rlp.append(&0u32); + self.send_request(sync, peer_id, PeerAsking::ForkHeader, GET_BLOCK_HEADERS_PACKET, rlp.out()); } /// Request snapshot manifest from a peer. @@ -1235,31 +1202,46 @@ impl ChainSync { } /// Request block bodies from a peer - fn request_bodies(&mut self, sync: &mut SyncIo, peer_id: PeerId, hashes: Vec) { + fn request_bodies(&mut self, sync: &mut SyncIo, peer_id: PeerId, hashes: Vec, set: BlockSet) { let mut rlp = RlpStream::new_list(hashes.len()); - trace!(target: "sync", "{} <- GetBlockBodies: {} entries starting from {:?}", peer_id, hashes.len(), hashes.first()); - for h in hashes { - rlp.append(&h); + trace!(target: "sync", "{} <- GetBlockBodies: {} entries starting from {:?}, set = {:?}", peer_id, hashes.len(), hashes.first(), set); + for h in &hashes { + rlp.append(&h.clone()); } self.send_request(sync, peer_id, PeerAsking::BlockBodies, GET_BLOCK_BODIES_PACKET, rlp.out()); + let peer = self.peers.get_mut(&peer_id).expect("peer_id may originate either from on_packet, where it is already validated or from enumerating self.peers. qed"); + peer.asking_blocks = hashes; + peer.block_set = Some(set); + } + + /// Request block receipts from a peer + fn request_receipts(&mut self, sync: &mut SyncIo, peer_id: PeerId, hashes: Vec, set: BlockSet) { + let mut rlp = RlpStream::new_list(hashes.len()); + trace!(target: "sync", "{} <- GetBlockReceipts: {} entries starting from {:?}, set = {:?}", peer_id, hashes.len(), hashes.first(), set); + for h in &hashes { + rlp.append(&h.clone()); + } + self.send_request(sync, peer_id, PeerAsking::BlockReceipts, GET_RECEIPTS_PACKET, rlp.out()); + let peer = self.peers.get_mut(&peer_id).expect("peer_id may originate either from on_packet, where it is already validated or from enumerating self.peers. qed"); + peer.asking_blocks = hashes; + peer.block_set = Some(set); } /// Reset peer status after request is complete. fn reset_peer_asking(&mut self, peer_id: PeerId, asking: PeerAsking) -> bool { if let Some(ref mut peer) = self.peers.get_mut(&peer_id) { peer.expired = false; + peer.block_set = None; if peer.asking != asking { trace!(target:"sync", "Asking {:?} while expected {:?}", peer.asking, asking); peer.asking = PeerAsking::Nothing; - false - } - else { + return false; + } else { peer.asking = PeerAsking::Nothing; - true + return true; } - } else { - false } + return false; } /// Generic request sender @@ -1314,10 +1296,12 @@ impl ChainSync { /// Send Status message fn send_status(&mut self, io: &mut SyncIo, peer: PeerId) -> Result<(), NetworkError> { - let pv64 = io.eth_protocol_version(peer) >= 64; + let protocol = io.eth_protocol_version(peer); + trace!(target: "sync", "Sending status to {}, protocol version {}", peer, protocol); + let pv64 = protocol >= PROTOCOL_VERSION_64; let mut packet = RlpStream::new_list(if pv64 { 7 } else { 5 }); let chain = io.chain().chain_info(); - packet.append(&(io.eth_protocol_version(peer) as u32)); + packet.append(&(protocol as u32)); packet.append(&self.network_id); packet.append(&chain.total_difficulty); packet.append(&chain.best_block_hash); @@ -1374,8 +1358,13 @@ impl ChainSync { let mut count = 0; let mut data = Bytes::new(); let inc = (skip + 1) as BlockNumber; + let overlay = io.chain_overlay().read(); while number <= last && count < max_count { - if let Some(mut hdr) = io.chain().block_header(BlockID::Number(number)) { + if let Some(hdr) = overlay.get(&number) { + trace!(target: "sync", "{}: Returning cached fork header", peer_id); + data.extend(hdr); + count += 1; + } else if let Some(mut hdr) = io.chain().block_header(BlockID::Number(number)) { data.append(&mut hdr); count += 1; } @@ -1391,7 +1380,7 @@ impl ChainSync { } let mut rlp = RlpStream::new_list(count as usize); rlp.append_raw(&data, count as usize); - trace!(target: "sync", "-> GetBlockHeaders: returned {} entries", count); + trace!(target: "sync", "{} -> GetBlockHeaders: returned {} entries", peer_id, count); Ok(Some((BLOCK_HEADERS_PACKET, rlp))) } @@ -1402,7 +1391,6 @@ impl ChainSync { debug!(target: "sync", "Empty GetBlockBodies request, ignoring."); return Ok(None); } - trace!(target: "sync", "{} -> GetBlockBodies: {} entries", peer_id, count); count = min(count, MAX_BODIES_TO_SEND); let mut added = 0usize; let mut data = Bytes::new(); @@ -1414,7 +1402,7 @@ impl ChainSync { } let mut rlp = RlpStream::new_list(added); rlp.append_raw(&data, added); - trace!(target: "sync", "-> GetBlockBodies: returned {} entries", added); + trace!(target: "sync", "{} -> GetBlockBodies: returned {} entries", peer_id, added); Ok(Some((BLOCK_BODIES_PACKET, rlp))) } @@ -1495,6 +1483,7 @@ impl ChainSync { let rlp = match io.snapshot_service().chunk(hash) { Some(data) => { let mut rlp = RlpStream::new_list(1); + trace!(target: "sync", "{} <- SnapshotData", peer_id); rlp.append(&data); rlp }, @@ -1570,6 +1559,7 @@ impl ChainSync { TRANSACTIONS_PACKET => self.on_peer_transactions(io, peer, &rlp), BLOCK_HEADERS_PACKET => self.on_peer_block_headers(io, peer, &rlp), BLOCK_BODIES_PACKET => self.on_peer_block_bodies(io, peer, &rlp), + RECEIPTS_PACKET => self.on_peer_block_receipts(io, peer, &rlp), NEW_BLOCK_PACKET => self.on_peer_new_block(io, peer, &rlp), NEW_BLOCK_HASHES_PACKET => self.on_peer_new_hashes(io, peer, &rlp), SNAPSHOT_MANIFEST_PACKET => self.on_snapshot_manifest(io, peer, &rlp), @@ -1590,8 +1580,9 @@ impl ChainSync { let mut aborting = Vec::new(); for (peer_id, peer) in &self.peers { let timeout = match peer.asking { - PeerAsking::BlockHeaders | PeerAsking::Heads => (tick - peer.ask_time) > HEADERS_TIMEOUT_SEC, + PeerAsking::BlockHeaders => (tick - peer.ask_time) > HEADERS_TIMEOUT_SEC, PeerAsking::BlockBodies => (tick - peer.ask_time) > BODIES_TIMEOUT_SEC, + PeerAsking::BlockReceipts => (tick - peer.ask_time) > RECEIPTS_TIMEOUT_SEC, PeerAsking::Nothing => false, PeerAsking::ForkHeader => (tick - peer.ask_time) > FORK_HEADER_TIMEOUT_SEC, PeerAsking::SnapshotManifest => (tick - peer.ask_time) > SNAPSHOT_MANIFEST_TIMEOUT_SEC, @@ -1613,7 +1604,8 @@ impl ChainSync { self.state = SyncState::Blocks; self.continue_sync(io); } else if self.state == SyncState::SnapshotWaiting && io.snapshot_service().status() == RestorationStatus::Inactive { - self.state = SyncState::Idle; + trace!(target:"sync", "Snapshot restoration is complete"); + self.restart(io); self.continue_sync(io); } } @@ -1737,7 +1729,6 @@ impl ChainSync { /// propagates new transactions to all peers pub fn propagate_new_transactions(&mut self, io: &mut SyncIo) -> usize { - // Early out of nobody to send to. if self.peers.is_empty() { return 0; @@ -1833,7 +1824,7 @@ impl ChainSync { } if !invalid.is_empty() { trace!(target: "sync", "Bad blocks in the queue, restarting"); - self.restart_on_bad_block(io); + self.restart(io); } for peer_info in self.peers.values_mut() { peer_info.last_sent_transactions.clear(); @@ -2050,6 +2041,7 @@ mod tests { snapshot_number: None, snapshot_hash: None, asking_snapshot_data: None, + block_set: None, }); sync } diff --git a/sync/src/lib.rs b/sync/src/lib.rs index a4c29f166..6cfe2a26c 100644 --- a/sync/src/lib.rs +++ b/sync/src/lib.rs @@ -48,6 +48,7 @@ extern crate ethcore_ipc as ipc; mod chain; mod blocks; +mod block_sync; mod sync_io; mod snapshot; diff --git a/sync/src/sync_io.rs b/sync/src/sync_io.rs index 52118b710..24a73437b 100644 --- a/sync/src/sync_io.rs +++ b/sync/src/sync_io.rs @@ -14,9 +14,13 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . +use std::collections::HashMap; use network::{NetworkContext, PeerId, PacketId, NetworkError, SessionInfo}; +use util::Bytes; use ethcore::client::BlockChainClient; +use ethcore::header::BlockNumber; use ethcore::snapshot::SnapshotService; +use parking_lot::RwLock; /// IO interface for the syning handler. /// Provides peer connection management and an interface to the blockchain client. @@ -48,6 +52,8 @@ pub trait SyncIo { } /// Check if the session is expired fn is_expired(&self) -> bool; + /// Return sync overlay + fn chain_overlay(&self) -> &RwLock>; } /// Wraps `NetworkContext` and the blockchain client @@ -55,15 +61,20 @@ pub struct NetSyncIo<'s, 'h> where 'h: 's { network: &'s NetworkContext<'h>, chain: &'s BlockChainClient, snapshot_service: &'s SnapshotService, + chain_overlay: &'s RwLock>, } impl<'s, 'h> NetSyncIo<'s, 'h> { /// Creates a new instance from the `NetworkContext` and the blockchain client reference. - pub fn new(network: &'s NetworkContext<'h>, chain: &'s BlockChainClient, snapshot_service: &'s SnapshotService) -> NetSyncIo<'s, 'h> { + pub fn new(network: &'s NetworkContext<'h>, + chain: &'s BlockChainClient, + snapshot_service: &'s SnapshotService, + chain_overlay: &'s RwLock>) -> NetSyncIo<'s, 'h> { NetSyncIo { network: network, chain: chain, snapshot_service: snapshot_service, + chain_overlay: chain_overlay, } } } @@ -89,6 +100,10 @@ impl<'s, 'h> SyncIo for NetSyncIo<'s, 'h> { self.chain } + fn chain_overlay(&self) -> &RwLock> { + self.chain_overlay + } + fn snapshot_service(&self) -> &SnapshotService { self.snapshot_service } diff --git a/sync/src/tests/helpers.rs b/sync/src/tests/helpers.rs index fcc8f002e..7fb2319c7 100644 --- a/sync/src/tests/helpers.rs +++ b/sync/src/tests/helpers.rs @@ -30,6 +30,7 @@ pub struct TestIo<'p> { pub queue: &'p mut VecDeque, pub sender: Option, pub to_disconnect: HashSet, + overlay: RwLock>, } impl<'p> TestIo<'p> { @@ -40,6 +41,7 @@ impl<'p> TestIo<'p> { queue: queue, sender: sender, to_disconnect: HashSet::new(), + overlay: RwLock::new(HashMap::new()), } } } @@ -90,6 +92,10 @@ impl<'p> SyncIo for TestIo<'p> { fn eth_protocol_version(&self, _peer: PeerId) -> u8 { 64 } + + fn chain_overlay(&self) -> &RwLock> { + &self.overlay + } } pub struct TestPacket { @@ -149,6 +155,7 @@ impl TestNet { for client in 0..self.peers.len() { if peer != client { let mut p = self.peers.get_mut(peer).unwrap(); + p.sync.write().restart(&mut TestIo::new(&mut p.chain, &p.snapshot_service, &mut p.queue, Some(client as PeerId))); p.sync.write().on_peer_connected(&mut TestIo::new(&mut p.chain, &p.snapshot_service, &mut p.queue, Some(client as PeerId)), client as PeerId); } } diff --git a/sync/src/tests/snapshot.rs b/sync/src/tests/snapshot.rs index adbb3ce48..58b7ec786 100644 --- a/sync/src/tests/snapshot.rs +++ b/sync/src/tests/snapshot.rs @@ -77,7 +77,9 @@ impl SnapshotService for TestSnapshotService { match *self.restoration_manifest.lock() { Some(ref manifest) if self.state_restoration_chunks.lock().len() == manifest.state_hashes.len() && self.block_restoration_chunks.lock().len() == manifest.block_hashes.len() => RestorationStatus::Inactive, - Some(_) => RestorationStatus::Ongoing { + Some(ref manifest) => RestorationStatus::Ongoing { + state_chunks: manifest.state_hashes.len() as u32, + block_chunks: manifest.block_hashes.len() as u32, state_chunks_done: self.state_restoration_chunks.lock().len() as u32, block_chunks_done: self.block_restoration_chunks.lock().len() as u32, }, @@ -114,7 +116,7 @@ impl SnapshotService for TestSnapshotService { fn snapshot_sync() { ::env_logger::init().ok(); let mut net = TestNet::new(2); - net.peer_mut(0).snapshot_service = Arc::new(TestSnapshotService::new_with_snapshot(16, H256::new(), 1)); + net.peer_mut(0).snapshot_service = Arc::new(TestSnapshotService::new_with_snapshot(16, H256::new(), 500000)); net.peer_mut(0).chain.add_blocks(1, EachBlockWith::Nothing); net.sync_steps(19); // status + manifest + chunks assert_eq!(net.peer(1).snapshot_service.state_restoration_chunks.lock().len(), net.peer(0).snapshot_service.manifest.as_ref().unwrap().state_hashes.len()); diff --git a/util/io/src/worker.rs b/util/io/src/worker.rs index f4f63919f..fd2040468 100644 --- a/util/io/src/worker.rs +++ b/util/io/src/worker.rs @@ -101,11 +101,11 @@ impl Worker { let _ = wait.wait(lock); } - if deleting.load(AtomicOrdering::Acquire) { - return; - } - while let chase_lev::Steal::Data(work) = stealer.steal() { - Worker::do_work(work, channel.clone()); + while !deleting.load(AtomicOrdering::Acquire) { + match stealer.steal() { + chase_lev::Steal::Data(work) => Worker::do_work(work, channel.clone()), + _ => break, + } } } } diff --git a/util/network/src/connection.rs b/util/network/src/connection.rs index 456c35e69..e12434a19 100644 --- a/util/network/src/connection.rs +++ b/util/network/src/connection.rs @@ -106,10 +106,11 @@ impl GenericConnection { /// Add a packet to send queue. pub fn send(&mut self, io: &IoContext, data: Bytes) where Message: Send + Clone { if !data.is_empty() { + trace!(target:"network", "{}: Sending {} bytes", self.token, data.len()); self.send_queue.push_back(Cursor::new(data)); - } - if !self.interest.is_writable() { - self.interest.insert(EventSet::writable()); + if !self.interest.is_writable() { + self.interest.insert(EventSet::writable()); + } io.update_registration(self.token).ok(); } } diff --git a/util/network/src/session.rs b/util/network/src/session.rs index 845f98bec..50aa92168 100644 --- a/util/network/src/session.rs +++ b/util/network/src/session.rs @@ -30,7 +30,7 @@ use node_table::NodeId; use stats::NetworkStats; use time; -const PING_TIMEOUT_SEC: u64 = 30; +const PING_TIMEOUT_SEC: u64 = 65; const PING_INTERVAL_SEC: u64 = 30; /// Peer session over encrypted connection. diff --git a/util/src/standard.rs b/util/src/standard.rs index 0693dcd23..3d6c93e1a 100644 --- a/util/src/standard.rs +++ b/util/src/standard.rs @@ -46,4 +46,4 @@ pub use rustc_serialize::hex::{FromHex, FromHexError}; pub use heapsize::HeapSizeOf; pub use itertools::Itertools; -pub use parking_lot::{Condvar, Mutex, MutexGuard, RwLock, RwLockReadGuard, RwLockWriteGuard}; +pub use parking_lot::{Condvar, Mutex, MutexGuard, RwLock, RwLockReadGuard, RwLockWriteGuard}; \ No newline at end of file