From 9087cc798b5b99cd7439ce4a8e7ae9f9cb73a068 Mon Sep 17 00:00:00 2001 From: arkpar Date: Fri, 25 Dec 2015 14:55:55 +0100 Subject: [PATCH] sync refactoring; range tests --- .gitignore | 3 + src/bin/client.rs | 2 +- src/eth.rs | 9 +- src/sync/chain.rs | 334 +++++++++++------------------------ src/sync/mod.rs | 40 ++++- src/sync/range_collection | 136 ++++++++++++++ src/sync/range_collection.rs | 244 +++++++++++++++++++++++++ src/sync/tests.rs | 13 ++ 8 files changed, 543 insertions(+), 238 deletions(-) create mode 100644 src/sync/range_collection create mode 100644 src/sync/range_collection.rs create mode 100644 src/sync/tests.rs diff --git a/.gitignore b/.gitignore index 5fc1b92c5..8b4a3b588 100644 --- a/.gitignore +++ b/.gitignore @@ -17,3 +17,6 @@ Cargo.lock # mac stuff .DS_Store + +# gdb files +.gdb_history diff --git a/src/bin/client.rs b/src/bin/client.rs index 5bc2adaab..0af81e47f 100644 --- a/src/bin/client.rs +++ b/src/bin/client.rs @@ -5,7 +5,7 @@ use util::network::{NetworkService}; fn main() { - let mut service = NetworkService::start().unwrap(); + let _service = NetworkService::start().unwrap(); loop { let mut cmd = String::new(); stdin().read_line(&mut cmd).unwrap(); diff --git a/src/eth.rs b/src/eth.rs index 3a8853bac..e82a899a0 100644 --- a/src/eth.rs +++ b/src/eth.rs @@ -31,7 +31,10 @@ pub struct BlockChainInfo { pub last_block_number: BlockNumber } -pub struct BlockQueueStats; +pub struct BlockQueueStatus { + pub full: bool, +} + pub struct TreeRoute; pub type BlockNumber = u32; @@ -50,7 +53,7 @@ pub trait BlockChainClient : Sync { fn state_data(&self, h: &H256) -> Option; fn block_receipts(&self, h: &H256) -> Option; fn import_block(&mut self, b: &[u8]) -> ImportResult; - fn queue_stats(&self) -> BlockQueueStats; - fn clear_queue(&mut self) -> BlockQueueStats; + fn queue_status(&self) -> BlockQueueStatus; + fn clear_queue(&mut self); fn info(&self) -> BlockChainInfo; } diff --git a/src/sync/chain.rs b/src/sync/chain.rs index c14abb89a..7f6b99ccb 100644 --- a/src/sync/chain.rs +++ b/src/sync/chain.rs @@ -1,6 +1,5 @@ use std::collections::{HashSet, HashMap}; use std::cmp::{min, max}; -use std::ops::{Add, Sub, Range}; use std::mem::{replace}; use util::network::{PeerId, HandlerIo, PacketId}; use util::hash::{H256}; @@ -10,6 +9,7 @@ use util::rlp::{Rlp, RlpStream, self}; //TODO: use UntrustedRlp use util::rlp::rlptraits::{Stream, View}; use util::sha3::Hashable; use eth::{BlockNumber, BlockChainClient, BlockHeader, BlockStatus, QueueStatus, ImportResult}; +use sync::range_collection::{RangeCollection, ToUsize, FromUsize}; pub struct SyncIo<'s, 'h> where 'h:'s { network: &'s mut HandlerIo<'h>, @@ -28,11 +28,25 @@ impl<'s, 'h> SyncIo<'s, 'h> { } } +impl ToUsize for BlockNumber { + fn to_usize(&self) -> usize { + *self as usize + } +} + +impl FromUsize for BlockNumber { + fn from_usize(s: usize) -> BlockNumber { + s as BlockNumber + } +} + const PROTOCOL_VERSION: u8 = 63u8; const MAX_BODIES_TO_SEND: usize = 256; const MAX_HEADERS_TO_SEND: usize = 1024; const MAX_NODE_DATA_TO_SEND: usize = 1024; const MAX_RECEIPTS_TO_SEND: usize = 1024; +const MAX_HEADERS_TO_REQUEST: usize = 1024; +const MAX_BODIES_TO_REQUEST: usize = 256; const STATUS_PACKET: u8 = 0x00; const NEW_BLOCK_HASHES_PACKET: u8 = 0x01; @@ -79,13 +93,13 @@ pub enum SyncState { } pub struct SyncStatus { - state: SyncState, - protocol_version: u8, - start_block_number: BlockNumber, - current_block_number: BlockNumber, - highest_block_number: BlockNumber, - blocks_total: usize, - blocks_received: usize + pub state: SyncState, + pub protocol_version: u8, + pub start_block_number: BlockNumber, + pub current_block_number: BlockNumber, + pub highest_block_number: BlockNumber, + pub blocks_total: usize, + pub blocks_received: usize } #[derive(PartialEq, Eq, Debug)] @@ -138,8 +152,8 @@ pub struct ChainSync { impl ChainSync { - pub fn new(io: &mut SyncIo) -> ChainSync { - let mut sync = ChainSync { + pub fn new() -> ChainSync { + ChainSync { state: SyncState::NotSynced, starting_block: 0, highest_block: 0, @@ -152,9 +166,7 @@ impl ChainSync { last_imported_block: 0, syncing_difficulty: U256::from(0u64), have_common_block: false - }; - sync.restart(io); - sync + } } /// @returns Synchonization status @@ -177,8 +189,8 @@ impl ChainSync { } /// @returns true is Sync is in progress - pub fn is_syncing(&self) { - self.state != SyncState::Idle; + pub fn is_syncing(&self) -> bool { + self.state != SyncState::Idle } fn reset(&mut self) { @@ -217,6 +229,9 @@ impl ChainSync { asking: PeerAsking::Nothing, asking_blocks: Vec::new(), }; + + trace!(target: "sync", "New peer (protocol: {}, network: {:?}, difficulty: {:?}, latest:{}, genesis:{})", peer.protocol_version, peer.network_id, peer.difficulty, peer.latest, peer.genesis); + let old = self.peers.insert(peer_id.clone(), peer); if old.is_some() { panic!("ChainSync: new peer already exists"); @@ -423,6 +438,7 @@ impl ChainSync { /// Enter waiting state fn pause_sync(&mut self) { + trace!(target: "sync", "Block queue full, pausing sync"); self.state = SyncState::Waiting; } @@ -431,12 +447,12 @@ impl ChainSync { let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer"); if peer.asking != PeerAsking::Nothing { - debug!(target: "sync", "Can't sync with this peer - outstanding asks."); + trace!(target: "sync", "Can't sync with this peer - outstanding asks."); return; } if self.state == SyncState::Waiting { - debug!(target: "sync", "Waiting for block queue"); + trace!(target: "sync", "Waiting for block queue"); return; } (peer.latest.clone(), peer.difficulty.clone()) @@ -459,24 +475,30 @@ impl ChainSync { fn request_blocks(&mut self, io: &mut SyncIo, peer_id: &PeerId) { self.clear_peer_download(peer_id); + + if io.chain.queue_status().full { + self.pause_sync(); + return; + } + // check to see if we need to download any block bodies first let mut needed_bodies: Vec = Vec::new(); let mut needed_numbers: Vec = Vec::new(); - let mut index = 0usize; - if self.have_common_block && !self.headers.is_empty() && self.headers.last().unwrap().0 == self.last_imported_block + 1 { - let mut header = self.headers.len() - 1; - while header != 0 && needed_bodies.len() < 1024 && index < self.headers[header].1.len() { - let block = self.headers[header].0 + index as BlockNumber; - if !self.downloading_bodies.contains(&block) && !self.bodies.have_item(&block) { - needed_bodies.push(self.headers[header].1[index].hash.clone()); - needed_numbers.push(block); - self.downloading_bodies.insert(block); + if self.have_common_block && !self.headers.is_empty() && self.headers.range_iter().next().unwrap().0 == self.last_imported_block + 1 { + for (start, ref items) in self.headers.range_iter() { + if needed_bodies.len() >= MAX_BODIES_TO_REQUEST { + break; } - index += 1; - if index >= self.headers[header].1.len() { - index = 0; - header -= 1; + let mut index: BlockNumber = 0; + while index != items.len() as BlockNumber && needed_bodies.len() < MAX_BODIES_TO_REQUEST { + let block = start + index; + if !self.downloading_bodies.contains(&block) && !self.bodies.have_item(&block) { + needed_bodies.push(items[index as usize].hash.clone()); + needed_numbers.push(block); + self.downloading_bodies.insert(block); + } + index += 1; } } } @@ -491,44 +513,43 @@ impl ChainSync { // download backwards until common block is found 1 header at a time start = io.chain.info().last_block_number as usize; if !self.headers.is_empty() { - start = min(start, self.headers.last().unwrap().0 as usize - 1); + start = min(start, self.headers.range_iter().next().unwrap().0 as usize - 1); } if start <= 1 { self.have_common_block = true; //reached genesis } } if self.have_common_block { - start = self.last_imported_block as usize + 1; - let mut next = self.headers.len() - 1; - let mut count = 0usize; - if !self.headers.is_empty() && start >= self.headers.last().unwrap().0 as usize { - start = self.headers.last().unwrap().0 as usize + self.headers.last().unwrap().1.len(); - next -=1; - } - while count == 0 && next != 0 { - count = min(1024, self.headers[next].0 as usize - start); - while count > 0 && self.downloading_headers.contains(&(start as BlockNumber)) { - start +=1; - count -=1; - } - } + + + let mut headers: Vec = Vec::new(); - for block in start..(start + count) { - if !self.downloading_headers.contains(&(block as BlockNumber)) { - headers.push(block as BlockNumber); - self.downloading_headers.insert(block as BlockNumber); + let mut prev = self.last_imported_block + 1; + for (start, ref items) in self.headers.range_iter() { + if headers.len() >= MAX_HEADERS_TO_REQUEST { + break; } + if start > prev { + continue; + } + let mut index = 0; + while index != items.len() as BlockNumber && headers.len() < MAX_BODIES_TO_REQUEST { + let block = prev + index; + if !self.downloading_headers.contains(&(block as BlockNumber)) { + headers.push(block as BlockNumber); + self.downloading_headers.insert(block as BlockNumber); + } + index += 1; + } + prev = start + items.len() as BlockNumber; } - count = self.headers.len(); - if count > 0 { + + if !headers.is_empty() { + let count = headers.len(); replace(&mut self.peers.get_mut(peer_id).unwrap().asking_blocks, headers); assert!(!self.headers.have_item(&(start as BlockNumber))); self.request_headers_by_number(io, peer_id, start as BlockNumber, count, 0, false); } - else if start >= (self.headers[next].0 as usize) { - start = self.headers[next].0 as usize + self.headers[next].1.len(); - next -=1; - } } else { self.request_headers_by_number(io, peer_id, start as BlockNumber, 1, 0, false); @@ -553,8 +574,8 @@ impl ChainSync { let mut restart = false; // merge headers and bodies { - let mut headers = self.headers.last().unwrap(); - let mut bodies = self.bodies.last().unwrap(); + let headers = self.headers.range_iter().next().unwrap(); + let bodies = self.bodies.range_iter().next().unwrap(); if headers.0 != bodies.0 || headers.0 != self.last_imported_block + 1 { return; } @@ -593,8 +614,8 @@ impl ChainSync { return; } - self.headers.remove_head(&self.last_imported_block); - self.bodies.remove_head(&self.last_imported_block); + self.headers.remove_head(&(self.last_imported_block + 1)); + self.bodies.remove_head(&(self.last_imported_block + 1)); if self.headers.is_empty() { assert!(self.bodies.is_empty()); @@ -645,12 +666,12 @@ impl ChainSync { for h in hashes { rlp.append(&h); } - self.send_request(sync, peer_id, PeerAsking::BlockHeaders, GET_BLOCK_BODIES_PACKET, rlp.out()); + self.send_request(sync, peer_id, PeerAsking::BlockBodies, GET_BLOCK_BODIES_PACKET, rlp.out()); } fn send_request(&mut self, sync: &mut SyncIo, peer_id: &PeerId, asking: PeerAsking, packet_id: PacketId, packet: Bytes) { { - let mut peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer"); + let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer"); if peer.asking != PeerAsking::Nothing { warn!(target:"sync", "Asking {:?} while requesting {:?}", asking, peer.asking); } @@ -668,7 +689,7 @@ impl ChainSync { } } - fn on_peer_transactions(&mut self, io: &mut SyncIo, peer_id: &PeerId, r: &Rlp) { + fn on_peer_transactions(&mut self, _io: &mut SyncIo, _peer_id: &PeerId, _r: &Rlp) { } fn send_status(&mut self, io: &mut SyncIo, peer_id: &PeerId) { @@ -682,13 +703,12 @@ impl ChainSync { self.send_request(io, peer_id, PeerAsking::State, STATUS_PACKET, packet.out()); } - fn return_block_headers(&self, io: &mut SyncIo, peer_id: &PeerId, r: &Rlp) { + fn return_block_headers(&self, io: &mut SyncIo, r: &Rlp) { // Packet layout: // [ block: { P , B_32 }, maxHeaders: P, skip: P, reverse: P in { 0 , 1 } ] let max_headers: usize = r.val_at(1); let skip: usize = r.val_at(2); let reverse: bool = r.val_at(3); - let mut packet = RlpStream::new(); let last = io.chain.info().last_block_number; let mut number = if r.at(0).size() == 32 { // id is a hash @@ -716,14 +736,20 @@ impl ChainSync { } None => {} } - number += (if reverse { -(skip + 1) } else { skip + 1 }) as BlockNumber; + if reverse { + number -= (skip + 1) as BlockNumber; + } + else { + number += (skip + 1) as BlockNumber; + } } let mut rlp = RlpStream::new_list(count as usize); rlp.append_raw(&data, count as usize); - io.network.respond(BLOCK_HEADERS_PACKET, rlp.out()); + io.network.respond(BLOCK_HEADERS_PACKET, rlp.out()).unwrap_or_else(|e| + debug!(target: "sync", "Error sending headers: {:?}", e)); } - fn return_block_bodies(&self, io: &mut SyncIo, peer_id: &PeerId, r: &Rlp) { + fn return_block_bodies(&self, io: &mut SyncIo, r: &Rlp) { let mut count = r.item_count(); if count == 0 { debug!(target: "sync", "Empty GetBlockBodies request, ignoring."); @@ -743,10 +769,11 @@ impl ChainSync { } let mut rlp = RlpStream::new_list(added); rlp.append_raw(&data, added); - io.network.respond(BLOCK_BODIES_PACKET, rlp.out()); + io.network.respond(BLOCK_BODIES_PACKET, rlp.out()).unwrap_or_else(|e| + debug!(target: "sync", "Error sending headers: {:?}", e)); } - fn return_node_data(&self, io: &mut SyncIo, peer_id: &PeerId, r: &Rlp) { + fn return_node_data(&self, io: &mut SyncIo, r: &Rlp) { let mut count = r.item_count(); if count == 0 { debug!(target: "sync", "Empty GetNodeData request, ignoring."); @@ -766,10 +793,11 @@ impl ChainSync { } let mut rlp = RlpStream::new_list(added); rlp.append_raw(&data, added); - io.network.respond(NODE_DATA_PACKET, rlp.out()); + io.network.respond(NODE_DATA_PACKET, rlp.out()).unwrap_or_else(|e| + debug!(target: "sync", "Error sending headers: {:?}", e)); } - fn return_receipts(&self, io: &mut SyncIo, peer_id: &PeerId, r: &Rlp) { + fn return_receipts(&self, io: &mut SyncIo, r: &Rlp) { let mut count = r.item_count(); if count == 0 { debug!(target: "sync", "Empty GetReceipts request, ignoring."); @@ -789,7 +817,8 @@ impl ChainSync { } let mut rlp = RlpStream::new_list(added); rlp.append_raw(&data, added); - io.network.respond(RECEIPTS_PACKET, rlp.out()); + io.network.respond(RECEIPTS_PACKET, rlp.out()).unwrap_or_else(|e| + debug!(target: "sync", "Error sending headers: {:?}", e)); } pub fn on_packet(&mut self, io: &mut SyncIo, peer: &PeerId, packet_id: u8, data: &[u8]) { @@ -797,170 +826,19 @@ impl ChainSync { match packet_id { STATUS_PACKET => self.on_peer_status(io, peer, &rlp), TRANSACTIONS_PACKET => self.on_peer_transactions(io, peer, &rlp), - GET_BLOCK_HEADERS_PACKET => self.return_block_headers(io, peer, &rlp), + GET_BLOCK_HEADERS_PACKET => self.return_block_headers(io, &rlp), BLOCK_HEADERS_PACKET => self.on_peer_block_headers(io, peer, &rlp), - GET_BLOCK_BODIES_PACKET => self.return_block_bodies(io, peer, &rlp), + GET_BLOCK_BODIES_PACKET => self.return_block_bodies(io, &rlp), BLOCK_BODIES_PACKET => self.on_peer_block_bodies(io, peer, &rlp), NEW_BLOCK_PACKET => self.on_peer_new_block(io, peer, &rlp), NEW_BLOCK_HASHES_PACKET => self.on_peer_new_hashes(io, peer, &rlp), - GET_NODE_DATA_PACKET => self.return_node_data(io, peer, &rlp), - GET_RECEIPTS_PACKET => self.return_receipts(io, peer, &rlp), + GET_NODE_DATA_PACKET => self.return_node_data(io, &rlp), + GET_RECEIPTS_PACKET => self.return_receipts(io, &rlp), _ => debug!(target: "sync", "Unkown packet {}", packet_id) } } -} -pub trait ToUsize { - fn to_usize(&self) -> usize; -} - -pub trait FromUsize { - fn from_usize(s: usize) -> Self; -} - -impl ToUsize for BlockNumber { - fn to_usize(&self) -> usize { - *self as usize + pub fn maintain_sync(&mut self, _io: &mut SyncIo) { } } -impl FromUsize for BlockNumber { - fn from_usize(s: usize) -> BlockNumber { - s as BlockNumber - } -} - -pub trait RangeCollection { - fn have_item(&self, key: &K) -> bool; - fn find_item(&self, key: &K) -> Option<&V>; - fn get_tail(&mut self, key: &K) -> Range; - fn remove_head(&mut self, start: &K); - fn remove_tail(&mut self, start: &K); - fn insert_item(&mut self, key: K, value: V); -} - -impl RangeCollection for Vec<(K, Vec)> where K: Ord + PartialEq + Add + Sub + Copy + FromUsize + ToUsize { - fn have_item(&self, key: &K) -> bool { - match self.binary_search_by(|&(k, _)| k.cmp(key).reverse()) { - Ok(_) => true, - Err(index) => match self.get(index + 1) { - Some(&(ref k, ref v)) => k <= key && (*k + FromUsize::from_usize(v.len())) > *key, - _ => false - }, - } - } - - fn find_item(&self, key: &K) -> Option<&V> { - match self.binary_search_by(|&(k, _)| k.cmp(key).reverse()) { - Ok(index) => self.get(index).unwrap().1.get(0), - Err(index) => match self.get(index + 1) { - Some(&(ref k, ref v)) if k <= key && (*k + FromUsize::from_usize(v.len())) > *key => v.get((*key - *k).to_usize()), - _ => None - }, - } - } - - /// Get a range of elements from start till the end of the range - fn get_tail(&mut self, key: &K) -> Range { - let kv = *key; - match self.binary_search_by(|&(k, _)| k.cmp(key).reverse()) { - Ok(index) => kv..(kv + FromUsize::from_usize(self[index].1.len())), - Err(index) => { - let mut empty = false; - match self.get_mut(index + 1) { - Some(&mut (ref k, ref mut v)) if k <= key && (*k + FromUsize::from_usize(v.len())) > *key => { - kv..(*k + FromUsize::from_usize(v.len())) - } - _ => kv..kv - } - }, - } - } - /// Remove element key and following elements in the same range - fn remove_tail(&mut self, key: &K) { - match self.binary_search_by(|&(k, _)| k.cmp(key).reverse()) { - Ok(index) => { self.remove(index); }, - Err(index) =>{ - let mut empty = false; - match self.get_mut(index + 1) { - Some(&mut (ref k, ref mut v)) if k <= key && (*k + FromUsize::from_usize(v.len())) > *key => { - v.truncate((*key - *k).to_usize()); - empty = v.is_empty(); - } - _ => {} - } - if empty { - self.remove(index + 1); - } - }, - } - } - - /// Remove range elements up to key - fn remove_head(&mut self, key: &K) { - if *key == FromUsize::from_usize(0) { - return - } - - let prev = *key - FromUsize::from_usize(1); - match self.binary_search_by(|&(k, _)| k.cmp(&prev).reverse()) { - Ok(index) => { self.remove(index); }, - Err(index) => { - let mut empty = false; - match self.get_mut(index + 1) { - Some(&mut (ref mut k, ref mut v)) if *k <= prev && (*k + FromUsize::from_usize(v.len())) > *key => { - let head = v.split_off((*key - *k).to_usize()); - empty = head.is_empty(); - let removed = ::std::mem::replace(v, head); - let new_k = *k - FromUsize::from_usize(removed.len()); - ::std::mem::replace(k, new_k); - } - _ => {} - } - if empty { - self.remove(index + 1); - } - }, - } - } - - fn insert_item(&mut self, key: K, value: V) { - assert!(!self.have_item(&key)); - - let mut lower = match self.binary_search_by(|&(k, _)| k.cmp(&key).reverse()) { - Ok(index) => index, - Err(index) => index, - }; - - lower += 1; - - let mut to_remove: Option = None; - if lower < self.len() && self[lower].0 + FromUsize::from_usize(self[lower].1.len()) == key { - // extend into existing chunk - self[lower].1.push(value); - } - else { - // insert a new chunk - let mut range: Vec = vec![value]; - self.insert(lower, (key, range)); - }; - let next = lower - 1; - if next < self.len() - { - { - let (mut next, mut inserted) = self.split_at_mut(lower); - let mut next = next.last_mut().unwrap(); - let mut inserted = inserted.first_mut().unwrap(); - if next.0 == key + FromUsize::from_usize(1) - { - inserted.1.append(&mut next.1); - to_remove = Some(lower - 1); - } - } - - if let Some(r) = to_remove { - self.remove(r); - } - } - } -} diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 3ae2e7bc5..3ec9c26ce 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -4,21 +4,48 @@ use util::network::{ProtocolHandler, NetworkService, HandlerIo, TimerToken, Peer use sync::chain::{ChainSync, SyncIo}; mod chain; +mod range_collection; + +#[cfg(test)] +mod tests; -pub fn new(service: &mut NetworkService, eth_cleint: Arc) { - +pub fn new(_service: &mut NetworkService, eth_client: Arc) -> EthSync { + EthSync { + chain: eth_client, + sync: ChainSync::new(), + } } -struct EthSync { - idle: bool, +pub struct EthSync { chain: Arc, sync: ChainSync } +pub use self::chain::SyncStatus; + +impl EthSync { + pub fn is_syncing(&self) -> bool { + self.sync.is_syncing() + } + + pub fn status(&self) -> SyncStatus { + self.sync.status() + } + + pub fn stop_network(&mut self, io: &mut HandlerIo) { + self.sync.abort(&mut SyncIo::new(io, Arc::get_mut(&mut self.chain).unwrap())); + } + + pub fn start_network(&mut self, io: &mut HandlerIo) { + self.sync.restart(&mut SyncIo::new(io, Arc::get_mut(&mut self.chain).unwrap())); + } +} + impl ProtocolHandler for EthSync { fn initialize(&mut self, io: &mut HandlerIo) { - io.register_timer(1000); + self.sync.restart(&mut SyncIo::new(io, Arc::get_mut(&mut self.chain).unwrap())); + io.register_timer(1000).unwrap(); } fn read(&mut self, io: &mut HandlerIo, peer: &PeerId, packet_id: u8, data: &[u8]) { @@ -33,7 +60,8 @@ impl ProtocolHandler for EthSync { self.sync.on_peer_aborting(&mut SyncIo::new(io, Arc::get_mut(&mut self.chain).unwrap()), peer); } - fn timeout(&mut self, io: &mut HandlerIo, timer: TimerToken) { + fn timeout(&mut self, io: &mut HandlerIo, _timer: TimerToken) { + self.sync.maintain_sync(&mut SyncIo::new(io, Arc::get_mut(&mut self.chain).unwrap())); } } diff --git a/src/sync/range_collection b/src/sync/range_collection new file mode 100644 index 000000000..cc1ccdf01 --- /dev/null +++ b/src/sync/range_collection @@ -0,0 +1,136 @@ + +pub trait RangeCollection { + fn have_item(&self, key: &K) -> bool; + fn find_item(&self, key: &K) -> Option<&V>; + fn get_tail(&mut self, key: &K) -> Range; + fn remove_head(&mut self, start: &K); + fn remove_tail(&mut self, start: &K); + fn insert_item(&mut self, key: K, value: V); +} + +impl RangeCollection for Vec<(K, Vec)> where K: Ord + PartialEq + Add + Sub + Copy + FromUsize + ToUsize { + fn have_item(&self, key: &K) -> bool { + match self.binary_search_by(|&(k, _)| k.cmp(key).reverse()) { + Ok(_) => true, + Err(index) => match self.get(index + 1) { + Some(&(ref k, ref v)) => k <= key && (*k + FromUsize::from_usize(v.len())) > *key, + _ => false + }, + } + } + + fn find_item(&self, key: &K) -> Option<&V> { + match self.binary_search_by(|&(k, _)| k.cmp(key).reverse()) { + Ok(index) => self.get(index).unwrap().1.get(0), + Err(index) => match self.get(index + 1) { + Some(&(ref k, ref v)) if k <= key && (*k + FromUsize::from_usize(v.len())) > *key => v.get((*key - *k).to_usize()), + _ => None + }, + } + } + + /// Get a range of elements from start till the end of the range + fn get_tail(&mut self, key: &K) -> Range { + let kv = *key; + match self.binary_search_by(|&(k, _)| k.cmp(key).reverse()) { + Ok(index) => kv..(kv + FromUsize::from_usize(self[index].1.len())), + Err(index) => { + let mut empty = false; + match self.get_mut(index + 1) { + Some(&mut (ref k, ref mut v)) if k <= key && (*k + FromUsize::from_usize(v.len())) > *key => { + kv..(*k + FromUsize::from_usize(v.len())) + } + _ => kv..kv + } + }, + } + } + /// Remove element key and following elements in the same range + fn remove_tail(&mut self, key: &K) { + match self.binary_search_by(|&(k, _)| k.cmp(key).reverse()) { + Ok(index) => { self.remove(index); }, + Err(index) =>{ + let mut empty = false; + match self.get_mut(index + 1) { + Some(&mut (ref k, ref mut v)) if k <= key && (*k + FromUsize::from_usize(v.len())) > *key => { + v.truncate((*key - *k).to_usize()); + empty = v.is_empty(); + } + _ => {} + } + if empty { + self.remove(index + 1); + } + }, + } + } + + /// Remove range elements up to key + fn remove_head(&mut self, key: &K) { + if *key == FromUsize::from_usize(0) { + return + } + + let prev = *key - FromUsize::from_usize(1); + match self.binary_search_by(|&(k, _)| k.cmp(&prev).reverse()) { + Ok(index) => { self.remove(index); }, + Err(index) => { + let mut empty = false; + match self.get_mut(index + 1) { + Some(&mut (ref mut k, ref mut v)) if *k <= prev && (*k + FromUsize::from_usize(v.len())) > *key => { + let head = v.split_off((*key - *k).to_usize()); + empty = head.is_empty(); + let removed = ::std::mem::replace(v, head); + let new_k = *k - FromUsize::from_usize(removed.len()); + ::std::mem::replace(k, new_k); + } + _ => {} + } + if empty { + self.remove(index + 1); + } + }, + } + } + + fn insert_item(&mut self, key: K, value: V) { + assert!(!self.have_item(&key)); + + let mut lower = match self.binary_search_by(|&(k, _)| k.cmp(&key).reverse()) { + Ok(index) => index, + Err(index) => index, + }; + + lower += 1; + + let mut to_remove: Option = None; + if lower < self.len() && self[lower].0 + FromUsize::from_usize(self[lower].1.len()) == key { + // extend into existing chunk + self[lower].1.push(value); + } + else { + // insert a new chunk + let mut range: Vec = vec![value]; + self.insert(lower, (key, range)); + }; + let next = lower - 1; + if next < self.len() + { + { + let (mut next, mut inserted) = self.split_at_mut(lower); + let mut next = next.last_mut().unwrap(); + let mut inserted = inserted.first_mut().unwrap(); + if next.0 == key + FromUsize::from_usize(1) + { + inserted.1.append(&mut next.1); + to_remove = Some(lower - 1); + } + } + + if let Some(r) = to_remove { + self.remove(r); + } + } + } +} + diff --git a/src/sync/range_collection.rs b/src/sync/range_collection.rs new file mode 100644 index 000000000..5d19d0fd8 --- /dev/null +++ b/src/sync/range_collection.rs @@ -0,0 +1,244 @@ +use std::ops::{Add, Sub, Range}; + +pub trait ToUsize { + fn to_usize(&self) -> usize; +} + +pub trait FromUsize { + fn from_usize(s: usize) -> Self; +} + +pub trait RangeCollection { + fn have_item(&self, key: &K) -> bool; + fn find_item(&self, key: &K) -> Option<&V>; + fn get_tail(&mut self, key: &K) -> Range; + fn remove_head(&mut self, start: &K); + fn remove_tail(&mut self, start: &K); + fn insert_item(&mut self, key: K, value: V); + fn range_iter<'c>(&'c self) -> RangeIterator<'c, K, V>; +} + +pub struct RangeIterator<'c, K:'c, V:'c> { + range: usize, + collection: &'c Vec<(K, Vec)> +} + +impl<'c, K:'c, V:'c> Iterator for RangeIterator<'c, K, V> where K: Add + FromUsize + ToUsize + Copy { + type Item = (K, &'c [V]); + // The 'Iterator' trait only requires the 'next' method to be defined. The + // return type is 'Option', 'None' is returned when the 'Iterator' is + // over, otherwise the next value is returned wrapped in 'Some' + fn next(&mut self) -> Option<(K, &'c [V])> { + if self.range > 0 { + self.range -= 1; + } + else { + return None; + } + match self.collection.get(self.range) { + Some(&(ref k, ref vec)) => { + Some((*k, &vec)) + }, + None => None + } + } +} + +impl RangeCollection for Vec<(K, Vec)> where K: Ord + PartialEq + Add + Sub + Copy + FromUsize + ToUsize { + fn range_iter<'c>(&'c self) -> RangeIterator<'c, K, V> { + RangeIterator { + range: self.len(), + collection: self + } + } + + fn have_item(&self, key: &K) -> bool { + match self.binary_search_by(|&(k, _)| k.cmp(key).reverse()) { + Ok(_) => true, + Err(index) => match self.get(index) { + Some(&(ref k, ref v)) => k <= key && (*k + FromUsize::from_usize(v.len())) > *key, + _ => false + }, + } + } + + fn find_item(&self, key: &K) -> Option<&V> { + match self.binary_search_by(|&(k, _)| k.cmp(key).reverse()) { + Ok(index) => self.get(index).unwrap().1.get(0), + Err(index) => match self.get(index) { + Some(&(ref k, ref v)) if k <= key && (*k + FromUsize::from_usize(v.len())) > *key => v.get((*key - *k).to_usize()), + _ => None + }, + } + } + + /// Get a range of elements from start till the end of the range + fn get_tail(&mut self, key: &K) -> Range { + let kv = *key; + match self.binary_search_by(|&(k, _)| k.cmp(key).reverse()) { + Ok(index) => kv..(kv + FromUsize::from_usize(self[index].1.len())), + Err(index) => { + match self.get_mut(index) { + Some(&mut (ref k, ref mut v)) if k <= key && (*k + FromUsize::from_usize(v.len())) > *key => { + kv..(*k + FromUsize::from_usize(v.len())) + } + _ => kv..kv + } + }, + } + } + /// Remove element key and following elements in the same range + fn remove_tail(&mut self, key: &K) { + match self.binary_search_by(|&(k, _)| k.cmp(key).reverse()) { + Ok(index) => { self.remove(index); }, + Err(index) =>{ + let mut empty = false; + match self.get_mut(index) { + Some(&mut (ref k, ref mut v)) if k <= key && (*k + FromUsize::from_usize(v.len())) > *key => { + v.truncate((*key - *k).to_usize()); + empty = v.is_empty(); + } + _ => {} + } + if empty { + self.remove(index); + } + }, + } + } + + /// Remove range elements up to key + fn remove_head(&mut self, key: &K) { + if *key == FromUsize::from_usize(0) { + return + } + + let prev = *key - FromUsize::from_usize(1); + match self.binary_search_by(|&(k, _)| k.cmp(key).reverse()) { + Ok(_) => { }, //start of range, do nothing. + Err(index) => { + let mut empty = false; + match self.get_mut(index) { + Some(&mut (ref mut k, ref mut v)) if *k <= prev && (*k + FromUsize::from_usize(v.len())) > prev => { + let tail = v.split_off((*key - *k).to_usize()); + empty = tail.is_empty(); + let removed = ::std::mem::replace(v, tail); + let new_k = *k + FromUsize::from_usize(removed.len()); + ::std::mem::replace(k, new_k); + } + _ => {} + } + if empty { + self.remove(index); + } + }, + } + } + + fn insert_item(&mut self, key: K, value: V) { + assert!(!self.have_item(&key)); + + let lower = match self.binary_search_by(|&(k, _)| k.cmp(&key).reverse()) { + Ok(index) => index, + Err(index) => index, + }; + + let mut to_remove: Option = None; + if lower < self.len() && self[lower].0 + FromUsize::from_usize(self[lower].1.len()) == key { + // extend into existing chunk + self[lower].1.push(value); + } + else { + // insert a new chunk + let range: Vec = vec![value]; + self.insert(lower, (key, range)); + }; + if lower > 0 { + let next = lower - 1; + if next < self.len() + { + { + let (mut next, mut inserted) = self.split_at_mut(lower); + let mut next = next.last_mut().unwrap(); + let mut inserted = inserted.first_mut().unwrap(); + if next.0 == key + FromUsize::from_usize(1) + { + inserted.1.append(&mut next.1); + to_remove = Some(lower - 1); + } + } + + if let Some(r) = to_remove { + self.remove(r); + } + } + } + } +} + +#[test] +fn test_range() { + use std::cmp::{Ordering}; + + let mut ranges: Vec<(u32, Vec)> = Vec::new(); + assert_eq!(ranges.range_iter().next(), None); + assert_eq!(ranges.find_item(&1), None); + assert!(!ranges.have_item(&1)); + assert_eq!(ranges.get_tail(&0), 0..0); + + ranges.insert_item(17, 'q'); + assert_eq!(ranges.range_iter().cmp(vec![(17, &['q'][..])]), Ordering::Equal); + assert_eq!(ranges.find_item(&17), Some(&'q')); + assert!(ranges.have_item(&17)); + assert_eq!(ranges.get_tail(&17), 17..18); + + ranges.insert_item(18, 'r'); + assert_eq!(ranges.range_iter().cmp(vec![(17, &['q', 'r'][..])]), Ordering::Equal); + assert_eq!(ranges.find_item(&18), Some(&'r')); + assert!(ranges.have_item(&18)); + assert_eq!(ranges.get_tail(&17), 17..19); + + ranges.insert_item(16, 'p'); + assert_eq!(ranges.range_iter().cmp(vec![(16, &['p', 'q', 'r'][..])]), Ordering::Equal); + assert_eq!(ranges.find_item(&16), Some(&'p')); + assert_eq!(ranges.find_item(&17), Some(&'q')); + assert_eq!(ranges.find_item(&18), Some(&'r')); + assert!(ranges.have_item(&16)); + assert_eq!(ranges.get_tail(&17), 17..19); + + ranges.insert_item(2, 'b'); + assert_eq!(ranges.range_iter().cmp(vec![(2, &['b'][..]), (16, &['p', 'q', 'r'][..])]), Ordering::Equal); + assert_eq!(ranges.find_item(&2), Some(&'b')); + + ranges.insert_item(3, 'c'); + ranges.insert_item(4, 'd'); + assert_eq!(ranges.get_tail(&3), 3..5); + assert_eq!(ranges.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..]), (16, &['p', 'q', 'r'][..])]), Ordering::Equal); + + let mut r = ranges.clone(); + r.remove_head(&1); + assert_eq!(r.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..]), (16, &['p', 'q', 'r'][..])]), Ordering::Equal); + r.remove_head(&2); + assert_eq!(r.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..]), (16, &['p', 'q', 'r'][..])]), Ordering::Equal); + r.remove_head(&3); + assert_eq!(r.range_iter().cmp(vec![(3, &['c', 'd'][..]), (16, &['p', 'q', 'r'][..])]), Ordering::Equal); + r.remove_head(&10); + assert_eq!(r.range_iter().cmp(vec![(3, &['c', 'd'][..]), (16, &['p', 'q', 'r'][..])]), Ordering::Equal); + r.remove_head(&5); + assert_eq!(r.range_iter().cmp(vec![(16, &['p', 'q', 'r'][..])]), Ordering::Equal); + r.remove_head(&19); + assert_eq!(r.range_iter().next(), None); + + let mut r = ranges.clone(); + r.remove_tail(&20); + assert_eq!(r.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..]), (16, &['p', 'q', 'r'][..])]), Ordering::Equal); + r.remove_tail(&17); + assert_eq!(r.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..]), (16, &['p'][..])]), Ordering::Equal); + r.remove_tail(&16); + assert_eq!(r.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..])]), Ordering::Equal); + r.remove_tail(&3); + assert_eq!(r.range_iter().cmp(vec![(2, &['b'][..])]), Ordering::Equal); + r.remove_tail(&2); + assert_eq!(r.range_iter().next(), None); +} + diff --git a/src/sync/tests.rs b/src/sync/tests.rs new file mode 100644 index 000000000..a51bf1543 --- /dev/null +++ b/src/sync/tests.rs @@ -0,0 +1,13 @@ +use std::collections::HashMap; +use util::bytes::Bytes; +use util::hash::H256; + +struct TestBlockChainClient { + blocks: Vec, + hashes: HashMap, +} + + +#[test] +fn full_sync() { +}