Merge branch 'master' into rpc_poll_ids

This commit is contained in:
debris
2016-03-02 05:15:21 +01:00
74 changed files with 2584 additions and 851 deletions

View File

@@ -15,6 +15,7 @@ log = "0.3"
env_logger = "0.3"
time = "0.1.34"
rand = "0.3.13"
heapsize = "0.3"
[features]
default = []

View File

@@ -39,7 +39,9 @@ use ethcore::error::*;
use ethcore::block::Block;
use io::SyncIo;
use time;
use std::option::Option;
use super::SyncConfig;
known_heap_size!(0, PeerInfo, Header, HeaderId);
impl ToUsize for BlockNumber {
fn to_usize(&self) -> usize {
@@ -80,9 +82,7 @@ const NODE_DATA_PACKET: u8 = 0x0e;
const GET_RECEIPTS_PACKET: u8 = 0x0f;
const RECEIPTS_PACKET: u8 = 0x10;
const NETWORK_ID: U256 = ONE_U256; //TODO: get this from parent
const CONNECTION_TIMEOUT_SEC: f64 = 10f64;
const CONNECTION_TIMEOUT_SEC: f64 = 5f64;
struct Header {
/// Header data
@@ -135,6 +135,8 @@ pub struct SyncStatus {
pub num_peers: usize,
/// Total number of active peers
pub num_active_peers: usize,
/// Heap memory used in bytes
pub mem_used: usize,
}
#[derive(PartialEq, Eq, Debug, Clone)]
@@ -203,13 +205,17 @@ pub struct ChainSync {
have_common_block: bool,
/// Last propagated block number
last_send_block_number: BlockNumber,
/// Max blocks to download ahead
max_download_ahead_blocks: usize,
/// Network ID
network_id: U256,
}
type RlpResponseResult = Result<Option<(PacketId, RlpStream)>, PacketDecodeError>;
impl ChainSync {
/// Create a new instance of syncing strategy.
pub fn new() -> ChainSync {
pub fn new(config: SyncConfig) -> ChainSync {
ChainSync {
state: SyncState::NotSynced,
starting_block: 0,
@@ -226,6 +232,8 @@ impl ChainSync {
syncing_difficulty: U256::from(0u64),
have_common_block: false,
last_send_block_number: 0,
max_download_ahead_blocks: max(MAX_HEADERS_TO_REQUEST, config.max_download_ahead_blocks),
network_id: config.network_id,
}
}
@@ -237,10 +245,19 @@ impl ChainSync {
start_block_number: self.starting_block,
last_imported_block_number: self.last_imported_block,
highest_block_number: self.highest_block,
blocks_received: match self.last_imported_block { None => 0, Some(x) => x - self.starting_block },
blocks_total: match self.highest_block { None => 0, Some(x) => x - self.starting_block },
blocks_received: match self.last_imported_block { Some(x) if x > self.starting_block => x - self.starting_block, _ => 0 },
blocks_total: match self.highest_block { Some(x) if x > self.starting_block => x - self.starting_block, _ => 0 },
num_peers: self.peers.len(),
num_active_peers: self.peers.values().filter(|p| p.asking != PeerAsking::Nothing).count(),
mem_used:
// TODO: https://github.com/servo/heapsize/pull/50
// self.downloading_hashes.heap_size_of_children()
//+ self.downloading_bodies.heap_size_of_children()
//+ self.downloading_hashes.heap_size_of_children()
self.headers.heap_size_of_children()
+ self.bodies.heap_size_of_children()
+ self.peers.heap_size_of_children()
+ self.header_ids.heap_size_of_children(),
}
}
@@ -275,7 +292,6 @@ impl ChainSync {
self.starting_block = 0;
self.highest_block = None;
self.have_common_block = false;
io.chain().clear_queue();
self.starting_block = io.chain().chain_info().best_block_number;
self.state = SyncState::NotSynced;
}
@@ -307,7 +323,7 @@ impl ChainSync {
trace!(target: "sync", "Peer {} genesis hash not matched", peer_id);
return Ok(());
}
if peer.network_id != NETWORK_ID {
if peer.network_id != self.network_id {
io.disable_peer(peer_id);
trace!(target: "sync", "Peer {} network id not matched", peer_id);
return Ok(());
@@ -436,7 +452,7 @@ impl ChainSync {
trace!(target: "sync", "Got body {}", n);
}
None => {
debug!(target: "sync", "Ignored unknown block body");
trace!(target: "sync", "Ignored unknown/stale block body");
}
}
}
@@ -611,7 +627,7 @@ impl ChainSync {
self.request_headers_by_hash(io, peer_id, &peer_latest, 1, 0, false);
}
else if self.state == SyncState::Blocks && io.chain().block_status(BlockId::Hash(peer_latest)) == BlockStatus::Unknown {
self.request_blocks(io, peer_id);
self.request_blocks(io, peer_id, false);
}
}
@@ -620,7 +636,7 @@ impl ChainSync {
}
/// Find some headers or blocks to download for a peer.
fn request_blocks(&mut self, io: &mut SyncIo, peer_id: PeerId) {
fn request_blocks(&mut self, io: &mut SyncIo, peer_id: PeerId, ignore_others: bool) {
self.clear_peer_download(peer_id);
if io.chain().queue_info().is_full() {
@@ -640,28 +656,34 @@ impl ChainSync {
let mut index: BlockNumber = 0;
while index != items.len() as BlockNumber && needed_bodies.len() < MAX_BODIES_TO_REQUEST {
let block = start + index;
if !self.downloading_bodies.contains(&block) && !self.bodies.have_item(&block) {
if ignore_others || (!self.downloading_bodies.contains(&block) && !self.bodies.have_item(&block)) {
needed_bodies.push(items[index as usize].hash.clone());
needed_numbers.push(block);
self.downloading_bodies.insert(block);
}
index += 1;
}
}
}
if !needed_bodies.is_empty() {
let (head, _) = self.headers.range_iter().next().unwrap();
if needed_numbers.first().unwrap() - head > self.max_download_ahead_blocks as BlockNumber {
trace!(target: "sync", "{}: Stalled download ({} vs {}), helping with downloading block bodies", peer_id, needed_numbers.first().unwrap(), head);
self.request_blocks(io, peer_id, true);
return;
}
self.downloading_bodies.extend(needed_numbers.iter());
replace(&mut self.peers.get_mut(&peer_id).unwrap().asking_blocks, needed_numbers);
self.request_bodies(io, peer_id, needed_bodies);
}
else {
// check if need to download headers
let mut start = 0usize;
let mut start = 0;
if !self.have_common_block {
// download backwards until common block is found 1 header at a time
let chain_info = io.chain().chain_info();
start = chain_info.best_block_number as usize;
start = chain_info.best_block_number;
if !self.headers.is_empty() {
start = min(start, self.headers.range_iter().next().unwrap().0 as usize - 1);
start = min(start, self.headers.range_iter().next().unwrap().0 - 1);
}
if start == 0 {
self.have_common_block = true; //reached genesis
@@ -672,6 +694,7 @@ impl ChainSync {
if self.have_common_block {
let mut headers: Vec<BlockNumber> = Vec::new();
let mut prev = self.current_base_block() + 1;
let head = self.headers.range_iter().next().map(|(h, _)| h);
for (next, ref items) in self.headers.range_iter() {
if !headers.is_empty() {
break;
@@ -682,9 +705,8 @@ impl ChainSync {
}
let mut block = prev;
while block < next && headers.len() < MAX_HEADERS_TO_REQUEST {
if !self.downloading_headers.contains(&(block as BlockNumber)) {
if ignore_others || !self.downloading_headers.contains(&(block as BlockNumber)) {
headers.push(block as BlockNumber);
self.downloading_headers.insert(block as BlockNumber);
}
block += 1;
}
@@ -692,17 +714,23 @@ impl ChainSync {
}
if !headers.is_empty() {
start = headers[0] as usize;
start = headers[0];
if head.is_some() && start > head.unwrap() && start - head.unwrap() > self.max_download_ahead_blocks as BlockNumber {
trace!(target: "sync", "{}: Stalled download ({} vs {}), helping with downloading headers", peer_id, start, head.unwrap());
self.request_blocks(io, peer_id, true);
return;
}
let count = headers.len();
self.downloading_headers.extend(headers.iter());
replace(&mut self.peers.get_mut(&peer_id).unwrap().asking_blocks, headers);
assert!(!self.headers.have_item(&(start as BlockNumber)));
self.request_headers_by_number(io, peer_id, start as BlockNumber, count, 0, false);
assert!(!self.headers.have_item(&start));
self.request_headers_by_number(io, peer_id, start, count, 0, false);
}
}
else {
// continue search for common block
self.downloading_headers.insert(start as BlockNumber);
self.request_headers_by_number(io, peer_id, start as BlockNumber, 1, 0, false);
self.downloading_headers.insert(start);
self.request_headers_by_number(io, peer_id, start, 1, 0, false);
}
}
}
@@ -894,7 +922,7 @@ impl ChainSync {
let mut packet = RlpStream::new_list(5);
let chain = io.chain().chain_info();
packet.append(&(PROTOCOL_VERSION as u32));
packet.append(&NETWORK_ID); //TODO: network id
packet.append(&self.network_id);
packet.append(&chain.total_difficulty);
packet.append(&chain.best_block_hash);
packet.append(&chain.genesis_hash);
@@ -1143,8 +1171,8 @@ impl ChainSync {
.collect::<Vec<_>>()
}
/// propagades latest block to lagging peers
fn propagade_blocks(&mut self, local_best: &H256, best_number: BlockNumber, io: &mut SyncIo) -> usize {
/// propagates latest block to lagging peers
fn propagate_blocks(&mut self, local_best: &H256, best_number: BlockNumber, io: &mut SyncIo) -> usize {
let updated_peers = {
let lagging_peers = self.get_lagging_peers(io);
@@ -1170,8 +1198,8 @@ impl ChainSync {
sent
}
/// propagades new known hashes to all peers
fn propagade_new_hashes(&mut self, local_best: &H256, best_number: BlockNumber, io: &mut SyncIo) -> usize {
/// propagates new known hashes to all peers
fn propagate_new_hashes(&mut self, local_best: &H256, best_number: BlockNumber, io: &mut SyncIo) -> usize {
let updated_peers = self.get_lagging_peers(io);
let mut sent = 0;
let last_parent = HeaderView::new(&io.chain().block_header(BlockId::Hash(local_best.clone())).unwrap()).parent_hash();
@@ -1206,8 +1234,8 @@ impl ChainSync {
pub fn chain_blocks_verified(&mut self, io: &mut SyncIo) {
let chain = io.chain().chain_info();
if (((chain.best_block_number as i64) - (self.last_send_block_number as i64)).abs() as BlockNumber) < MAX_PEER_LAG_PROPAGATION {
let blocks = self.propagade_blocks(&chain.best_block_hash, chain.best_block_number, io);
let hashes = self.propagade_new_hashes(&chain.best_block_hash, chain.best_block_number, io);
let blocks = self.propagate_blocks(&chain.best_block_hash, chain.best_block_number, io);
let hashes = self.propagate_new_hashes(&chain.best_block_hash, chain.best_block_number, io);
if blocks != 0 || hashes != 0 {
trace!(target: "sync", "Sent latest {} blocks and {} hashes to peers.", blocks, hashes);
}
@@ -1220,6 +1248,7 @@ impl ChainSync {
mod tests {
use tests::helpers::*;
use super::*;
use ::SyncConfig;
use util::*;
use super::{PeerInfo, PeerAsking};
use ethcore::header::*;
@@ -1333,7 +1362,7 @@ mod tests {
}
fn dummy_sync_with_peer(peer_latest_hash: H256) -> ChainSync {
let mut sync = ChainSync::new();
let mut sync = ChainSync::new(SyncConfig::default());
sync.peers.insert(0,
PeerInfo {
protocol_version: 0,
@@ -1390,7 +1419,7 @@ mod tests {
let best_number = client.chain_info().best_block_number;
let mut io = TestIo::new(&mut client, &mut queue, None);
let peer_count = sync.propagade_new_hashes(&best_hash, best_number, &mut io);
let peer_count = sync.propagate_new_hashes(&best_hash, best_number, &mut io);
// 1 message should be send
assert_eq!(1, io.queue.len());
@@ -1410,7 +1439,7 @@ mod tests {
let best_number = client.chain_info().best_block_number;
let mut io = TestIo::new(&mut client, &mut queue, None);
let peer_count = sync.propagade_blocks(&best_hash, best_number, &mut io);
let peer_count = sync.propagate_blocks(&best_hash, best_number, &mut io);
// 1 message should be send
assert_eq!(1, io.queue.len());
@@ -1516,7 +1545,7 @@ mod tests {
let best_number = client.chain_info().best_block_number;
let mut io = TestIo::new(&mut client, &mut queue, None);
sync.propagade_new_hashes(&best_hash, best_number, &mut io);
sync.propagate_new_hashes(&best_hash, best_number, &mut io);
let data = &io.queue[0].data.clone();
let result = sync.on_peer_new_hashes(&mut io, 0, &UntrustedRlp::new(&data));
@@ -1535,7 +1564,7 @@ mod tests {
let best_number = client.chain_info().best_block_number;
let mut io = TestIo::new(&mut client, &mut queue, None);
sync.propagade_blocks(&best_hash, best_number, &mut io);
sync.propagate_blocks(&best_hash, best_number, &mut io);
let data = &io.queue[0].data.clone();
let result = sync.on_peer_new_block(&mut io, 0, &UntrustedRlp::new(&data));

View File

@@ -34,15 +34,15 @@
//! use std::env;
//! use std::sync::Arc;
//! use util::network::{NetworkService, NetworkConfiguration};
//! use ethcore::client::Client;
//! use ethsync::EthSync;
//! use ethcore::client::{Client, ClientConfig};
//! use ethsync::{EthSync, SyncConfig};
//! use ethcore::ethereum;
//!
//! fn main() {
//! let mut service = NetworkService::start(NetworkConfiguration::new()).unwrap();
//! let dir = env::temp_dir();
//! let client = Client::new(ethereum::new_frontier(), &dir, service.io().channel()).unwrap();
//! EthSync::register(&mut service, client);
//! let client = Client::new(ClientConfig::default(), ethereum::new_frontier(), &dir, service.io().channel()).unwrap();
//! EthSync::register(&mut service, SyncConfig::default(), client);
//! }
//! ```
@@ -54,12 +54,15 @@ extern crate ethcore;
extern crate env_logger;
extern crate time;
extern crate rand;
#[macro_use]
extern crate heapsize;
use std::ops::*;
use std::sync::*;
use ethcore::client::Client;
use util::network::{NetworkProtocolHandler, NetworkService, NetworkContext, PeerId};
use util::TimerToken;
use util::{U256, ONE_U256};
use chain::ChainSync;
use ethcore::service::SyncMessage;
use io::NetSyncIo;
@@ -71,6 +74,23 @@ mod range_collection;
#[cfg(test)]
mod tests;
/// Sync configuration
pub struct SyncConfig {
/// Max blocks to download ahead
pub max_download_ahead_blocks: usize,
/// Network ID
pub network_id: U256,
}
impl Default for SyncConfig {
fn default() -> SyncConfig {
SyncConfig {
max_download_ahead_blocks: 20000,
network_id: ONE_U256,
}
}
}
/// Ethereum network protocol handler
pub struct EthSync {
/// Shared blockchain client. TODO: this should evetually become an IPC endpoint
@@ -83,10 +103,10 @@ pub use self::chain::{SyncStatus, SyncState};
impl EthSync {
/// Creates and register protocol with the network service
pub fn register(service: &mut NetworkService<SyncMessage>, chain: Arc<Client>) -> Arc<EthSync> {
pub fn register(service: &mut NetworkService<SyncMessage>, config: SyncConfig, chain: Arc<Client>) -> Arc<EthSync> {
let sync = Arc::new(EthSync {
chain: chain,
sync: RwLock::new(ChainSync::new()),
sync: RwLock::new(ChainSync::new(config)),
});
service.register_protocol(sync.clone(), "eth", &[62u8, 63u8]).expect("Error registering eth protocol handler");
sync

View File

@@ -121,7 +121,7 @@ fn status_packet() {
}
#[test]
fn propagade_hashes() {
fn propagate_hashes() {
let mut net = TestNet::new(6);
net.peer_mut(1).chain.add_blocks(10, false);
net.sync();
@@ -147,7 +147,7 @@ fn propagade_hashes() {
}
#[test]
fn propagade_blocks() {
fn propagate_blocks() {
let mut net = TestNet::new(2);
net.peer_mut(1).chain.add_blocks(10, false);
net.sync();

View File

@@ -15,12 +15,12 @@
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use util::*;
use ethcore::client::{BlockChainClient, BlockStatus, TreeRoute, BlockChainInfo, TransactionId, BlockId};
use ethcore::block_queue::BlockQueueInfo;
use ethcore::client::{BlockChainClient, BlockStatus, TreeRoute, BlockChainInfo, TransactionId, BlockId, BlockQueueInfo};
use ethcore::header::{Header as BlockHeader, BlockNumber};
use ethcore::error::*;
use io::SyncIo;
use chain::{ChainSync};
use chain::ChainSync;
use ::SyncConfig;
use ethcore::receipt::Receipt;
use ethcore::transaction::LocalizedTransaction;
use ethcore::filter::Filter;
@@ -121,7 +121,7 @@ impl BlockChainClient for TestBlockChainClient {
unimplemented!();
}
fn logs(&self, filter: Filter) -> Vec<LocalizedLogEntry> {
fn logs(&self, _filter: Filter) -> Vec<LocalizedLogEntry> {
unimplemented!();
}
@@ -255,6 +255,9 @@ impl BlockChainClient for TestBlockChainClient {
verified_queue_size: 0,
unverified_queue_size: 0,
verifying_queue_size: 0,
max_queue_size: 0,
max_mem_use: 0,
mem_used: 0,
}
}
@@ -344,7 +347,7 @@ impl TestNet {
for _ in 0..n {
net.peers.push(TestPeer {
chain: TestBlockChainClient::new(),
sync: ChainSync::new(),
sync: ChainSync::new(SyncConfig::default()),
queue: VecDeque::new(),
});
}