Limit download ahead

This commit is contained in:
arkpar 2016-02-24 21:23:58 +01:00
parent e519e162df
commit cb3608c6d3
6 changed files with 76 additions and 32 deletions

View File

@ -58,6 +58,8 @@ pub struct Spec {
/// Known nodes on the network in enode format. /// Known nodes on the network in enode format.
pub nodes: Vec<String>, pub nodes: Vec<String>,
/// Network ID
pub network_id: U256,
/// Parameters concerning operation of the specific engine we're using. /// Parameters concerning operation of the specific engine we're using.
/// Maps the parameter name to an RLP-encoded value. /// Maps the parameter name to an RLP-encoded value.
@ -120,6 +122,9 @@ impl Spec {
/// Get the known knodes of the network in enode format. /// Get the known knodes of the network in enode format.
pub fn nodes(&self) -> &Vec<String> { &self.nodes } pub fn nodes(&self) -> &Vec<String> { &self.nodes }
/// Get the configured Network ID.
pub fn network_id(&self) -> U256 { self.network_id }
/// Get the header of the genesis block. /// Get the header of the genesis block.
pub fn genesis_header(&self) -> Header { pub fn genesis_header(&self) -> Header {
Header { Header {
@ -250,6 +255,7 @@ impl FromJson for Spec {
engine_name: json["engineName"].as_string().unwrap().to_owned(), engine_name: json["engineName"].as_string().unwrap().to_owned(),
engine_params: json_to_rlp_map(&json["params"]), engine_params: json_to_rlp_map(&json["params"]),
nodes: nodes, nodes: nodes,
network_id: U256::from_str(&json["params"]["networkID"].as_string().unwrap()[2..]).unwrap(),
builtins: builtins, builtins: builtins,
parent_hash: H256::from_str(&genesis["parentHash"].as_string().unwrap()[2..]).unwrap(), parent_hash: H256::from_str(&genesis["parentHash"].as_string().unwrap()[2..]).unwrap(),
author: Address::from_str(&genesis["author"].as_string().unwrap()[2..]).unwrap(), author: Address::from_str(&genesis["author"].as_string().unwrap()[2..]).unwrap(),

View File

@ -48,7 +48,7 @@ use ethcore::client::*;
use ethcore::service::{ClientService, NetSyncMessage}; use ethcore::service::{ClientService, NetSyncMessage};
use ethcore::ethereum; use ethcore::ethereum;
use ethcore::blockchain::CacheSize; use ethcore::blockchain::CacheSize;
use ethsync::EthSync; use ethsync::{EthSync, SyncConfig};
use docopt::Docopt; use docopt::Docopt;
use daemonize::Daemonize; use daemonize::Daemonize;
@ -281,6 +281,8 @@ impl Configuration {
let spec = self.spec(); let spec = self.spec();
let net_settings = self.net_settings(&spec); let net_settings = self.net_settings(&spec);
let mut sync_config = SyncConfig::default();
sync_config.network_id = spec.network_id();
// Build client // Build client
let mut service = ClientService::start(spec, net_settings, &Path::new(&self.path())).unwrap(); let mut service = ClientService::start(spec, net_settings, &Path::new(&self.path())).unwrap();
@ -288,7 +290,7 @@ impl Configuration {
client.configure_cache(self.args.flag_cache_pref_size, self.args.flag_cache_max_size); client.configure_cache(self.args.flag_cache_pref_size, self.args.flag_cache_max_size);
// Sync // Sync
let sync = EthSync::register(service.network(), client); let sync = EthSync::register(service.network(), sync_config, client);
// Setup rpc // Setup rpc
if self.args.flag_jsonrpc { if self.args.flag_jsonrpc {

View File

@ -40,6 +40,7 @@ use ethcore::block::Block;
use io::SyncIo; use io::SyncIo;
use time; use time;
use std::option::Option; use std::option::Option;
use super::SyncConfig;
impl ToUsize for BlockNumber { impl ToUsize for BlockNumber {
fn to_usize(&self) -> usize { fn to_usize(&self) -> usize {
@ -80,9 +81,7 @@ const NODE_DATA_PACKET: u8 = 0x0e;
const GET_RECEIPTS_PACKET: u8 = 0x0f; const GET_RECEIPTS_PACKET: u8 = 0x0f;
const RECEIPTS_PACKET: u8 = 0x10; const RECEIPTS_PACKET: u8 = 0x10;
const NETWORK_ID: U256 = ONE_U256; //TODO: get this from parent const CONNECTION_TIMEOUT_SEC: f64 = 5f64;
const CONNECTION_TIMEOUT_SEC: f64 = 10f64;
struct Header { struct Header {
/// Header data /// Header data
@ -203,13 +202,17 @@ pub struct ChainSync {
have_common_block: bool, have_common_block: bool,
/// Last propagated block number /// Last propagated block number
last_send_block_number: BlockNumber, last_send_block_number: BlockNumber,
/// Max blocks to download ahead
max_download_ahead_blocks: usize,
/// Network ID
network_id: U256,
} }
type RlpResponseResult = Result<Option<(PacketId, RlpStream)>, PacketDecodeError>; type RlpResponseResult = Result<Option<(PacketId, RlpStream)>, PacketDecodeError>;
impl ChainSync { impl ChainSync {
/// Create a new instance of syncing strategy. /// Create a new instance of syncing strategy.
pub fn new() -> ChainSync { pub fn new(config: SyncConfig) -> ChainSync {
ChainSync { ChainSync {
state: SyncState::NotSynced, state: SyncState::NotSynced,
starting_block: 0, starting_block: 0,
@ -226,6 +229,8 @@ impl ChainSync {
syncing_difficulty: U256::from(0u64), syncing_difficulty: U256::from(0u64),
have_common_block: false, have_common_block: false,
last_send_block_number: 0, last_send_block_number: 0,
max_download_ahead_blocks: max(MAX_HEADERS_TO_REQUEST, config.max_download_ahead_blocks),
network_id: config.network_id,
} }
} }
@ -275,7 +280,6 @@ impl ChainSync {
self.starting_block = 0; self.starting_block = 0;
self.highest_block = None; self.highest_block = None;
self.have_common_block = false; self.have_common_block = false;
io.chain().clear_queue();
self.starting_block = io.chain().chain_info().best_block_number; self.starting_block = io.chain().chain_info().best_block_number;
self.state = SyncState::NotSynced; self.state = SyncState::NotSynced;
} }
@ -307,7 +311,7 @@ impl ChainSync {
trace!(target: "sync", "Peer {} genesis hash not matched", peer_id); trace!(target: "sync", "Peer {} genesis hash not matched", peer_id);
return Ok(()); return Ok(());
} }
if peer.network_id != NETWORK_ID { if peer.network_id != self.network_id {
io.disable_peer(peer_id); io.disable_peer(peer_id);
trace!(target: "sync", "Peer {} network id not matched", peer_id); trace!(target: "sync", "Peer {} network id not matched", peer_id);
return Ok(()); return Ok(());
@ -436,7 +440,7 @@ impl ChainSync {
trace!(target: "sync", "Got body {}", n); trace!(target: "sync", "Got body {}", n);
} }
None => { None => {
debug!(target: "sync", "Ignored unknown block body"); trace!(target: "sync", "Ignored unknown/stale block body");
} }
} }
} }
@ -608,7 +612,7 @@ impl ChainSync {
self.request_headers_by_hash(io, peer_id, &peer_latest, 1, 0, false); self.request_headers_by_hash(io, peer_id, &peer_latest, 1, 0, false);
} }
else if self.state == SyncState::Blocks && io.chain().block_status(BlockId::Hash(peer_latest)) == BlockStatus::Unknown { else if self.state == SyncState::Blocks && io.chain().block_status(BlockId::Hash(peer_latest)) == BlockStatus::Unknown {
self.request_blocks(io, peer_id); self.request_blocks(io, peer_id, false);
} }
} }
@ -617,7 +621,7 @@ impl ChainSync {
} }
/// Find some headers or blocks to download for a peer. /// Find some headers or blocks to download for a peer.
fn request_blocks(&mut self, io: &mut SyncIo, peer_id: PeerId) { fn request_blocks(&mut self, io: &mut SyncIo, peer_id: PeerId, ignore_others: bool) {
self.clear_peer_download(peer_id); self.clear_peer_download(peer_id);
if io.chain().queue_info().is_full() { if io.chain().queue_info().is_full() {
@ -637,28 +641,34 @@ impl ChainSync {
let mut index: BlockNumber = 0; let mut index: BlockNumber = 0;
while index != items.len() as BlockNumber && needed_bodies.len() < MAX_BODIES_TO_REQUEST { while index != items.len() as BlockNumber && needed_bodies.len() < MAX_BODIES_TO_REQUEST {
let block = start + index; let block = start + index;
if !self.downloading_bodies.contains(&block) && !self.bodies.have_item(&block) { if ignore_others || (!self.downloading_bodies.contains(&block) && !self.bodies.have_item(&block)) {
needed_bodies.push(items[index as usize].hash.clone()); needed_bodies.push(items[index as usize].hash.clone());
needed_numbers.push(block); needed_numbers.push(block);
self.downloading_bodies.insert(block);
} }
index += 1; index += 1;
} }
} }
} }
if !needed_bodies.is_empty() { if !needed_bodies.is_empty() {
let (head, _) = self.headers.range_iter().next().unwrap();
if needed_numbers.first().unwrap() - head > self.max_download_ahead_blocks as BlockNumber {
trace!(target: "sync", "{}: Stalled download ({} vs {}), helping with downloading block bodies", peer_id, needed_numbers.first().unwrap(), head);
self.request_blocks(io, peer_id, true);
return;
}
self.downloading_bodies.extend(needed_numbers.iter());
replace(&mut self.peers.get_mut(&peer_id).unwrap().asking_blocks, needed_numbers); replace(&mut self.peers.get_mut(&peer_id).unwrap().asking_blocks, needed_numbers);
self.request_bodies(io, peer_id, needed_bodies); self.request_bodies(io, peer_id, needed_bodies);
} }
else { else {
// check if need to download headers // check if need to download headers
let mut start = 0usize; let mut start = 0;
if !self.have_common_block { if !self.have_common_block {
// download backwards until common block is found 1 header at a time // download backwards until common block is found 1 header at a time
let chain_info = io.chain().chain_info(); let chain_info = io.chain().chain_info();
start = chain_info.best_block_number as usize; start = chain_info.best_block_number;
if !self.headers.is_empty() { if !self.headers.is_empty() {
start = min(start, self.headers.range_iter().next().unwrap().0 as usize - 1); start = min(start, self.headers.range_iter().next().unwrap().0 - 1);
} }
if start == 0 { if start == 0 {
self.have_common_block = true; //reached genesis self.have_common_block = true; //reached genesis
@ -669,6 +679,7 @@ impl ChainSync {
if self.have_common_block { if self.have_common_block {
let mut headers: Vec<BlockNumber> = Vec::new(); let mut headers: Vec<BlockNumber> = Vec::new();
let mut prev = self.current_base_block() + 1; let mut prev = self.current_base_block() + 1;
let head = self.headers.range_iter().next().map(|(h, _)| h);
for (next, ref items) in self.headers.range_iter() { for (next, ref items) in self.headers.range_iter() {
if !headers.is_empty() { if !headers.is_empty() {
break; break;
@ -679,9 +690,8 @@ impl ChainSync {
} }
let mut block = prev; let mut block = prev;
while block < next && headers.len() < MAX_HEADERS_TO_REQUEST { while block < next && headers.len() < MAX_HEADERS_TO_REQUEST {
if !self.downloading_headers.contains(&(block as BlockNumber)) { if ignore_others || !self.downloading_headers.contains(&(block as BlockNumber)) {
headers.push(block as BlockNumber); headers.push(block as BlockNumber);
self.downloading_headers.insert(block as BlockNumber);
} }
block += 1; block += 1;
} }
@ -689,17 +699,23 @@ impl ChainSync {
} }
if !headers.is_empty() { if !headers.is_empty() {
start = headers[0] as usize; start = headers[0];
if head.is_some() && start > head.unwrap() && start - head.unwrap() > self.max_download_ahead_blocks as BlockNumber {
trace!(target: "sync", "{}: Stalled download ({} vs {}), helping with downloading headers", peer_id, start, head.unwrap());
self.request_blocks(io, peer_id, true);
return;
}
let count = headers.len(); let count = headers.len();
self.downloading_headers.extend(headers.iter());
replace(&mut self.peers.get_mut(&peer_id).unwrap().asking_blocks, headers); replace(&mut self.peers.get_mut(&peer_id).unwrap().asking_blocks, headers);
assert!(!self.headers.have_item(&(start as BlockNumber))); assert!(!self.headers.have_item(&start));
self.request_headers_by_number(io, peer_id, start as BlockNumber, count, 0, false); self.request_headers_by_number(io, peer_id, start, count, 0, false);
} }
} }
else { else {
// continue search for common block // continue search for common block
self.downloading_headers.insert(start as BlockNumber); self.downloading_headers.insert(start);
self.request_headers_by_number(io, peer_id, start as BlockNumber, 1, 0, false); self.request_headers_by_number(io, peer_id, start, 1, 0, false);
} }
} }
} }
@ -891,7 +907,7 @@ impl ChainSync {
let mut packet = RlpStream::new_list(5); let mut packet = RlpStream::new_list(5);
let chain = io.chain().chain_info(); let chain = io.chain().chain_info();
packet.append(&(PROTOCOL_VERSION as u32)); packet.append(&(PROTOCOL_VERSION as u32));
packet.append(&NETWORK_ID); //TODO: network id packet.append(&self.network_id);
packet.append(&chain.total_difficulty); packet.append(&chain.total_difficulty);
packet.append(&chain.best_block_hash); packet.append(&chain.best_block_hash);
packet.append(&chain.genesis_hash); packet.append(&chain.genesis_hash);
@ -1221,6 +1237,7 @@ impl ChainSync {
mod tests { mod tests {
use tests::helpers::*; use tests::helpers::*;
use super::*; use super::*;
use ::SyncConfig;
use util::*; use util::*;
use super::{PeerInfo, PeerAsking}; use super::{PeerInfo, PeerAsking};
use ethcore::header::*; use ethcore::header::*;
@ -1334,7 +1351,7 @@ mod tests {
} }
fn dummy_sync_with_peer(peer_latest_hash: H256) -> ChainSync { fn dummy_sync_with_peer(peer_latest_hash: H256) -> ChainSync {
let mut sync = ChainSync::new(); let mut sync = ChainSync::new(SyncConfig::default());
sync.peers.insert(0, sync.peers.insert(0,
PeerInfo { PeerInfo {
protocol_version: 0, protocol_version: 0,

View File

@ -35,14 +35,14 @@
//! use std::sync::Arc; //! use std::sync::Arc;
//! use util::network::{NetworkService, NetworkConfiguration}; //! use util::network::{NetworkService, NetworkConfiguration};
//! use ethcore::client::Client; //! use ethcore::client::Client;
//! use ethsync::EthSync; //! use ethsync::{EthSync, SyncConfig};
//! use ethcore::ethereum; //! use ethcore::ethereum;
//! //!
//! fn main() { //! fn main() {
//! let mut service = NetworkService::start(NetworkConfiguration::new()).unwrap(); //! let mut service = NetworkService::start(NetworkConfiguration::new()).unwrap();
//! let dir = env::temp_dir(); //! let dir = env::temp_dir();
//! let client = Client::new(ethereum::new_frontier(), &dir, service.io().channel()).unwrap(); //! let client = Client::new(ethereum::new_frontier(), &dir, service.io().channel()).unwrap();
//! EthSync::register(&mut service, client); //! EthSync::register(&mut service, SyncConfig::default(), client);
//! } //! }
//! ``` //! ```
@ -60,6 +60,7 @@ use std::sync::*;
use ethcore::client::Client; use ethcore::client::Client;
use util::network::{NetworkProtocolHandler, NetworkService, NetworkContext, PeerId}; use util::network::{NetworkProtocolHandler, NetworkService, NetworkContext, PeerId};
use util::TimerToken; use util::TimerToken;
use util::{U256, ONE_U256};
use chain::ChainSync; use chain::ChainSync;
use ethcore::service::SyncMessage; use ethcore::service::SyncMessage;
use io::NetSyncIo; use io::NetSyncIo;
@ -71,6 +72,23 @@ mod range_collection;
#[cfg(test)] #[cfg(test)]
mod tests; mod tests;
/// Sync configuration
pub struct SyncConfig {
/// Max blocks to download ahead
pub max_download_ahead_blocks: usize,
/// Network ID
pub network_id: U256,
}
impl Default for SyncConfig {
fn default() -> SyncConfig {
SyncConfig {
max_download_ahead_blocks: 20000,
network_id: ONE_U256,
}
}
}
/// Ethereum network protocol handler /// Ethereum network protocol handler
pub struct EthSync { pub struct EthSync {
/// Shared blockchain client. TODO: this should evetually become an IPC endpoint /// Shared blockchain client. TODO: this should evetually become an IPC endpoint
@ -83,10 +101,10 @@ pub use self::chain::{SyncStatus, SyncState};
impl EthSync { impl EthSync {
/// Creates and register protocol with the network service /// Creates and register protocol with the network service
pub fn register(service: &mut NetworkService<SyncMessage>, chain: Arc<Client>) -> Arc<EthSync> { pub fn register(service: &mut NetworkService<SyncMessage>, config: SyncConfig, chain: Arc<Client>) -> Arc<EthSync> {
let sync = Arc::new(EthSync { let sync = Arc::new(EthSync {
chain: chain, chain: chain,
sync: RwLock::new(ChainSync::new()), sync: RwLock::new(ChainSync::new(config)),
}); });
service.register_protocol(sync.clone(), "eth", &[62u8, 63u8]).expect("Error registering eth protocol handler"); service.register_protocol(sync.clone(), "eth", &[62u8, 63u8]).expect("Error registering eth protocol handler");
sync sync

View File

@ -207,7 +207,7 @@ impl<K, V> RangeCollection<K, V> for Vec<(K, Vec<V>)> where K: Ord + PartialEq +
} }
#[test] #[test]
#[allow(cyclomatic_complexity)] #[cfg_attr(dev, allow(cyclomatic_complexity))]
fn test_range() { fn test_range() {
use std::cmp::{Ordering}; use std::cmp::{Ordering};

View File

@ -20,7 +20,8 @@ use ethcore::block_queue::BlockQueueInfo;
use ethcore::header::{Header as BlockHeader, BlockNumber}; use ethcore::header::{Header as BlockHeader, BlockNumber};
use ethcore::error::*; use ethcore::error::*;
use io::SyncIo; use io::SyncIo;
use chain::{ChainSync}; use chain::ChainSync;
use ::SyncConfig;
use ethcore::receipt::Receipt; use ethcore::receipt::Receipt;
use ethcore::transaction::LocalizedTransaction; use ethcore::transaction::LocalizedTransaction;
@ -330,7 +331,7 @@ impl TestNet {
for _ in 0..n { for _ in 0..n {
net.peers.push(TestPeer { net.peers.push(TestPeer {
chain: TestBlockChainClient::new(), chain: TestBlockChainClient::new(),
sync: ChainSync::new(), sync: ChainSync::new(SyncConfig::default()),
queue: VecDeque::new(), queue: VecDeque::new(),
}); });
} }