Merge branch 'master' into lightcli
This commit is contained in:
@@ -14,6 +14,8 @@
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use std::collections::hash_map::Entry;
|
||||
use smallvec::SmallVec;
|
||||
use util::*;
|
||||
use rlp::*;
|
||||
use network::NetworkError;
|
||||
@@ -21,11 +23,14 @@ use ethcore::header::Header as BlockHeader;
|
||||
|
||||
known_heap_size!(0, HeaderId);
|
||||
|
||||
type SmallHashVec = SmallVec<[H256; 1]>;
|
||||
|
||||
/// Block data with optional body.
|
||||
struct SyncBlock {
|
||||
header: Bytes,
|
||||
body: Option<Bytes>,
|
||||
receipts: Option<Bytes>,
|
||||
receipts_root: H256,
|
||||
}
|
||||
|
||||
/// Block with optional receipt
|
||||
@@ -64,15 +69,15 @@ pub struct BlockCollection {
|
||||
parents: HashMap<H256, H256>,
|
||||
/// Used to map body to header.
|
||||
header_ids: HashMap<HeaderId, H256>,
|
||||
/// Used to map receipts root to header.
|
||||
receipt_ids: HashMap<H256, H256>,
|
||||
/// Used to map receipts root to headers.
|
||||
receipt_ids: HashMap<H256, SmallHashVec>,
|
||||
/// First block in `blocks`.
|
||||
head: Option<H256>,
|
||||
/// Set of block header hashes being downloaded
|
||||
downloading_headers: HashSet<H256>,
|
||||
/// Set of block bodies being downloaded identified by block hash.
|
||||
downloading_bodies: HashSet<H256>,
|
||||
/// Set of block receipts being downloaded identified by block hash.
|
||||
/// Set of block receipts being downloaded identified by receipt root.
|
||||
downloading_receipts: HashSet<H256>,
|
||||
}
|
||||
|
||||
@@ -194,21 +199,24 @@ impl BlockCollection {
|
||||
head = self.parents.get(&head.unwrap()).cloned();
|
||||
if let Some(head) = head {
|
||||
match self.blocks.get(&head) {
|
||||
Some(block) if block.receipts.is_none() && !self.downloading_receipts.contains(&head) => {
|
||||
self.downloading_receipts.insert(head.clone());
|
||||
needed_receipts.push(head.clone());
|
||||
Some(block) => {
|
||||
if block.receipts.is_none() && !self.downloading_receipts.contains(&block.receipts_root) {
|
||||
self.downloading_receipts.insert(block.receipts_root);
|
||||
needed_receipts.push(head.clone());
|
||||
}
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
}
|
||||
}
|
||||
for h in self.receipt_ids.values() {
|
||||
// If there are multiple blocks per receipt, only request one of them.
|
||||
for (root, h) in self.receipt_ids.iter().map(|(root, hashes)| (root, hashes[0])) {
|
||||
if needed_receipts.len() >= count {
|
||||
break;
|
||||
}
|
||||
if !self.downloading_receipts.contains(h) {
|
||||
if !self.downloading_receipts.contains(root) {
|
||||
needed_receipts.push(h.clone());
|
||||
self.downloading_receipts.insert(h.clone());
|
||||
self.downloading_receipts.insert(*root);
|
||||
}
|
||||
}
|
||||
needed_receipts
|
||||
@@ -245,7 +253,9 @@ impl BlockCollection {
|
||||
/// Unmark block receipt as being downloaded.
|
||||
pub fn clear_receipt_download(&mut self, hashes: &[H256]) {
|
||||
for h in hashes {
|
||||
self.downloading_receipts.remove(h);
|
||||
if let Some(ref block) = self.blocks.get(h) {
|
||||
self.downloading_receipts.remove(&block.receipts_root);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -366,23 +376,24 @@ impl BlockCollection {
|
||||
let receipts = UntrustedRlp::new(&r);
|
||||
ordered_trie_root(receipts.iter().map(|r| r.as_raw().to_vec())) //TODO: get rid of vectors here
|
||||
};
|
||||
match self.receipt_ids.get(&receipt_root).cloned() {
|
||||
Some(h) => {
|
||||
self.receipt_ids.remove(&receipt_root);
|
||||
self.downloading_receipts.remove(&h);
|
||||
match self.blocks.get_mut(&h) {
|
||||
Some(ref mut block) => {
|
||||
trace!(target: "sync", "Got receipt {}", h);
|
||||
block.receipts = Some(r);
|
||||
Ok(())
|
||||
},
|
||||
None => {
|
||||
warn!("Got receipt with no header {}", h);
|
||||
Err(NetworkError::BadProtocol)
|
||||
self.downloading_receipts.remove(&receipt_root);
|
||||
match self.receipt_ids.entry(receipt_root) {
|
||||
Entry::Occupied(entry) => {
|
||||
for h in entry.remove() {
|
||||
match self.blocks.get_mut(&h) {
|
||||
Some(ref mut block) => {
|
||||
trace!(target: "sync", "Got receipt {}", h);
|
||||
block.receipts = Some(r.clone());
|
||||
},
|
||||
None => {
|
||||
warn!("Got receipt with no header {}", h);
|
||||
return Err(NetworkError::BadProtocol)
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
None => {
|
||||
_ => {
|
||||
trace!(target: "sync", "Ignored unknown/stale block receipt {:?}", receipt_root);
|
||||
Err(NetworkError::BadProtocol)
|
||||
}
|
||||
@@ -407,6 +418,7 @@ impl BlockCollection {
|
||||
header: header,
|
||||
body: None,
|
||||
receipts: None,
|
||||
receipts_root: H256::new(),
|
||||
};
|
||||
let header_id = HeaderId {
|
||||
transactions_root: info.transactions_root().clone(),
|
||||
@@ -429,11 +441,9 @@ impl BlockCollection {
|
||||
let receipts_stream = RlpStream::new_list(0);
|
||||
block.receipts = Some(receipts_stream.out());
|
||||
} else {
|
||||
if self.receipt_ids.contains_key(&receipt_root) {
|
||||
warn!(target: "sync", "Duplicate receipt root {:?}, block: {:?}", receipt_root, hash);
|
||||
}
|
||||
self.receipt_ids.insert(receipt_root, hash.clone());
|
||||
self.receipt_ids.entry(receipt_root).or_insert_with(|| SmallHashVec::new()).push(hash.clone());
|
||||
}
|
||||
block.receipts_root = receipt_root;
|
||||
}
|
||||
|
||||
self.parents.insert(info.parent_hash().clone(), hash.clone());
|
||||
|
||||
@@ -158,14 +158,16 @@ pub const SNAPSHOT_SYNC_PACKET_COUNT: u8 = 0x16;
|
||||
|
||||
const MAX_SNAPSHOT_CHUNKS_DOWNLOAD_AHEAD: usize = 3;
|
||||
|
||||
const MIN_SUPPORTED_SNAPSHOT_MANIFEST_VERSION: u64 = 1;
|
||||
|
||||
const WAIT_PEERS_TIMEOUT_SEC: u64 = 5;
|
||||
const STATUS_TIMEOUT_SEC: u64 = 5;
|
||||
const HEADERS_TIMEOUT_SEC: u64 = 15;
|
||||
const BODIES_TIMEOUT_SEC: u64 = 10;
|
||||
const RECEIPTS_TIMEOUT_SEC: u64 = 10;
|
||||
const FORK_HEADER_TIMEOUT_SEC: u64 = 3;
|
||||
const SNAPSHOT_MANIFEST_TIMEOUT_SEC: u64 = 3;
|
||||
const SNAPSHOT_DATA_TIMEOUT_SEC: u64 = 60;
|
||||
const SNAPSHOT_MANIFEST_TIMEOUT_SEC: u64 = 5;
|
||||
const SNAPSHOT_DATA_TIMEOUT_SEC: u64 = 120;
|
||||
|
||||
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
|
||||
/// Sync state
|
||||
@@ -377,6 +379,8 @@ pub struct ChainSync {
|
||||
transactions_stats: TransactionsStats,
|
||||
/// Enable ancient block downloading
|
||||
download_old_blocks: bool,
|
||||
/// Enable warp sync.
|
||||
enable_warp_sync: bool,
|
||||
}
|
||||
|
||||
type RlpResponseResult = Result<Option<(PacketId, RlpStream)>, PacketDecodeError>;
|
||||
@@ -401,6 +405,7 @@ impl ChainSync {
|
||||
snapshot: Snapshot::new(),
|
||||
sync_start_time: None,
|
||||
transactions_stats: TransactionsStats::default(),
|
||||
enable_warp_sync: config.warp_sync,
|
||||
};
|
||||
sync.update_targets(chain);
|
||||
sync
|
||||
@@ -463,12 +468,7 @@ impl ChainSync {
|
||||
/// Reset sync. Clear all downloaded data but keep the queue
|
||||
fn reset(&mut self, io: &mut SyncIo) {
|
||||
self.new_blocks.reset();
|
||||
self.snapshot.clear();
|
||||
let chain_info = io.chain().chain_info();
|
||||
if self.state == SyncState::SnapshotData {
|
||||
debug!(target:"sync", "Aborting snapshot restore");
|
||||
io.snapshot_service().abort_restore();
|
||||
}
|
||||
for (_, ref mut p) in &mut self.peers {
|
||||
if p.block_set != Some(BlockSet::OldBlocks) {
|
||||
p.reset_asking();
|
||||
@@ -487,6 +487,11 @@ impl ChainSync {
|
||||
/// Restart sync
|
||||
pub fn reset_and_continue(&mut self, io: &mut SyncIo) {
|
||||
trace!(target: "sync", "Restarting");
|
||||
if self.state == SyncState::SnapshotData {
|
||||
debug!(target:"sync", "Aborting snapshot restore");
|
||||
io.snapshot_service().abort_restore();
|
||||
}
|
||||
self.snapshot.clear();
|
||||
self.reset(io);
|
||||
self.continue_sync(io);
|
||||
}
|
||||
@@ -499,7 +504,10 @@ impl ChainSync {
|
||||
}
|
||||
|
||||
fn maybe_start_snapshot_sync(&mut self, io: &mut SyncIo) {
|
||||
if self.state != SyncState::WaitingPeers {
|
||||
if !self.enable_warp_sync {
|
||||
return;
|
||||
}
|
||||
if self.state != SyncState::WaitingPeers && self.state != SyncState::Blocks && self.state != SyncState::Waiting {
|
||||
return;
|
||||
}
|
||||
// Make sure the snapshot block is not too far away from best block and network best block and
|
||||
@@ -531,7 +539,7 @@ impl ChainSync {
|
||||
(best_hash, max_peers, snapshot_peers)
|
||||
};
|
||||
|
||||
let timeout = self.sync_start_time.map_or(false, |t| ((time::precise_time_ns() - t) / 1_000_000_000) > WAIT_PEERS_TIMEOUT_SEC);
|
||||
let timeout = (self.state == SyncState::WaitingPeers) && self.sync_start_time.map_or(false, |t| ((time::precise_time_ns() - t) / 1_000_000_000) > WAIT_PEERS_TIMEOUT_SEC);
|
||||
|
||||
if let (Some(hash), Some(peers)) = (best_hash, best_hash.map_or(None, |h| snapshot_peers.get(&h))) {
|
||||
if max_peers >= SNAPSHOT_MIN_PEERS {
|
||||
@@ -549,13 +557,18 @@ impl ChainSync {
|
||||
}
|
||||
|
||||
fn start_snapshot_sync(&mut self, io: &mut SyncIo, peers: &[PeerId]) {
|
||||
self.snapshot.clear();
|
||||
for p in peers {
|
||||
if self.peers.get(p).map_or(false, |p| p.asking == PeerAsking::Nothing) {
|
||||
self.request_snapshot_manifest(io, *p);
|
||||
if !self.snapshot.have_manifest() {
|
||||
for p in peers {
|
||||
if self.peers.get(p).map_or(false, |p| p.asking == PeerAsking::Nothing) {
|
||||
self.request_snapshot_manifest(io, *p);
|
||||
}
|
||||
}
|
||||
self.state = SyncState::SnapshotManifest;
|
||||
trace!(target: "sync", "New snapshot sync with {:?}", peers);
|
||||
} else {
|
||||
self.state = SyncState::SnapshotData;
|
||||
trace!(target: "sync", "Resumed snapshot sync with {:?}", peers);
|
||||
}
|
||||
self.state = SyncState::SnapshotManifest;
|
||||
}
|
||||
|
||||
/// Restart sync disregarding the block queue status. May end up re-downloading up to QUEUE_SIZE blocks
|
||||
@@ -1023,12 +1036,18 @@ impl ChainSync {
|
||||
let manifest = match ManifestData::from_rlp(manifest_rlp.as_raw()) {
|
||||
Err(e) => {
|
||||
trace!(target: "sync", "{}: Ignored bad manifest: {:?}", peer_id, e);
|
||||
io.disconnect_peer(peer_id);
|
||||
io.disable_peer(peer_id);
|
||||
self.continue_sync(io);
|
||||
return Ok(());
|
||||
}
|
||||
Ok(manifest) => manifest,
|
||||
};
|
||||
if manifest.version < MIN_SUPPORTED_SNAPSHOT_MANIFEST_VERSION {
|
||||
trace!(target: "sync", "{}: Snapshot manifest version too low: {}", peer_id, manifest.version);
|
||||
io.disable_peer(peer_id);
|
||||
self.continue_sync(io);
|
||||
return Ok(());
|
||||
}
|
||||
self.snapshot.reset_to(&manifest, &manifest_rlp.as_raw().sha3());
|
||||
io.snapshot_service().begin_restore(manifest);
|
||||
self.state = SyncState::SnapshotData;
|
||||
|
||||
@@ -35,6 +35,7 @@ extern crate time;
|
||||
extern crate rand;
|
||||
extern crate semver;
|
||||
extern crate parking_lot;
|
||||
extern crate smallvec;
|
||||
extern crate rlp;
|
||||
|
||||
extern crate ethcore_light as light;
|
||||
|
||||
@@ -54,6 +54,11 @@ impl Snapshot {
|
||||
self.snapshot_hash = None;
|
||||
}
|
||||
|
||||
/// Check if currently downloading a snapshot.
|
||||
pub fn have_manifest(&self) -> bool {
|
||||
self.snapshot_hash.is_some()
|
||||
}
|
||||
|
||||
/// Reset collection for a manifest RLP
|
||||
pub fn reset_to(&mut self, manifest: &ManifestData, hash: &H256) {
|
||||
self.clear();
|
||||
@@ -139,6 +144,7 @@ mod test {
|
||||
let state_chunks: Vec<Bytes> = (0..20).map(|_| H256::random().to_vec()).collect();
|
||||
let block_chunks: Vec<Bytes> = (0..20).map(|_| H256::random().to_vec()).collect();
|
||||
let manifest = ManifestData {
|
||||
version: 2,
|
||||
state_hashes: state_chunks.iter().map(|data| data.sha3()).collect(),
|
||||
block_hashes: block_chunks.iter().map(|data| data.sha3()).collect(),
|
||||
state_root: H256::new(),
|
||||
|
||||
@@ -49,6 +49,7 @@ impl TestSnapshotService {
|
||||
let state_chunks: Vec<Bytes> = (0..num_state_chunks).map(|_| H256::random().to_vec()).collect();
|
||||
let block_chunks: Vec<Bytes> = (0..num_block_chunks).map(|_| H256::random().to_vec()).collect();
|
||||
let manifest = ManifestData {
|
||||
version: 2,
|
||||
state_hashes: state_chunks.iter().map(|data| data.sha3()).collect(),
|
||||
block_hashes: block_chunks.iter().map(|data| data.sha3()).collect(),
|
||||
state_root: H256::new(),
|
||||
|
||||
Reference in New Issue
Block a user