exclusive access to each peer at a time

This commit is contained in:
Robert Habermeier 2016-11-18 19:26:05 +01:00
parent 4fd9670b33
commit 48df2e12fa
1 changed files with 38 additions and 21 deletions

View File

@ -99,7 +99,7 @@ struct PendingPeer {
// data about each peer.
struct Peer {
local_buffer: Mutex<Buffer>, // their buffer relative to us
local_buffer: Buffer, // their buffer relative to us
remote_buffer: Buffer, // our buffer relative to them
current_asking: HashSet<usize>, // pending request ids.
status: Status,
@ -112,21 +112,25 @@ impl Peer {
// check the maximum cost of a request, returning an error if there's
// not enough buffer left.
// returns the calculated maximum cost.
fn deduct_max(&self, flow_params: &FlowParams, kind: request::Kind, max: usize) -> Result<U256, Error> {
let mut local_buffer = self.local_buffer.lock();
flow_params.recharge(&mut local_buffer);
fn deduct_max(&mut self, flow_params: &FlowParams, kind: request::Kind, max: usize) -> Result<U256, Error> {
flow_params.recharge(&mut self.local_buffer);
let max_cost = flow_params.compute_cost(kind, max);
try!(local_buffer.deduct_cost(max_cost));
try!(self.local_buffer.deduct_cost(max_cost));
Ok(max_cost)
}
// refund buffer for a request. returns new buffer amount.
fn refund(&self, flow_params: &FlowParams, amount: U256) -> U256 {
let mut local_buffer = self.local_buffer.lock();
flow_params.refund(&mut local_buffer, amount);
fn refund(&mut self, flow_params: &FlowParams, amount: U256) -> U256 {
flow_params.refund(&mut self.local_buffer, amount);
local_buffer.current()
self.local_buffer.current()
}
// recharge remote buffer with remote flow params.
fn recharge_remote(&mut self) {
let flow = &mut self.remote_flow;
flow.recharge(&mut self.remote_buffer);
}
}
@ -171,7 +175,7 @@ pub struct LightProtocol {
genesis_hash: H256,
network_id: NetworkId,
pending_peers: RwLock<HashMap<PeerId, PendingPeer>>,
peers: RwLock<HashMap<PeerId, Peer>>,
peers: RwLock<HashMap<PeerId, Mutex<Peer>>>,
pending_requests: RwLock<HashMap<usize, Requested>>,
capabilities: RwLock<Capabilities>,
flow_params: FlowParams, // assumed static and same for every peer.
@ -199,8 +203,9 @@ impl LightProtocol {
/// Check the maximum amount of requests of a specific type
/// which a peer would be able to serve.
pub fn max_requests(&self, peer: PeerId, kind: request::Kind) -> Option<usize> {
self.peers.write().get_mut(&peer).map(|peer| {
peer.remote_flow.recharge(&mut peer.remote_buffer);
self.peers.read().get(&peer).map(|peer| {
let mut peer = peer.lock();
peer.recharge_remote();
peer.remote_flow.max_amount(&peer.remote_buffer, kind)
})
}
@ -212,9 +217,11 @@ impl LightProtocol {
/// On success, returns a request id which can later be coordinated
/// with an event.
pub fn request_from(&self, io: &NetworkContext, peer_id: &PeerId, request: Request) -> Result<ReqId, Error> {
let mut peers = self.peers.write();
let peer = try!(peers.get_mut(peer_id).ok_or_else(|| Error::UnknownPeer));
peer.remote_flow.recharge(&mut peer.remote_buffer);
let peers = self.peers.read();
let peer = try!(peers.get(peer_id).ok_or_else(|| Error::UnknownPeer));
let mut peer = peer.lock();
peer.recharge_remote();
let max = peer.remote_flow.compute_cost(request.kind(), request.amount());
try!(peer.remote_buffer.deduct_cost(max));
@ -251,7 +258,8 @@ impl LightProtocol {
self.capabilities.write().update_from(&announcement);
// calculate reorg info and send packets
for (peer_id, peer_info) in self.peers.write().iter_mut() {
for (peer_id, peer_info) in self.peers.read().iter() {
let mut peer_info = peer_info.lock();
let reorg_depth = reorgs_map.entry(peer_info.sent_head)
.or_insert_with(|| {
match self.provider.reorg_depth(&announcement.head_hash, &peer_info.sent_head) {
@ -354,15 +362,15 @@ impl LightProtocol {
return Err(Error::WrongNetwork);
}
self.peers.write().insert(*peer, Peer {
local_buffer: Mutex::new(self.flow_params.create_buffer()),
self.peers.write().insert(*peer, Mutex::new(Peer {
local_buffer: self.flow_params.create_buffer(),
remote_buffer: flow_params.create_buffer(),
current_asking: HashSet::new(),
status: status.clone(),
capabilities: capabilities.clone(),
remote_flow: flow_params,
sent_head: pending.sent_head,
});
}));
for handler in &self.handlers {
handler.on_connect(*peer, &status, &capabilities)
@ -379,13 +387,15 @@ impl LightProtocol {
}
let announcement = try!(status::parse_announcement(data));
let mut peers = self.peers.write();
let peers = self.peers.read();
let peer_info = match peers.get_mut(peer) {
let peer_info = match peers.get(peer) {
Some(info) => info,
None => return Ok(()),
};
let mut peer_info = peer_info.lock();
// update status.
{
// TODO: punish peer if they've moved backwards.
@ -420,6 +430,8 @@ impl LightProtocol {
}
};
let mut peer = peer.lock();
let req_id: u64 = try!(data.val_at(0));
let block = {
@ -471,6 +483,7 @@ impl LightProtocol {
return Ok(())
}
};
let mut peer = peer.lock();
let req_id: u64 = try!(data.val_at(0));
@ -516,6 +529,7 @@ impl LightProtocol {
return Ok(())
}
};
let mut peer = peer.lock();
let req_id: u64 = try!(data.val_at(0));
@ -561,6 +575,7 @@ impl LightProtocol {
return Ok(())
}
};
let mut peer = peer.lock();
let req_id: u64 = try!(data.val_at(0));
@ -617,6 +632,7 @@ impl LightProtocol {
return Ok(())
}
};
let mut peer = peer.lock();
let req_id: u64 = try!(data.val_at(0));
@ -671,6 +687,7 @@ impl LightProtocol {
return Ok(())
}
};
let mut peer = peer.lock();
let req_id: u64 = try!(data.val_at(0));