From 0578712a26356591e6ccb81c1675363384da03fb Mon Sep 17 00:00:00 2001 From: arkpar Date: Thu, 24 Dec 2015 17:18:47 +0100 Subject: [PATCH] syncing code done --- src/eth.rs | 18 +- src/header.rs | 6 +- src/sync/chain.rs | 464 +++++++++++++++++++++++++++++++++++++++++++--- src/sync/mod.rs | 9 +- 4 files changed, 461 insertions(+), 36 deletions(-) diff --git a/src/eth.rs b/src/eth.rs index 7b3de0d9d..3a8853bac 100644 --- a/src/eth.rs +++ b/src/eth.rs @@ -13,6 +13,14 @@ pub enum BlockStatus { InChain, Queued(QueueStatus), Bad, + Unknown, +} + +pub enum ImportResult { + Queued(QueueStatus), + AlreadyInChain, + AlreadyQueued(QueueStatus), + Bad, } pub struct BlockChainInfo { @@ -30,16 +38,18 @@ pub type BlockNumber = u32; pub type BlockHeader = ::header::Header; pub trait BlockChainClient : Sync { - fn block_header(&self, h: &H256) -> Option; + fn block_header(&self, h: &H256) -> Option; fn block_body(&self, h: &H256) -> Option; fn block(&self, h: &H256) -> Option; fn block_status(&self, h: &H256) -> BlockStatus; - fn block_header_at(&self, n: BlockNumber) -> Option; + fn block_header_at(&self, n: BlockNumber) -> Option; fn block_body_at(&self, h: BlockNumber) -> Option; fn block_at(&self, h: BlockNumber) -> Option; - fn block_status_at(&self, h: BlockNumber) -> Option; + fn block_status_at(&self, h: BlockNumber) -> BlockStatus; fn tree_route(&self, from: &H256, to: &H256) -> TreeRoute; - fn import_block(&mut self, b: Bytes) -> BlockStatus; + fn state_data(&self, h: &H256) -> Option; + fn block_receipts(&self, h: &H256) -> Option; + fn import_block(&mut self, b: &[u8]) -> ImportResult; fn queue_stats(&self) -> BlockQueueStats; fn clear_queue(&mut self) -> BlockQueueStats; fn info(&self) -> BlockChainInfo; diff --git a/src/header.rs b/src/header.rs index 28e568e8e..e161ac280 100644 --- a/src/header.rs +++ b/src/header.rs @@ -52,6 +52,10 @@ impl Header { seal: vec![], } } + + pub fn hash(&self) -> H256 { + unimplemented!(); + } } impl Decodable for Header { @@ -99,7 +103,7 @@ impl Encodable for Header { self.gas_used.encode(e); self.timestamp.encode(e); self.extra_data.encode(e); - + for b in self.seal.iter() { b.encode(e); } diff --git a/src/sync/chain.rs b/src/sync/chain.rs index 51b210ae9..c14abb89a 100644 --- a/src/sync/chain.rs +++ b/src/sync/chain.rs @@ -1,14 +1,15 @@ use std::collections::{HashSet, HashMap}; use std::cmp::{min, max}; -use std::ops::{Add, Sub}; +use std::ops::{Add, Sub, Range}; use std::mem::{replace}; use util::network::{PeerId, HandlerIo, PacketId}; use util::hash::{H256}; use util::bytes::{Bytes}; use util::uint::{U256}; -use util::rlp::{Rlp, RlpStream}; +use util::rlp::{Rlp, RlpStream, self}; //TODO: use UntrustedRlp use util::rlp::rlptraits::{Stream, View}; -use eth::{BlockNumber, BlockChainClient}; +use util::sha3::Hashable; +use eth::{BlockNumber, BlockChainClient, BlockHeader, BlockStatus, QueueStatus, ImportResult}; pub struct SyncIo<'s, 'h> where 'h:'s { network: &'s mut HandlerIo<'h>, @@ -16,11 +17,23 @@ pub struct SyncIo<'s, 'h> where 'h:'s { } impl<'s, 'h> SyncIo<'s, 'h> { + pub fn new(network: &'s mut HandlerIo<'h>, chain: &'s mut BlockChainClient) -> SyncIo<'s,'h> { + SyncIo { + network: network, + chain: chain, + } + } fn disable_peer(&mut self, peer_id: &PeerId) { self.network.disable_peer(*peer_id); } } +const PROTOCOL_VERSION: u8 = 63u8; +const MAX_BODIES_TO_SEND: usize = 256; +const MAX_HEADERS_TO_SEND: usize = 1024; +const MAX_NODE_DATA_TO_SEND: usize = 1024; +const MAX_RECEIPTS_TO_SEND: usize = 1024; + const STATUS_PACKET: u8 = 0x00; const NEW_BLOCK_HASHES_PACKET: u8 = 0x01; const TRANSACTIONS_PACKET: u8 = 0x02; @@ -125,7 +138,6 @@ pub struct ChainSync { impl ChainSync { - pub fn new(io: &mut SyncIo) -> ChainSync { let mut sync = ChainSync { state: SyncState::NotSynced, @@ -195,7 +207,7 @@ impl ChainSync { } /// Called by peer to report status - pub fn on_peer_status(&mut self, io: &mut SyncIo, peer_id: &PeerId, r: &Rlp) { + fn on_peer_status(&mut self, io: &mut SyncIo, peer_id: &PeerId, r: &Rlp) { let peer = PeerInfo { protocol_version: r.val_at(0), network_id: r.val_at(1), @@ -213,7 +225,7 @@ impl ChainSync { } /// Called by peer once it has new block headers during sync - pub fn on_peer_block_headers(&mut self, io: &mut SyncIo, peer_id: &PeerId, r: &Rlp) { + fn on_peer_block_headers(&mut self, io: &mut SyncIo, peer_id: &PeerId, r: &Rlp) { let item_count = r.item_count(); trace!(target: "sync", "BlockHeaders ({} entries)", item_count); self.clear_peer_download(peer_id); @@ -225,36 +237,193 @@ impl ChainSync { trace!(target: "sync", "Ignored block headers while waiting"); return; } + + for i in 0..item_count { + let info: BlockHeader = r.val_at(i); + let number = BlockNumber::from(info.number); + if number <= self.last_imported_block || self.headers.have_item(&number) { + trace!(target: "sync", "Skipping existing block header"); + continue; + } + if number > self.highest_block { + self.highest_block = number; + } + match io.chain.block_status(&info.hash()) { + BlockStatus::InChain => { + self.have_common_block = true; + self.last_imported_block = number; + }, + _ => { + if self.have_common_block { + //validate chain + if self.headers.find_item(&(number - 1)).map_or(false, |p| p.hash != info.parent_hash) { + // mismatching parent id, delete the previous block and don't add this one + // TODO: lower peer rating + debug!(target: "sync", "Mismatched block header {} {}", number, info.hash()); + self.remove_downloaded_blocks(number - 1); + continue; + } + if self.headers.find_item(&(number + 1)).map_or(false, |p| p.parent != info.hash()) { + // mismatching parent id for the next block, clear following headers + debug!(target: "sync", "Mismatched block header {}", number + 1); + self.remove_downloaded_blocks(number + 1); + } + } + let hdr = Header { + data: r.at(i).data().to_vec(), + hash: info.hash(), + parent: info.parent_hash, + }; + self.headers.insert_item(number, hdr); + let header_id = HeaderId { + transactions_root: info.transactions_root, + uncles: info.uncles_hash + }; + if header_id.transactions_root == rlp::SHA3_NULL_RLP && header_id.uncles == rlp::SHA3_EMPTY_LIST_RLP { + //empty body, just mark as downloaded + let mut body_stream = RlpStream::new_list(2); + body_stream.append_raw(&rlp::EMPTY_LIST_RLP, 1); + body_stream.append_raw(&rlp::EMPTY_LIST_RLP, 1); + self.bodies.insert_item(number, body_stream.out()); + } + else { + self.header_ids.insert(header_id, number); + } + } + } + self.collect_blocks(io); + self.continue_sync(io); + } } /// Called by peer once it has new block bodies - pub fn on_peer_block_bodies(&mut self, io: &mut SyncIo, peer: &PeerId, r: &Rlp) { + fn on_peer_block_bodies(&mut self, io: &mut SyncIo, peer_id: &PeerId, r: &Rlp) { + let item_count = r.item_count(); + trace!(target: "sync", "BlockBodies ({} entries)", item_count); + self.clear_peer_download(peer_id); + if self.state != SyncState::Blocks && self.state != SyncState::NewBlocks && self.state != SyncState::Waiting { + trace!(target: "sync", "Ignored unexpected block bodies"); + return; + } + if self.state == SyncState::Waiting { + trace!(target: "sync", "Ignored block bodies while waiting"); + return; + } + for i in 0..item_count { + let body: Rlp = r.at(i); + let tx = body.at(0); + let tx_root = ::util::triehash::ordered_trie_root(tx.iter().map(|r| r.data().to_vec()).collect()); //TODO: get rid of vectors here + let uncles = body.at(1).data().sha3(); + let header_id = HeaderId { + transactions_root: tx_root, + uncles: uncles + }; + match self.header_ids.get(&header_id).map(|n| *n) { + Some(n) => { + self.header_ids.remove(&header_id); + self.bodies.insert_item(n, body.data().to_vec()); + } + None => { + debug!(target: "sync", "Ignored unknown block body"); + } + } + } + self.collect_blocks(io); + self.continue_sync(io); } /// Called by peer once it has new block bodies - pub fn on_peer_new_block(&mut self, io: &mut SyncIo, peer: &PeerId, r: &Rlp) { + fn on_peer_new_block(&mut self, io: &mut SyncIo, peer_id: &PeerId, r: &Rlp) { + let block_rlp = r.at(0); + let header_rlp = block_rlp.at(0); + let h = header_rlp.data().sha3(); + + match io.chain.import_block(block_rlp.data()) { + ImportResult::AlreadyInChain => { + trace!(target: "sync", "New block already in chain {:?}", h); + }, + ImportResult::AlreadyQueued(_) => { + trace!(target: "sync", "New block already queued {:?}", h); + }, + ImportResult::Queued(QueueStatus::Known) => { + trace!(target: "sync", "New block queued {:?}", h); + }, + ImportResult::Queued(QueueStatus::Unknown) => { + trace!(target: "sync", "New block unknown {:?}", h); + //TODO: handle too many unknown blocks + let difficulty: U256 = r.val_at(1); + let peer_difficulty = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer").difficulty; + if difficulty > peer_difficulty { + trace!(target: "sync", "Received block {:?} with no known parent. Peer needs syncing...", h); + self.sync_peer(io, peer_id, true); + } + }, + ImportResult::Bad =>{ + debug!(target: "sync", "Bad new block {:?}", h); + io.disable_peer(peer_id); + } + } } - pub fn on_peer_new_hashes(&mut self, io: &mut SyncIo, peer: &PeerId, r: &Rlp) { + fn on_peer_new_hashes(&mut self, io: &mut SyncIo, peer_id: &PeerId, r: &Rlp) { + if self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer").asking != PeerAsking::Nothing { + trace!(target: "sync", "Ignoring new hashes since we're already downloading."); + return; + } + let hashes = r.iter().map(|item| (item.val_at::(0), item.val_at::(1))); + let mut max_height: U256 = From::from(0); + for (h, d) in hashes { + match io.chain.block_status(&h) { + BlockStatus::InChain => { + trace!(target: "sync", "New block hash already in chain {:?}", h); + }, + BlockStatus::Queued(_) => { + trace!(target: "sync", "New hash block already queued {:?}", h); + }, + BlockStatus::Unknown => { + trace!(target: "sync", "New unknown block hash {:?}", h); + if d > max_height { + let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer"); + peer.latest = h.clone(); + max_height = d; + } + }, + BlockStatus::Bad =>{ + debug!(target: "sync", "Bad new block hash {:?}", h); + io.disable_peer(peer_id); + return; + } + } + } } /// Called by peer when it is disconnecting - pub fn on_peer_aborting(&mut self, peer: &PeerId) { + pub fn on_peer_aborting(&mut self, io: &mut SyncIo, peer: &PeerId) { + self.clear_peer_download(peer); + self.continue_sync(io); + } + + pub fn on_peer_connected(&mut self, io: &mut SyncIo, peer: &PeerId) { + self.send_status(io, peer); } /// Resume downloading after witing state - fn continue_sync(&mut self) { + fn continue_sync(&mut self, io: &mut SyncIo) { + let peers: Vec = self.peers.keys().map(|k| *k).collect(); + for p in peers { + self.sync_peer(io, &p, false); + } } /// Called after all blocks have been donloaded fn complete_sync(&mut self) { + self.reset(); + self.state = SyncState::Idle; } /// Enter waiting state fn pause_sync(&mut self) { - } - - fn reset_sync(&mut self) { + self.state = SyncState::Waiting; } fn sync_peer(&mut self, io: &mut SyncIo, peer_id: &PeerId, force: bool) { @@ -367,11 +536,90 @@ impl ChainSync { } } - fn clear_peer_download(&mut self, peer: &PeerId) { + fn clear_peer_download(&mut self, peer_id: &PeerId) { + let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer"); + for b in &peer.asking_blocks { + self.downloading_headers.remove(&b); + self.downloading_bodies.remove(&b); + } + peer.asking_blocks.clear(); } - fn clear_all_download(&mut self) { + + fn collect_blocks(&mut self, io: &mut SyncIo) { + if !self.have_common_block || self.headers.is_empty() || self.bodies.is_empty() { + return; + } + + let mut restart = false; + // merge headers and bodies + { + let mut headers = self.headers.last().unwrap(); + let mut bodies = self.bodies.last().unwrap(); + if headers.0 != bodies.0 || headers.0 != self.last_imported_block + 1 { + return; + } + + for i in 0..min(headers.1.len(), bodies.1.len()) { + let mut block_rlp = RlpStream::new_list(3); + block_rlp.append_raw(&headers.1[i].data, 1); + block_rlp.append_raw(&bodies.1[i], 2); + let h = &headers.1[i].hash; + match io.chain.import_block(&block_rlp.out()) { + ImportResult::AlreadyInChain => { + trace!(target: "sync", "Block already in chain {:?}", h); + self.last_imported_block = headers.0 + i as BlockNumber; + }, + ImportResult::AlreadyQueued(_) => { + trace!(target: "sync", "Block already queued {:?}", h); + self.last_imported_block = headers.0 + i as BlockNumber; + }, + ImportResult::Queued(QueueStatus::Known) => { + trace!(target: "sync", "Block queued {:?}", h); + self.last_imported_block = headers.0 + i as BlockNumber; + }, + ImportResult::Queued(QueueStatus::Unknown) => { + panic!("Queued out of order block"); + }, + ImportResult::Bad =>{ + debug!(target: "sync", "Bad block {:?}", h); + restart = true; + } + } + } + } + + if restart { + self.restart(io); + return; + } + + self.headers.remove_head(&self.last_imported_block); + self.bodies.remove_head(&self.last_imported_block); + + if self.headers.is_empty() { + assert!(self.bodies.is_empty()); + self.complete_sync(); + } } - fn collect_blocks(&mut self) { + + fn remove_downloaded_blocks(&mut self, start: BlockNumber) { + for n in self.headers.get_tail(&start) { + match self.headers.find_item(&n) { + Some(ref header_data) => { + let header_to_delete: BlockHeader = rlp::decode(&header_data.data); + let header_id = HeaderId { + transactions_root: header_to_delete.transactions_root, + uncles: header_to_delete.uncles_hash + }; + self.header_ids.remove(&header_id); + }, + None => {} + } + self.downloading_bodies.remove(&n); + self.downloading_headers.remove(&n); + } + self.headers.remove_tail(&start); + self.bodies.remove_tail(&start); } fn request_headers_by_hash(&mut self, sync: &mut SyncIo, peer_id: &PeerId, h: &H256, count: usize, skip: usize, reverse: bool) { @@ -411,7 +659,7 @@ impl ChainSync { Err(e) => { warn!(target:"sync", "Error sending request: {:?}", e); sync.disable_peer(peer_id); - self.on_peer_aborting(peer_id); + self.on_peer_aborting(sync, peer_id); } Ok(_) => { let mut peer = self.peers.get_mut(&peer_id).unwrap(); @@ -419,6 +667,147 @@ impl ChainSync { } } } + + fn on_peer_transactions(&mut self, io: &mut SyncIo, peer_id: &PeerId, r: &Rlp) { + } + + fn send_status(&mut self, io: &mut SyncIo, peer_id: &PeerId) { + let mut packet = RlpStream::new_list(5); + let chain = io.chain.info(); + packet.append(&(PROTOCOL_VERSION as u32)); + packet.append(&0u32); //TODO: network id + packet.append(&chain.total_difficulty); + packet.append(&chain.last_block_hash); + packet.append(&chain.genesis_hash); + self.send_request(io, peer_id, PeerAsking::State, STATUS_PACKET, packet.out()); + } + + fn return_block_headers(&self, io: &mut SyncIo, peer_id: &PeerId, r: &Rlp) { + // Packet layout: + // [ block: { P , B_32 }, maxHeaders: P, skip: P, reverse: P in { 0 , 1 } ] + let max_headers: usize = r.val_at(1); + let skip: usize = r.val_at(2); + let reverse: bool = r.val_at(3); + let mut packet = RlpStream::new(); + let last = io.chain.info().last_block_number; + let mut number = if r.at(0).size() == 32 { + // id is a hash + let hash: H256 = r.val_at(0); + trace!(target: "sync", "GetBlockHeaders (hash: {}, max: {}, skip: {}, reverse:{})", hash, max_headers, skip, reverse); + match io.chain.block_header(&hash) { + Some(hdr) => From::from(rlp::decode::(&hdr).number), + None => last + } + } + else { + r.val_at(0) + }; + + number = max(1, number); + number = min(last, number); + let max_count = min(MAX_HEADERS_TO_SEND, max_headers); + let mut count = 0; + let mut data = Bytes::new(); + while number < last && number > 1 && count < max_count { + match io.chain.block_header_at(number) { + Some(mut hdr) => { + data.append(&mut hdr); + count += 1; + } + None => {} + } + number += (if reverse { -(skip + 1) } else { skip + 1 }) as BlockNumber; + } + let mut rlp = RlpStream::new_list(count as usize); + rlp.append_raw(&data, count as usize); + io.network.respond(BLOCK_HEADERS_PACKET, rlp.out()); + } + + fn return_block_bodies(&self, io: &mut SyncIo, peer_id: &PeerId, r: &Rlp) { + let mut count = r.item_count(); + if count == 0 { + debug!(target: "sync", "Empty GetBlockBodies request, ignoring."); + return; + } + count = min(count, MAX_BODIES_TO_SEND); + let mut added = 0usize; + let mut data = Bytes::new(); + for i in 0..count { + match io.chain.block_body(&r.val_at::(i)) { + Some(mut hdr) => { + data.append(&mut hdr); + added += 1; + } + None => {} + } + } + let mut rlp = RlpStream::new_list(added); + rlp.append_raw(&data, added); + io.network.respond(BLOCK_BODIES_PACKET, rlp.out()); + } + + fn return_node_data(&self, io: &mut SyncIo, peer_id: &PeerId, r: &Rlp) { + let mut count = r.item_count(); + if count == 0 { + debug!(target: "sync", "Empty GetNodeData request, ignoring."); + return; + } + count = min(count, MAX_NODE_DATA_TO_SEND); + let mut added = 0usize; + let mut data = Bytes::new(); + for i in 0..count { + match io.chain.state_data(&r.val_at::(i)) { + Some(mut hdr) => { + data.append(&mut hdr); + added += 1; + } + None => {} + } + } + let mut rlp = RlpStream::new_list(added); + rlp.append_raw(&data, added); + io.network.respond(NODE_DATA_PACKET, rlp.out()); + } + + fn return_receipts(&self, io: &mut SyncIo, peer_id: &PeerId, r: &Rlp) { + let mut count = r.item_count(); + if count == 0 { + debug!(target: "sync", "Empty GetReceipts request, ignoring."); + return; + } + count = min(count, MAX_RECEIPTS_TO_SEND); + let mut added = 0usize; + let mut data = Bytes::new(); + for i in 0..count { + match io.chain.block_receipts(&r.val_at::(i)) { + Some(mut hdr) => { + data.append(&mut hdr); + added += 1; + } + None => {} + } + } + let mut rlp = RlpStream::new_list(added); + rlp.append_raw(&data, added); + io.network.respond(RECEIPTS_PACKET, rlp.out()); + } + + pub fn on_packet(&mut self, io: &mut SyncIo, peer: &PeerId, packet_id: u8, data: &[u8]) { + let rlp = Rlp::new(data); + match packet_id { + STATUS_PACKET => self.on_peer_status(io, peer, &rlp), + TRANSACTIONS_PACKET => self.on_peer_transactions(io, peer, &rlp), + GET_BLOCK_HEADERS_PACKET => self.return_block_headers(io, peer, &rlp), + BLOCK_HEADERS_PACKET => self.on_peer_block_headers(io, peer, &rlp), + GET_BLOCK_BODIES_PACKET => self.return_block_bodies(io, peer, &rlp), + BLOCK_BODIES_PACKET => self.on_peer_block_bodies(io, peer, &rlp), + NEW_BLOCK_PACKET => self.on_peer_new_block(io, peer, &rlp), + NEW_BLOCK_HASHES_PACKET => self.on_peer_new_hashes(io, peer, &rlp), + GET_NODE_DATA_PACKET => self.return_node_data(io, peer, &rlp), + GET_RECEIPTS_PACKET => self.return_receipts(io, peer, &rlp), + _ => debug!(target: "sync", "Unkown packet {}", packet_id) + } + } } pub trait ToUsize { @@ -426,7 +815,7 @@ pub trait ToUsize { } pub trait FromUsize { - fn from(s: usize) -> Self; + fn from_usize(s: usize) -> Self; } impl ToUsize for BlockNumber { @@ -436,7 +825,7 @@ impl ToUsize for BlockNumber { } impl FromUsize for BlockNumber { - fn from(s: usize) -> BlockNumber { + fn from_usize(s: usize) -> BlockNumber { s as BlockNumber } } @@ -444,6 +833,7 @@ impl FromUsize for BlockNumber { pub trait RangeCollection { fn have_item(&self, key: &K) -> bool; fn find_item(&self, key: &K) -> Option<&V>; + fn get_tail(&mut self, key: &K) -> Range; fn remove_head(&mut self, start: &K); fn remove_tail(&mut self, start: &K); fn insert_item(&mut self, key: K, value: V); @@ -454,7 +844,7 @@ impl RangeCollection for Vec<(K, Vec)> where K: Ord + PartialEq + match self.binary_search_by(|&(k, _)| k.cmp(key).reverse()) { Ok(_) => true, Err(index) => match self.get(index + 1) { - Some(&(ref k, ref v)) => k <= key && (*k + FromUsize::from(v.len())) > *key, + Some(&(ref k, ref v)) => k <= key && (*k + FromUsize::from_usize(v.len())) > *key, _ => false }, } @@ -464,12 +854,28 @@ impl RangeCollection for Vec<(K, Vec)> where K: Ord + PartialEq + match self.binary_search_by(|&(k, _)| k.cmp(key).reverse()) { Ok(index) => self.get(index).unwrap().1.get(0), Err(index) => match self.get(index + 1) { - Some(&(ref k, ref v)) if k <= key && (*k + FromUsize::from(v.len())) > *key => v.get((*key - *k).to_usize()), + Some(&(ref k, ref v)) if k <= key && (*k + FromUsize::from_usize(v.len())) > *key => v.get((*key - *k).to_usize()), _ => None }, } } + /// Get a range of elements from start till the end of the range + fn get_tail(&mut self, key: &K) -> Range { + let kv = *key; + match self.binary_search_by(|&(k, _)| k.cmp(key).reverse()) { + Ok(index) => kv..(kv + FromUsize::from_usize(self[index].1.len())), + Err(index) => { + let mut empty = false; + match self.get_mut(index + 1) { + Some(&mut (ref k, ref mut v)) if k <= key && (*k + FromUsize::from_usize(v.len())) > *key => { + kv..(*k + FromUsize::from_usize(v.len())) + } + _ => kv..kv + } + }, + } + } /// Remove element key and following elements in the same range fn remove_tail(&mut self, key: &K) { match self.binary_search_by(|&(k, _)| k.cmp(key).reverse()) { @@ -477,7 +883,7 @@ impl RangeCollection for Vec<(K, Vec)> where K: Ord + PartialEq + Err(index) =>{ let mut empty = false; match self.get_mut(index + 1) { - Some(&mut (ref k, ref mut v)) if k <= key && (*k + FromUsize::from(v.len())) > *key => { + Some(&mut (ref k, ref mut v)) if k <= key && (*k + FromUsize::from_usize(v.len())) > *key => { v.truncate((*key - *k).to_usize()); empty = v.is_empty(); } @@ -492,21 +898,21 @@ impl RangeCollection for Vec<(K, Vec)> where K: Ord + PartialEq + /// Remove range elements up to key fn remove_head(&mut self, key: &K) { - if *key == FromUsize::from(0) { + if *key == FromUsize::from_usize(0) { return } - let prev = *key - FromUsize::from(1); + let prev = *key - FromUsize::from_usize(1); match self.binary_search_by(|&(k, _)| k.cmp(&prev).reverse()) { Ok(index) => { self.remove(index); }, Err(index) => { let mut empty = false; match self.get_mut(index + 1) { - Some(&mut (ref mut k, ref mut v)) if *k <= prev && (*k + FromUsize::from(v.len())) > *key => { + Some(&mut (ref mut k, ref mut v)) if *k <= prev && (*k + FromUsize::from_usize(v.len())) > *key => { let head = v.split_off((*key - *k).to_usize()); empty = head.is_empty(); let removed = ::std::mem::replace(v, head); - let new_k = *k - FromUsize::from(removed.len()); + let new_k = *k - FromUsize::from_usize(removed.len()); ::std::mem::replace(k, new_k); } _ => {} @@ -529,7 +935,7 @@ impl RangeCollection for Vec<(K, Vec)> where K: Ord + PartialEq + lower += 1; let mut to_remove: Option = None; - if lower < self.len() && self[lower].0 + FromUsize::from(self[lower].1.len()) == key { + if lower < self.len() && self[lower].0 + FromUsize::from_usize(self[lower].1.len()) == key { // extend into existing chunk self[lower].1.push(value); } @@ -545,7 +951,7 @@ impl RangeCollection for Vec<(K, Vec)> where K: Ord + PartialEq + let (mut next, mut inserted) = self.split_at_mut(lower); let mut next = next.last_mut().unwrap(); let mut inserted = inserted.first_mut().unwrap(); - if next.0 == key + FromUsize::from(1) + if next.0 == key + FromUsize::from_usize(1) { inserted.1.append(&mut next.1); to_remove = Some(lower - 1); diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 29e61b7d0..3ae2e7bc5 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -1,6 +1,7 @@ use std::sync::{Arc}; use eth::{BlockChainClient}; -use util::network::{Error as NetworkError, ProtocolHandler, NetworkService, HandlerIo, TimerToken, PeerId}; +use util::network::{ProtocolHandler, NetworkService, HandlerIo, TimerToken, PeerId}; +use sync::chain::{ChainSync, SyncIo}; mod chain; @@ -11,7 +12,8 @@ pub fn new(service: &mut NetworkService, eth_cleint: Arc) { struct EthSync { idle: bool, - chain: Arc + chain: Arc, + sync: ChainSync } impl ProtocolHandler for EthSync { @@ -20,12 +22,15 @@ impl ProtocolHandler for EthSync { } fn read(&mut self, io: &mut HandlerIo, peer: &PeerId, packet_id: u8, data: &[u8]) { + self.sync.on_packet(&mut SyncIo::new(io, Arc::get_mut(&mut self.chain).unwrap()), peer, packet_id, data); } fn connected(&mut self, io: &mut HandlerIo, peer: &PeerId) { + self.sync.on_peer_connected(&mut SyncIo::new(io, Arc::get_mut(&mut self.chain).unwrap()), peer); } fn disconnected(&mut self, io: &mut HandlerIo, peer: &PeerId) { + self.sync.on_peer_aborting(&mut SyncIo::new(io, Arc::get_mut(&mut self.chain).unwrap()), peer); } fn timeout(&mut self, io: &mut HandlerIo, timer: TimerToken) {