From 48df2e12facf6265d44fb1828e2ee03abf2a0af4 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Fri, 18 Nov 2016 19:26:05 +0100 Subject: [PATCH] exclusive access to each peer at a time --- ethcore/src/light/net/mod.rs | 59 +++++++++++++++++++++++------------- 1 file changed, 38 insertions(+), 21 deletions(-) diff --git a/ethcore/src/light/net/mod.rs b/ethcore/src/light/net/mod.rs index a2202d460..4b92e262e 100644 --- a/ethcore/src/light/net/mod.rs +++ b/ethcore/src/light/net/mod.rs @@ -99,7 +99,7 @@ struct PendingPeer { // data about each peer. struct Peer { - local_buffer: Mutex, // their buffer relative to us + local_buffer: Buffer, // their buffer relative to us remote_buffer: Buffer, // our buffer relative to them current_asking: HashSet, // pending request ids. status: Status, @@ -112,21 +112,25 @@ impl Peer { // check the maximum cost of a request, returning an error if there's // not enough buffer left. // returns the calculated maximum cost. - fn deduct_max(&self, flow_params: &FlowParams, kind: request::Kind, max: usize) -> Result { - let mut local_buffer = self.local_buffer.lock(); - flow_params.recharge(&mut local_buffer); + fn deduct_max(&mut self, flow_params: &FlowParams, kind: request::Kind, max: usize) -> Result { + flow_params.recharge(&mut self.local_buffer); let max_cost = flow_params.compute_cost(kind, max); - try!(local_buffer.deduct_cost(max_cost)); + try!(self.local_buffer.deduct_cost(max_cost)); Ok(max_cost) } // refund buffer for a request. returns new buffer amount. - fn refund(&self, flow_params: &FlowParams, amount: U256) -> U256 { - let mut local_buffer = self.local_buffer.lock(); - flow_params.refund(&mut local_buffer, amount); + fn refund(&mut self, flow_params: &FlowParams, amount: U256) -> U256 { + flow_params.refund(&mut self.local_buffer, amount); - local_buffer.current() + self.local_buffer.current() + } + + // recharge remote buffer with remote flow params. + fn recharge_remote(&mut self) { + let flow = &mut self.remote_flow; + flow.recharge(&mut self.remote_buffer); } } @@ -171,7 +175,7 @@ pub struct LightProtocol { genesis_hash: H256, network_id: NetworkId, pending_peers: RwLock>, - peers: RwLock>, + peers: RwLock>>, pending_requests: RwLock>, capabilities: RwLock, flow_params: FlowParams, // assumed static and same for every peer. @@ -199,8 +203,9 @@ impl LightProtocol { /// Check the maximum amount of requests of a specific type /// which a peer would be able to serve. pub fn max_requests(&self, peer: PeerId, kind: request::Kind) -> Option { - self.peers.write().get_mut(&peer).map(|peer| { - peer.remote_flow.recharge(&mut peer.remote_buffer); + self.peers.read().get(&peer).map(|peer| { + let mut peer = peer.lock(); + peer.recharge_remote(); peer.remote_flow.max_amount(&peer.remote_buffer, kind) }) } @@ -212,9 +217,11 @@ impl LightProtocol { /// On success, returns a request id which can later be coordinated /// with an event. pub fn request_from(&self, io: &NetworkContext, peer_id: &PeerId, request: Request) -> Result { - let mut peers = self.peers.write(); - let peer = try!(peers.get_mut(peer_id).ok_or_else(|| Error::UnknownPeer)); - peer.remote_flow.recharge(&mut peer.remote_buffer); + let peers = self.peers.read(); + let peer = try!(peers.get(peer_id).ok_or_else(|| Error::UnknownPeer)); + let mut peer = peer.lock(); + + peer.recharge_remote(); let max = peer.remote_flow.compute_cost(request.kind(), request.amount()); try!(peer.remote_buffer.deduct_cost(max)); @@ -251,7 +258,8 @@ impl LightProtocol { self.capabilities.write().update_from(&announcement); // calculate reorg info and send packets - for (peer_id, peer_info) in self.peers.write().iter_mut() { + for (peer_id, peer_info) in self.peers.read().iter() { + let mut peer_info = peer_info.lock(); let reorg_depth = reorgs_map.entry(peer_info.sent_head) .or_insert_with(|| { match self.provider.reorg_depth(&announcement.head_hash, &peer_info.sent_head) { @@ -354,15 +362,15 @@ impl LightProtocol { return Err(Error::WrongNetwork); } - self.peers.write().insert(*peer, Peer { - local_buffer: Mutex::new(self.flow_params.create_buffer()), + self.peers.write().insert(*peer, Mutex::new(Peer { + local_buffer: self.flow_params.create_buffer(), remote_buffer: flow_params.create_buffer(), current_asking: HashSet::new(), status: status.clone(), capabilities: capabilities.clone(), remote_flow: flow_params, sent_head: pending.sent_head, - }); + })); for handler in &self.handlers { handler.on_connect(*peer, &status, &capabilities) @@ -379,13 +387,15 @@ impl LightProtocol { } let announcement = try!(status::parse_announcement(data)); - let mut peers = self.peers.write(); + let peers = self.peers.read(); - let peer_info = match peers.get_mut(peer) { + let peer_info = match peers.get(peer) { Some(info) => info, None => return Ok(()), }; + let mut peer_info = peer_info.lock(); + // update status. { // TODO: punish peer if they've moved backwards. @@ -420,6 +430,8 @@ impl LightProtocol { } }; + let mut peer = peer.lock(); + let req_id: u64 = try!(data.val_at(0)); let block = { @@ -471,6 +483,7 @@ impl LightProtocol { return Ok(()) } }; + let mut peer = peer.lock(); let req_id: u64 = try!(data.val_at(0)); @@ -516,6 +529,7 @@ impl LightProtocol { return Ok(()) } }; + let mut peer = peer.lock(); let req_id: u64 = try!(data.val_at(0)); @@ -561,6 +575,7 @@ impl LightProtocol { return Ok(()) } }; + let mut peer = peer.lock(); let req_id: u64 = try!(data.val_at(0)); @@ -617,6 +632,7 @@ impl LightProtocol { return Ok(()) } }; + let mut peer = peer.lock(); let req_id: u64 = try!(data.val_at(0)); @@ -671,6 +687,7 @@ impl LightProtocol { return Ok(()) } }; + let mut peer = peer.lock(); let req_id: u64 = try!(data.val_at(0));