Merge branch 'master' into miner-no-default
Conflicts: sync/src/lib.rs
This commit is contained in:
@@ -10,7 +10,7 @@ authors = ["Ethcore <admin@ethcore.io"]
|
||||
[dependencies]
|
||||
ethcore-util = { path = "../util" }
|
||||
ethcore = { path = "../ethcore" }
|
||||
clippy = { version = "0.0.76", optional = true}
|
||||
clippy = { version = "0.0.77", optional = true}
|
||||
log = "0.3"
|
||||
env_logger = "0.3"
|
||||
time = "0.1.34"
|
||||
|
||||
@@ -95,12 +95,17 @@ impl BlockCollection {
|
||||
}
|
||||
|
||||
/// Insert a collection of block bodies for previously downloaded headers.
|
||||
pub fn insert_bodies(&mut self, bodies: Vec<Bytes>) {
|
||||
pub fn insert_bodies(&mut self, bodies: Vec<Bytes>) -> usize {
|
||||
let mut inserted = 0;
|
||||
for b in bodies.into_iter() {
|
||||
if let Err(e) = self.insert_body(b) {
|
||||
trace!(target: "sync", "Ignored invalid body: {:?}", e);
|
||||
}
|
||||
else {
|
||||
inserted += 1;
|
||||
}
|
||||
}
|
||||
inserted
|
||||
}
|
||||
|
||||
/// Returns a set of block hashes that require a body download. The returned set is marked as being downloaded.
|
||||
@@ -231,13 +236,19 @@ impl BlockCollection {
|
||||
Some(ref mut block) => {
|
||||
trace!(target: "sync", "Got body {}", h);
|
||||
block.body = Some(body.as_raw().to_vec());
|
||||
Ok(())
|
||||
},
|
||||
None => warn!("Got body with no header {}", h)
|
||||
None => {
|
||||
warn!("Got body with no header {}", h);
|
||||
Err(UtilError::Network(NetworkError::BadProtocol))
|
||||
}
|
||||
}
|
||||
}
|
||||
None => trace!(target: "sync", "Ignored unknown/stale block body")
|
||||
};
|
||||
Ok(())
|
||||
None => {
|
||||
trace!(target: "sync", "Ignored unknown/stale block body");
|
||||
Err(UtilError::Network(NetworkError::BadProtocol))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn insert_header(&mut self, header: Bytes) -> Result<H256, UtilError> {
|
||||
@@ -265,7 +276,7 @@ impl BlockCollection {
|
||||
if header_id.transactions_root == rlp::SHA3_NULL_RLP && header_id.uncles == rlp::SHA3_EMPTY_LIST_RLP {
|
||||
// empty body, just mark as downloaded
|
||||
let mut body_stream = RlpStream::new_list(2);
|
||||
body_stream.append_raw(&rlp::NULL_RLP, 1);
|
||||
body_stream.append_raw(&rlp::EMPTY_LIST_RLP, 1);
|
||||
body_stream.append_raw(&rlp::EMPTY_LIST_RLP, 1);
|
||||
block.body = Some(body_stream.out());
|
||||
}
|
||||
@@ -284,6 +295,10 @@ impl BlockCollection {
|
||||
let old_subchains: HashSet<_> = { self.heads.iter().cloned().collect() };
|
||||
for s in self.heads.drain(..) {
|
||||
let mut h = s.clone();
|
||||
if !self.blocks.contains_key(&h) {
|
||||
new_heads.push(h);
|
||||
continue;
|
||||
}
|
||||
loop {
|
||||
match self.parents.get(&h) {
|
||||
Some(next) => {
|
||||
@@ -383,7 +398,7 @@ mod test {
|
||||
assert_eq!(&bc.drain()[..], &blocks[6..16]);
|
||||
assert_eq!(hashes[15], bc.heads[0]);
|
||||
|
||||
bc.insert_headers(headers[16..].to_vec());
|
||||
bc.insert_headers(headers[15..].to_vec());
|
||||
bc.drain();
|
||||
assert!(bc.is_empty());
|
||||
}
|
||||
@@ -409,5 +424,24 @@ mod test {
|
||||
assert!(bc.head.is_some());
|
||||
assert_eq!(hashes[21], bc.heads[0]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn insert_headers_no_gap() {
|
||||
let mut bc = BlockCollection::new();
|
||||
assert!(is_empty(&bc));
|
||||
let client = TestBlockChainClient::new();
|
||||
let nblocks = 200;
|
||||
client.add_blocks(nblocks, EachBlockWith::Nothing);
|
||||
let blocks: Vec<_> = (0 .. nblocks).map(|i| (&client as &BlockChainClient).block(BlockID::Number(i as BlockNumber)).unwrap()).collect();
|
||||
let headers: Vec<_> = blocks.iter().map(|b| Rlp::new(b).at(0).as_raw().to_vec()).collect();
|
||||
let hashes: Vec<_> = headers.iter().map(|h| HeaderView::new(h).sha3()).collect();
|
||||
let heads: Vec<_> = hashes.iter().enumerate().filter_map(|(i, h)| if i % 20 == 0 { Some(h.clone()) } else { None }).collect();
|
||||
bc.reset_to(heads);
|
||||
|
||||
bc.insert_headers(headers[1..2].to_vec());
|
||||
assert!(bc.drain().is_empty());
|
||||
bc.insert_headers(headers[0..1].to_vec());
|
||||
assert_eq!(bc.drain().len(), 2);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -95,12 +95,12 @@ use ethcore::views::{HeaderView, BlockView};
|
||||
use ethcore::header::{BlockNumber, Header as BlockHeader};
|
||||
use ethcore::client::{BlockChainClient, BlockStatus, BlockID, BlockChainInfo};
|
||||
use ethcore::error::*;
|
||||
use ethcore::transaction::SignedTransaction;
|
||||
use ethcore::block::Block;
|
||||
use io::SyncIo;
|
||||
use time;
|
||||
use super::SyncConfig;
|
||||
use blocks::BlockCollection;
|
||||
use rand::{thread_rng, Rng};
|
||||
|
||||
known_heap_size!(0, PeerInfo);
|
||||
|
||||
@@ -309,7 +309,6 @@ impl ChainSync {
|
||||
}
|
||||
self.syncing_difficulty = From::from(0u64);
|
||||
self.state = SyncState::Idle;
|
||||
self.blocks.clear();
|
||||
self.active_peers = self.peers.keys().cloned().collect();
|
||||
}
|
||||
|
||||
@@ -361,12 +360,12 @@ impl ChainSync {
|
||||
|
||||
trace!(target: "sync", "New peer {} (protocol: {}, network: {:?}, difficulty: {:?}, latest:{}, genesis:{})", peer_id, peer.protocol_version, peer.network_id, peer.difficulty, peer.latest_hash, peer.genesis);
|
||||
if io.is_expired() {
|
||||
trace!("Status packet from expired session {}:{}", peer_id, io.peer_info(peer_id));
|
||||
trace!(target: "sync", "Status packet from expired session {}:{}", peer_id, io.peer_info(peer_id));
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if self.peers.contains_key(&peer_id) {
|
||||
warn!("Unexpected status packet from {}:{}", peer_id, io.peer_info(peer_id));
|
||||
debug!(target: "sync", "Unexpected status packet from {}:{}", peer_id, io.peer_info(peer_id));
|
||||
return Ok(());
|
||||
}
|
||||
let chain_info = io.chain().chain_info();
|
||||
@@ -394,7 +393,7 @@ impl ChainSync {
|
||||
self.clear_peer_download(peer_id);
|
||||
let expected_hash = self.peers.get(&peer_id).and_then(|p| p.asking_hash);
|
||||
let expected_asking = if self.state == SyncState::ChainHead { PeerAsking::Heads } else { PeerAsking::BlockHeaders };
|
||||
if !self.reset_peer_asking(peer_id, expected_asking) {
|
||||
if !self.reset_peer_asking(peer_id, expected_asking) || expected_hash.is_none() {
|
||||
trace!(target: "sync", "Ignored unexpected headers");
|
||||
self.continue_sync(io);
|
||||
return Ok(());
|
||||
@@ -517,7 +516,10 @@ impl ChainSync {
|
||||
for i in 0..item_count {
|
||||
bodies.push(try!(r.at(i)).as_raw().to_vec());
|
||||
}
|
||||
self.blocks.insert_bodies(bodies);
|
||||
if self.blocks.insert_bodies(bodies) != item_count {
|
||||
trace!(target: "sync", "Deactivating peer for giving invalid block bodies");
|
||||
self.deactivate_peer(io, peer_id);
|
||||
}
|
||||
self.collect_blocks(io);
|
||||
}
|
||||
self.continue_sync(io);
|
||||
@@ -531,10 +533,6 @@ impl ChainSync {
|
||||
let header_rlp = try!(block_rlp.at(0));
|
||||
let h = header_rlp.as_raw().sha3();
|
||||
trace!(target: "sync", "{} -> NewBlock ({})", peer_id, h);
|
||||
if self.state != SyncState::Idle {
|
||||
trace!(target: "sync", "NewBlock ignored while seeking");
|
||||
return Ok(());
|
||||
}
|
||||
let header: BlockHeader = try!(header_rlp.as_val());
|
||||
let mut unknown = false;
|
||||
{
|
||||
@@ -542,46 +540,45 @@ impl ChainSync {
|
||||
peer.latest_hash = header.hash();
|
||||
peer.latest_number = Some(header.number());
|
||||
}
|
||||
if header.number <= self.last_imported_block + 1 {
|
||||
match io.chain().import_block(block_rlp.as_raw().to_vec()) {
|
||||
Err(Error::Import(ImportError::AlreadyInChain)) => {
|
||||
trace!(target: "sync", "New block already in chain {:?}", h);
|
||||
},
|
||||
Err(Error::Import(ImportError::AlreadyQueued)) => {
|
||||
trace!(target: "sync", "New block already queued {:?}", h);
|
||||
},
|
||||
Ok(_) => {
|
||||
if header.number == self.last_imported_block + 1 {
|
||||
self.last_imported_block = header.number;
|
||||
self.last_imported_hash = header.hash();
|
||||
}
|
||||
trace!(target: "sync", "New block queued {:?} ({})", h, header.number);
|
||||
},
|
||||
Err(Error::Block(BlockError::UnknownParent(p))) => {
|
||||
unknown = true;
|
||||
trace!(target: "sync", "New block with unknown parent ({:?}) {:?}", p, h);
|
||||
},
|
||||
Err(e) => {
|
||||
debug!(target: "sync", "Bad new block {:?} : {:?}", h, e);
|
||||
io.disable_peer(peer_id);
|
||||
}
|
||||
};
|
||||
}
|
||||
else {
|
||||
unknown = true;
|
||||
}
|
||||
if unknown {
|
||||
trace!(target: "sync", "New unknown block {:?}", h);
|
||||
//TODO: handle too many unknown blocks
|
||||
let difficulty: U256 = try!(r.val_at(1));
|
||||
if let Some(ref mut peer) = self.peers.get_mut(&peer_id) {
|
||||
if peer.difficulty.map_or(true, |pd| difficulty > pd) {
|
||||
//self.state = SyncState::ChainHead;
|
||||
peer.difficulty = Some(difficulty);
|
||||
trace!(target: "sync", "Received block {:?} with no known parent. Peer needs syncing...", h);
|
||||
match io.chain().import_block(block_rlp.as_raw().to_vec()) {
|
||||
Err(Error::Import(ImportError::AlreadyInChain)) => {
|
||||
trace!(target: "sync", "New block already in chain {:?}", h);
|
||||
},
|
||||
Err(Error::Import(ImportError::AlreadyQueued)) => {
|
||||
trace!(target: "sync", "New block already queued {:?}", h);
|
||||
},
|
||||
Ok(_) => {
|
||||
if header.number == self.last_imported_block + 1 {
|
||||
self.last_imported_block = header.number;
|
||||
self.last_imported_hash = header.hash();
|
||||
}
|
||||
trace!(target: "sync", "New block queued {:?} ({})", h, header.number);
|
||||
},
|
||||
Err(Error::Block(BlockError::UnknownParent(p))) => {
|
||||
unknown = true;
|
||||
trace!(target: "sync", "New block with unknown parent ({:?}) {:?}", p, h);
|
||||
},
|
||||
Err(e) => {
|
||||
debug!(target: "sync", "Bad new block {:?} : {:?}", h, e);
|
||||
io.disable_peer(peer_id);
|
||||
}
|
||||
};
|
||||
if unknown {
|
||||
if self.state != SyncState::Idle {
|
||||
trace!(target: "sync", "NewBlock ignored while seeking");
|
||||
} else {
|
||||
trace!(target: "sync", "New unknown block {:?}", h);
|
||||
//TODO: handle too many unknown blocks
|
||||
let difficulty: U256 = try!(r.val_at(1));
|
||||
if let Some(ref mut peer) = self.peers.get_mut(&peer_id) {
|
||||
if peer.difficulty.map_or(true, |pd| difficulty > pd) {
|
||||
//self.state = SyncState::ChainHead;
|
||||
peer.difficulty = Some(difficulty);
|
||||
trace!(target: "sync", "Received block {:?} with no known parent. Peer needs syncing...", h);
|
||||
}
|
||||
}
|
||||
self.sync_peer(io, peer_id, true);
|
||||
}
|
||||
self.sync_peer(io, peer_id, true);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -659,7 +656,7 @@ impl ChainSync {
|
||||
/// Resume downloading
|
||||
fn continue_sync(&mut self, io: &mut SyncIo) {
|
||||
let mut peers: Vec<(PeerId, U256)> = self.peers.iter().map(|(k, p)| (*k, p.difficulty.unwrap_or_else(U256::zero))).collect();
|
||||
peers.sort_by(|&(_, d1), &(_, d2)| d1.cmp(&d2).reverse()); //TODO: sort by rating
|
||||
thread_rng().shuffle(&mut peers); //TODO: sort by rating
|
||||
trace!(target: "sync", "Syncing with {}/{} peers", self.active_peers.len(), peers.len());
|
||||
for (p, _) in peers {
|
||||
if self.active_peers.contains(&p) {
|
||||
@@ -685,7 +682,11 @@ impl ChainSync {
|
||||
}
|
||||
|
||||
/// Find something to do for a peer. Called for a new peer or when a peer is done with it's task.
|
||||
fn sync_peer(&mut self, io: &mut SyncIo, peer_id: PeerId, force: bool) {
|
||||
fn sync_peer(&mut self, io: &mut SyncIo, peer_id: PeerId, force: bool) {
|
||||
if !self.active_peers.contains(&peer_id) {
|
||||
trace!(target: "sync", "Skipping deactivated peer");
|
||||
return;
|
||||
}
|
||||
let (peer_latest, peer_difficulty) = {
|
||||
let peer = self.peers.get_mut(&peer_id).unwrap();
|
||||
if peer.asking != PeerAsking::Nothing {
|
||||
@@ -937,15 +938,15 @@ impl ChainSync {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let item_count = r.item_count();
|
||||
let mut item_count = r.item_count();
|
||||
trace!(target: "sync", "{} -> Transactions ({} entries)", peer_id, item_count);
|
||||
|
||||
item_count = min(item_count, MAX_TX_TO_IMPORT);
|
||||
let mut transactions = Vec::with_capacity(item_count);
|
||||
for i in 0 .. min(item_count, MAX_TX_TO_IMPORT) {
|
||||
let tx: SignedTransaction = try!(r.val_at(i));
|
||||
for i in 0 .. item_count {
|
||||
let tx = try!(r.at(i)).as_raw().to_vec();
|
||||
transactions.push(tx);
|
||||
}
|
||||
let _ = io.chain().import_transactions(transactions);
|
||||
io.chain().queue_transactions(transactions);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1094,7 +1095,7 @@ impl ChainSync {
|
||||
Ok(Some((RECEIPTS_PACKET, rlp_result)))
|
||||
}
|
||||
|
||||
fn return_rlp<FRlp, FError>(&self, io: &mut SyncIo, rlp: &UntrustedRlp, peer: PeerId, rlp_func: FRlp, error_func: FError) -> Result<(), PacketDecodeError>
|
||||
fn return_rlp<FRlp, FError>(io: &mut SyncIo, rlp: &UntrustedRlp, peer: PeerId, rlp_func: FRlp, error_func: FError) -> Result<(), PacketDecodeError>
|
||||
where FRlp : Fn(&SyncIo, &UntrustedRlp, PeerId) -> RlpResponseResult,
|
||||
FError : FnOnce(UtilError) -> String
|
||||
{
|
||||
@@ -1111,13 +1112,41 @@ impl ChainSync {
|
||||
}
|
||||
|
||||
/// Dispatch incoming requests and responses
|
||||
pub fn on_packet(&mut self, io: &mut SyncIo, peer: PeerId, packet_id: u8, data: &[u8]) {
|
||||
pub fn dispatch_packet(sync: &RwLock<ChainSync>, io: &mut SyncIo, peer: PeerId, packet_id: u8, data: &[u8]) {
|
||||
let rlp = UntrustedRlp::new(data);
|
||||
let result = match packet_id {
|
||||
GET_BLOCK_BODIES_PACKET => ChainSync::return_rlp(io, &rlp, peer,
|
||||
ChainSync::return_block_bodies,
|
||||
|e| format!("Error sending block bodies: {:?}", e)),
|
||||
|
||||
GET_BLOCK_HEADERS_PACKET => ChainSync::return_rlp(io, &rlp, peer,
|
||||
ChainSync::return_block_headers,
|
||||
|e| format!("Error sending block headers: {:?}", e)),
|
||||
|
||||
GET_RECEIPTS_PACKET => ChainSync::return_rlp(io, &rlp, peer,
|
||||
ChainSync::return_receipts,
|
||||
|e| format!("Error sending receipts: {:?}", e)),
|
||||
|
||||
GET_NODE_DATA_PACKET => ChainSync::return_rlp(io, &rlp, peer,
|
||||
ChainSync::return_node_data,
|
||||
|e| format!("Error sending nodes: {:?}", e)),
|
||||
|
||||
_ => {
|
||||
sync.write().unwrap().on_packet(io, peer, packet_id, data);
|
||||
Ok(())
|
||||
}
|
||||
};
|
||||
result.unwrap_or_else(|e| {
|
||||
debug!(target:"sync", "{} -> Malformed packet {} : {}", peer, packet_id, e);
|
||||
})
|
||||
}
|
||||
|
||||
pub fn on_packet(&mut self, io: &mut SyncIo, peer: PeerId, packet_id: u8, data: &[u8]) {
|
||||
if packet_id != STATUS_PACKET && !self.peers.contains_key(&peer) {
|
||||
debug!(target:"sync", "Unexpected packet from unregistered peer: {}:{}", peer, io.peer_info(peer));
|
||||
return;
|
||||
}
|
||||
let rlp = UntrustedRlp::new(data);
|
||||
let result = match packet_id {
|
||||
STATUS_PACKET => self.on_peer_status(io, peer, &rlp),
|
||||
TRANSACTIONS_PACKET => self.on_peer_transactions(io, peer, &rlp),
|
||||
@@ -1125,23 +1154,6 @@ impl ChainSync {
|
||||
BLOCK_BODIES_PACKET => self.on_peer_block_bodies(io, peer, &rlp),
|
||||
NEW_BLOCK_PACKET => self.on_peer_new_block(io, peer, &rlp),
|
||||
NEW_BLOCK_HASHES_PACKET => self.on_peer_new_hashes(io, peer, &rlp),
|
||||
|
||||
GET_BLOCK_BODIES_PACKET => self.return_rlp(io, &rlp, peer,
|
||||
ChainSync::return_block_bodies,
|
||||
|e| format!("Error sending block bodies: {:?}", e)),
|
||||
|
||||
GET_BLOCK_HEADERS_PACKET => self.return_rlp(io, &rlp, peer,
|
||||
ChainSync::return_block_headers,
|
||||
|e| format!("Error sending block headers: {:?}", e)),
|
||||
|
||||
GET_RECEIPTS_PACKET => self.return_rlp(io, &rlp, peer,
|
||||
ChainSync::return_receipts,
|
||||
|e| format!("Error sending receipts: {:?}", e)),
|
||||
|
||||
GET_NODE_DATA_PACKET => self.return_rlp(io, &rlp, peer,
|
||||
ChainSync::return_node_data,
|
||||
|e| format!("Error sending nodes: {:?}", e)),
|
||||
|
||||
_ => {
|
||||
debug!(target: "sync", "Unknown packet {}", packet_id);
|
||||
Ok(())
|
||||
@@ -1243,7 +1255,7 @@ impl ChainSync {
|
||||
/// propagates latest block to lagging peers
|
||||
fn propagate_blocks(&mut self, chain_info: &BlockChainInfo, io: &mut SyncIo) -> usize {
|
||||
let lucky_peers = self.select_lagging_peers(chain_info, io);
|
||||
trace!("Sending NewBlocks to {:?}", lucky_peers);
|
||||
trace!(target: "sync", "Sending NewBlocks to {:?}", lucky_peers);
|
||||
let mut sent = 0;
|
||||
for (peer_id, _) in lucky_peers {
|
||||
let rlp = ChainSync::create_latest_block_rlp(io.chain());
|
||||
@@ -1258,7 +1270,7 @@ impl ChainSync {
|
||||
/// propagates new known hashes to all peers
|
||||
fn propagate_new_hashes(&mut self, chain_info: &BlockChainInfo, io: &mut SyncIo) -> usize {
|
||||
let lucky_peers = self.select_lagging_peers(chain_info, io);
|
||||
trace!("Sending NewHashes to {:?}", lucky_peers);
|
||||
trace!(target: "sync", "Sending NewHashes to {:?}", lucky_peers);
|
||||
let mut sent = 0;
|
||||
let last_parent = HeaderView::new(&io.chain().block_header(BlockID::Hash(chain_info.best_block_hash.clone())).unwrap()).parent_hash();
|
||||
for (peer_id, peer_number) in lucky_peers {
|
||||
@@ -1421,7 +1433,7 @@ mod tests {
|
||||
fn return_receipts() {
|
||||
let mut client = TestBlockChainClient::new();
|
||||
let mut queue = VecDeque::new();
|
||||
let mut sync = dummy_sync_with_peer(H256::new(), &client);
|
||||
let sync = dummy_sync_with_peer(H256::new(), &client);
|
||||
let mut io = TestIo::new(&mut client, &mut queue, None);
|
||||
|
||||
let mut receipt_list = RlpStream::new_list(4);
|
||||
@@ -1442,7 +1454,7 @@ mod tests {
|
||||
assert_eq!(603, rlp_result.unwrap().1.out().len());
|
||||
|
||||
io.sender = Some(2usize);
|
||||
sync.on_packet(&mut io, 0usize, super::GET_RECEIPTS_PACKET, &receipts_request);
|
||||
ChainSync::dispatch_packet(&RwLock::new(sync), &mut io, 0usize, super::GET_RECEIPTS_PACKET, &receipts_request);
|
||||
assert_eq!(1, io.queue.len());
|
||||
}
|
||||
|
||||
@@ -1514,7 +1526,7 @@ mod tests {
|
||||
fn return_nodes() {
|
||||
let mut client = TestBlockChainClient::new();
|
||||
let mut queue = VecDeque::new();
|
||||
let mut sync = dummy_sync_with_peer(H256::new(), &client);
|
||||
let sync = dummy_sync_with_peer(H256::new(), &client);
|
||||
let mut io = TestIo::new(&mut client, &mut queue, None);
|
||||
|
||||
let mut node_list = RlpStream::new_list(3);
|
||||
@@ -1534,7 +1546,8 @@ mod tests {
|
||||
assert_eq!(34, rlp_result.unwrap().1.out().len());
|
||||
|
||||
io.sender = Some(2usize);
|
||||
sync.on_packet(&mut io, 0usize, super::GET_NODE_DATA_PACKET, &node_request);
|
||||
|
||||
ChainSync::dispatch_packet(&RwLock::new(sync), &mut io, 0usize, super::GET_NODE_DATA_PACKET, &node_request);
|
||||
assert_eq!(1, io.queue.len());
|
||||
}
|
||||
|
||||
|
||||
@@ -44,14 +44,14 @@
|
||||
//! let mut service = NetworkService::new(NetworkConfiguration::new()).unwrap();
|
||||
//! service.start().unwrap();
|
||||
//! let dir = env::temp_dir();
|
||||
//! let miner = Miner::new(false, ethereum::new_frontier(), None);
|
||||
//! let client = Client::new(
|
||||
//! ClientConfig::default(),
|
||||
//! ethereum::new_frontier(),
|
||||
//! ethereum::new_frontier(true),
|
||||
//! &dir,
|
||||
//! Arc::new(Miner::new(false, ethereum::new_frontier(), None)),
|
||||
//! Arc::new(miner),
|
||||
//! service.io().channel()
|
||||
//! ).unwrap();
|
||||
//! let miner = Miner::new(false, ethereum::new_frontier(), None);
|
||||
//! let sync = EthSync::new(SyncConfig::default(), client);
|
||||
//! EthSync::register(&mut service, sync);
|
||||
//! }
|
||||
@@ -175,7 +175,7 @@ impl NetworkProtocolHandler<SyncMessage> for EthSync {
|
||||
}
|
||||
|
||||
fn read(&self, io: &NetworkContext<SyncMessage>, peer: &PeerId, packet_id: u8, data: &[u8]) {
|
||||
self.sync.write().unwrap().on_packet(&mut NetSyncIo::new(io, self.chain.deref()) , *peer, packet_id, data);
|
||||
ChainSync::dispatch_packet(&self.sync, &mut NetSyncIo::new(io, self.chain.deref()) , *peer, packet_id, data);
|
||||
}
|
||||
|
||||
fn connected(&self, io: &NetworkContext<SyncMessage>, peer: &PeerId) {
|
||||
|
||||
@@ -47,7 +47,7 @@ fn status_after_sync() {
|
||||
net.peer_mut(1).chain.add_blocks(1000, EachBlockWith::Uncle);
|
||||
net.peer_mut(2).chain.add_blocks(1000, EachBlockWith::Uncle);
|
||||
net.sync();
|
||||
let status = net.peer(0).sync.status();
|
||||
let status = net.peer(0).sync.read().unwrap().status();
|
||||
assert_eq!(status.state, SyncState::Idle);
|
||||
}
|
||||
|
||||
@@ -107,14 +107,14 @@ fn restart() {
|
||||
assert!(net.peer(0).chain.chain_info().best_block_number > 100);
|
||||
net.restart_peer(0);
|
||||
|
||||
let status = net.peer(0).sync.status();
|
||||
let status = net.peer(0).sync.read().unwrap().status();
|
||||
assert_eq!(status.state, SyncState::ChainHead);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn status_empty() {
|
||||
let net = TestNet::new(2);
|
||||
assert_eq!(net.peer(0).sync.status().state, SyncState::Idle);
|
||||
assert_eq!(net.peer(0).sync.read().unwrap().status().state, SyncState::Idle);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -78,7 +78,7 @@ pub struct TestPacket {
|
||||
|
||||
pub struct TestPeer {
|
||||
pub chain: TestBlockChainClient,
|
||||
pub sync: ChainSync,
|
||||
pub sync: RwLock<ChainSync>,
|
||||
pub queue: VecDeque<TestPacket>,
|
||||
}
|
||||
|
||||
@@ -97,7 +97,7 @@ impl TestNet {
|
||||
let chain = TestBlockChainClient::new();
|
||||
let sync = ChainSync::new(SyncConfig::default(), &chain);
|
||||
net.peers.push(TestPeer {
|
||||
sync: sync,
|
||||
sync: RwLock::new(sync),
|
||||
chain: chain,
|
||||
queue: VecDeque::new(),
|
||||
});
|
||||
@@ -118,7 +118,7 @@ impl TestNet {
|
||||
for client in 0..self.peers.len() {
|
||||
if peer != client {
|
||||
let mut p = self.peers.get_mut(peer).unwrap();
|
||||
p.sync.on_peer_connected(&mut TestIo::new(&mut p.chain, &mut p.queue, Some(client as PeerId)), client as PeerId);
|
||||
p.sync.write().unwrap().on_peer_connected(&mut TestIo::new(&mut p.chain, &mut p.queue, Some(client as PeerId)), client as PeerId);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -129,22 +129,22 @@ impl TestNet {
|
||||
if let Some(packet) = self.peers[peer].queue.pop_front() {
|
||||
let mut p = self.peers.get_mut(packet.recipient).unwrap();
|
||||
trace!("--- {} -> {} ---", peer, packet.recipient);
|
||||
p.sync.on_packet(&mut TestIo::new(&mut p.chain, &mut p.queue, Some(peer as PeerId)), peer as PeerId, packet.packet_id, &packet.data);
|
||||
ChainSync::dispatch_packet(&p.sync, &mut TestIo::new(&mut p.chain, &mut p.queue, Some(peer as PeerId)), peer as PeerId, packet.packet_id, &packet.data);
|
||||
trace!("----------------");
|
||||
}
|
||||
let mut p = self.peers.get_mut(peer).unwrap();
|
||||
p.sync.maintain_sync(&mut TestIo::new(&mut p.chain, &mut p.queue, None));
|
||||
p.sync.write().unwrap().maintain_sync(&mut TestIo::new(&mut p.chain, &mut p.queue, None));
|
||||
}
|
||||
}
|
||||
|
||||
pub fn sync_step_peer(&mut self, peer_num: usize) {
|
||||
let mut peer = self.peer_mut(peer_num);
|
||||
peer.sync.maintain_sync(&mut TestIo::new(&mut peer.chain, &mut peer.queue, None));
|
||||
peer.sync.write().unwrap().maintain_sync(&mut TestIo::new(&mut peer.chain, &mut peer.queue, None));
|
||||
}
|
||||
|
||||
pub fn restart_peer(&mut self, i: usize) {
|
||||
let peer = self.peer_mut(i);
|
||||
peer.sync.restart(&mut TestIo::new(&mut peer.chain, &mut peer.queue, None));
|
||||
peer.sync.write().unwrap().restart(&mut TestIo::new(&mut peer.chain, &mut peer.queue, None));
|
||||
}
|
||||
|
||||
pub fn sync(&mut self) -> u32 {
|
||||
@@ -173,6 +173,6 @@ impl TestNet {
|
||||
|
||||
pub fn trigger_chain_new_blocks(&mut self, peer_id: usize) {
|
||||
let mut peer = self.peer_mut(peer_id);
|
||||
peer.sync.chain_new_blocks(&mut TestIo::new(&mut peer.chain, &mut peer.queue, None), &[], &[], &[], &[]);
|
||||
peer.sync.write().unwrap().chain_new_blocks(&mut TestIo::new(&mut peer.chain, &mut peer.queue, None), &[], &[], &[], &[]);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user