More tweaks
This commit is contained in:
parent
0e905a06d9
commit
1e8bf8c89d
@ -115,7 +115,7 @@ fn can_collect_garbage() {
|
|||||||
fn can_handle_long_fork() {
|
fn can_handle_long_fork() {
|
||||||
let client_result = generate_dummy_client(1200);
|
let client_result = generate_dummy_client(1200);
|
||||||
let client = client_result.reference();
|
let client = client_result.reference();
|
||||||
for _ in 0..10 {
|
for _ in 0..20 {
|
||||||
client.import_verified_blocks(&IoChannel::disconnected());
|
client.import_verified_blocks(&IoChannel::disconnected());
|
||||||
}
|
}
|
||||||
assert_eq!(1200, client.chain_info().best_block_number);
|
assert_eq!(1200, client.chain_info().best_block_number);
|
||||||
@ -124,7 +124,7 @@ fn can_handle_long_fork() {
|
|||||||
push_blocks_to_client(client, 49, 1201, 800);
|
push_blocks_to_client(client, 49, 1201, 800);
|
||||||
push_blocks_to_client(client, 53, 1201, 600);
|
push_blocks_to_client(client, 53, 1201, 600);
|
||||||
|
|
||||||
for _ in 0..20 {
|
for _ in 0..40 {
|
||||||
client.import_verified_blocks(&IoChannel::disconnected());
|
client.import_verified_blocks(&IoChannel::disconnected());
|
||||||
}
|
}
|
||||||
assert_eq!(2000, client.chain_info().best_block_number);
|
assert_eq!(2000, client.chain_info().best_block_number);
|
||||||
|
@ -113,11 +113,12 @@ const MAX_HEADERS_TO_SEND: usize = 512;
|
|||||||
const MAX_NODE_DATA_TO_SEND: usize = 1024;
|
const MAX_NODE_DATA_TO_SEND: usize = 1024;
|
||||||
const MAX_RECEIPTS_TO_SEND: usize = 1024;
|
const MAX_RECEIPTS_TO_SEND: usize = 1024;
|
||||||
const MAX_RECEIPTS_HEADERS_TO_SEND: usize = 256;
|
const MAX_RECEIPTS_HEADERS_TO_SEND: usize = 256;
|
||||||
const MAX_HEADERS_TO_REQUEST: usize = 256;
|
const MAX_HEADERS_TO_REQUEST: usize = 128;
|
||||||
const MAX_BODIES_TO_REQUEST: usize = 64;
|
const MAX_BODIES_TO_REQUEST: usize = 64;
|
||||||
const MIN_PEERS_PROPAGATION: usize = 4;
|
const MIN_PEERS_PROPAGATION: usize = 4;
|
||||||
const MAX_PEERS_PROPAGATION: usize = 128;
|
const MAX_PEERS_PROPAGATION: usize = 128;
|
||||||
const MAX_PEER_LAG_PROPAGATION: BlockNumber = 20;
|
const MAX_PEER_LAG_PROPAGATION: BlockNumber = 20;
|
||||||
|
const SUBCHAIN_SIZE: usize = 64;
|
||||||
|
|
||||||
const STATUS_PACKET: u8 = 0x00;
|
const STATUS_PACKET: u8 = 0x00;
|
||||||
const NEW_BLOCK_HASHES_PACKET: u8 = 0x01;
|
const NEW_BLOCK_HASHES_PACKET: u8 = 0x01;
|
||||||
@ -639,7 +640,7 @@ impl ChainSync {
|
|||||||
self.sync_peer(io, p, false);
|
self.sync_peer(io, p, false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if !self.peers.values().any(|p| p.asking != PeerAsking::Nothing) {
|
if self.state != SyncState::Waiting && !self.peers.values().any(|p| p.asking != PeerAsking::Nothing) {
|
||||||
self.complete_sync();
|
self.complete_sync();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -665,7 +666,7 @@ impl ChainSync {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if self.state == SyncState::Waiting {
|
if self.state == SyncState::Waiting {
|
||||||
trace!(target: "sync", "Waiting for block queue");
|
trace!(target: "sync", "Waiting for the block queue");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
(peer.latest_hash.clone(), peer.difficulty.clone())
|
(peer.latest_hash.clone(), peer.difficulty.clone())
|
||||||
@ -689,7 +690,7 @@ impl ChainSync {
|
|||||||
// Request subchain headers
|
// Request subchain headers
|
||||||
trace!(target: "sync", "Starting sync with better chain");
|
trace!(target: "sync", "Starting sync with better chain");
|
||||||
let last = self.last_imported_hash.clone();
|
let last = self.last_imported_hash.clone();
|
||||||
self.request_headers_by_hash(io, peer_id, &last, 128, 255, false, PeerAsking::Heads);
|
self.request_headers_by_hash(io, peer_id, &last, SUBCHAIN_SIZE, MAX_HEADERS_TO_REQUEST - 1, false, PeerAsking::Heads);
|
||||||
},
|
},
|
||||||
SyncState::Blocks | SyncState::NewBlocks => {
|
SyncState::Blocks | SyncState::NewBlocks => {
|
||||||
if io.chain().block_status(BlockID::Hash(peer_latest)) == BlockStatus::Unknown {
|
if io.chain().block_status(BlockID::Hash(peer_latest)) == BlockStatus::Unknown {
|
||||||
@ -704,6 +705,8 @@ impl ChainSync {
|
|||||||
fn start_sync_round(&mut self, io: &mut SyncIo) {
|
fn start_sync_round(&mut self, io: &mut SyncIo) {
|
||||||
self.state = SyncState::ChainHead;
|
self.state = SyncState::ChainHead;
|
||||||
trace!(target: "sync", "Starting round (last imported count = {:?}, block = {:?}", self.imported_this_round, self.last_imported_block);
|
trace!(target: "sync", "Starting round (last imported count = {:?}, block = {:?}", self.imported_this_round, self.last_imported_block);
|
||||||
|
// Check if need to retract to find the common block. The problem is that the peers still return headers by hash even
|
||||||
|
// from the non-canonical part of the tree. So we also retract if nothing has been imported last round.
|
||||||
if self.imported_this_round.is_some() && self.imported_this_round.unwrap() == 0 && self.last_imported_block > 0 {
|
if self.imported_this_round.is_some() && self.imported_this_round.unwrap() == 0 && self.last_imported_block > 0 {
|
||||||
match io.chain().block_hash(BlockID::Number(self.last_imported_block - 1)) {
|
match io.chain().block_hash(BlockID::Number(self.last_imported_block - 1)) {
|
||||||
Some(h) => {
|
Some(h) => {
|
||||||
@ -781,9 +784,13 @@ impl ChainSync {
|
|||||||
|
|
||||||
match io.chain().import_block(block) {
|
match io.chain().import_block(block) {
|
||||||
Err(Error::Import(ImportError::AlreadyInChain)) => {
|
Err(Error::Import(ImportError::AlreadyInChain)) => {
|
||||||
|
self.last_imported_block = number;
|
||||||
|
self.last_imported_hash = h.clone();
|
||||||
trace!(target: "sync", "Block already in chain {:?}", h);
|
trace!(target: "sync", "Block already in chain {:?}", h);
|
||||||
},
|
},
|
||||||
Err(Error::Import(ImportError::AlreadyQueued)) => {
|
Err(Error::Import(ImportError::AlreadyQueued)) => {
|
||||||
|
self.last_imported_block = number;
|
||||||
|
self.last_imported_hash = h.clone();
|
||||||
trace!(target: "sync", "Block already queued {:?}", h);
|
trace!(target: "sync", "Block already queued {:?}", h);
|
||||||
},
|
},
|
||||||
Ok(_) => {
|
Ok(_) => {
|
||||||
@ -856,13 +863,10 @@ impl ChainSync {
|
|||||||
|
|
||||||
/// Generic request sender
|
/// Generic request sender
|
||||||
fn send_request(&mut self, sync: &mut SyncIo, peer_id: PeerId, asking: PeerAsking, packet_id: PacketId, packet: Bytes) {
|
fn send_request(&mut self, sync: &mut SyncIo, peer_id: PeerId, asking: PeerAsking, packet_id: PacketId, packet: Bytes) {
|
||||||
{
|
|
||||||
let peer = self.peers.get_mut(&peer_id).unwrap();
|
let peer = self.peers.get_mut(&peer_id).unwrap();
|
||||||
if peer.asking != PeerAsking::Nothing {
|
if peer.asking != PeerAsking::Nothing {
|
||||||
warn!(target:"sync", "Asking {:?} while requesting {:?}", peer.asking, asking);
|
warn!(target:"sync", "Asking {:?} while requesting {:?}", peer.asking, asking);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
let mut peer = self.peers.get_mut(&peer_id).unwrap();
|
|
||||||
peer.asking = asking;
|
peer.asking = asking;
|
||||||
peer.ask_time = time::precise_time_s();
|
peer.ask_time = time::precise_time_s();
|
||||||
if let Err(e) = sync.send(peer_id, packet_id, packet) {
|
if let Err(e) = sync.send(peer_id, packet_id, packet) {
|
||||||
@ -1095,6 +1099,7 @@ impl ChainSync {
|
|||||||
let tick = time::precise_time_s();
|
let tick = time::precise_time_s();
|
||||||
for (peer_id, peer) in &self.peers {
|
for (peer_id, peer) in &self.peers {
|
||||||
if peer.asking != PeerAsking::Nothing && (tick - peer.ask_time) > CONNECTION_TIMEOUT_SEC {
|
if peer.asking != PeerAsking::Nothing && (tick - peer.ask_time) > CONNECTION_TIMEOUT_SEC {
|
||||||
|
trace!(target:"sync", "Timeouted {}", peer_id);
|
||||||
io.disconnect_peer(*peer_id);
|
io.disconnect_peer(*peer_id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user