increase tick timer and limit peers to one req
This commit is contained in:
parent
5b0531e964
commit
ccdf5d5873
@ -62,6 +62,8 @@ pub enum Error {
|
|||||||
UnsupportedProtocolVersion(u8),
|
UnsupportedProtocolVersion(u8),
|
||||||
/// Bad protocol version.
|
/// Bad protocol version.
|
||||||
BadProtocolVersion,
|
BadProtocolVersion,
|
||||||
|
/// Peer is overburdened.
|
||||||
|
Overburdened,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Error {
|
impl Error {
|
||||||
@ -79,6 +81,7 @@ impl Error {
|
|||||||
Error::NotServer => Punishment::Disable,
|
Error::NotServer => Punishment::Disable,
|
||||||
Error::UnsupportedProtocolVersion(_) => Punishment::Disable,
|
Error::UnsupportedProtocolVersion(_) => Punishment::Disable,
|
||||||
Error::BadProtocolVersion => Punishment::Disable,
|
Error::BadProtocolVersion => Punishment::Disable,
|
||||||
|
Error::Overburdened => Punishment::None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -109,6 +112,7 @@ impl fmt::Display for Error {
|
|||||||
Error::NotServer => write!(f, "Peer not a server."),
|
Error::NotServer => write!(f, "Peer not a server."),
|
||||||
Error::UnsupportedProtocolVersion(pv) => write!(f, "Unsupported protocol version: {}", pv),
|
Error::UnsupportedProtocolVersion(pv) => write!(f, "Unsupported protocol version: {}", pv),
|
||||||
Error::BadProtocolVersion => write!(f, "Bad protocol version in handshake"),
|
Error::BadProtocolVersion => write!(f, "Bad protocol version in handshake"),
|
||||||
|
Error::Overburdened => write!(f, "Peer overburdened"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
@ -55,6 +55,9 @@ pub use self::status::{Status, Capabilities, Announcement};
|
|||||||
const TIMEOUT: TimerToken = 0;
|
const TIMEOUT: TimerToken = 0;
|
||||||
const TIMEOUT_INTERVAL_MS: u64 = 1000;
|
const TIMEOUT_INTERVAL_MS: u64 = 1000;
|
||||||
|
|
||||||
|
const TICK_TIMEOUT: TimerToken = 1;
|
||||||
|
const TICK_TIMEOUT_INTERVAL_MS: u64 = 5000;
|
||||||
|
|
||||||
// minimum interval between updates.
|
// minimum interval between updates.
|
||||||
const UPDATE_INTERVAL_MS: i64 = 5000;
|
const UPDATE_INTERVAL_MS: i64 = 5000;
|
||||||
|
|
||||||
@ -132,8 +135,9 @@ struct Peer {
|
|||||||
status: Status,
|
status: Status,
|
||||||
capabilities: Capabilities,
|
capabilities: Capabilities,
|
||||||
remote_flow: Option<(Buffer, FlowParams)>,
|
remote_flow: Option<(Buffer, FlowParams)>,
|
||||||
sent_head: H256, // last head we've given them.
|
sent_head: H256, // last chain head we've given them.
|
||||||
last_update: SteadyTime,
|
last_update: SteadyTime,
|
||||||
|
idle: bool, // make into a current percentage of max buffer being requested?
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Peer {
|
impl Peer {
|
||||||
@ -263,10 +267,16 @@ impl LightProtocol {
|
|||||||
pub fn max_requests(&self, peer: PeerId, kind: request::Kind) -> usize {
|
pub fn max_requests(&self, peer: PeerId, kind: request::Kind) -> usize {
|
||||||
self.peers.read().get(&peer).and_then(|peer| {
|
self.peers.read().get(&peer).and_then(|peer| {
|
||||||
let mut peer = peer.lock();
|
let mut peer = peer.lock();
|
||||||
match peer.remote_flow.as_mut() {
|
let idle = peer.idle;
|
||||||
Some(&mut (ref mut buf, ref flow)) => {
|
match peer.remote_flow {
|
||||||
|
Some((ref mut buf, ref flow)) => {
|
||||||
flow.recharge(buf);
|
flow.recharge(buf);
|
||||||
Some(flow.max_amount(&*buf, kind))
|
|
||||||
|
if !idle {
|
||||||
|
Some(0)
|
||||||
|
} else {
|
||||||
|
Some(flow.max_amount(&*buf, kind))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
None => None,
|
None => None,
|
||||||
}
|
}
|
||||||
@ -284,6 +294,8 @@ impl LightProtocol {
|
|||||||
let peer = try!(peers.get(peer_id).ok_or_else(|| Error::UnknownPeer));
|
let peer = try!(peers.get(peer_id).ok_or_else(|| Error::UnknownPeer));
|
||||||
let mut peer = peer.lock();
|
let mut peer = peer.lock();
|
||||||
|
|
||||||
|
if !peer.idle { return Err(Error::Overburdened) }
|
||||||
|
|
||||||
match peer.remote_flow {
|
match peer.remote_flow {
|
||||||
Some((ref mut buf, ref flow)) => {
|
Some((ref mut buf, ref flow)) => {
|
||||||
flow.recharge(buf);
|
flow.recharge(buf);
|
||||||
@ -309,6 +321,7 @@ impl LightProtocol {
|
|||||||
|
|
||||||
io.send(*peer_id, packet_id, packet_data);
|
io.send(*peer_id, packet_id, packet_data);
|
||||||
|
|
||||||
|
peer.idle = false;
|
||||||
self.pending_requests.write().insert(req_id, Requested {
|
self.pending_requests.write().insert(req_id, Requested {
|
||||||
request: request,
|
request: request,
|
||||||
timestamp: SteadyTime::now(),
|
timestamp: SteadyTime::now(),
|
||||||
@ -412,6 +425,8 @@ impl LightProtocol {
|
|||||||
match peers.get(peer) {
|
match peers.get(peer) {
|
||||||
Some(peer_info) => {
|
Some(peer_info) => {
|
||||||
let mut peer_info = peer_info.lock();
|
let mut peer_info = peer_info.lock();
|
||||||
|
peer_info.idle = true;
|
||||||
|
|
||||||
match peer_info.remote_flow.as_mut() {
|
match peer_info.remote_flow.as_mut() {
|
||||||
Some(&mut (ref mut buf, ref mut flow)) => {
|
Some(&mut (ref mut buf, ref mut flow)) => {
|
||||||
let actual_buffer = ::std::cmp::min(cur_buffer, *flow.limit());
|
let actual_buffer = ::std::cmp::min(cur_buffer, *flow.limit());
|
||||||
@ -620,6 +635,7 @@ impl LightProtocol {
|
|||||||
remote_flow: remote_flow,
|
remote_flow: remote_flow,
|
||||||
sent_head: pending.sent_head,
|
sent_head: pending.sent_head,
|
||||||
last_update: pending.last_update,
|
last_update: pending.last_update,
|
||||||
|
idle: true,
|
||||||
}));
|
}));
|
||||||
|
|
||||||
for handler in &self.handlers {
|
for handler in &self.handlers {
|
||||||
@ -1124,7 +1140,10 @@ fn punish(peer: PeerId, io: &IoContext, e: Error) {
|
|||||||
|
|
||||||
impl NetworkProtocolHandler for LightProtocol {
|
impl NetworkProtocolHandler for LightProtocol {
|
||||||
fn initialize(&self, io: &NetworkContext) {
|
fn initialize(&self, io: &NetworkContext) {
|
||||||
io.register_timer(TIMEOUT, TIMEOUT_INTERVAL_MS).expect("Error registering sync timer.");
|
io.register_timer(TIMEOUT, TIMEOUT_INTERVAL_MS)
|
||||||
|
.expect("Error registering sync timer.");
|
||||||
|
io.register_timer(TICK_TIMEOUT, TICK_TIMEOUT_INTERVAL_MS)
|
||||||
|
.expect("Error registering sync timer.");
|
||||||
}
|
}
|
||||||
|
|
||||||
fn read(&self, io: &NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]) {
|
fn read(&self, io: &NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]) {
|
||||||
@ -1141,10 +1160,8 @@ impl NetworkProtocolHandler for LightProtocol {
|
|||||||
|
|
||||||
fn timeout(&self, io: &NetworkContext, timer: TimerToken) {
|
fn timeout(&self, io: &NetworkContext, timer: TimerToken) {
|
||||||
match timer {
|
match timer {
|
||||||
TIMEOUT => {
|
TIMEOUT => self.timeout_check(io),
|
||||||
self.timeout_check(io);
|
TICK_TIMEOUT => self.tick_handlers(io),
|
||||||
self.tick_handlers(io);
|
|
||||||
},
|
|
||||||
_ => warn!(target: "les", "received timeout on unknown token {}", timer),
|
_ => warn!(target: "les", "received timeout on unknown token {}", timer),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -417,8 +417,6 @@ impl<L: LightChainClient> LightSync<L> {
|
|||||||
rng.shuffle(&mut peer_ids);
|
rng.shuffle(&mut peer_ids);
|
||||||
|
|
||||||
for peer in &peer_ids {
|
for peer in &peer_ids {
|
||||||
let peer_info = peers.get(peer).expect("key known to be present; qed");
|
|
||||||
let mut peer_info = peer_info.lock();
|
|
||||||
if ctx.max_requests(*peer, request::Kind::Headers) >= req.max {
|
if ctx.max_requests(*peer, request::Kind::Headers) >= req.max {
|
||||||
match ctx.request_from(*peer, request::Request::Headers(req.clone())) {
|
match ctx.request_from(*peer, request::Request::Headers(req.clone())) {
|
||||||
Ok(id) => {
|
Ok(id) => {
|
||||||
|
Loading…
Reference in New Issue
Block a user