Implement eth/64, remove eth/62 (#46)

Co-authored-by: Artem Vorotnikov <artem@vorotnikov.me>
This commit is contained in:
adria0.eth
2020-09-21 14:48:14 +02:00
committed by GitHub
parent b54ddd027d
commit 1c82a0733f
18 changed files with 2726 additions and 2202 deletions

View File

@@ -22,7 +22,7 @@ use network::{
NonReservedPeerMode, PeerId, ProtocolId,
};
use std::{
collections::{BTreeMap, HashMap},
collections::{BTreeMap, BTreeSet, HashMap},
io,
ops::RangeInclusive,
sync::{atomic, mpsc, Arc},
@@ -30,8 +30,9 @@ use std::{
};
use chain::{
ChainSyncApi, SyncState, SyncStatus as EthSyncStatus, ETH_PROTOCOL_VERSION_62,
ETH_PROTOCOL_VERSION_63, PAR_PROTOCOL_VERSION_1, PAR_PROTOCOL_VERSION_2,
fork_filter::ForkFilterApi, ChainSyncApi, SyncState, SyncStatus as EthSyncStatus,
ETH_PROTOCOL_VERSION_63, ETH_PROTOCOL_VERSION_64, PAR_PROTOCOL_VERSION_1,
PAR_PROTOCOL_VERSION_2,
};
use ethcore::{
client::{BlockChainClient, ChainMessageType, ChainNotify, NewBlocks},
@@ -215,6 +216,8 @@ pub struct Params {
pub config: SyncConfig,
/// Blockchain client.
pub chain: Arc<dyn BlockChainClient>,
/// Forks.
pub forks: BTreeSet<BlockNumber>,
/// Snapshot service.
pub snapshot_service: Arc<dyn SnapshotService>,
/// Network layer configuration.
@@ -240,7 +243,14 @@ impl EthSync {
connection_filter: Option<Arc<dyn ConnectionFilter>>,
) -> Result<Arc<EthSync>, Error> {
let (priority_tasks_tx, priority_tasks_rx) = mpsc::channel();
let sync = ChainSyncApi::new(params.config, &*params.chain, priority_tasks_rx);
let fork_filter = ForkFilterApi::new(&*params.chain, params.forks);
let sync = ChainSyncApi::new(
params.config,
&*params.chain,
fork_filter,
priority_tasks_rx,
);
let service = NetworkService::new(
params.network_config.clone().into_basic()?,
connection_filter,
@@ -559,7 +569,7 @@ impl ChainNotify for EthSync {
.register_protocol(
self.eth_handler.clone(),
self.subprotocol_name,
&[ETH_PROTOCOL_VERSION_62, ETH_PROTOCOL_VERSION_63],
&[ETH_PROTOCOL_VERSION_63, ETH_PROTOCOL_VERSION_64],
)
.unwrap_or_else(|e| warn!("Error registering ethereum protocol: {:?}", e));
// register the warp sync subprotocol

View File

@@ -99,6 +99,12 @@ pub enum BlockDownloaderImportError {
Useless,
}
impl From<rlp04::DecoderError> for BlockDownloaderImportError {
fn from(_: rlp04::DecoderError) -> BlockDownloaderImportError {
BlockDownloaderImportError::Invalid
}
}
impl From<rlp::DecoderError> for BlockDownloaderImportError {
fn from(_: rlp::DecoderError) -> BlockDownloaderImportError {
BlockDownloaderImportError::Invalid

View File

@@ -0,0 +1,114 @@
//! This module contains a wrapper that connects this codebase with `ethereum-forkid` crate which provides `FORK_ID`
//! to support Ethereum network protocol, version 64 and above.
// Re-export ethereum-forkid crate contents here.
pub use ethereum_forkid::{BlockNumber, ForkId, RejectReason};
use ethcore::client::ChainInfo;
use ethereum_forkid::ForkFilter;
/// Wrapper around fork filter that provides integration with `ForkFilter`.
pub struct ForkFilterApi {
inner: ForkFilter,
}
impl ForkFilterApi {
/// Create `ForkFilterApi` from `ChainInfo` and an `Iterator` over the hard forks.
pub fn new<C: ?Sized + ChainInfo, I: IntoIterator<Item = BlockNumber>>(
client: &C,
forks: I,
) -> Self {
let chain_info = client.chain_info();
let genesis_hash = primitive_types07::H256::from_slice(&chain_info.genesis_hash.0);
Self {
inner: ForkFilter::new(chain_info.best_block_number, genesis_hash, forks),
}
}
#[cfg(test)]
/// Dummy version of ForkFilterApi with no forks.
pub fn new_dummy<C: ?Sized + ChainInfo>(client: &C) -> Self {
let chain_info = client.chain_info();
Self {
inner: ForkFilter::new(
chain_info.best_block_number,
primitive_types07::H256::from_slice(&chain_info.genesis_hash.0),
vec![],
),
}
}
fn update_head<C: ?Sized + ChainInfo>(&mut self, client: &C) {
self.inner.set_head(client.chain_info().best_block_number);
}
/// Wrapper for `ForkFilter::current`
pub fn current<C: ?Sized + ChainInfo>(&mut self, client: &C) -> ForkId {
self.update_head(client);
self.inner.current()
}
/// Wrapper for `ForkFilter::is_compatible`
pub fn is_compatible<C: ?Sized + ChainInfo>(
&mut self,
client: &C,
fork_id: ForkId,
) -> Result<(), RejectReason> {
self.update_head(client);
self.inner.is_compatible(fork_id)
}
}
#[cfg(test)]
mod tests {
use super::*;
use ethcore::{client::TestBlockChainClient, ethereum, spec::Spec};
fn test_spec<F: Fn() -> Spec>(spec_builder: F, forks: Vec<BlockNumber>) {
let spec = (spec_builder)();
let genesis_hash = spec.genesis_header().hash();
let spec_forks = spec.hard_forks.clone();
let client = TestBlockChainClient::new_with_spec(spec);
assert_eq!(
ForkFilterApi::new(&client, spec_forks).inner,
ForkFilter::new(
0,
primitive_types07::H256::from_slice(&genesis_hash.0),
forks
)
);
}
#[test]
fn ethereum_spec() {
test_spec(
|| ethereum::new_foundation(&String::new()),
vec![
1_150_000, 1_920_000, 2_463_000, 2_675_000, 4_370_000, 7_280_000, 9_069_000,
9_200_000,
],
)
}
#[test]
fn ropsten_spec() {
test_spec(
|| ethereum::new_ropsten(&String::new()),
vec![10, 1_700_000, 4_230_000, 4_939_394, 6_485_846, 7_117_117],
)
}
#[test]
fn rinkeby_spec() {
test_spec(
|| ethereum::new_rinkeby(&String::new()),
vec![1, 2, 3, 1_035_301, 3_660_663, 4_321_234, 5_435_345],
)
}
#[test]
fn goerli_spec() {
test_spec(|| ethereum::new_goerli(&String::new()), vec![1_561_651])
}
}

View File

@@ -14,7 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Parity Ethereum. If not, see <http://www.gnu.org/licenses/>.
use api::PAR_PROTOCOL;
use api::{ETH_PROTOCOL, PAR_PROTOCOL};
use block_sync::{BlockDownloaderImportError as DownloaderImportError, DownloadAction};
use bytes::Bytes;
use enum_primitive::FromPrimitive;
@@ -42,7 +42,7 @@ use super::sync_packet::{
use super::{
BlockSet, ChainSync, ForkConfirmation, PacketProcessError, PeerAsking, PeerInfo, SyncRequester,
SyncState, ETH_PROTOCOL_VERSION_62, ETH_PROTOCOL_VERSION_63, MAX_NEW_BLOCK_AGE, MAX_NEW_HASHES,
SyncState, ETH_PROTOCOL_VERSION_63, ETH_PROTOCOL_VERSION_64, MAX_NEW_BLOCK_AGE, MAX_NEW_HASHES,
PAR_PROTOCOL_VERSION_1, PAR_PROTOCOL_VERSION_2,
};
@@ -659,16 +659,72 @@ impl SyncHandler {
peer_id: PeerId,
r: &Rlp,
) -> Result<(), DownloaderImportError> {
let mut r_iter = r.iter();
sync.handshaking_peers.remove(&peer_id);
let protocol_version: u8 = r.val_at(0)?;
let protocol_version: u8 = r_iter
.next()
.ok_or(rlp::DecoderError::RlpIsTooShort)?
.as_val()?;
let eth_protocol_version = io.protocol_version(&ETH_PROTOCOL, peer_id);
let warp_protocol_version = io.protocol_version(&PAR_PROTOCOL, peer_id);
let warp_protocol = warp_protocol_version != 0;
let network_id = r_iter
.next()
.ok_or(rlp::DecoderError::RlpIsTooShort)?
.as_val()?;
let difficulty = Some(
r_iter
.next()
.ok_or(rlp::DecoderError::RlpIsTooShort)?
.as_val()?,
);
let latest_hash = r_iter
.next()
.ok_or(rlp::DecoderError::RlpIsTooShort)?
.as_val()?;
let genesis = r_iter
.next()
.ok_or(rlp::DecoderError::RlpIsTooShort)?
.as_val()?;
let forkid_validation_error = {
if eth_protocol_version >= ETH_PROTOCOL_VERSION_64.0 {
let fork_id = rlp04::Rlp::new(r.as_raw()).val_at(5)?;
r_iter.next().ok_or(rlp::DecoderError::RlpIsTooShort)?;
sync.fork_filter
.is_compatible(io.chain(), fork_id)
.err()
.map(|e| (fork_id, e))
} else {
None
}
};
let snapshot_hash = if warp_protocol {
Some(
r_iter
.next()
.ok_or(rlp::DecoderError::RlpIsTooShort)?
.as_val()?,
)
} else {
None
};
let snapshot_number = if warp_protocol {
Some(
r_iter
.next()
.ok_or(rlp::DecoderError::RlpIsTooShort)?
.as_val()?,
)
} else {
None
};
let peer = PeerInfo {
protocol_version: protocol_version,
network_id: r.val_at(1)?,
difficulty: Some(r.val_at(2)?),
latest_hash: r.val_at(3)?,
genesis: r.val_at(4)?,
protocol_version,
network_id,
difficulty,
latest_hash,
genesis,
asking: PeerAsking::Nothing,
asking_blocks: Vec::new(),
asking_hash: None,
@@ -681,16 +737,8 @@ impl SyncHandler {
ForkConfirmation::Unconfirmed
},
asking_snapshot_data: None,
snapshot_hash: if warp_protocol {
Some(r.val_at(5)?)
} else {
None
},
snapshot_number: if warp_protocol {
Some(r.val_at(6)?)
} else {
None
},
snapshot_hash,
snapshot_number,
block_set: None,
client_version: ClientVersion::from(io.peer_version(peer_id)),
};
@@ -729,13 +777,18 @@ impl SyncHandler {
return Err(DownloaderImportError::Invalid);
}
if let Some((fork_id, reason)) = forkid_validation_error {
trace!(target: "sync", "Peer {} incompatible fork id (fork id: {:#x}/{}, error: {:?})", peer_id, fork_id.hash.0, fork_id.next, reason);
return Err(DownloaderImportError::Invalid);
}
if false
|| (warp_protocol
&& (peer.protocol_version < PAR_PROTOCOL_VERSION_1.0
|| peer.protocol_version > PAR_PROTOCOL_VERSION_2.0))
|| (!warp_protocol
&& (peer.protocol_version < ETH_PROTOCOL_VERSION_62.0
|| peer.protocol_version > ETH_PROTOCOL_VERSION_63.0))
&& (peer.protocol_version < ETH_PROTOCOL_VERSION_63.0
|| peer.protocol_version > ETH_PROTOCOL_VERSION_64.0))
{
trace!(target: "sync", "Peer {} unsupported eth protocol ({})", peer_id, peer.protocol_version);
return Err(DownloaderImportError::Invalid);

View File

@@ -16,7 +16,7 @@
//! `BlockChain` synchronization strategy.
//! Syncs to peers and keeps up to date.
//! This implementation uses ethereum protocol v63
//! This implementation uses ethereum protocol v63/v64
//!
//! Syncing strategy summary.
//! Split the chain into ranges of N blocks each. Download ranges sequentially. Split each range into subchains of M blocks. Download subchains in parallel.
@@ -87,14 +87,16 @@
//!
//! All other messages are ignored.
pub mod fork_filter;
mod handler;
mod propagator;
mod requester;
mod supplier;
pub mod sync_packet;
pub use self::fork_filter::ForkFilterApi;
use super::{SyncConfig, WarpSync};
use api::{EthProtocolInfo as PeerInfoDigest, PriorityTask, PAR_PROTOCOL};
use api::{EthProtocolInfo as PeerInfoDigest, PriorityTask, ETH_PROTOCOL, PAR_PROTOCOL};
use block_sync::{BlockDownloader, DownloadAction};
use bytes::Bytes;
use derive_more::Display;
@@ -151,10 +153,10 @@ impl From<DecoderError> for PacketProcessError {
}
}
/// 64 version of Ethereum protocol.
pub const ETH_PROTOCOL_VERSION_64: (u8, u8) = (64, 0x11);
/// 63 version of Ethereum protocol.
pub const ETH_PROTOCOL_VERSION_63: (u8, u8) = (63, 0x11);
/// 62 version of Ethereum protocol.
pub const ETH_PROTOCOL_VERSION_62: (u8, u8) = (62, 0x11);
/// 1 version of Parity protocol and the packet count.
pub const PAR_PROTOCOL_VERSION_1: (u8, u8) = (1, 0x15);
/// 2 version of Parity protocol (consensus messages added).
@@ -407,10 +409,11 @@ impl ChainSyncApi {
pub fn new(
config: SyncConfig,
chain: &dyn BlockChainClient,
fork_filter: ForkFilterApi,
priority_tasks: mpsc::Receiver<PriorityTask>,
) -> Self {
ChainSyncApi {
sync: RwLock::new(ChainSync::new(config, chain)),
sync: RwLock::new(ChainSync::new(config, chain, fork_filter)),
priority_tasks: Mutex::new(priority_tasks),
}
}
@@ -660,6 +663,8 @@ pub struct ChainSync {
network_id: u64,
/// Optional fork block to check
fork_block: Option<(BlockNumber, H256)>,
/// Fork filter
fork_filter: ForkFilterApi,
/// Snapshot downloader.
snapshot: Snapshot,
/// Connected peers pending Status message.
@@ -680,8 +685,11 @@ pub struct ChainSync {
}
impl ChainSync {
/// Create a new instance of syncing strategy.
pub fn new(config: SyncConfig, chain: &dyn BlockChainClient) -> Self {
pub fn new(
config: SyncConfig,
chain: &dyn BlockChainClient,
fork_filter: ForkFilterApi,
) -> Self {
let chain_info = chain.chain_info();
let best_block = chain.chain_info().best_block_number;
let state = Self::get_init_state(config.warp_sync, chain);
@@ -704,6 +712,7 @@ impl ChainSync {
last_sent_block_number: 0,
network_id: config.network_id,
fork_block: config.fork_block,
fork_filter,
download_old_blocks: config.download_old_blocks,
snapshot: Snapshot::new(),
sync_start_time: None,
@@ -725,7 +734,7 @@ impl ChainSync {
SyncStatus {
state: self.state.clone(),
protocol_version: ETH_PROTOCOL_VERSION_63.0,
protocol_version: ETH_PROTOCOL_VERSION_64.0,
network_id: self.network_id,
start_block_number: self.starting_block,
last_imported_block_number: Some(last_imported_number),
@@ -1255,30 +1264,35 @@ impl ChainSync {
/// Send Status message
fn send_status(&mut self, io: &mut dyn SyncIo, peer: PeerId) -> Result<(), network::Error> {
let eth_protocol_version = io.protocol_version(&ETH_PROTOCOL, peer);
let warp_protocol_version = io.protocol_version(&PAR_PROTOCOL, peer);
let warp_protocol = warp_protocol_version != 0;
let protocol = if warp_protocol {
warp_protocol_version
} else {
ETH_PROTOCOL_VERSION_63.0
eth_protocol_version
};
trace!(target: "sync", "Sending status to {}, protocol version {}", peer, protocol);
let mut packet = RlpStream::new();
let mut packet = rlp04::RlpStream::new();
packet.begin_unbounded_list();
let chain = io.chain().chain_info();
packet.append(&(protocol as u32));
packet.append(&self.network_id);
packet.append(&chain.total_difficulty);
packet.append(&chain.best_block_hash);
packet.append(&chain.genesis_hash);
packet.append(&primitive_types07::U256(chain.total_difficulty.0));
packet.append(&primitive_types07::H256(chain.best_block_hash.0));
packet.append(&primitive_types07::H256(chain.genesis_hash.0));
if eth_protocol_version >= ETH_PROTOCOL_VERSION_64.0 {
packet.append(&self.fork_filter.current(io.chain()));
}
if warp_protocol {
let manifest = io.snapshot_service().manifest();
let block_number = manifest.as_ref().map_or(0, |m| m.block_number);
let manifest_hash = manifest.map_or(H256::new(), |m| keccak(m.into_rlp()));
packet.append(&manifest_hash);
packet.append(&primitive_types07::H256(manifest_hash.0));
packet.append(&block_number);
}
packet.complete_unbounded_list();
packet.finalize_unbounded_list();
io.respond(StatusPacket.id(), packet.out())
}
@@ -1577,7 +1591,11 @@ pub mod tests {
peer_latest_hash: H256,
client: &dyn BlockChainClient,
) -> ChainSync {
let mut sync = ChainSync::new(SyncConfig::default(), client);
let mut sync = ChainSync::new(
SyncConfig::default(),
client,
ForkFilterApi::new_dummy(client),
);
insert_dummy_peer(&mut sync, 0, peer_latest_hash);
sync
}

View File

@@ -450,7 +450,11 @@ mod tests {
client.add_blocks(2, EachBlockWith::Uncle);
let queue = RwLock::new(VecDeque::new());
let block = client.block(BlockId::Latest).unwrap().into_inner();
let mut sync = ChainSync::new(SyncConfig::default(), &client);
let mut sync = ChainSync::new(
SyncConfig::default(),
&client,
ForkFilterApi::new_dummy(&client),
);
sync.peers.insert(
0,
PeerInfo {
@@ -540,7 +544,11 @@ mod tests {
client.add_blocks(100, EachBlockWith::Uncle);
client.insert_transaction_to_queue();
// Sync with no peers
let mut sync = ChainSync::new(SyncConfig::default(), &client);
let mut sync = ChainSync::new(
SyncConfig::default(),
&client,
ForkFilterApi::new_dummy(&client),
);
let queue = RwLock::new(VecDeque::new());
let ss = TestSnapshotService::new();
let mut io = TestIo::new(&mut client, &ss, &queue, None);
@@ -617,7 +625,11 @@ mod tests {
let mut client = TestBlockChainClient::new();
client.insert_transaction_with_gas_price_to_queue(U256::zero());
let block_hash = client.block_hash_delta_minus(1);
let mut sync = ChainSync::new(SyncConfig::default(), &client);
let mut sync = ChainSync::new(
SyncConfig::default(),
&client,
ForkFilterApi::new_dummy(&client),
);
let queue = RwLock::new(VecDeque::new());
let ss = TestSnapshotService::new();
let mut io = TestIo::new(&mut client, &ss, &queue, None);
@@ -655,7 +667,11 @@ mod tests {
let tx1_hash = client.insert_transaction_to_queue();
let tx2_hash = client.insert_transaction_with_gas_price_to_queue(U256::zero());
let block_hash = client.block_hash_delta_minus(1);
let mut sync = ChainSync::new(SyncConfig::default(), &client);
let mut sync = ChainSync::new(
SyncConfig::default(),
&client,
ForkFilterApi::new_dummy(&client),
);
let queue = RwLock::new(VecDeque::new());
let ss = TestSnapshotService::new();
let mut io = TestIo::new(&mut client, &ss, &queue, None);

View File

@@ -27,6 +27,7 @@ extern crate ethcore;
extern crate ethcore_io as io;
extern crate ethcore_network as network;
extern crate ethcore_network_devp2p as devp2p;
extern crate ethereum_forkid;
extern crate ethereum_types;
extern crate ethkey;
extern crate ethstore;
@@ -34,8 +35,10 @@ extern crate fastmap;
extern crate keccak_hash as hash;
extern crate parity_bytes as bytes;
extern crate parking_lot;
extern crate primitive_types07;
extern crate rand;
extern crate rlp;
extern crate rlp04;
extern crate stats;
extern crate triehash_ethereum;

View File

@@ -18,7 +18,7 @@ use api::PAR_PROTOCOL;
use bytes::Bytes;
use chain::{
sync_packet::{PacketInfo, SyncPacket},
ChainSync, SyncSupplier, ETH_PROTOCOL_VERSION_63, PAR_PROTOCOL_VERSION_2,
ChainSync, ForkFilterApi, SyncSupplier, ETH_PROTOCOL_VERSION_64, PAR_PROTOCOL_VERSION_2,
};
use ethcore::{
client::{
@@ -30,6 +30,7 @@ use ethcore::{
spec::Spec,
test_helpers,
};
use ethereum_types::H256;
use io::{IoChannel, IoContext, IoHandler};
use network::{self, client_version::ClientVersion, PacketId, PeerId, ProtocolId, SessionInfo};
@@ -168,7 +169,7 @@ where
}
fn eth_protocol_version(&self, _peer: PeerId) -> u8 {
ETH_PROTOCOL_VERSION_63.0
ETH_PROTOCOL_VERSION_64.0
}
fn protocol_version(&self, protocol: &ProtocolId, peer_id: PeerId) -> u8 {
@@ -405,7 +406,7 @@ impl TestNet<EthPeer<TestBlockChainClient>> {
for _ in 0..n {
let chain = TestBlockChainClient::new();
let ss = Arc::new(TestSnapshotService::new());
let sync = ChainSync::new(config.clone(), &chain);
let sync = ChainSync::new(config.clone(), &chain, ForkFilterApi::new_dummy(&chain));
net.peers.push(Arc::new(EthPeer {
sync: RwLock::new(sync),
snapshot_service: ss,
@@ -454,7 +455,7 @@ impl TestNet<EthPeer<EthcoreClient>> {
.unwrap();
let ss = Arc::new(TestSnapshotService::new());
let sync = ChainSync::new(config, &*client);
let sync = ChainSync::new(config, &*client, ForkFilterApi::new_dummy(&*client));
let peer = Arc::new(EthPeer {
sync: RwLock::new(sync),
snapshot_service: ss,