diff --git a/sync/src/tests/helpers.rs b/sync/src/tests/helpers.rs index 6ad9965cd..b18b0cf61 100644 --- a/sync/src/tests/helpers.rs +++ b/sync/src/tests/helpers.rs @@ -132,37 +132,115 @@ impl<'p, C> SyncIo for TestIo<'p, C> where C: FlushingBlockChainClient, C: 'p { } } +/// Abstract messages between peers. +pub trait Message { + /// The intended recipient of this message. + fn recipient(&self) -> PeerId; +} + +/// Mock subprotocol packet pub struct TestPacket { pub data: Bytes, pub packet_id: PacketId, pub recipient: PeerId, } -pub struct TestPeer where C: FlushingBlockChainClient { +impl Message for TestPacket { + fn recipient(&self) -> PeerId { self.recipient } +} + +/// A peer which can be a member of the `TestNet`. +pub trait Peer { + type Message: Message; + + /// Called on connection to other indicated peer. + fn on_connect(&self, other: PeerId); + + /// Called on disconnect from other indicated peer. + fn on_disconnect(&self, other: PeerId); + + /// Receive a message from another peer. Return a set of peers to disconnect. + fn receive_message(&self, from: PeerId, msg: Self::Message) -> HashSet; + + /// Produce the next pending message to send to another peer. + fn pending_message(&self) -> Option; + + /// Whether this peer is done syncing (has no messages to send). + fn is_done(&self) -> bool; + + /// Execute a "sync step". This is called for each peer after it sends a packet. + fn sync_step(&self); + + /// Restart sync for a peer. + fn restart_sync(&self); +} + +pub struct EthPeer where C: FlushingBlockChainClient { pub chain: Arc, pub snapshot_service: Arc, pub sync: RwLock, pub queue: RwLock>, } -pub struct TestNet where C: FlushingBlockChainClient { - pub peers: Vec>>, +impl Peer for EthPeer { + type Message = TestPacket; + + fn on_connect(&self, other: PeerId) { + self.sync.write().update_targets(&*self.chain); + self.sync.write().on_peer_connected(&mut TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, Some(other)), other); + } + + fn on_disconnect(&self, other: PeerId) { + let mut io = TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, Some(other)); + self.sync.write().on_peer_aborting(&mut io, other); + } + + fn receive_message(&self, from: PeerId, msg: TestPacket) -> HashSet { + let mut io = TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, Some(from)); + ChainSync::dispatch_packet(&self.sync, &mut io, from, msg.packet_id, &msg.data); + self.chain.flush(); + io.to_disconnect.clone() + } + + fn pending_message(&self) -> Option { + self.chain.flush(); + self.queue.write().pop_front() + } + + fn is_done(&self) -> bool { + self.queue.read().is_empty() + } + + fn sync_step(&self) { + self.chain.flush(); + self.sync.write().maintain_peers(&mut TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, None)); + self.sync.write().maintain_sync(&mut TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, None)); + self.sync.write().propagate_new_transactions(&mut TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, None)); + } + + fn restart_sync(&self) { + self.sync.write().restart(&mut TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, None)); + } +} + +pub struct TestNet

