openethereum/sync/src/tests/helpers.rs

359 lines
9.9 KiB
Rust
Raw Normal View History

2016-12-11 19:30:54 +01:00
// Copyright 2015, 2016 Parity Technologies (UK) Ltd.
2016-02-05 13:40:41 +01:00
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use util::*;
use network::*;
use tests::snapshot::*;
2016-12-06 19:23:15 +01:00
use ethcore::client::{TestBlockChainClient, BlockChainClient, Client as EthcoreClient, ClientConfig, ChainNotify};
2016-07-27 21:38:22 +02:00
use ethcore::header::BlockNumber;
use ethcore::snapshot::SnapshotService;
2016-12-06 19:23:15 +01:00
use ethcore::spec::Spec;
use ethcore::miner::Miner;
use ethcore::db::NUM_COLUMNS;
use sync_io::SyncIo;
2016-12-06 19:23:15 +01:00
use io::IoChannel;
use api::WARP_SYNC_PROTOCOL_ID;
use chain::ChainSync;
2016-02-24 21:23:58 +01:00
use ::SyncConfig;
2016-12-06 19:23:15 +01:00
use devtools::{self, GuardedTempResult};
2015-12-26 15:47:07 +01:00
2016-12-06 19:23:15 +01:00
pub trait FlushingBlockChainClient: BlockChainClient {
fn flush(&self) {}
}
impl FlushingBlockChainClient for EthcoreClient {
fn flush(&self) {
self.flush_queue();
}
}
impl FlushingBlockChainClient for TestBlockChainClient {}
pub struct TestIo<'p, C> where C: FlushingBlockChainClient, C: 'p {
pub chain: &'p C,
pub snapshot_service: &'p TestSnapshotService,
2016-12-11 12:32:01 +01:00
pub queue: &'p RwLock<VecDeque<TestPacket>>,
2016-02-04 19:30:31 +01:00
pub sender: Option<PeerId>,
pub to_disconnect: HashSet<PeerId>,
2016-12-11 12:32:01 +01:00
pub packets: Vec<TestPacket>,
overlay: RwLock<HashMap<BlockNumber, Bytes>>,
2015-12-26 15:47:07 +01:00
}
2016-12-06 19:23:15 +01:00
impl<'p, C> TestIo<'p, C> where C: FlushingBlockChainClient, C: 'p {
2016-12-11 12:32:01 +01:00
pub fn new(chain: &'p C, ss: &'p TestSnapshotService, queue: &'p RwLock<VecDeque<TestPacket>>, sender: Option<PeerId>) -> TestIo<'p, C> {
2015-12-26 15:47:07 +01:00
TestIo {
chain: chain,
snapshot_service: ss,
2015-12-26 15:47:07 +01:00
queue: queue,
sender: sender,
to_disconnect: HashSet::new(),
overlay: RwLock::new(HashMap::new()),
2016-12-11 12:32:01 +01:00
packets: Vec::new(),
2015-12-26 15:47:07 +01:00
}
}
}
2016-12-11 12:32:01 +01:00
impl<'p, C> Drop for TestIo<'p, C> where C: FlushingBlockChainClient, C: 'p {
fn drop(&mut self) {
self.queue.write().extend(self.packets.drain(..));
}
}
2016-12-06 19:23:15 +01:00
impl<'p, C> SyncIo for TestIo<'p, C> where C: FlushingBlockChainClient, C: 'p {
fn disable_peer(&mut self, peer_id: PeerId) {
self.disconnect_peer(peer_id);
2015-12-26 15:47:07 +01:00
}
fn disconnect_peer(&mut self, peer_id: PeerId) {
self.to_disconnect.insert(peer_id);
2016-02-02 14:54:46 +01:00
}
2016-06-17 18:26:54 +02:00
fn is_expired(&self) -> bool {
false
}
fn respond(&mut self, packet_id: PacketId, data: Vec<u8>) -> Result<(), NetworkError> {
2016-12-11 12:32:01 +01:00
self.packets.push(TestPacket {
2015-12-26 15:47:07 +01:00
data: data,
packet_id: packet_id,
recipient: self.sender.unwrap()
});
Ok(())
}
fn send(&mut self, peer_id: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), NetworkError> {
2016-12-11 12:32:01 +01:00
self.packets.push(TestPacket {
2015-12-26 15:47:07 +01:00
data: data,
packet_id: packet_id,
recipient: peer_id,
});
Ok(())
}
fn send_protocol(&mut self, _protocol: ProtocolId, peer_id: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), NetworkError> {
self.send(peer_id, packet_id, data)
}
2016-05-31 22:24:32 +02:00
fn chain(&self) -> &BlockChainClient {
2016-12-11 12:32:01 +01:00
&*self.chain
2015-12-26 15:47:07 +01:00
}
fn snapshot_service(&self) -> &SnapshotService {
self.snapshot_service
}
fn peer_session_info(&self, _peer_id: PeerId) -> Option<SessionInfo> {
None
}
fn eth_protocol_version(&self, _peer: PeerId) -> u8 {
63
}
fn protocol_version(&self, protocol: &ProtocolId, peer_id: PeerId) -> u8 {
2016-12-06 19:23:15 +01:00
if protocol == &WARP_SYNC_PROTOCOL_ID { 2 } else { self.eth_protocol_version(peer_id) }
}
fn chain_overlay(&self) -> &RwLock<HashMap<BlockNumber, Bytes>> {
&self.overlay
}
2015-12-26 15:47:07 +01:00
}
2016-02-04 19:30:31 +01:00
pub struct TestPacket {
pub data: Bytes,
pub packet_id: PacketId,
pub recipient: PeerId,
2015-12-26 15:47:07 +01:00
}
2016-12-06 19:23:15 +01:00
pub struct TestPeer<C> where C: FlushingBlockChainClient {
2016-12-11 12:32:01 +01:00
pub chain: Arc<C>,
pub snapshot_service: Arc<TestSnapshotService>,
2016-06-20 17:28:48 +02:00
pub sync: RwLock<ChainSync>,
2016-12-06 19:23:15 +01:00
pub queue: RwLock<VecDeque<TestPacket>>,
2015-12-26 15:47:07 +01:00
}
2016-12-06 19:23:15 +01:00
pub struct TestNet<C> where C: FlushingBlockChainClient {
pub peers: Vec<Arc<TestPeer<C>>>,
2016-02-04 19:30:31 +01:00
pub started: bool,
2016-11-28 16:30:36 +01:00
pub disconnect_events: Vec<(PeerId, PeerId)>, //disconnected (initiated by, to)
2015-12-26 15:47:07 +01:00
}
2016-12-06 19:23:15 +01:00
impl TestNet<TestBlockChainClient> {
pub fn new(n: usize) -> TestNet<TestBlockChainClient> {
Self::new_with_config(n, SyncConfig::default())
2016-07-27 21:38:22 +02:00
}
2016-12-06 19:23:15 +01:00
pub fn new_with_fork(n: usize, fork: Option<(BlockNumber, H256)>) -> TestNet<TestBlockChainClient> {
let mut config = SyncConfig::default();
config.fork_block = fork;
Self::new_with_config(n, config)
}
2016-12-06 19:23:15 +01:00
pub fn new_with_config(n: usize, config: SyncConfig) -> TestNet<TestBlockChainClient> {
2015-12-26 15:47:07 +01:00
let mut net = TestNet {
peers: Vec::new(),
2016-02-03 21:42:30 +01:00
started: false,
2016-11-28 16:30:36 +01:00
disconnect_events: Vec::new(),
2015-12-26 15:47:07 +01:00
};
for _ in 0..n {
2016-05-16 14:41:41 +02:00
let chain = TestBlockChainClient::new();
let ss = Arc::new(TestSnapshotService::new());
let sync = ChainSync::new(config.clone(), &chain);
2016-12-06 19:23:15 +01:00
net.peers.push(Arc::new(TestPeer {
2016-06-20 17:28:48 +02:00
sync: RwLock::new(sync),
snapshot_service: ss,
2016-12-11 12:32:01 +01:00
chain: Arc::new(chain),
2016-12-06 19:23:15 +01:00
queue: RwLock::new(VecDeque::new()),
}));
2015-12-26 15:47:07 +01:00
}
net
}
2016-12-06 19:23:15 +01:00
}
impl TestNet<EthcoreClient> {
2016-12-11 12:32:01 +01:00
pub fn with_spec<F>(n: usize, config: SyncConfig, spec_factory: F) -> GuardedTempResult<TestNet<EthcoreClient>>
2016-12-06 19:23:15 +01:00
where F: Fn() -> Spec
{
let mut net = TestNet {
peers: Vec::new(),
started: false,
disconnect_events: Vec::new(),
};
let dir = devtools::RandomTempPath::new();
for _ in 0..n {
let mut client_dir = dir.as_path().clone();
client_dir.push(devtools::random_filename());
let db_config = DatabaseConfig::with_columns(NUM_COLUMNS);
let spec = spec_factory();
2016-12-11 12:32:01 +01:00
let client = EthcoreClient::new(
2016-12-06 19:23:15 +01:00
ClientConfig::default(),
&spec,
client_dir.as_path(),
Arc::new(Miner::with_spec(&spec)),
IoChannel::disconnected(),
&db_config
2016-12-11 12:32:01 +01:00
).unwrap();
2016-12-06 19:23:15 +01:00
let ss = Arc::new(TestSnapshotService::new());
2016-12-11 12:32:01 +01:00
let sync = ChainSync::new(config.clone(), &*client);
2016-12-06 19:23:15 +01:00
let peer = Arc::new(TestPeer {
sync: RwLock::new(sync),
snapshot_service: ss,
chain: client,
queue: RwLock::new(VecDeque::new()),
});
peer.chain.add_notify(peer.clone());
net.peers.push(peer);
}
GuardedTempResult::<TestNet<EthcoreClient>> {
_temp: dir,
result: Some(net)
}
}
}
2015-12-26 15:47:07 +01:00
2016-12-06 19:23:15 +01:00
impl<C> TestNet<C> where C: FlushingBlockChainClient {
pub fn peer(&self, i: usize) -> &TestPeer<C> {
2016-11-28 13:20:49 +01:00
&self.peers[i]
2015-12-27 00:48:03 +01:00
}
2016-12-06 19:23:15 +01:00
pub fn peer_mut(&mut self, i: usize) -> &mut TestPeer<C> {
Arc::get_mut(&mut self.peers[i]).unwrap()
2015-12-26 15:47:07 +01:00
}
pub fn start(&mut self) {
2016-12-13 13:23:10 +01:00
if self.started {
return;
}
2015-12-26 15:47:07 +01:00
for peer in 0..self.peers.len() {
for client in 0..self.peers.len() {
if peer != client {
2016-12-06 19:23:15 +01:00
let p = &self.peers[peer];
2016-12-11 12:32:01 +01:00
p.sync.write().update_targets(&*p.chain);
p.sync.write().on_peer_connected(&mut TestIo::new(&*p.chain, &p.snapshot_service, &p.queue, Some(client as PeerId)), client as PeerId);
2015-12-26 15:47:07 +01:00
}
}
}
2016-12-13 13:23:10 +01:00
self.started = true;
2015-12-26 15:47:07 +01:00
}
pub fn sync_step(&mut self) {
for peer in 0..self.peers.len() {
2016-12-12 19:44:24 +01:00
self.peers[peer].chain.flush();
2016-12-06 19:23:15 +01:00
let packet = self.peers[peer].queue.write().pop_front();
if let Some(packet) = packet {
let disconnecting = {
2016-12-06 19:23:15 +01:00
let p = &self.peers[packet.recipient];
trace!("--- {} -> {} ---", peer, packet.recipient);
let to_disconnect = {
2016-12-11 12:32:01 +01:00
let mut io = TestIo::new(&*p.chain, &p.snapshot_service, &p.queue, Some(peer as PeerId));
ChainSync::dispatch_packet(&p.sync, &mut io, peer as PeerId, packet.packet_id, &packet.data);
2016-12-12 19:44:24 +01:00
p.chain.flush();
2016-12-11 12:32:01 +01:00
io.to_disconnect.clone()
};
for d in &to_disconnect {
// notify this that disconnecting peers are disconnecting
2016-12-11 12:32:01 +01:00
let mut io = TestIo::new(&*p.chain, &p.snapshot_service, &p.queue, Some(*d));
p.sync.write().on_peer_aborting(&mut io, *d);
2016-11-28 16:30:36 +01:00
self.disconnect_events.push((peer, *d));
}
to_disconnect
};
for d in &disconnecting {
// notify other peers that this peer is disconnecting
2016-12-06 19:23:15 +01:00
let p = &self.peers[*d];
2016-12-11 12:32:01 +01:00
let mut io = TestIo::new(&*p.chain, &p.snapshot_service, &p.queue, Some(peer as PeerId));
p.sync.write().on_peer_aborting(&mut io, peer as PeerId);
}
2015-12-26 15:47:07 +01:00
}
self.sync_step_peer(peer);
2015-12-26 15:47:07 +01:00
}
}
2016-02-05 18:34:08 +01:00
pub fn sync_step_peer(&mut self, peer_num: usize) {
2016-12-06 19:23:15 +01:00
let peer = self.peer(peer_num);
peer.chain.flush();
2016-12-11 12:32:01 +01:00
peer.sync.write().maintain_peers(&mut TestIo::new(&*peer.chain, &peer.snapshot_service, &peer.queue, None));
peer.sync.write().maintain_sync(&mut TestIo::new(&*peer.chain, &peer.snapshot_service, &peer.queue, None));
peer.sync.write().propagate_new_transactions(&mut TestIo::new(&*peer.chain, &peer.snapshot_service, &peer.queue, None));
2016-02-05 18:34:08 +01:00
}
2016-02-03 21:42:30 +01:00
pub fn restart_peer(&mut self, i: usize) {
2016-12-06 19:23:15 +01:00
let peer = self.peer(i);
2016-12-11 12:32:01 +01:00
peer.sync.write().restart(&mut TestIo::new(&*peer.chain, &peer.snapshot_service, &peer.queue, None));
2016-02-03 21:42:30 +01:00
}
pub fn sync(&mut self) -> u32 {
2015-12-26 15:47:07 +01:00
self.start();
2016-02-03 21:42:30 +01:00
let mut total_steps = 0;
2015-12-26 15:47:07 +01:00
while !self.done() {
2016-02-03 21:42:30 +01:00
self.sync_step();
2016-05-17 10:32:05 +02:00
total_steps += 1;
2016-02-03 21:42:30 +01:00
}
total_steps
}
pub fn sync_steps(&mut self, count: usize) {
2016-12-13 13:23:10 +01:00
self.start();
2016-02-03 21:42:30 +01:00
for _ in 0..count {
self.sync_step();
2015-12-26 15:47:07 +01:00
}
}
pub fn done(&self) -> bool {
2016-12-06 19:23:15 +01:00
self.peers.iter().all(|p| p.queue.read().is_empty())
2015-12-26 15:47:07 +01:00
}
2016-02-07 01:00:43 +01:00
pub fn trigger_chain_new_blocks(&mut self, peer_id: usize) {
2016-12-06 19:23:15 +01:00
let peer = self.peer(peer_id);
2016-12-11 12:32:01 +01:00
peer.sync.write().chain_new_blocks(&mut TestIo::new(&*peer.chain, &peer.snapshot_service, &peer.queue, None), &[], &[], &[], &[], &[], &[]);
2016-02-07 01:00:43 +01:00
}
2015-12-26 15:47:07 +01:00
}
2016-12-06 19:23:15 +01:00
impl ChainNotify for TestPeer<EthcoreClient> {
fn new_blocks(&self,
imported: Vec<H256>,
invalid: Vec<H256>,
enacted: Vec<H256>,
retracted: Vec<H256>,
sealed: Vec<H256>,
proposed: Vec<Bytes>,
_duration: u64)
{
2016-12-11 12:32:01 +01:00
let mut io = TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, None);
2016-12-06 19:23:15 +01:00
self.sync.write().chain_new_blocks(
&mut io,
&imported,
&invalid,
&enacted,
&retracted,
&sealed,
&proposed);
}
fn start(&self) {}
fn stop(&self) {}
fn broadcast(&self, message: Vec<u8>) {
2016-12-11 12:32:01 +01:00
let mut io = TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, None);
2016-12-06 19:23:15 +01:00
self.sync.write().propagate_consensus_packet(&mut io, message.clone());
}
}