store cumulative cost in pending request set.
This commit is contained in:
parent
c718b5618e
commit
e3d6525d83
@ -303,12 +303,9 @@ impl LightProtocol {
|
|||||||
match peer.remote_flow {
|
match peer.remote_flow {
|
||||||
None => Err(Error::NotServer),
|
None => Err(Error::NotServer),
|
||||||
Some((ref mut creds, ref params)) => {
|
Some((ref mut creds, ref params)) => {
|
||||||
// check that enough credits are available.
|
// compute and deduct cost.
|
||||||
let mut temp_creds: Credits = creds.clone();
|
let cost = params.compute_cost_multi(requests.requests());
|
||||||
for request in requests.requests() {
|
creds.deduct_cost(cost)?;
|
||||||
temp_creds.deduct_cost(params.compute_cost(request))?;
|
|
||||||
}
|
|
||||||
*creds = temp_creds;
|
|
||||||
|
|
||||||
let req_id = ReqId(self.req_id.fetch_add(1, Ordering::SeqCst));
|
let req_id = ReqId(self.req_id.fetch_add(1, Ordering::SeqCst));
|
||||||
io.send(*peer_id, packet::REQUEST, {
|
io.send(*peer_id, packet::REQUEST, {
|
||||||
@ -318,7 +315,7 @@ impl LightProtocol {
|
|||||||
});
|
});
|
||||||
|
|
||||||
// begin timeout.
|
// begin timeout.
|
||||||
peer.pending_requests.insert(req_id, requests, SteadyTime::now());
|
peer.pending_requests.insert(req_id, requests, cost, SteadyTime::now());
|
||||||
Ok(req_id)
|
Ok(req_id)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -408,13 +405,18 @@ impl LightProtocol {
|
|||||||
Some(peer_info) => {
|
Some(peer_info) => {
|
||||||
let mut peer_info = peer_info.lock();
|
let mut peer_info = peer_info.lock();
|
||||||
let req_info = peer_info.pending_requests.remove(&req_id, SteadyTime::now());
|
let req_info = peer_info.pending_requests.remove(&req_id, SteadyTime::now());
|
||||||
|
let cumulative_cost = peer_info.pending_requests.cumulative_cost();
|
||||||
let flow_info = peer_info.remote_flow.as_mut();
|
let flow_info = peer_info.remote_flow.as_mut();
|
||||||
|
|
||||||
match (req_info, flow_info) {
|
match (req_info, flow_info) {
|
||||||
(Some(_), Some(flow_info)) => {
|
(Some(_), Some(flow_info)) => {
|
||||||
let &mut (ref mut c, ref mut flow) = flow_info;
|
let &mut (ref mut c, ref mut flow) = flow_info;
|
||||||
let actual_credits = ::std::cmp::min(cur_credits, *flow.limit());
|
|
||||||
c.update_to(actual_credits);
|
// only update if the cumulative cost of the request set is zero.
|
||||||
|
if cumulative_cost == 0.into() {
|
||||||
|
let actual_credits = ::std::cmp::min(cur_credits, *flow.limit());
|
||||||
|
c.update_to(actual_credits);
|
||||||
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@ -520,6 +522,7 @@ impl LightProtocol {
|
|||||||
last_update: SteadyTime::now(),
|
last_update: SteadyTime::now(),
|
||||||
});
|
});
|
||||||
|
|
||||||
|
trace!(target: "pip", "Sending status to peer {}", peer);
|
||||||
io.send(*peer, packet::STATUS, status_packet);
|
io.send(*peer, packet::STATUS, status_packet);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -27,22 +27,29 @@ use std::iter::FromIterator;
|
|||||||
use request::Request;
|
use request::Request;
|
||||||
use request::Requests;
|
use request::Requests;
|
||||||
use net::{timeout, ReqId};
|
use net::{timeout, ReqId};
|
||||||
|
use util::U256;
|
||||||
|
|
||||||
use time::{Duration, SteadyTime};
|
use time::{Duration, SteadyTime};
|
||||||
|
|
||||||
|
// Request set entry: requests + cost.
|
||||||
|
#[derive(Debug)]
|
||||||
|
struct Entry(Requests, U256);
|
||||||
|
|
||||||
/// Request set.
|
/// Request set.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct RequestSet {
|
pub struct RequestSet {
|
||||||
counter: u64,
|
counter: u64,
|
||||||
|
cumulative_cost: U256,
|
||||||
base: Option<SteadyTime>,
|
base: Option<SteadyTime>,
|
||||||
ids: HashMap<ReqId, u64>,
|
ids: HashMap<ReqId, u64>,
|
||||||
reqs: BTreeMap<u64, Requests>,
|
reqs: BTreeMap<u64, Entry>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for RequestSet {
|
impl Default for RequestSet {
|
||||||
fn default() -> Self {
|
fn default() -> Self {
|
||||||
RequestSet {
|
RequestSet {
|
||||||
counter: 0,
|
counter: 0,
|
||||||
|
cumulative_cost: 0.into(),
|
||||||
base: None,
|
base: None,
|
||||||
ids: HashMap::new(),
|
ids: HashMap::new(),
|
||||||
reqs: BTreeMap::new(),
|
reqs: BTreeMap::new(),
|
||||||
@ -52,10 +59,12 @@ impl Default for RequestSet {
|
|||||||
|
|
||||||
impl RequestSet {
|
impl RequestSet {
|
||||||
/// Push requests onto the stack.
|
/// Push requests onto the stack.
|
||||||
pub fn insert(&mut self, req_id: ReqId, req: Requests, now: SteadyTime) {
|
pub fn insert(&mut self, req_id: ReqId, req: Requests, cost: U256, now: SteadyTime) {
|
||||||
let counter = self.counter;
|
let counter = self.counter;
|
||||||
|
self.cumulative_cost = self.cumulative_cost + cost;
|
||||||
|
|
||||||
self.ids.insert(req_id, counter);
|
self.ids.insert(req_id, counter);
|
||||||
self.reqs.insert(counter, req);
|
self.reqs.insert(counter, Entry(req, cost));
|
||||||
|
|
||||||
if self.reqs.keys().next().map_or(true, |x| *x == counter) {
|
if self.reqs.keys().next().map_or(true, |x| *x == counter) {
|
||||||
self.base = Some(now);
|
self.base = Some(now);
|
||||||
@ -71,7 +80,7 @@ impl RequestSet {
|
|||||||
None => return None,
|
None => return None,
|
||||||
};
|
};
|
||||||
|
|
||||||
let req = self.reqs.remove(&id).expect("entry in `ids` implies entry in `reqs`; qed");
|
let Entry(req, cost) = self.reqs.remove(&id).expect("entry in `ids` implies entry in `reqs`; qed");
|
||||||
|
|
||||||
match self.reqs.keys().next() {
|
match self.reqs.keys().next() {
|
||||||
Some(k) if *k > id => self.base = Some(now),
|
Some(k) if *k > id => self.base = Some(now),
|
||||||
@ -79,6 +88,7 @@ impl RequestSet {
|
|||||||
_ => {}
|
_ => {}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
self.cumulative_cost = self.cumulative_cost - cost;
|
||||||
Some(req)
|
Some(req)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -93,7 +103,7 @@ impl RequestSet {
|
|||||||
let first_req = self.reqs.values().next()
|
let first_req = self.reqs.values().next()
|
||||||
.expect("base existing implies `reqs` non-empty; qed");
|
.expect("base existing implies `reqs` non-empty; qed");
|
||||||
|
|
||||||
base + compute_timeout(&first_req) <= now
|
base + compute_timeout(&first_req.0) <= now
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Collect all pending request ids.
|
/// Collect all pending request ids.
|
||||||
@ -108,6 +118,9 @@ impl RequestSet {
|
|||||||
|
|
||||||
/// Whether the set is empty.
|
/// Whether the set is empty.
|
||||||
pub fn is_empty(&self) -> bool { self.len() == 0 }
|
pub fn is_empty(&self) -> bool { self.len() == 0 }
|
||||||
|
|
||||||
|
/// The cumulative cost of all requests in the set.
|
||||||
|
pub fn cumulative_cost(&self) -> U256 { self.cumulative_cost }
|
||||||
}
|
}
|
||||||
|
|
||||||
// helper to calculate timeout for a specific set of requests.
|
// helper to calculate timeout for a specific set of requests.
|
||||||
@ -141,8 +154,8 @@ mod tests {
|
|||||||
|
|
||||||
let the_req = RequestBuilder::default().build();
|
let the_req = RequestBuilder::default().build();
|
||||||
let req_time = compute_timeout(&the_req);
|
let req_time = compute_timeout(&the_req);
|
||||||
req_set.insert(ReqId(0), the_req.clone(), test_begin);
|
req_set.insert(ReqId(0), the_req.clone(), 0.into(), test_begin);
|
||||||
req_set.insert(ReqId(1), the_req, test_begin + Duration::seconds(1));
|
req_set.insert(ReqId(1), the_req, 0.into(), test_begin + Duration::seconds(1));
|
||||||
|
|
||||||
assert_eq!(req_set.base, Some(test_begin));
|
assert_eq!(req_set.base, Some(test_begin));
|
||||||
|
|
||||||
@ -153,4 +166,22 @@ mod tests {
|
|||||||
assert!(!req_set.check_timeout(test_end));
|
assert!(!req_set.check_timeout(test_end));
|
||||||
assert!(req_set.check_timeout(test_end + Duration::seconds(1)));
|
assert!(req_set.check_timeout(test_end + Duration::seconds(1)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn cumulative_cost() {
|
||||||
|
let the_req = RequestBuilder::default().build();
|
||||||
|
let test_begin = SteadyTime::now();
|
||||||
|
let test_end = test_begin + Duration::seconds(1);
|
||||||
|
let mut req_set = RequestSet::default();
|
||||||
|
|
||||||
|
for i in 0..5 {
|
||||||
|
req_set.insert(ReqId(i), the_req.clone(), 1.into(), test_begin);
|
||||||
|
assert_eq!(req_set.cumulative_cost, (i + 1).into());
|
||||||
|
}
|
||||||
|
|
||||||
|
for i in (0..5).rev() {
|
||||||
|
assert!(req_set.remove(&ReqId(i), test_end).is_some());
|
||||||
|
assert_eq!(req_set.cumulative_cost, i.into());
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -600,8 +600,8 @@ fn id_guard() {
|
|||||||
|
|
||||||
let mut pending_requests = RequestSet::default();
|
let mut pending_requests = RequestSet::default();
|
||||||
|
|
||||||
pending_requests.insert(req_id_1, req.clone(), ::time::SteadyTime::now());
|
pending_requests.insert(req_id_1, req.clone(), 0.into(), ::time::SteadyTime::now());
|
||||||
pending_requests.insert(req_id_2, req, ::time::SteadyTime::now());
|
pending_requests.insert(req_id_2, req, 1.into(), ::time::SteadyTime::now());
|
||||||
|
|
||||||
proto.peers.write().insert(peer_id, ::util::Mutex::new(Peer {
|
proto.peers.write().insert(peer_id, ::util::Mutex::new(Peer {
|
||||||
local_credits: flow_params.create_credits(),
|
local_credits: flow_params.create_credits(),
|
||||||
|
@ -418,8 +418,10 @@ impl<L: AsLightClient> LightSync<L> {
|
|||||||
let best_td = chain_info.pending_total_difficulty;
|
let best_td = chain_info.pending_total_difficulty;
|
||||||
let sync_target = match *self.best_seen.lock() {
|
let sync_target = match *self.best_seen.lock() {
|
||||||
Some(ref target) if target.head_td > best_td => (target.head_num, target.head_hash),
|
Some(ref target) if target.head_td > best_td => (target.head_num, target.head_hash),
|
||||||
_ => {
|
ref other => {
|
||||||
trace!(target: "sync", "No target to sync to.");
|
let network_score = other.as_ref().map(|target| target.head_td);
|
||||||
|
trace!(target: "sync", "No target to sync to. Network score: {:?}, Local score: {:?}",
|
||||||
|
network_score, best_td);
|
||||||
*state = SyncState::Idle;
|
*state = SyncState::Idle;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user