{ + pub peers: Vec>, pub started: bool, pub disconnect_events: Vec<(PeerId, PeerId)>, //disconnected (initiated by, to) } -impl TestNet { - pub fn new(n: usize) -> TestNet { +impl TestNet> { + pub fn new(n: usize) -> Self { Self::new_with_config(n, SyncConfig::default()) } - pub fn new_with_fork(n: usize, fork: Option<(BlockNumber, H256)>) -> TestNet { + pub fn new_with_fork(n: usize, fork: Option<(BlockNumber, H256)>) -> Self { let mut config = SyncConfig::default(); config.fork_block = fork; Self::new_with_config(n, config) } - pub fn new_with_config(n: usize, config: SyncConfig) -> TestNet { + pub fn new_with_config(n: usize, config: SyncConfig) -> Self { let mut net = TestNet { peers: Vec::new(), started: false, @@ -172,7 +250,7 @@ impl TestNet { let chain = TestBlockChainClient::new(); let ss = Arc::new(TestSnapshotService::new()); let sync = ChainSync::new(config.clone(), &chain); - net.peers.push(Arc::new(TestPeer { + net.peers.push(Arc::new(EthPeer { sync: RwLock::new(sync), snapshot_service: ss, chain: Arc::new(chain), @@ -183,8 +261,8 @@ impl TestNet { } } -impl TestNet { - pub fn with_spec(n: usize, config: SyncConfig, spec_factory: F) -> GuardedTempResult> +impl TestNet> { + pub fn with_spec(n: usize, config: SyncConfig, spec_factory: F) -> GuardedTempResult where F: Fn() -> Spec { let mut net = TestNet { @@ -211,7 +289,7 @@ impl TestNet { let ss = Arc::new(TestSnapshotService::new()); let sync = ChainSync::new(config.clone(), &*client); - let peer = Arc::new(TestPeer { + let peer = Arc::new(EthPeer { sync: RwLock::new(sync), snapshot_service: ss, chain: client, @@ -220,22 +298,18 @@ impl TestNet { peer.chain.add_notify(peer.clone()); net.peers.push(peer); } - GuardedTempResult::> { + GuardedTempResult { _temp: dir, result: Some(net) } } } -impl TestNet where C: FlushingBlockChainClient { - pub fn peer(&self, i: usize) -> &TestPeer { +impl

TestNet

where P: Peer { + pub fn peer(&self, i: usize) -> &P { &self.peers[i] } - pub fn peer_mut(&mut self, i: usize) -> &mut TestPeer { - Arc::get_mut(&mut self.peers[i]).unwrap() - } - pub fn start(&mut self) { if self.started { return; @@ -243,9 +317,7 @@ impl TestNet where C: FlushingBlockChainClient { for peer in 0..self.peers.len() { for client in 0..self.peers.len() { if peer != client { - let p = &self.peers[peer]; - p.sync.write().update_targets(&*p.chain); - p.sync.write().on_peer_connected(&mut TestIo::new(&*p.chain, &p.snapshot_service, &p.queue, Some(client as PeerId)), client as PeerId); + self.peers[peer].on_connect(client as PeerId); } } } @@ -254,31 +326,22 @@ impl TestNet where C: FlushingBlockChainClient { pub fn sync_step(&mut self) { for peer in 0..self.peers.len() { - self.peers[peer].chain.flush(); - let packet = self.peers[peer].queue.write().pop_front(); + let packet = self.peers[peer].pending_message(); if let Some(packet) = packet { let disconnecting = { - let p = &self.peers[packet.recipient]; - trace!("--- {} -> {} ---", peer, packet.recipient); - let to_disconnect = { - let mut io = TestIo::new(&*p.chain, &p.snapshot_service, &p.queue, Some(peer as PeerId)); - ChainSync::dispatch_packet(&p.sync, &mut io, peer as PeerId, packet.packet_id, &packet.data); - p.chain.flush(); - io.to_disconnect.clone() - }; + let recipient = packet.recipient(); + trace!("--- {} -> {} ---", peer, recipient); + let to_disconnect = self.peers[recipient].receive_message(peer as PeerId, packet); for d in &to_disconnect { // notify this that disconnecting peers are disconnecting - let mut io = TestIo::new(&*p.chain, &p.snapshot_service, &p.queue, Some(*d)); - p.sync.write().on_peer_aborting(&mut io, *d); + self.peers[recipient].on_disconnect(*d as PeerId); self.disconnect_events.push((peer, *d)); } to_disconnect }; for d in &disconnecting { // notify other peers that this peer is disconnecting - let p = &self.peers[*d]; - let mut io = TestIo::new(&*p.chain, &p.snapshot_service, &p.queue, Some(peer as PeerId)); - p.sync.write().on_peer_aborting(&mut io, peer as PeerId); + self.peers[*d].on_disconnect(peer as PeerId); } } @@ -287,16 +350,11 @@ impl TestNet where C: FlushingBlockChainClient { } pub fn sync_step_peer(&mut self, peer_num: usize) { - let peer = self.peer(peer_num); - peer.chain.flush(); - peer.sync.write().maintain_peers(&mut TestIo::new(&*peer.chain, &peer.snapshot_service, &peer.queue, None)); - peer.sync.write().maintain_sync(&mut TestIo::new(&*peer.chain, &peer.snapshot_service, &peer.queue, None)); - peer.sync.write().propagate_new_transactions(&mut TestIo::new(&*peer.chain, &peer.snapshot_service, &peer.queue, None)); + self.peers[peer_num].sync_step(); } pub fn restart_peer(&mut self, i: usize) { - let peer = self.peer(i); - peer.sync.write().restart(&mut TestIo::new(&*peer.chain, &peer.snapshot_service, &peer.queue, None)); + self.peers[i].restart_sync(); } pub fn sync(&mut self) -> u32 { @@ -317,16 +375,25 @@ impl TestNet where C: FlushingBlockChainClient { } pub fn done(&self) -> bool { - self.peers.iter().all(|p| p.queue.read().is_empty()) + self.peers.iter().all(|p| p.is_done()) } +} +impl TestNet> { + // relies on Arc uniqueness, which is only true when we haven't registered a ChainNotify. + pub fn peer_mut(&mut self, i: usize) -> &mut EthPeer { + Arc::get_mut(&mut self.peers[i]).expect("Arc never exposed externally") + } +} + +impl TestNet> { pub fn trigger_chain_new_blocks(&mut self, peer_id: usize) { - let peer = self.peer(peer_id); + let peer = &mut self.peers[peer_id]; peer.sync.write().chain_new_blocks(&mut TestIo::new(&*peer.chain, &peer.snapshot_service, &peer.queue, None), &[], &[], &[], &[], &[], &[]); } } -impl ChainNotify for TestPeer { +impl ChainNotify for EthPeer { fn new_blocks(&self, imported: Vec, invalid: Vec,