fix deadlocks
This commit is contained in:
parent
8970946b74
commit
a7505be627
@ -284,8 +284,8 @@ impl LightProtocol {
|
|||||||
let peer = try!(peers.get(peer_id).ok_or_else(|| Error::UnknownPeer));
|
let peer = try!(peers.get(peer_id).ok_or_else(|| Error::UnknownPeer));
|
||||||
let mut peer = peer.lock();
|
let mut peer = peer.lock();
|
||||||
|
|
||||||
match peer.remote_flow.as_mut() {
|
match peer.remote_flow {
|
||||||
Some(&mut (ref mut buf, ref flow)) => {
|
Some((ref mut buf, ref flow)) => {
|
||||||
flow.recharge(buf);
|
flow.recharge(buf);
|
||||||
let max = flow.compute_cost(request.kind(), request.amount());
|
let max = flow.compute_cost(request.kind(), request.amount());
|
||||||
try!(buf.deduct_cost(max));
|
try!(buf.deduct_cost(max));
|
||||||
@ -296,6 +296,8 @@ impl LightProtocol {
|
|||||||
let req_id = self.req_id.fetch_add(1, Ordering::SeqCst);
|
let req_id = self.req_id.fetch_add(1, Ordering::SeqCst);
|
||||||
let packet_data = encode_request(&request, req_id);
|
let packet_data = encode_request(&request, req_id);
|
||||||
|
|
||||||
|
trace!(target: "les", "Dispatching request {} to peer {}", req_id, peer_id);
|
||||||
|
|
||||||
let packet_id = match request.kind() {
|
let packet_id = match request.kind() {
|
||||||
request::Kind::Headers => packet::GET_BLOCK_HEADERS,
|
request::Kind::Headers => packet::GET_BLOCK_HEADERS,
|
||||||
request::Kind::Bodies => packet::GET_BLOCK_BODIES,
|
request::Kind::Bodies => packet::GET_BLOCK_BODIES,
|
||||||
|
@ -193,9 +193,11 @@ impl<L: LightChainClient> Handler for LightSync<L> {
|
|||||||
head_num: status.head_num,
|
head_num: status.head_num,
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut best = self.best_seen.lock();
|
{
|
||||||
if best.as_ref().map_or(true, |b| status.head_td > b.1) {
|
let mut best = self.best_seen.lock();
|
||||||
*best = Some((status.head_hash, status.head_td));
|
if best.as_ref().map_or(true, |b| status.head_td > b.1) {
|
||||||
|
*best = Some((status.head_hash, status.head_td));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
self.peers.write().insert(ctx.peer(), Mutex::new(Peer::new(chain_info)));
|
self.peers.write().insert(ctx.peer(), Mutex::new(Peer::new(chain_info)));
|
||||||
@ -210,6 +212,8 @@ impl<L: LightChainClient> Handler for LightSync<L> {
|
|||||||
None => return,
|
None => return,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
trace!(target: "sync", "peer {} disconnecting", peer_id);
|
||||||
|
|
||||||
let new_best = {
|
let new_best = {
|
||||||
let mut best = self.best_seen.lock();
|
let mut best = self.best_seen.lock();
|
||||||
let peer_best = (peer.status.head_hash, peer.status.head_td);
|
let peer_best = (peer.status.head_hash, peer.status.head_td);
|
||||||
@ -249,7 +253,7 @@ impl<L: LightChainClient> Handler for LightSync<L> {
|
|||||||
fn on_announcement(&self, ctx: &EventContext, announcement: &Announcement) {
|
fn on_announcement(&self, ctx: &EventContext, announcement: &Announcement) {
|
||||||
let last_td = {
|
let last_td = {
|
||||||
let peers = self.peers.read();
|
let peers = self.peers.read();
|
||||||
match peers.get(&ctx.peer()){
|
match peers.get(&ctx.peer()) {
|
||||||
None => return,
|
None => return,
|
||||||
Some(peer) => {
|
Some(peer) => {
|
||||||
let mut peer = peer.lock();
|
let mut peer = peer.lock();
|
||||||
@ -273,9 +277,11 @@ impl<L: LightChainClient> Handler for LightSync<L> {
|
|||||||
ctx.disconnect_peer(ctx.peer());
|
ctx.disconnect_peer(ctx.peer());
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut best = self.best_seen.lock();
|
{
|
||||||
if best.as_ref().map_or(true, |b| announcement.head_td > b.1) {
|
let mut best = self.best_seen.lock();
|
||||||
*best = Some((announcement.head_hash, announcement.head_td));
|
if best.as_ref().map_or(true, |b| announcement.head_td > b.1) {
|
||||||
|
*best = Some((announcement.head_hash, announcement.head_td));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
self.maintain_sync(ctx.as_basic());
|
self.maintain_sync(ctx.as_basic());
|
||||||
@ -326,12 +332,16 @@ impl<L: LightChainClient> LightSync<L> {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
trace!(target: "sync", "Beginning search for common ancestor");
|
||||||
|
|
||||||
*state = SyncState::AncestorSearch(AncestorSearch::begin(chain_info.best_block_number));
|
*state = SyncState::AncestorSearch(AncestorSearch::begin(chain_info.best_block_number));
|
||||||
}
|
}
|
||||||
|
|
||||||
fn maintain_sync(&self, ctx: &BasicContext) {
|
fn maintain_sync(&self, ctx: &BasicContext) {
|
||||||
const DRAIN_AMOUNT: usize = 128;
|
const DRAIN_AMOUNT: usize = 128;
|
||||||
|
|
||||||
|
debug!(target: "sync", "Maintaining sync.");
|
||||||
|
|
||||||
let mut state = self.state.lock();
|
let mut state = self.state.lock();
|
||||||
|
|
||||||
// drain any pending blocks into the queue.
|
// drain any pending blocks into the queue.
|
||||||
|
Loading…
Reference in New Issue
Block a user