make test network generic over peer type

This commit is contained in:
Robert Habermeier 2016-12-27 11:23:57 +01:00
parent 4e51f93176
commit 84219a0094

View File

@ -132,37 +132,115 @@ impl<'p, C> SyncIo for TestIo<'p, C> where C: FlushingBlockChainClient, C: 'p {
} }
} }
/// Abstract messages between peers.
pub trait Message {
/// The intended recipient of this message.
fn recipient(&self) -> PeerId;
}
/// Mock subprotocol packet
pub struct TestPacket { pub struct TestPacket {
pub data: Bytes, pub data: Bytes,
pub packet_id: PacketId, pub packet_id: PacketId,
pub recipient: PeerId, pub recipient: PeerId,
} }
pub struct TestPeer<C> where C: FlushingBlockChainClient { impl Message for TestPacket {
fn recipient(&self) -> PeerId { self.recipient }
}
/// A peer which can be a member of the `TestNet`.
pub trait Peer {
type Message: Message;
/// Called on connection to other indicated peer.
fn on_connect(&self, other: PeerId);
/// Called on disconnect from other indicated peer.
fn on_disconnect(&self, other: PeerId);
/// Receive a message from another peer. Return a set of peers to disconnect.
fn receive_message(&self, from: PeerId, msg: Self::Message) -> HashSet<PeerId>;
/// Produce the next pending message to send to another peer.
fn pending_message(&self) -> Option<Self::Message>;
/// Whether this peer is done syncing (has no messages to send).
fn is_done(&self) -> bool;
/// Execute a "sync step". This is called for each peer after it sends a packet.
fn sync_step(&self);
/// Restart sync for a peer.
fn restart_sync(&self);
}
pub struct EthPeer<C> where C: FlushingBlockChainClient {
pub chain: Arc<C>, pub chain: Arc<C>,
pub snapshot_service: Arc<TestSnapshotService>, pub snapshot_service: Arc<TestSnapshotService>,
pub sync: RwLock<ChainSync>, pub sync: RwLock<ChainSync>,
pub queue: RwLock<VecDeque<TestPacket>>, pub queue: RwLock<VecDeque<TestPacket>>,
} }
pub struct TestNet<C> where C: FlushingBlockChainClient { impl<C: FlushingBlockChainClient> Peer for EthPeer<C> {
pub peers: Vec<Arc<TestPeer<C>>>, type Message = TestPacket;
fn on_connect(&self, other: PeerId) {
self.sync.write().update_targets(&*self.chain);
self.sync.write().on_peer_connected(&mut TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, Some(other)), other);
}
fn on_disconnect(&self, other: PeerId) {
let mut io = TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, Some(other));
self.sync.write().on_peer_aborting(&mut io, other);
}
fn receive_message(&self, from: PeerId, msg: TestPacket) -> HashSet<PeerId> {
let mut io = TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, Some(from));
ChainSync::dispatch_packet(&self.sync, &mut io, from, msg.packet_id, &msg.data);
self.chain.flush();
io.to_disconnect.clone()
}
fn pending_message(&self) -> Option<TestPacket> {
self.chain.flush();
self.queue.write().pop_front()
}
fn is_done(&self) -> bool {
self.queue.read().is_empty()
}
fn sync_step(&self) {
self.chain.flush();
self.sync.write().maintain_peers(&mut TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, None));
self.sync.write().maintain_sync(&mut TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, None));
self.sync.write().propagate_new_transactions(&mut TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, None));
}
fn restart_sync(&self) {
self.sync.write().restart(&mut TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, None));
}
}
pub struct TestNet<P> {
pub peers: Vec<Arc<P>>,
pub started: bool, pub started: bool,
pub disconnect_events: Vec<(PeerId, PeerId)>, //disconnected (initiated by, to) pub disconnect_events: Vec<(PeerId, PeerId)>, //disconnected (initiated by, to)
} }
impl TestNet<TestBlockChainClient> { impl TestNet<EthPeer<TestBlockChainClient>> {
pub fn new(n: usize) -> TestNet<TestBlockChainClient> { pub fn new(n: usize) -> Self {
Self::new_with_config(n, SyncConfig::default()) Self::new_with_config(n, SyncConfig::default())
} }
pub fn new_with_fork(n: usize, fork: Option<(BlockNumber, H256)>) -> TestNet<TestBlockChainClient> { pub fn new_with_fork(n: usize, fork: Option<(BlockNumber, H256)>) -> Self {
let mut config = SyncConfig::default(); let mut config = SyncConfig::default();
config.fork_block = fork; config.fork_block = fork;
Self::new_with_config(n, config) Self::new_with_config(n, config)
} }
pub fn new_with_config(n: usize, config: SyncConfig) -> TestNet<TestBlockChainClient> { pub fn new_with_config(n: usize, config: SyncConfig) -> Self {
let mut net = TestNet { let mut net = TestNet {
peers: Vec::new(), peers: Vec::new(),
started: false, started: false,
@ -172,7 +250,7 @@ impl TestNet<TestBlockChainClient> {
let chain = TestBlockChainClient::new(); let chain = TestBlockChainClient::new();
let ss = Arc::new(TestSnapshotService::new()); let ss = Arc::new(TestSnapshotService::new());
let sync = ChainSync::new(config.clone(), &chain); let sync = ChainSync::new(config.clone(), &chain);
net.peers.push(Arc::new(TestPeer { net.peers.push(Arc::new(EthPeer {
sync: RwLock::new(sync), sync: RwLock::new(sync),
snapshot_service: ss, snapshot_service: ss,
chain: Arc::new(chain), chain: Arc::new(chain),
@ -183,8 +261,8 @@ impl TestNet<TestBlockChainClient> {
} }
} }
impl TestNet<EthcoreClient> { impl TestNet<EthPeer<EthcoreClient>> {
pub fn with_spec<F>(n: usize, config: SyncConfig, spec_factory: F) -> GuardedTempResult<TestNet<EthcoreClient>> pub fn with_spec<F>(n: usize, config: SyncConfig, spec_factory: F) -> GuardedTempResult<Self>
where F: Fn() -> Spec where F: Fn() -> Spec
{ {
let mut net = TestNet { let mut net = TestNet {
@ -211,7 +289,7 @@ impl TestNet<EthcoreClient> {
let ss = Arc::new(TestSnapshotService::new()); let ss = Arc::new(TestSnapshotService::new());
let sync = ChainSync::new(config.clone(), &*client); let sync = ChainSync::new(config.clone(), &*client);
let peer = Arc::new(TestPeer { let peer = Arc::new(EthPeer {
sync: RwLock::new(sync), sync: RwLock::new(sync),
snapshot_service: ss, snapshot_service: ss,
chain: client, chain: client,
@ -220,22 +298,18 @@ impl TestNet<EthcoreClient> {
peer.chain.add_notify(peer.clone()); peer.chain.add_notify(peer.clone());
net.peers.push(peer); net.peers.push(peer);
} }
GuardedTempResult::<TestNet<EthcoreClient>> { GuardedTempResult {
_temp: dir, _temp: dir,
result: Some(net) result: Some(net)
} }
} }
} }
impl<C> TestNet<C> where C: FlushingBlockChainClient { impl<P> TestNet<P> where P: Peer {
pub fn peer(&self, i: usize) -> &TestPeer<C> { pub fn peer(&self, i: usize) -> &P {
&self.peers[i] &self.peers[i]
} }
pub fn peer_mut(&mut self, i: usize) -> &mut TestPeer<C> {
Arc::get_mut(&mut self.peers[i]).unwrap()
}
pub fn start(&mut self) { pub fn start(&mut self) {
if self.started { if self.started {
return; return;
@ -243,9 +317,7 @@ impl<C> TestNet<C> where C: FlushingBlockChainClient {
for peer in 0..self.peers.len() { for peer in 0..self.peers.len() {
for client in 0..self.peers.len() { for client in 0..self.peers.len() {
if peer != client { if peer != client {
let p = &self.peers[peer]; self.peers[peer].on_connect(client as PeerId);
p.sync.write().update_targets(&*p.chain);
p.sync.write().on_peer_connected(&mut TestIo::new(&*p.chain, &p.snapshot_service, &p.queue, Some(client as PeerId)), client as PeerId);
} }
} }
} }
@ -254,31 +326,22 @@ impl<C> TestNet<C> where C: FlushingBlockChainClient {
pub fn sync_step(&mut self) { pub fn sync_step(&mut self) {
for peer in 0..self.peers.len() { for peer in 0..self.peers.len() {
self.peers[peer].chain.flush(); let packet = self.peers[peer].pending_message();
let packet = self.peers[peer].queue.write().pop_front();
if let Some(packet) = packet { if let Some(packet) = packet {
let disconnecting = { let disconnecting = {
let p = &self.peers[packet.recipient]; let recipient = packet.recipient();
trace!("--- {} -> {} ---", peer, packet.recipient); trace!("--- {} -> {} ---", peer, recipient);
let to_disconnect = { let to_disconnect = self.peers[recipient].receive_message(peer as PeerId, packet);
let mut io = TestIo::new(&*p.chain, &p.snapshot_service, &p.queue, Some(peer as PeerId));
ChainSync::dispatch_packet(&p.sync, &mut io, peer as PeerId, packet.packet_id, &packet.data);
p.chain.flush();
io.to_disconnect.clone()
};
for d in &to_disconnect { for d in &to_disconnect {
// notify this that disconnecting peers are disconnecting // notify this that disconnecting peers are disconnecting
let mut io = TestIo::new(&*p.chain, &p.snapshot_service, &p.queue, Some(*d)); self.peers[recipient].on_disconnect(*d as PeerId);
p.sync.write().on_peer_aborting(&mut io, *d);
self.disconnect_events.push((peer, *d)); self.disconnect_events.push((peer, *d));
} }
to_disconnect to_disconnect
}; };
for d in &disconnecting { for d in &disconnecting {
// notify other peers that this peer is disconnecting // notify other peers that this peer is disconnecting
let p = &self.peers[*d]; self.peers[*d].on_disconnect(peer as PeerId);
let mut io = TestIo::new(&*p.chain, &p.snapshot_service, &p.queue, Some(peer as PeerId));
p.sync.write().on_peer_aborting(&mut io, peer as PeerId);
} }
} }
@ -287,16 +350,11 @@ impl<C> TestNet<C> where C: FlushingBlockChainClient {
} }
pub fn sync_step_peer(&mut self, peer_num: usize) { pub fn sync_step_peer(&mut self, peer_num: usize) {
let peer = self.peer(peer_num); self.peers[peer_num].sync_step();
peer.chain.flush();
peer.sync.write().maintain_peers(&mut TestIo::new(&*peer.chain, &peer.snapshot_service, &peer.queue, None));
peer.sync.write().maintain_sync(&mut TestIo::new(&*peer.chain, &peer.snapshot_service, &peer.queue, None));
peer.sync.write().propagate_new_transactions(&mut TestIo::new(&*peer.chain, &peer.snapshot_service, &peer.queue, None));
} }
pub fn restart_peer(&mut self, i: usize) { pub fn restart_peer(&mut self, i: usize) {
let peer = self.peer(i); self.peers[i].restart_sync();
peer.sync.write().restart(&mut TestIo::new(&*peer.chain, &peer.snapshot_service, &peer.queue, None));
} }
pub fn sync(&mut self) -> u32 { pub fn sync(&mut self) -> u32 {
@ -317,16 +375,25 @@ impl<C> TestNet<C> where C: FlushingBlockChainClient {
} }
pub fn done(&self) -> bool { pub fn done(&self) -> bool {
self.peers.iter().all(|p| p.queue.read().is_empty()) self.peers.iter().all(|p| p.is_done())
} }
}
impl TestNet<EthPeer<TestBlockChainClient>> {
// relies on Arc uniqueness, which is only true when we haven't registered a ChainNotify.
pub fn peer_mut(&mut self, i: usize) -> &mut EthPeer<TestBlockChainClient> {
Arc::get_mut(&mut self.peers[i]).expect("Arc never exposed externally")
}
}
impl<C: FlushingBlockChainClient> TestNet<EthPeer<C>> {
pub fn trigger_chain_new_blocks(&mut self, peer_id: usize) { pub fn trigger_chain_new_blocks(&mut self, peer_id: usize) {
let peer = self.peer(peer_id); let peer = &mut self.peers[peer_id];
peer.sync.write().chain_new_blocks(&mut TestIo::new(&*peer.chain, &peer.snapshot_service, &peer.queue, None), &[], &[], &[], &[], &[], &[]); peer.sync.write().chain_new_blocks(&mut TestIo::new(&*peer.chain, &peer.snapshot_service, &peer.queue, None), &[], &[], &[], &[], &[], &[]);
} }
} }
impl ChainNotify for TestPeer<EthcoreClient> { impl ChainNotify for EthPeer<EthcoreClient> {
fn new_blocks(&self, fn new_blocks(&self,
imported: Vec<H256>, imported: Vec<H256>,
invalid: Vec<H256>, invalid: Vec<H256>,