From 38e40f649cd630851fd21ee4f1bb47524d04b96d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jochen=20M=C3=BCller?= Date: Mon, 5 Jul 2021 17:06:35 +0200 Subject: [PATCH] Restore GetNodeData (#469) * Accept GetNodeData requests * Implement blockchain client method for node data requests * Reuse old database read methods for node data * fmt * Copy & paste old tests... * ... and make them work * fmt --- crates/db/journaldb/src/archivedb.rs | 27 +++++++ crates/db/journaldb/src/earlymergedb.rs | 7 ++ crates/db/journaldb/src/overlayrecentdb.rs | 21 +++++ crates/db/journaldb/src/refcounteddb.rs | 8 ++ crates/db/journaldb/src/traits.rs | 4 + crates/ethcore/src/client/client.rs | 4 + crates/ethcore/src/client/test_client.rs | 12 +++ crates/ethcore/src/client/traits.rs | 3 + crates/ethcore/src/tests/client.rs | 9 +++ crates/ethcore/sync/src/chain/mod.rs | 1 + crates/ethcore/sync/src/chain/supplier.rs | 83 ++++++++++++++++++++ crates/ethcore/sync/src/chain/sync_packet.rs | 11 +-- 12 files changed, 185 insertions(+), 5 deletions(-) diff --git a/crates/db/journaldb/src/archivedb.rs b/crates/db/journaldb/src/archivedb.rs index 5d4ddfe44..65fbf2593 100644 --- a/crates/db/journaldb/src/archivedb.rs +++ b/crates/db/journaldb/src/archivedb.rs @@ -25,12 +25,14 @@ use std::{ use super::{ error_key_already_exists, error_negatively_reference_hash, memory_db::*, LATEST_ERA_KEY, }; +use bytes::Bytes; use ethcore_db::{DBTransaction, DBValue, KeyValueDB}; use ethereum_types::H256; use hash_db::HashDB; use keccak_hasher::KeccakHasher; use rlp::{decode, encode}; use traits::JournalDB; +use DB_PREFIX_LEN; /// Implementation of the `HashDB` trait for a disk-backed database with a memory overlay /// and latent-removal semantics. @@ -214,12 +216,19 @@ impl JournalDB for ArchiveDB { fn consolidate(&mut self, with: MemoryDB) { self.overlay.consolidate(with); } + + fn state(&self, id: &H256) -> Option { + self.backing + .get_by_prefix(self.column, &id[0..DB_PREFIX_LEN]) + .map(|b| b.into_vec()) + } } #[cfg(test)] mod tests { use super::*; + use ethcore_db::InMemoryWithMetrics; use hash_db::HashDB; use keccak::keccak; use JournalDB; @@ -497,4 +506,22 @@ mod tests { assert!(jdb.get(&key).is_none()); } + + #[test] + fn returns_state() { + let shared_db = Arc::new(InMemoryWithMetrics::create(0)); + + let key = { + let mut jdb = ArchiveDB::new(shared_db.clone(), None); + let key = jdb.insert(b"foo"); + jdb.commit_batch(0, &keccak(b"0"), None).unwrap(); + key + }; + + { + let jdb = ArchiveDB::new(shared_db, None); + let state = jdb.state(&key); + assert!(state.is_some()); + } + } } diff --git a/crates/db/journaldb/src/earlymergedb.rs b/crates/db/journaldb/src/earlymergedb.rs index 4455ff43e..80b6e07a4 100644 --- a/crates/db/journaldb/src/earlymergedb.rs +++ b/crates/db/journaldb/src/earlymergedb.rs @@ -35,6 +35,7 @@ use parity_util_mem::MallocSizeOf; use parking_lot::RwLock; use rlp::{decode, encode}; use util::{DatabaseKey, DatabaseValueRef, DatabaseValueView}; +use DB_PREFIX_LEN; #[derive(Debug, Clone, PartialEq, Eq, MallocSizeOf)] struct RefInfo { @@ -608,6 +609,12 @@ impl JournalDB for EarlyMergeDB { fn consolidate(&mut self, with: MemoryDB) { self.overlay.consolidate(with); } + + fn state(&self, id: &H256) -> Option { + self.backing + .get_by_prefix(self.column, &id[0..DB_PREFIX_LEN]) + .map(|b| b.into_vec()) + } } #[cfg(test)] diff --git a/crates/db/journaldb/src/overlayrecentdb.rs b/crates/db/journaldb/src/overlayrecentdb.rs index cd7eb171a..60c88957c 100644 --- a/crates/db/journaldb/src/overlayrecentdb.rs +++ b/crates/db/journaldb/src/overlayrecentdb.rs @@ -23,6 +23,7 @@ use std::{ }; use super::{error_negatively_reference_hash, JournalDB, DB_PREFIX_LEN, LATEST_ERA_KEY}; +use bytes::Bytes; use ethcore_db::{DBTransaction, DBValue, KeyValueDB}; use ethereum_types::H256; use fastmap::H256FastMap; @@ -509,6 +510,26 @@ impl JournalDB for OverlayRecentDB { fn consolidate(&mut self, with: MemoryDB) { self.transaction_overlay.consolidate(with); } + + fn state(&self, key: &H256) -> Option { + let journal_overlay = self.journal_overlay.read(); + let key = to_short_key(key); + journal_overlay + .backing_overlay + .get(&key) + .map(|v| v.into_vec()) + .or_else(|| { + journal_overlay + .pending_overlay + .get(&key) + .map(|d| d.clone().into_vec()) + }) + .or_else(|| { + self.backing + .get_by_prefix(self.column, &key[0..DB_PREFIX_LEN]) + .map(|b| b.into_vec()) + }) + } } impl HashDB for OverlayRecentDB { diff --git a/crates/db/journaldb/src/refcounteddb.rs b/crates/db/journaldb/src/refcounteddb.rs index 15b353c6e..3ba75e717 100644 --- a/crates/db/journaldb/src/refcounteddb.rs +++ b/crates/db/journaldb/src/refcounteddb.rs @@ -23,6 +23,7 @@ use std::{ }; use super::{traits::JournalDB, LATEST_ERA_KEY}; +use bytes::Bytes; use ethcore_db::{DBTransaction, DBValue, KeyValueDB}; use ethereum_types::H256; use hash_db::HashDB; @@ -32,6 +33,7 @@ use overlaydb::OverlayDB; use parity_util_mem::{allocators::new_malloc_size_ops, MallocSizeOf}; use rlp::{decode, encode}; use util::{DatabaseKey, DatabaseValueRef, DatabaseValueView}; +use DB_PREFIX_LEN; /// Implementation of the `HashDB` trait for a disk-backed database with a memory overlay /// and latent-removal semantics. @@ -245,6 +247,12 @@ impl JournalDB for RefCountedDB { } } } + + fn state(&self, id: &H256) -> Option { + self.backing + .get_by_prefix(self.column, &id[0..DB_PREFIX_LEN]) + .map(|b| b.into_vec()) + } } #[cfg(test)] diff --git a/crates/db/journaldb/src/traits.rs b/crates/db/journaldb/src/traits.rs index ecd2d556e..7e8edd815 100644 --- a/crates/db/journaldb/src/traits.rs +++ b/crates/db/journaldb/src/traits.rs @@ -18,6 +18,7 @@ use std::{io, sync::Arc}; +use bytes::Bytes; use ethcore_db::{DBTransaction, DBValue, KeyValueDB}; use ethereum_types::H256; use hash_db::{AsHashDB, HashDB}; @@ -95,6 +96,9 @@ pub trait JournalDB: KeyedHashDB { /// Consolidate all the insertions and deletions in the given memory overlay. fn consolidate(&mut self, overlay: ::memory_db::MemoryDB); + /// State data query + fn state(&self, id: &H256) -> Option; + /// Commit all changes in a single batch #[cfg(test)] fn commit_batch(&mut self, now: u64, id: &H256, end: Option<(u64, H256)>) -> io::Result { diff --git a/crates/ethcore/src/client/client.rs b/crates/ethcore/src/client/client.rs index b1210a27f..7a64fc96f 100644 --- a/crates/ethcore/src/client/client.rs +++ b/crates/ethcore/src/client/client.rs @@ -2776,6 +2776,10 @@ impl BlockChainClient for Client { fn registrar_address(&self) -> Option
{ self.registrar_address.clone() } + + fn state_data(&self, hash: &H256) -> Option { + self.state_db.read().journal_db().state(hash) + } } impl IoClient for Client { diff --git a/crates/ethcore/src/client/test_client.rs b/crates/ethcore/src/client/test_client.rs index 6924e1ca7..fdf7bd427 100644 --- a/crates/ethcore/src/client/test_client.rs +++ b/crates/ethcore/src/client/test_client.rs @@ -1078,6 +1078,18 @@ impl BlockChainClient for TestBlockChainClient { fn registrar_address(&self) -> Option
{ None } + + fn state_data(&self, hash: &H256) -> Option { + let begins_with_f = + H256::from_str("f000000000000000000000000000000000000000000000000000000000000000") + .unwrap(); + if *hash > begins_with_f { + let mut rlp = RlpStream::new(); + rlp.append(&hash.clone()); + return Some(rlp.out()); + } + None + } } impl IoClient for TestBlockChainClient { diff --git a/crates/ethcore/src/client/traits.rs b/crates/ethcore/src/client/traits.rs index b9e7573a1..6d778c0ae 100644 --- a/crates/ethcore/src/client/traits.rs +++ b/crates/ethcore/src/client/traits.rs @@ -333,6 +333,9 @@ pub trait BlockChainClient: /// Get all possible uncle hashes for a block. fn find_uncles(&self, hash: &H256) -> Option>; + /// Get latest state node + fn state_data(&self, hash: &H256) -> Option; + /// Get block receipts data by block header hash. fn block_receipts(&self, hash: &H256) -> Option; diff --git a/crates/ethcore/src/tests/client.rs b/crates/ethcore/src/tests/client.rs index 777cec7ae..ac9554235 100644 --- a/crates/ethcore/src/tests/client.rs +++ b/crates/ethcore/src/tests/client.rs @@ -591,3 +591,12 @@ fn import_export_binary() { assert!(client.block_header(BlockId::Number(17)).is_some()); assert!(client.block_header(BlockId::Number(16)).is_some()); } + +#[test] +fn returns_state_root_basic() { + let client = generate_dummy_client(6); + let test_spec = Spec::new_test(); + let genesis_header = test_spec.genesis_header(); + + assert!(client.state_data(genesis_header.state_root()).is_some()); +} diff --git a/crates/ethcore/sync/src/chain/mod.rs b/crates/ethcore/sync/src/chain/mod.rs index 8e0cecb7a..e7ac93324 100644 --- a/crates/ethcore/sync/src/chain/mod.rs +++ b/crates/ethcore/sync/src/chain/mod.rs @@ -169,6 +169,7 @@ pub const PAR_PROTOCOL_VERSION_2: (u8, u8) = (2, 0x16); pub const MAX_BODIES_TO_SEND: usize = 256; pub const MAX_HEADERS_TO_SEND: usize = 512; +pub const MAX_NODE_DATA_TO_SEND: usize = 1024; pub const MAX_RECEIPTS_HEADERS_TO_SEND: usize = 256; pub const MAX_TRANSACTIONS_TO_REQUEST: usize = 256; const MIN_PEERS_PROPAGATION: usize = 4; diff --git a/crates/ethcore/sync/src/chain/supplier.rs b/crates/ethcore/sync/src/chain/supplier.rs index 5203b8e68..6349a351b 100644 --- a/crates/ethcore/sync/src/chain/supplier.rs +++ b/crates/ethcore/sync/src/chain/supplier.rs @@ -40,6 +40,7 @@ use super::{ ChainSync, PacketProcessError, RlpResponseResult, SyncHandler, MAX_BODIES_TO_SEND, MAX_HEADERS_TO_SEND, MAX_RECEIPTS_HEADERS_TO_SEND, }; +use chain::MAX_NODE_DATA_TO_SEND; use std::borrow::Borrow; /// The Chain Sync Supplier: answers requests from peers with available data @@ -88,6 +89,15 @@ impl SyncSupplier { |e| format!("Error sending block headers: {:?}", e), ), + GetNodeDataPacket => SyncSupplier::return_rlp( + io, + &rlp, + peer, + request_id, + SyncSupplier::return_node_data, + |e| format!("Error sending node data: {:?}", e), + ), + GetReceiptsPacket => SyncSupplier::return_rlp( io, &rlp, @@ -340,6 +350,32 @@ impl SyncSupplier { Ok(Some((BlockBodiesPacket, rlp))) } + fn return_node_data(io: &dyn SyncIo, rlp: &Rlp, peer_id: PeerId) -> RlpResponseResult { + let count = cmp::min(rlp.item_count().unwrap_or(0), MAX_NODE_DATA_TO_SEND); + if count == 0 { + debug!(target: "sync", "Empty GetNodeData request, ignoring."); + return Ok(None); + } + + let mut data = Bytes::new(); + + let mut added = 0usize; + for i in 0..count { + if let Some(ref mut node_data) = io.chain().state_data(&rlp.val_at::(i)?) { + data.append(node_data); + added += 1; + if data.len() > PAYLOAD_SOFT_LIMIT { + break; + } + } + } + + let mut rlp = RlpStream::new_list(added); + rlp.append_raw(&data, added); + trace!(target: "sync", "{} -> GetNodeData: returned {} entries", peer_id, added); + Ok(Some((NodeDataPacket, rlp))) + } + fn return_receipts(io: &dyn SyncIo, rlp: &Rlp, peer_id: PeerId) -> RlpResponseResult { let mut count = rlp.item_count().unwrap_or(0); trace!(target: "sync", "{} -> GetReceipts: {} entries", peer_id, count); @@ -723,4 +759,51 @@ mod test { ); assert_eq!(1, io.packets.len()); } + + #[test] + fn return_nodes() { + let mut client = TestBlockChainClient::new(); + let queue = RwLock::new(VecDeque::new()); + let sync = dummy_sync_with_peer(H256::zero(), &client); + let ss = TestSnapshotService::new(); + let mut io = TestIo::new(&mut client, &ss, &queue, None); + + let mut node_list = RlpStream::new_list(3); + node_list.append( + &H256::from_str("0000000000000000000000000000000000000000000000005555555555555555") + .unwrap(), + ); + node_list.append( + &H256::from_str("ffffffffffffffffffffffffffffffffffffffffffffaaaaaaaaaaaaaaaaaaaa") + .unwrap(), + ); + node_list.append( + &H256::from_str("aff0000000000000000000000000000000000000000000000000000000000000") + .unwrap(), + ); + + let node_request = node_list.out(); + // it returns rlp ONLY for hashes started with "f" + let result = SyncSupplier::return_node_data(&io, &Rlp::new(&node_request.clone()), 0); + + assert!(result.is_ok()); + let rlp_result = result.unwrap(); + assert!(rlp_result.is_some()); + + // the length of one rlp-encoded hashe + let rlp = rlp_result.unwrap().1.out(); + let rlp = Rlp::new(&rlp); + assert_eq!(Ok(1), rlp.item_count()); + + io.sender = Some(2usize); + + SyncSupplier::dispatch_packet( + &RwLock::new(sync), + &mut io, + 0usize, + GetNodeDataPacket.id(), + &node_request, + ); + assert_eq!(1, io.packets.len()); + } } diff --git a/crates/ethcore/sync/src/chain/sync_packet.rs b/crates/ethcore/sync/src/chain/sync_packet.rs index e25dd7b50..f8964040d 100644 --- a/crates/ethcore/sync/src/chain/sync_packet.rs +++ b/crates/ethcore/sync/src/chain/sync_packet.rs @@ -48,8 +48,8 @@ pub enum SyncPacket { GetPooledTransactionsPacket = 0x09, PooledTransactionsPacket = 0x0a, - //GetNodeDataPacket = 0x0d, - //NodeDataPacket = 0x0e, + GetNodeDataPacket = 0x0d, + NodeDataPacket = 0x0e, GetReceiptsPacket = 0x0f, ReceiptsPacket = 0x10, @@ -87,8 +87,8 @@ impl PacketInfo for SyncPacket { | NewPooledTransactionHashesPacket | GetPooledTransactionsPacket | PooledTransactionsPacket - //| GetNodeDataPacket - //| NodeDataPacket + | GetNodeDataPacket + | NodeDataPacket | GetReceiptsPacket | ReceiptsPacket => ETH_PROTOCOL, @@ -105,7 +105,6 @@ impl PacketInfo for SyncPacket { } fn has_request_id_in_eth_66(&self) -> bool { - // Note: NodeDataPacket and GetNodeDataPacket also get a request id in eth-66. match self { GetBlockHeadersPacket | BlockHeadersPacket @@ -113,6 +112,8 @@ impl PacketInfo for SyncPacket { | BlockBodiesPacket | GetPooledTransactionsPacket | PooledTransactionsPacket + | GetNodeDataPacket + | NodeDataPacket | GetReceiptsPacket | ReceiptsPacket => true, _ => false,