Merge pull request #303 from ethcore/ark
Check block parent on import; Peer timeouts
This commit is contained in:
commit
5155ff1fac
@ -9,6 +9,7 @@ use engine::Engine;
|
|||||||
use views::*;
|
use views::*;
|
||||||
use header::*;
|
use header::*;
|
||||||
use service::*;
|
use service::*;
|
||||||
|
use client::BlockStatus;
|
||||||
|
|
||||||
/// Block queue status
|
/// Block queue status
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
@ -41,7 +42,7 @@ pub struct BlockQueue {
|
|||||||
deleting: Arc<AtomicBool>,
|
deleting: Arc<AtomicBool>,
|
||||||
ready_signal: Arc<QueueSignal>,
|
ready_signal: Arc<QueueSignal>,
|
||||||
empty: Arc<Condvar>,
|
empty: Arc<Condvar>,
|
||||||
processing: HashSet<H256>
|
processing: RwLock<HashSet<H256>>
|
||||||
}
|
}
|
||||||
|
|
||||||
struct UnVerifiedBlock {
|
struct UnVerifiedBlock {
|
||||||
@ -106,7 +107,7 @@ impl BlockQueue {
|
|||||||
verification: verification.clone(),
|
verification: verification.clone(),
|
||||||
verifiers: verifiers,
|
verifiers: verifiers,
|
||||||
deleting: deleting.clone(),
|
deleting: deleting.clone(),
|
||||||
processing: HashSet::new(),
|
processing: RwLock::new(HashSet::new()),
|
||||||
empty: empty.clone(),
|
empty: empty.clone(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -196,11 +197,22 @@ impl BlockQueue {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Check if the block is currently in the queue
|
||||||
|
pub fn block_status(&self, hash: &H256) -> BlockStatus {
|
||||||
|
if self.processing.read().unwrap().contains(&hash) {
|
||||||
|
return BlockStatus::Queued;
|
||||||
|
}
|
||||||
|
if self.verification.lock().unwrap().bad.contains(&hash) {
|
||||||
|
return BlockStatus::Bad;
|
||||||
|
}
|
||||||
|
BlockStatus::Unknown
|
||||||
|
}
|
||||||
|
|
||||||
/// Add a block to the queue.
|
/// Add a block to the queue.
|
||||||
pub fn import_block(&mut self, bytes: Bytes) -> ImportResult {
|
pub fn import_block(&mut self, bytes: Bytes) -> ImportResult {
|
||||||
let header = BlockView::new(&bytes).header();
|
let header = BlockView::new(&bytes).header();
|
||||||
let h = header.hash();
|
let h = header.hash();
|
||||||
if self.processing.contains(&h) {
|
if self.processing.read().unwrap().contains(&h) {
|
||||||
return Err(ImportError::AlreadyQueued);
|
return Err(ImportError::AlreadyQueued);
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
@ -217,7 +229,7 @@ impl BlockQueue {
|
|||||||
|
|
||||||
match verify_block_basic(&header, &bytes, self.engine.deref().deref()) {
|
match verify_block_basic(&header, &bytes, self.engine.deref().deref()) {
|
||||||
Ok(()) => {
|
Ok(()) => {
|
||||||
self.processing.insert(h.clone());
|
self.processing.write().unwrap().insert(h.clone());
|
||||||
self.verification.lock().unwrap().unverified.push_back(UnVerifiedBlock { header: header, bytes: bytes });
|
self.verification.lock().unwrap().unverified.push_back(UnVerifiedBlock { header: header, bytes: bytes });
|
||||||
self.more_to_verify.notify_all();
|
self.more_to_verify.notify_all();
|
||||||
Ok(h)
|
Ok(h)
|
||||||
@ -235,10 +247,12 @@ impl BlockQueue {
|
|||||||
let mut verification_lock = self.verification.lock().unwrap();
|
let mut verification_lock = self.verification.lock().unwrap();
|
||||||
let mut verification = verification_lock.deref_mut();
|
let mut verification = verification_lock.deref_mut();
|
||||||
verification.bad.insert(hash.clone());
|
verification.bad.insert(hash.clone());
|
||||||
|
self.processing.write().unwrap().remove(&hash);
|
||||||
let mut new_verified = VecDeque::new();
|
let mut new_verified = VecDeque::new();
|
||||||
for block in verification.verified.drain(..) {
|
for block in verification.verified.drain(..) {
|
||||||
if verification.bad.contains(&block.header.parent_hash) {
|
if verification.bad.contains(&block.header.parent_hash) {
|
||||||
verification.bad.insert(block.header.hash());
|
verification.bad.insert(block.header.hash());
|
||||||
|
self.processing.write().unwrap().remove(&block.header.hash());
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
new_verified.push_back(block);
|
new_verified.push_back(block);
|
||||||
@ -247,6 +261,15 @@ impl BlockQueue {
|
|||||||
verification.verified = new_verified;
|
verification.verified = new_verified;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Mark given block as processed
|
||||||
|
pub fn mark_as_good(&mut self, hashes: &[H256]) {
|
||||||
|
let mut processing = self.processing.write().unwrap();
|
||||||
|
for h in hashes {
|
||||||
|
processing.remove(&h);
|
||||||
|
}
|
||||||
|
//TODO: reward peers
|
||||||
|
}
|
||||||
|
|
||||||
/// Removes up to `max` verified blocks from the queue
|
/// Removes up to `max` verified blocks from the queue
|
||||||
pub fn drain(&mut self, max: usize) -> Vec<PreVerifiedBlock> {
|
pub fn drain(&mut self, max: usize) -> Vec<PreVerifiedBlock> {
|
||||||
let mut verification = self.verification.lock().unwrap();
|
let mut verification = self.verification.lock().unwrap();
|
||||||
@ -254,7 +277,6 @@ impl BlockQueue {
|
|||||||
let mut result = Vec::with_capacity(count);
|
let mut result = Vec::with_capacity(count);
|
||||||
for _ in 0..count {
|
for _ in 0..count {
|
||||||
let block = verification.verified.pop_front().unwrap();
|
let block = verification.verified.pop_front().unwrap();
|
||||||
self.processing.remove(&block.header.hash());
|
|
||||||
result.push(block);
|
result.push(block);
|
||||||
}
|
}
|
||||||
self.ready_signal.reset();
|
self.ready_signal.reset();
|
||||||
@ -294,6 +316,7 @@ mod tests {
|
|||||||
use block_queue::*;
|
use block_queue::*;
|
||||||
use tests::helpers::*;
|
use tests::helpers::*;
|
||||||
use error::*;
|
use error::*;
|
||||||
|
use views::*;
|
||||||
|
|
||||||
fn get_test_queue() -> BlockQueue {
|
fn get_test_queue() -> BlockQueue {
|
||||||
let spec = get_test_spec();
|
let spec = get_test_spec();
|
||||||
@ -339,11 +362,14 @@ mod tests {
|
|||||||
#[test]
|
#[test]
|
||||||
fn returns_ok_for_drained_duplicates() {
|
fn returns_ok_for_drained_duplicates() {
|
||||||
let mut queue = get_test_queue();
|
let mut queue = get_test_queue();
|
||||||
if let Err(e) = queue.import_block(get_good_dummy_block()) {
|
let block = get_good_dummy_block();
|
||||||
|
let hash = BlockView::new(&block).header().hash().clone();
|
||||||
|
if let Err(e) = queue.import_block(block) {
|
||||||
panic!("error importing block that is valid by definition({:?})", e);
|
panic!("error importing block that is valid by definition({:?})", e);
|
||||||
}
|
}
|
||||||
queue.flush();
|
queue.flush();
|
||||||
queue.drain(10);
|
queue.drain(10);
|
||||||
|
queue.mark_as_good(&[ hash ]);
|
||||||
|
|
||||||
if let Err(e) = queue.import_block(get_good_dummy_block()) {
|
if let Err(e) = queue.import_block(get_good_dummy_block()) {
|
||||||
panic!("error importing block that has already been drained ({:?})", e);
|
panic!("error importing block that has already been drained ({:?})", e);
|
||||||
|
@ -17,7 +17,7 @@ use verification::*;
|
|||||||
use block::*;
|
use block::*;
|
||||||
|
|
||||||
/// General block status
|
/// General block status
|
||||||
#[derive(Debug)]
|
#[derive(Debug, Eq, PartialEq)]
|
||||||
pub enum BlockStatus {
|
pub enum BlockStatus {
|
||||||
/// Part of the blockchain.
|
/// Part of the blockchain.
|
||||||
InChain,
|
InChain,
|
||||||
@ -206,6 +206,7 @@ impl Client {
|
|||||||
let mut bad = HashSet::new();
|
let mut bad = HashSet::new();
|
||||||
let _import_lock = self.import_lock.lock();
|
let _import_lock = self.import_lock.lock();
|
||||||
let blocks = self.block_queue.write().unwrap().drain(128);
|
let blocks = self.block_queue.write().unwrap().drain(128);
|
||||||
|
let mut good_blocks = Vec::with_capacity(128);
|
||||||
for block in blocks {
|
for block in blocks {
|
||||||
if bad.contains(&block.header.parent_hash) {
|
if bad.contains(&block.header.parent_hash) {
|
||||||
self.block_queue.write().unwrap().mark_as_bad(&block.header.hash());
|
self.block_queue.write().unwrap().mark_as_bad(&block.header.hash());
|
||||||
@ -258,6 +259,8 @@ impl Client {
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
good_blocks.push(header.hash().clone());
|
||||||
|
|
||||||
self.chain.write().unwrap().insert_block(&block.bytes); //TODO: err here?
|
self.chain.write().unwrap().insert_block(&block.bytes); //TODO: err here?
|
||||||
let ancient = if header.number() >= HISTORY { Some(header.number() - HISTORY) } else { None };
|
let ancient = if header.number() >= HISTORY { Some(header.number() - HISTORY) } else { None };
|
||||||
match result.drain().commit(header.number(), &header.hash(), ancient.map(|n|(n, self.chain.read().unwrap().block_hash(n).unwrap()))) {
|
match result.drain().commit(header.number(), &header.hash(), ancient.map(|n|(n, self.chain.read().unwrap().block_hash(n).unwrap()))) {
|
||||||
@ -271,6 +274,7 @@ impl Client {
|
|||||||
trace!(target: "client", "Imported #{} ({})", header.number(), header.hash());
|
trace!(target: "client", "Imported #{} ({})", header.number(), header.hash());
|
||||||
ret += 1;
|
ret += 1;
|
||||||
}
|
}
|
||||||
|
self.block_queue.write().unwrap().mark_as_good(&good_blocks);
|
||||||
ret
|
ret
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -325,7 +329,11 @@ impl BlockChainClient for Client {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn block_status(&self, hash: &H256) -> BlockStatus {
|
fn block_status(&self, hash: &H256) -> BlockStatus {
|
||||||
if self.chain.read().unwrap().is_known(&hash) { BlockStatus::InChain } else { BlockStatus::Unknown }
|
if self.chain.read().unwrap().is_known(&hash) {
|
||||||
|
BlockStatus::InChain
|
||||||
|
} else {
|
||||||
|
self.block_queue.read().unwrap().block_status(hash)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn block_total_difficulty(&self, hash: &H256) -> Option<U256> {
|
fn block_total_difficulty(&self, hash: &H256) -> Option<U256> {
|
||||||
@ -372,6 +380,9 @@ impl BlockChainClient for Client {
|
|||||||
if self.chain.read().unwrap().is_known(&header.hash()) {
|
if self.chain.read().unwrap().is_known(&header.hash()) {
|
||||||
return Err(ImportError::AlreadyInChain);
|
return Err(ImportError::AlreadyInChain);
|
||||||
}
|
}
|
||||||
|
if self.block_status(&header.parent_hash) == BlockStatus::Unknown {
|
||||||
|
return Err(ImportError::UnknownParent);
|
||||||
|
}
|
||||||
self.block_queue.write().unwrap().import_block(bytes)
|
self.block_queue.write().unwrap().import_block(bytes)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -130,14 +130,16 @@ pub enum BlockError {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
/// TODO [arkpar] Please document me
|
/// Import to the block queue result
|
||||||
pub enum ImportError {
|
pub enum ImportError {
|
||||||
/// TODO [arkpar] Please document me
|
/// Bad block detected
|
||||||
Bad(Option<Error>),
|
Bad(Option<Error>),
|
||||||
/// TODO [arkpar] Please document me
|
/// Already in the block chain
|
||||||
AlreadyInChain,
|
AlreadyInChain,
|
||||||
/// TODO [arkpar] Please document me
|
/// Already in the block queue
|
||||||
AlreadyQueued,
|
AlreadyQueued,
|
||||||
|
/// Unknown parent
|
||||||
|
UnknownParent,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<Error> for ImportError {
|
impl From<Error> for ImportError {
|
||||||
|
@ -13,4 +13,5 @@ ethcore = { path = ".." }
|
|||||||
clippy = "0.0.37"
|
clippy = "0.0.37"
|
||||||
log = "0.3"
|
log = "0.3"
|
||||||
env_logger = "0.3"
|
env_logger = "0.3"
|
||||||
|
time = "0.1.34"
|
||||||
|
|
||||||
|
@ -22,6 +22,7 @@ use range_collection::{RangeCollection, ToUsize, FromUsize};
|
|||||||
use ethcore::error::*;
|
use ethcore::error::*;
|
||||||
use ethcore::block::Block;
|
use ethcore::block::Block;
|
||||||
use io::SyncIo;
|
use io::SyncIo;
|
||||||
|
use time;
|
||||||
|
|
||||||
impl ToUsize for BlockNumber {
|
impl ToUsize for BlockNumber {
|
||||||
fn to_usize(&self) -> usize {
|
fn to_usize(&self) -> usize {
|
||||||
@ -61,6 +62,8 @@ const RECEIPTS_PACKET: u8 = 0x10;
|
|||||||
|
|
||||||
const NETWORK_ID: U256 = ONE_U256; //TODO: get this from parent
|
const NETWORK_ID: U256 = ONE_U256; //TODO: get this from parent
|
||||||
|
|
||||||
|
const CONNECTION_TIMEOUT_SEC: f64 = 30f64;
|
||||||
|
|
||||||
struct Header {
|
struct Header {
|
||||||
/// Header data
|
/// Header data
|
||||||
data: Bytes,
|
data: Bytes,
|
||||||
@ -138,6 +141,8 @@ struct PeerInfo {
|
|||||||
asking: PeerAsking,
|
asking: PeerAsking,
|
||||||
/// A set of block numbers being requested
|
/// A set of block numbers being requested
|
||||||
asking_blocks: Vec<BlockNumber>,
|
asking_blocks: Vec<BlockNumber>,
|
||||||
|
/// Request timestamp
|
||||||
|
ask_time: f64,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Blockchain sync handler.
|
/// Blockchain sync handler.
|
||||||
@ -250,6 +255,7 @@ impl ChainSync {
|
|||||||
genesis: try!(r.val_at(4)),
|
genesis: try!(r.val_at(4)),
|
||||||
asking: PeerAsking::Nothing,
|
asking: PeerAsking::Nothing,
|
||||||
asking_blocks: Vec::new(),
|
asking_blocks: Vec::new(),
|
||||||
|
ask_time: 0f64,
|
||||||
};
|
};
|
||||||
|
|
||||||
trace!(target: "sync", "New peer {} (protocol: {}, network: {:?}, difficulty: {:?}, latest:{}, genesis:{})", peer_id, peer.protocol_version, peer.network_id, peer.difficulty, peer.latest, peer.genesis);
|
trace!(target: "sync", "New peer {} (protocol: {}, network: {:?}, difficulty: {:?}, latest:{}, genesis:{})", peer_id, peer.protocol_version, peer.network_id, peer.difficulty, peer.latest, peer.genesis);
|
||||||
@ -408,6 +414,7 @@ impl ChainSync {
|
|||||||
trace!(target: "sync", "{} -> NewBlock ({})", peer_id, h);
|
trace!(target: "sync", "{} -> NewBlock ({})", peer_id, h);
|
||||||
let header_view = HeaderView::new(header_rlp.as_raw());
|
let header_view = HeaderView::new(header_rlp.as_raw());
|
||||||
// TODO: Decompose block and add to self.headers and self.bodies instead
|
// TODO: Decompose block and add to self.headers and self.bodies instead
|
||||||
|
let mut unknown = false;
|
||||||
if header_view.number() == From::from(self.last_imported_block + 1) {
|
if header_view.number() == From::from(self.last_imported_block + 1) {
|
||||||
match io.chain().import_block(block_rlp.as_raw().to_vec()) {
|
match io.chain().import_block(block_rlp.as_raw().to_vec()) {
|
||||||
Err(ImportError::AlreadyInChain) => {
|
Err(ImportError::AlreadyInChain) => {
|
||||||
@ -416,6 +423,10 @@ impl ChainSync {
|
|||||||
Err(ImportError::AlreadyQueued) => {
|
Err(ImportError::AlreadyQueued) => {
|
||||||
trace!(target: "sync", "New block already queued {:?}", h);
|
trace!(target: "sync", "New block already queued {:?}", h);
|
||||||
},
|
},
|
||||||
|
Err(ImportError::UnknownParent) => {
|
||||||
|
unknown = true;
|
||||||
|
trace!(target: "sync", "New block with unknown parent {:?}", h);
|
||||||
|
},
|
||||||
Ok(_) => {
|
Ok(_) => {
|
||||||
trace!(target: "sync", "New block queued {:?}", h);
|
trace!(target: "sync", "New block queued {:?}", h);
|
||||||
},
|
},
|
||||||
@ -426,6 +437,9 @@ impl ChainSync {
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
|
unknown = true;
|
||||||
|
}
|
||||||
|
if unknown {
|
||||||
trace!(target: "sync", "New block unknown {:?}", h);
|
trace!(target: "sync", "New block unknown {:?}", h);
|
||||||
//TODO: handle too many unknown blocks
|
//TODO: handle too many unknown blocks
|
||||||
let difficulty: U256 = try!(r.val_at(1));
|
let difficulty: U256 = try!(r.val_at(1));
|
||||||
@ -795,6 +809,7 @@ impl ChainSync {
|
|||||||
Ok(_) => {
|
Ok(_) => {
|
||||||
let mut peer = self.peers.get_mut(&peer_id).unwrap();
|
let mut peer = self.peers.get_mut(&peer_id).unwrap();
|
||||||
peer.asking = asking;
|
peer.asking = asking;
|
||||||
|
peer.ask_time = time::precise_time_s();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -969,6 +984,16 @@ impl ChainSync {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Handle peer timeouts
|
||||||
|
pub fn maintain_peers(&self, io: &mut SyncIo) {
|
||||||
|
let tick = time::precise_time_s();
|
||||||
|
for (peer_id, peer) in &self.peers {
|
||||||
|
if peer.asking != PeerAsking::Nothing && (tick - peer.ask_time) > CONNECTION_TIMEOUT_SEC {
|
||||||
|
io.disconnect_peer(*peer_id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Maintain other peers. Send out any new blocks and transactions
|
/// Maintain other peers. Send out any new blocks and transactions
|
||||||
pub fn _maintain_sync(&mut self, _io: &mut SyncIo) {
|
pub fn _maintain_sync(&mut self, _io: &mut SyncIo) {
|
||||||
}
|
}
|
||||||
|
@ -9,6 +9,8 @@ use ethcore::service::SyncMessage;
|
|||||||
pub trait SyncIo {
|
pub trait SyncIo {
|
||||||
/// Disable a peer
|
/// Disable a peer
|
||||||
fn disable_peer(&mut self, peer_id: PeerId);
|
fn disable_peer(&mut self, peer_id: PeerId);
|
||||||
|
/// Disconnect peer
|
||||||
|
fn disconnect_peer(&mut self, peer_id: PeerId);
|
||||||
/// Respond to current request with a packet. Can be called from an IO handler for incoming packet.
|
/// Respond to current request with a packet. Can be called from an IO handler for incoming packet.
|
||||||
fn respond(&mut self, packet_id: PacketId, data: Vec<u8>) -> Result<(), UtilError>;
|
fn respond(&mut self, packet_id: PacketId, data: Vec<u8>) -> Result<(), UtilError>;
|
||||||
/// Send a packet to a peer.
|
/// Send a packet to a peer.
|
||||||
@ -42,6 +44,10 @@ impl<'s, 'h> SyncIo for NetSyncIo<'s, 'h> {
|
|||||||
self.network.disable_peer(peer_id);
|
self.network.disable_peer(peer_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn disconnect_peer(&mut self, peer_id: PeerId) {
|
||||||
|
self.network.disconnect_peer(peer_id);
|
||||||
|
}
|
||||||
|
|
||||||
fn respond(&mut self, packet_id: PacketId, data: Vec<u8>) -> Result<(), UtilError>{
|
fn respond(&mut self, packet_id: PacketId, data: Vec<u8>) -> Result<(), UtilError>{
|
||||||
self.network.respond(packet_id, data)
|
self.network.respond(packet_id, data)
|
||||||
}
|
}
|
||||||
|
@ -33,11 +33,13 @@ extern crate log;
|
|||||||
extern crate ethcore_util as util;
|
extern crate ethcore_util as util;
|
||||||
extern crate ethcore;
|
extern crate ethcore;
|
||||||
extern crate env_logger;
|
extern crate env_logger;
|
||||||
|
extern crate time;
|
||||||
|
|
||||||
use std::ops::*;
|
use std::ops::*;
|
||||||
use std::sync::*;
|
use std::sync::*;
|
||||||
use ethcore::client::Client;
|
use ethcore::client::Client;
|
||||||
use util::network::{NetworkProtocolHandler, NetworkService, NetworkContext, PeerId};
|
use util::network::{NetworkProtocolHandler, NetworkService, NetworkContext, PeerId};
|
||||||
|
use util::io::TimerToken;
|
||||||
use chain::ChainSync;
|
use chain::ChainSync;
|
||||||
use ethcore::service::SyncMessage;
|
use ethcore::service::SyncMessage;
|
||||||
use io::NetSyncIo;
|
use io::NetSyncIo;
|
||||||
@ -87,7 +89,8 @@ impl EthSync {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl NetworkProtocolHandler<SyncMessage> for EthSync {
|
impl NetworkProtocolHandler<SyncMessage> for EthSync {
|
||||||
fn initialize(&self, _io: &NetworkContext<SyncMessage>) {
|
fn initialize(&self, io: &NetworkContext<SyncMessage>) {
|
||||||
|
io.register_timer(0, 1000).expect("Error registering sync timer");
|
||||||
}
|
}
|
||||||
|
|
||||||
fn read(&self, io: &NetworkContext<SyncMessage>, peer: &PeerId, packet_id: u8, data: &[u8]) {
|
fn read(&self, io: &NetworkContext<SyncMessage>, peer: &PeerId, packet_id: u8, data: &[u8]) {
|
||||||
@ -101,6 +104,10 @@ impl NetworkProtocolHandler<SyncMessage> for EthSync {
|
|||||||
fn disconnected(&self, io: &NetworkContext<SyncMessage>, peer: &PeerId) {
|
fn disconnected(&self, io: &NetworkContext<SyncMessage>, peer: &PeerId) {
|
||||||
self.sync.write().unwrap().on_peer_aborting(&mut NetSyncIo::new(io, self.chain.deref()), *peer);
|
self.sync.write().unwrap().on_peer_aborting(&mut NetSyncIo::new(io, self.chain.deref()), *peer);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn timeout(&self, io: &NetworkContext<SyncMessage>, _timer: TimerToken) {
|
||||||
|
self.sync.write().unwrap().maintain_peers(&mut NetSyncIo::new(io, self.chain.deref()));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -209,6 +209,9 @@ impl<'p> SyncIo for TestIo<'p> {
|
|||||||
fn disable_peer(&mut self, _peer_id: PeerId) {
|
fn disable_peer(&mut self, _peer_id: PeerId) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn disconnect_peer(&mut self, _peer_id: PeerId) {
|
||||||
|
}
|
||||||
|
|
||||||
fn respond(&mut self, packet_id: PacketId, data: Vec<u8>) -> Result<(), UtilError> {
|
fn respond(&mut self, packet_id: PacketId, data: Vec<u8>) -> Result<(), UtilError> {
|
||||||
self.queue.push_back(TestPacket {
|
self.queue.push_back(TestPacket {
|
||||||
data: data,
|
data: data,
|
||||||
|
@ -207,6 +207,12 @@ pub struct EncryptedConnection {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl EncryptedConnection {
|
impl EncryptedConnection {
|
||||||
|
|
||||||
|
/// Get socket token
|
||||||
|
pub fn token(&self) -> StreamToken {
|
||||||
|
self.connection.token
|
||||||
|
}
|
||||||
|
|
||||||
/// Create an encrypted connection out of the handshake. Consumes a handshake object.
|
/// Create an encrypted connection out of the handshake. Consumes a handshake object.
|
||||||
pub fn new(mut handshake: Handshake) -> Result<EncryptedConnection, UtilError> {
|
pub fn new(mut handshake: Handshake) -> Result<EncryptedConnection, UtilError> {
|
||||||
let shared = try!(crypto::ecdh::agree(handshake.ecdhe.secret(), &handshake.remote_public));
|
let shared = try!(crypto::ecdh::agree(handshake.ecdhe.secret(), &handshake.remote_public));
|
||||||
|
@ -5,17 +5,17 @@ use rlp::*;
|
|||||||
pub enum DisconnectReason
|
pub enum DisconnectReason
|
||||||
{
|
{
|
||||||
DisconnectRequested,
|
DisconnectRequested,
|
||||||
//TCPError,
|
_TCPError,
|
||||||
//BadProtocol,
|
_BadProtocol,
|
||||||
UselessPeer,
|
UselessPeer,
|
||||||
//TooManyPeers,
|
_TooManyPeers,
|
||||||
//DuplicatePeer,
|
_DuplicatePeer,
|
||||||
//IncompatibleProtocol,
|
_IncompatibleProtocol,
|
||||||
//NullIdentity,
|
_NullIdentity,
|
||||||
//ClientQuit,
|
_ClientQuit,
|
||||||
//UnexpectedIdentity,
|
_UnexpectedIdentity,
|
||||||
//LocalIdentity,
|
_LocalIdentity,
|
||||||
//PingTimeout,
|
PingTimeout,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
|
@ -19,6 +19,7 @@ use io::*;
|
|||||||
use network::NetworkProtocolHandler;
|
use network::NetworkProtocolHandler;
|
||||||
use network::node::*;
|
use network::node::*;
|
||||||
use network::stats::NetworkStats;
|
use network::stats::NetworkStats;
|
||||||
|
use network::error::DisconnectReason;
|
||||||
|
|
||||||
type Slab<T> = ::slab::Slab<T, usize>;
|
type Slab<T> = ::slab::Slab<T, usize>;
|
||||||
|
|
||||||
@ -108,6 +109,8 @@ pub enum NetworkIoMessage<Message> where Message: Send + Sync + Clone {
|
|||||||
/// Timer delay in milliseconds.
|
/// Timer delay in milliseconds.
|
||||||
delay: u64,
|
delay: u64,
|
||||||
},
|
},
|
||||||
|
/// Disconnect a peer
|
||||||
|
Disconnect(PeerId),
|
||||||
/// User message
|
/// User message
|
||||||
User(Message),
|
User(Message),
|
||||||
}
|
}
|
||||||
@ -181,8 +184,14 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Disable current protocol capability for given peer. If no capabilities left peer gets disconnected.
|
/// Disable current protocol capability for given peer. If no capabilities left peer gets disconnected.
|
||||||
pub fn disable_peer(&self, _peer: PeerId) {
|
pub fn disable_peer(&self, peer: PeerId) {
|
||||||
//TODO: remove capability, disconnect if no capabilities left
|
//TODO: remove capability, disconnect if no capabilities left
|
||||||
|
self.disconnect_peer(peer);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Disconnect peer. Reconnect can be attempted later.
|
||||||
|
pub fn disconnect_peer(&self, peer: PeerId) {
|
||||||
|
self.io.message(NetworkIoMessage::Disconnect(peer));
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Register a new IO timer. 'IoHandler::timeout' will be called with the token.
|
/// Register a new IO timer. 'IoHandler::timeout' will be called with the token.
|
||||||
@ -332,6 +341,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn maintain_network(&self, io: &IoContext<NetworkIoMessage<Message>>) {
|
fn maintain_network(&self, io: &IoContext<NetworkIoMessage<Message>>) {
|
||||||
|
self.keep_alive(io);
|
||||||
self.connect_peers(io);
|
self.connect_peers(io);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -343,6 +353,21 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
|
|||||||
self.connections.read().unwrap().iter().any(|e| match *e.lock().unwrap().deref() { ConnectionEntry::Handshake(ref h) => h.id.eq(&id), _ => false })
|
self.connections.read().unwrap().iter().any(|e| match *e.lock().unwrap().deref() { ConnectionEntry::Handshake(ref h) => h.id.eq(&id), _ => false })
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn keep_alive(&self, io: &IoContext<NetworkIoMessage<Message>>) {
|
||||||
|
let mut to_kill = Vec::new();
|
||||||
|
for e in self.connections.write().unwrap().iter_mut() {
|
||||||
|
if let ConnectionEntry::Session(ref mut s) = *e.lock().unwrap().deref_mut() {
|
||||||
|
if !s.keep_alive() {
|
||||||
|
s.disconnect(DisconnectReason::PingTimeout);
|
||||||
|
to_kill.push(s.token());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for p in to_kill {
|
||||||
|
self.kill_connection(p, io);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn connect_peers(&self, io: &IoContext<NetworkIoMessage<Message>>) {
|
fn connect_peers(&self, io: &IoContext<NetworkIoMessage<Message>>) {
|
||||||
struct NodeInfo {
|
struct NodeInfo {
|
||||||
id: NodeId,
|
id: NodeId,
|
||||||
@ -684,6 +709,15 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
|
|||||||
self.timers.write().unwrap().insert(handler_token, ProtocolTimer { protocol: protocol, token: *token });
|
self.timers.write().unwrap().insert(handler_token, ProtocolTimer { protocol: protocol, token: *token });
|
||||||
io.register_timer(handler_token, *delay).expect("Error registering timer");
|
io.register_timer(handler_token, *delay).expect("Error registering timer");
|
||||||
},
|
},
|
||||||
|
NetworkIoMessage::Disconnect(ref peer) => {
|
||||||
|
if let Some(connection) = self.connections.read().unwrap().get(*peer).cloned() {
|
||||||
|
match *connection.lock().unwrap().deref_mut() {
|
||||||
|
ConnectionEntry::Handshake(_) => {},
|
||||||
|
ConnectionEntry::Session(ref mut s) => { s.disconnect(DisconnectReason::DisconnectRequested); }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
self.kill_connection(*peer, io);
|
||||||
|
},
|
||||||
NetworkIoMessage::User(ref message) => {
|
NetworkIoMessage::User(ref message) => {
|
||||||
for (p, h) in self.handlers.read().unwrap().iter() {
|
for (p, h) in self.handlers.read().unwrap().iter() {
|
||||||
h.message(&NetworkContext::new(io, p, None, self.connections.clone()), &message);
|
h.message(&NetworkContext::new(io, p, None, self.connections.clone()), &message);
|
||||||
|
@ -21,7 +21,7 @@ impl<Message> NetworkService<Message> where Message: Send + Sync + Clone + 'stat
|
|||||||
let host = Arc::new(Host::new(config));
|
let host = Arc::new(Host::new(config));
|
||||||
let stats = host.stats().clone();
|
let stats = host.stats().clone();
|
||||||
let host_info = host.client_version();
|
let host_info = host.client_version();
|
||||||
info!("NetworkService::start(): id={:?}", host.client_id());
|
info!("Host ID={:?}", host.client_id());
|
||||||
try!(io_service.register_handler(host));
|
try!(io_service.register_handler(host));
|
||||||
Ok(NetworkService {
|
Ok(NetworkService {
|
||||||
io_service: io_service,
|
io_service: io_service,
|
||||||
|
@ -4,10 +4,14 @@ use rlp::*;
|
|||||||
use network::connection::{EncryptedConnection, Packet};
|
use network::connection::{EncryptedConnection, Packet};
|
||||||
use network::handshake::Handshake;
|
use network::handshake::Handshake;
|
||||||
use error::*;
|
use error::*;
|
||||||
use io::{IoContext};
|
use io::{IoContext, StreamToken};
|
||||||
use network::error::{NetworkError, DisconnectReason};
|
use network::error::{NetworkError, DisconnectReason};
|
||||||
use network::host::*;
|
use network::host::*;
|
||||||
use network::node::NodeId;
|
use network::node::NodeId;
|
||||||
|
use time;
|
||||||
|
|
||||||
|
const PING_TIMEOUT_SEC: u64 = 30;
|
||||||
|
const PING_INTERVAL_SEC: u64 = 30;
|
||||||
|
|
||||||
/// Peer session over encrypted connection.
|
/// Peer session over encrypted connection.
|
||||||
/// When created waits for Hello packet exchange and signals ready state.
|
/// When created waits for Hello packet exchange and signals ready state.
|
||||||
@ -19,6 +23,8 @@ pub struct Session {
|
|||||||
connection: EncryptedConnection,
|
connection: EncryptedConnection,
|
||||||
/// Session ready flag. Set after successfull Hello packet exchange
|
/// Session ready flag. Set after successfull Hello packet exchange
|
||||||
had_hello: bool,
|
had_hello: bool,
|
||||||
|
ping_time_ns: u64,
|
||||||
|
pong_time_ns: Option<u64>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Structure used to report various session events.
|
/// Structure used to report various session events.
|
||||||
@ -47,6 +53,8 @@ pub struct SessionInfo {
|
|||||||
pub protocol_version: u32,
|
pub protocol_version: u32,
|
||||||
/// Peer protocol capabilities
|
/// Peer protocol capabilities
|
||||||
capabilities: Vec<SessionCapabilityInfo>,
|
capabilities: Vec<SessionCapabilityInfo>,
|
||||||
|
/// Peer ping delay in milliseconds
|
||||||
|
pub ping_ms: Option<u64>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, PartialEq, Eq)]
|
#[derive(Debug, PartialEq, Eq)]
|
||||||
@ -95,10 +103,13 @@ impl Session {
|
|||||||
client_version: String::new(),
|
client_version: String::new(),
|
||||||
protocol_version: 0,
|
protocol_version: 0,
|
||||||
capabilities: Vec::new(),
|
capabilities: Vec::new(),
|
||||||
|
ping_ms: None,
|
||||||
},
|
},
|
||||||
|
ping_time_ns: 0,
|
||||||
|
pong_time_ns: None,
|
||||||
};
|
};
|
||||||
try!(session.write_hello(host));
|
try!(session.write_hello(host));
|
||||||
try!(session.write_ping());
|
try!(session.send_ping());
|
||||||
Ok(session)
|
Ok(session)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -141,7 +152,7 @@ impl Session {
|
|||||||
while protocol != self.info.capabilities[i].protocol {
|
while protocol != self.info.capabilities[i].protocol {
|
||||||
i += 1;
|
i += 1;
|
||||||
if i == self.info.capabilities.len() {
|
if i == self.info.capabilities.len() {
|
||||||
debug!(target: "net", "Unkown protocol: {:?}", protocol);
|
debug!(target: "net", "Unknown protocol: {:?}", protocol);
|
||||||
return Ok(())
|
return Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -152,6 +163,26 @@ impl Session {
|
|||||||
self.connection.send_packet(&rlp.out())
|
self.connection.send_packet(&rlp.out())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Keep this session alive. Returns false if ping timeout happened
|
||||||
|
pub fn keep_alive(&mut self) -> bool {
|
||||||
|
let timed_out = if let Some(pong) = self.pong_time_ns {
|
||||||
|
pong - self.ping_time_ns > PING_TIMEOUT_SEC * 1000_000_000
|
||||||
|
} else {
|
||||||
|
time::precise_time_ns() - self.ping_time_ns > PING_TIMEOUT_SEC * 1000_000_000
|
||||||
|
};
|
||||||
|
|
||||||
|
if !timed_out && time::precise_time_ns() - self.ping_time_ns > PING_INTERVAL_SEC * 1000_000_000 {
|
||||||
|
if let Err(e) = self.send_ping() {
|
||||||
|
debug!("Error sending ping message: {:?}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
!timed_out
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn token(&self) -> StreamToken {
|
||||||
|
self.connection.token()
|
||||||
|
}
|
||||||
|
|
||||||
fn read_packet(&mut self, packet: Packet, host: &HostInfo) -> Result<SessionData, UtilError> {
|
fn read_packet(&mut self, packet: Packet, host: &HostInfo) -> Result<SessionData, UtilError> {
|
||||||
if packet.data.len() < 2 {
|
if packet.data.len() < 2 {
|
||||||
return Err(From::from(NetworkError::BadProtocol));
|
return Err(From::from(NetworkError::BadProtocol));
|
||||||
@ -168,7 +199,12 @@ impl Session {
|
|||||||
},
|
},
|
||||||
PACKET_DISCONNECT => Err(From::from(NetworkError::Disconnect(DisconnectReason::DisconnectRequested))),
|
PACKET_DISCONNECT => Err(From::from(NetworkError::Disconnect(DisconnectReason::DisconnectRequested))),
|
||||||
PACKET_PING => {
|
PACKET_PING => {
|
||||||
try!(self.write_pong());
|
try!(self.send_pong());
|
||||||
|
Ok(SessionData::None)
|
||||||
|
},
|
||||||
|
PACKET_PONG => {
|
||||||
|
self.pong_time_ns = Some(time::precise_time_ns());
|
||||||
|
self.info.ping_ms = Some((self.pong_time_ns.unwrap() - self.ping_time_ns) / 1000_000);
|
||||||
Ok(SessionData::None)
|
Ok(SessionData::None)
|
||||||
},
|
},
|
||||||
PACKET_GET_PEERS => Ok(SessionData::None), //TODO;
|
PACKET_GET_PEERS => Ok(SessionData::None), //TODO;
|
||||||
@ -178,7 +214,7 @@ impl Session {
|
|||||||
while packet_id < self.info.capabilities[i].id_offset {
|
while packet_id < self.info.capabilities[i].id_offset {
|
||||||
i += 1;
|
i += 1;
|
||||||
if i == self.info.capabilities.len() {
|
if i == self.info.capabilities.len() {
|
||||||
debug!(target: "net", "Unkown packet: {:?}", packet_id);
|
debug!(target: "net", "Unknown packet: {:?}", packet_id);
|
||||||
return Ok(SessionData::None)
|
return Ok(SessionData::None)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -189,7 +225,7 @@ impl Session {
|
|||||||
Ok(SessionData::Packet { data: packet.data, protocol: protocol, packet_id: pid } )
|
Ok(SessionData::Packet { data: packet.data, protocol: protocol, packet_id: pid } )
|
||||||
},
|
},
|
||||||
_ => {
|
_ => {
|
||||||
debug!(target: "net", "Unkown packet: {:?}", packet_id);
|
debug!(target: "net", "Unknown packet: {:?}", packet_id);
|
||||||
Ok(SessionData::None)
|
Ok(SessionData::None)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -255,15 +291,20 @@ impl Session {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn write_ping(&mut self) -> Result<(), UtilError> {
|
/// Senf ping packet
|
||||||
self.send(try!(Session::prepare(PACKET_PING)))
|
pub fn send_ping(&mut self) -> Result<(), UtilError> {
|
||||||
|
try!(self.send(try!(Session::prepare(PACKET_PING))));
|
||||||
|
self.ping_time_ns = time::precise_time_ns();
|
||||||
|
self.pong_time_ns = None;
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn write_pong(&mut self) -> Result<(), UtilError> {
|
fn send_pong(&mut self) -> Result<(), UtilError> {
|
||||||
self.send(try!(Session::prepare(PACKET_PONG)))
|
self.send(try!(Session::prepare(PACKET_PONG)))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn disconnect(&mut self, reason: DisconnectReason) -> NetworkError {
|
/// Disconnect this session
|
||||||
|
pub fn disconnect(&mut self, reason: DisconnectReason) -> NetworkError {
|
||||||
let mut rlp = RlpStream::new();
|
let mut rlp = RlpStream::new();
|
||||||
rlp.append(&(PACKET_DISCONNECT as u32));
|
rlp.append(&(PACKET_DISCONNECT as u32));
|
||||||
rlp.begin_list(1);
|
rlp.begin_list(1);
|
||||||
|
Loading…
Reference in New Issue
Block a user