Restore GetNodeData (#469)

* Accept GetNodeData requests

* Implement blockchain client method for node data requests

* Reuse old database read methods for node data

* fmt

* Copy & paste old tests...

* ... and make them work

* fmt
This commit is contained in:
Jochen Müller 2021-07-05 17:06:35 +02:00 committed by GitHub
parent 43ee520904
commit 38e40f649c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 185 additions and 5 deletions

View File

@ -25,12 +25,14 @@ use std::{
use super::{ use super::{
error_key_already_exists, error_negatively_reference_hash, memory_db::*, LATEST_ERA_KEY, error_key_already_exists, error_negatively_reference_hash, memory_db::*, LATEST_ERA_KEY,
}; };
use bytes::Bytes;
use ethcore_db::{DBTransaction, DBValue, KeyValueDB}; use ethcore_db::{DBTransaction, DBValue, KeyValueDB};
use ethereum_types::H256; use ethereum_types::H256;
use hash_db::HashDB; use hash_db::HashDB;
use keccak_hasher::KeccakHasher; use keccak_hasher::KeccakHasher;
use rlp::{decode, encode}; use rlp::{decode, encode};
use traits::JournalDB; use traits::JournalDB;
use DB_PREFIX_LEN;
/// Implementation of the `HashDB` trait for a disk-backed database with a memory overlay /// Implementation of the `HashDB` trait for a disk-backed database with a memory overlay
/// and latent-removal semantics. /// and latent-removal semantics.
@ -214,12 +216,19 @@ impl JournalDB for ArchiveDB {
fn consolidate(&mut self, with: MemoryDB<KeccakHasher, DBValue>) { fn consolidate(&mut self, with: MemoryDB<KeccakHasher, DBValue>) {
self.overlay.consolidate(with); self.overlay.consolidate(with);
} }
fn state(&self, id: &H256) -> Option<Bytes> {
self.backing
.get_by_prefix(self.column, &id[0..DB_PREFIX_LEN])
.map(|b| b.into_vec())
}
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use ethcore_db::InMemoryWithMetrics;
use hash_db::HashDB; use hash_db::HashDB;
use keccak::keccak; use keccak::keccak;
use JournalDB; use JournalDB;
@ -497,4 +506,22 @@ mod tests {
assert!(jdb.get(&key).is_none()); assert!(jdb.get(&key).is_none());
} }
#[test]
fn returns_state() {
let shared_db = Arc::new(InMemoryWithMetrics::create(0));
let key = {
let mut jdb = ArchiveDB::new(shared_db.clone(), None);
let key = jdb.insert(b"foo");
jdb.commit_batch(0, &keccak(b"0"), None).unwrap();
key
};
{
let jdb = ArchiveDB::new(shared_db, None);
let state = jdb.state(&key);
assert!(state.is_some());
}
}
} }

View File

@ -35,6 +35,7 @@ use parity_util_mem::MallocSizeOf;
use parking_lot::RwLock; use parking_lot::RwLock;
use rlp::{decode, encode}; use rlp::{decode, encode};
use util::{DatabaseKey, DatabaseValueRef, DatabaseValueView}; use util::{DatabaseKey, DatabaseValueRef, DatabaseValueView};
use DB_PREFIX_LEN;
#[derive(Debug, Clone, PartialEq, Eq, MallocSizeOf)] #[derive(Debug, Clone, PartialEq, Eq, MallocSizeOf)]
struct RefInfo { struct RefInfo {
@ -608,6 +609,12 @@ impl JournalDB for EarlyMergeDB {
fn consolidate(&mut self, with: MemoryDB<KeccakHasher, DBValue>) { fn consolidate(&mut self, with: MemoryDB<KeccakHasher, DBValue>) {
self.overlay.consolidate(with); self.overlay.consolidate(with);
} }
fn state(&self, id: &H256) -> Option<Bytes> {
self.backing
.get_by_prefix(self.column, &id[0..DB_PREFIX_LEN])
.map(|b| b.into_vec())
}
} }
#[cfg(test)] #[cfg(test)]

View File

@ -23,6 +23,7 @@ use std::{
}; };
use super::{error_negatively_reference_hash, JournalDB, DB_PREFIX_LEN, LATEST_ERA_KEY}; use super::{error_negatively_reference_hash, JournalDB, DB_PREFIX_LEN, LATEST_ERA_KEY};
use bytes::Bytes;
use ethcore_db::{DBTransaction, DBValue, KeyValueDB}; use ethcore_db::{DBTransaction, DBValue, KeyValueDB};
use ethereum_types::H256; use ethereum_types::H256;
use fastmap::H256FastMap; use fastmap::H256FastMap;
@ -509,6 +510,26 @@ impl JournalDB for OverlayRecentDB {
fn consolidate(&mut self, with: MemoryDB<KeccakHasher, DBValue>) { fn consolidate(&mut self, with: MemoryDB<KeccakHasher, DBValue>) {
self.transaction_overlay.consolidate(with); self.transaction_overlay.consolidate(with);
} }
fn state(&self, key: &H256) -> Option<Bytes> {
let journal_overlay = self.journal_overlay.read();
let key = to_short_key(key);
journal_overlay
.backing_overlay
.get(&key)
.map(|v| v.into_vec())
.or_else(|| {
journal_overlay
.pending_overlay
.get(&key)
.map(|d| d.clone().into_vec())
})
.or_else(|| {
self.backing
.get_by_prefix(self.column, &key[0..DB_PREFIX_LEN])
.map(|b| b.into_vec())
})
}
} }
impl HashDB<KeccakHasher, DBValue> for OverlayRecentDB { impl HashDB<KeccakHasher, DBValue> for OverlayRecentDB {

View File

@ -23,6 +23,7 @@ use std::{
}; };
use super::{traits::JournalDB, LATEST_ERA_KEY}; use super::{traits::JournalDB, LATEST_ERA_KEY};
use bytes::Bytes;
use ethcore_db::{DBTransaction, DBValue, KeyValueDB}; use ethcore_db::{DBTransaction, DBValue, KeyValueDB};
use ethereum_types::H256; use ethereum_types::H256;
use hash_db::HashDB; use hash_db::HashDB;
@ -32,6 +33,7 @@ use overlaydb::OverlayDB;
use parity_util_mem::{allocators::new_malloc_size_ops, MallocSizeOf}; use parity_util_mem::{allocators::new_malloc_size_ops, MallocSizeOf};
use rlp::{decode, encode}; use rlp::{decode, encode};
use util::{DatabaseKey, DatabaseValueRef, DatabaseValueView}; use util::{DatabaseKey, DatabaseValueRef, DatabaseValueView};
use DB_PREFIX_LEN;
/// Implementation of the `HashDB` trait for a disk-backed database with a memory overlay /// Implementation of the `HashDB` trait for a disk-backed database with a memory overlay
/// and latent-removal semantics. /// and latent-removal semantics.
@ -245,6 +247,12 @@ impl JournalDB for RefCountedDB {
} }
} }
} }
fn state(&self, id: &H256) -> Option<Bytes> {
self.backing
.get_by_prefix(self.column, &id[0..DB_PREFIX_LEN])
.map(|b| b.into_vec())
}
} }
#[cfg(test)] #[cfg(test)]

View File

@ -18,6 +18,7 @@
use std::{io, sync::Arc}; use std::{io, sync::Arc};
use bytes::Bytes;
use ethcore_db::{DBTransaction, DBValue, KeyValueDB}; use ethcore_db::{DBTransaction, DBValue, KeyValueDB};
use ethereum_types::H256; use ethereum_types::H256;
use hash_db::{AsHashDB, HashDB}; use hash_db::{AsHashDB, HashDB};
@ -95,6 +96,9 @@ pub trait JournalDB: KeyedHashDB {
/// Consolidate all the insertions and deletions in the given memory overlay. /// Consolidate all the insertions and deletions in the given memory overlay.
fn consolidate(&mut self, overlay: ::memory_db::MemoryDB<KeccakHasher, DBValue>); fn consolidate(&mut self, overlay: ::memory_db::MemoryDB<KeccakHasher, DBValue>);
/// State data query
fn state(&self, id: &H256) -> Option<Bytes>;
/// Commit all changes in a single batch /// Commit all changes in a single batch
#[cfg(test)] #[cfg(test)]
fn commit_batch(&mut self, now: u64, id: &H256, end: Option<(u64, H256)>) -> io::Result<u32> { fn commit_batch(&mut self, now: u64, id: &H256, end: Option<(u64, H256)>) -> io::Result<u32> {

View File

@ -2776,6 +2776,10 @@ impl BlockChainClient for Client {
fn registrar_address(&self) -> Option<Address> { fn registrar_address(&self) -> Option<Address> {
self.registrar_address.clone() self.registrar_address.clone()
} }
fn state_data(&self, hash: &H256) -> Option<Bytes> {
self.state_db.read().journal_db().state(hash)
}
} }
impl IoClient for Client { impl IoClient for Client {

View File

@ -1078,6 +1078,18 @@ impl BlockChainClient for TestBlockChainClient {
fn registrar_address(&self) -> Option<Address> { fn registrar_address(&self) -> Option<Address> {
None None
} }
fn state_data(&self, hash: &H256) -> Option<Bytes> {
let begins_with_f =
H256::from_str("f000000000000000000000000000000000000000000000000000000000000000")
.unwrap();
if *hash > begins_with_f {
let mut rlp = RlpStream::new();
rlp.append(&hash.clone());
return Some(rlp.out());
}
None
}
} }
impl IoClient for TestBlockChainClient { impl IoClient for TestBlockChainClient {

View File

@ -333,6 +333,9 @@ pub trait BlockChainClient:
/// Get all possible uncle hashes for a block. /// Get all possible uncle hashes for a block.
fn find_uncles(&self, hash: &H256) -> Option<Vec<H256>>; fn find_uncles(&self, hash: &H256) -> Option<Vec<H256>>;
/// Get latest state node
fn state_data(&self, hash: &H256) -> Option<Bytes>;
/// Get block receipts data by block header hash. /// Get block receipts data by block header hash.
fn block_receipts(&self, hash: &H256) -> Option<BlockReceipts>; fn block_receipts(&self, hash: &H256) -> Option<BlockReceipts>;

View File

@ -591,3 +591,12 @@ fn import_export_binary() {
assert!(client.block_header(BlockId::Number(17)).is_some()); assert!(client.block_header(BlockId::Number(17)).is_some());
assert!(client.block_header(BlockId::Number(16)).is_some()); assert!(client.block_header(BlockId::Number(16)).is_some());
} }
#[test]
fn returns_state_root_basic() {
let client = generate_dummy_client(6);
let test_spec = Spec::new_test();
let genesis_header = test_spec.genesis_header();
assert!(client.state_data(genesis_header.state_root()).is_some());
}

View File

@ -169,6 +169,7 @@ pub const PAR_PROTOCOL_VERSION_2: (u8, u8) = (2, 0x16);
pub const MAX_BODIES_TO_SEND: usize = 256; pub const MAX_BODIES_TO_SEND: usize = 256;
pub const MAX_HEADERS_TO_SEND: usize = 512; pub const MAX_HEADERS_TO_SEND: usize = 512;
pub const MAX_NODE_DATA_TO_SEND: usize = 1024;
pub const MAX_RECEIPTS_HEADERS_TO_SEND: usize = 256; pub const MAX_RECEIPTS_HEADERS_TO_SEND: usize = 256;
pub const MAX_TRANSACTIONS_TO_REQUEST: usize = 256; pub const MAX_TRANSACTIONS_TO_REQUEST: usize = 256;
const MIN_PEERS_PROPAGATION: usize = 4; const MIN_PEERS_PROPAGATION: usize = 4;

View File

@ -40,6 +40,7 @@ use super::{
ChainSync, PacketProcessError, RlpResponseResult, SyncHandler, MAX_BODIES_TO_SEND, ChainSync, PacketProcessError, RlpResponseResult, SyncHandler, MAX_BODIES_TO_SEND,
MAX_HEADERS_TO_SEND, MAX_RECEIPTS_HEADERS_TO_SEND, MAX_HEADERS_TO_SEND, MAX_RECEIPTS_HEADERS_TO_SEND,
}; };
use chain::MAX_NODE_DATA_TO_SEND;
use std::borrow::Borrow; use std::borrow::Borrow;
/// The Chain Sync Supplier: answers requests from peers with available data /// The Chain Sync Supplier: answers requests from peers with available data
@ -88,6 +89,15 @@ impl SyncSupplier {
|e| format!("Error sending block headers: {:?}", e), |e| format!("Error sending block headers: {:?}", e),
), ),
GetNodeDataPacket => SyncSupplier::return_rlp(
io,
&rlp,
peer,
request_id,
SyncSupplier::return_node_data,
|e| format!("Error sending node data: {:?}", e),
),
GetReceiptsPacket => SyncSupplier::return_rlp( GetReceiptsPacket => SyncSupplier::return_rlp(
io, io,
&rlp, &rlp,
@ -340,6 +350,32 @@ impl SyncSupplier {
Ok(Some((BlockBodiesPacket, rlp))) Ok(Some((BlockBodiesPacket, rlp)))
} }
fn return_node_data(io: &dyn SyncIo, rlp: &Rlp, peer_id: PeerId) -> RlpResponseResult {
let count = cmp::min(rlp.item_count().unwrap_or(0), MAX_NODE_DATA_TO_SEND);
if count == 0 {
debug!(target: "sync", "Empty GetNodeData request, ignoring.");
return Ok(None);
}
let mut data = Bytes::new();
let mut added = 0usize;
for i in 0..count {
if let Some(ref mut node_data) = io.chain().state_data(&rlp.val_at::<H256>(i)?) {
data.append(node_data);
added += 1;
if data.len() > PAYLOAD_SOFT_LIMIT {
break;
}
}
}
let mut rlp = RlpStream::new_list(added);
rlp.append_raw(&data, added);
trace!(target: "sync", "{} -> GetNodeData: returned {} entries", peer_id, added);
Ok(Some((NodeDataPacket, rlp)))
}
fn return_receipts(io: &dyn SyncIo, rlp: &Rlp, peer_id: PeerId) -> RlpResponseResult { fn return_receipts(io: &dyn SyncIo, rlp: &Rlp, peer_id: PeerId) -> RlpResponseResult {
let mut count = rlp.item_count().unwrap_or(0); let mut count = rlp.item_count().unwrap_or(0);
trace!(target: "sync", "{} -> GetReceipts: {} entries", peer_id, count); trace!(target: "sync", "{} -> GetReceipts: {} entries", peer_id, count);
@ -723,4 +759,51 @@ mod test {
); );
assert_eq!(1, io.packets.len()); assert_eq!(1, io.packets.len());
} }
#[test]
fn return_nodes() {
let mut client = TestBlockChainClient::new();
let queue = RwLock::new(VecDeque::new());
let sync = dummy_sync_with_peer(H256::zero(), &client);
let ss = TestSnapshotService::new();
let mut io = TestIo::new(&mut client, &ss, &queue, None);
let mut node_list = RlpStream::new_list(3);
node_list.append(
&H256::from_str("0000000000000000000000000000000000000000000000005555555555555555")
.unwrap(),
);
node_list.append(
&H256::from_str("ffffffffffffffffffffffffffffffffffffffffffffaaaaaaaaaaaaaaaaaaaa")
.unwrap(),
);
node_list.append(
&H256::from_str("aff0000000000000000000000000000000000000000000000000000000000000")
.unwrap(),
);
let node_request = node_list.out();
// it returns rlp ONLY for hashes started with "f"
let result = SyncSupplier::return_node_data(&io, &Rlp::new(&node_request.clone()), 0);
assert!(result.is_ok());
let rlp_result = result.unwrap();
assert!(rlp_result.is_some());
// the length of one rlp-encoded hashe
let rlp = rlp_result.unwrap().1.out();
let rlp = Rlp::new(&rlp);
assert_eq!(Ok(1), rlp.item_count());
io.sender = Some(2usize);
SyncSupplier::dispatch_packet(
&RwLock::new(sync),
&mut io,
0usize,
GetNodeDataPacket.id(),
&node_request,
);
assert_eq!(1, io.packets.len());
}
} }

View File

@ -48,8 +48,8 @@ pub enum SyncPacket {
GetPooledTransactionsPacket = 0x09, GetPooledTransactionsPacket = 0x09,
PooledTransactionsPacket = 0x0a, PooledTransactionsPacket = 0x0a,
//GetNodeDataPacket = 0x0d, GetNodeDataPacket = 0x0d,
//NodeDataPacket = 0x0e, NodeDataPacket = 0x0e,
GetReceiptsPacket = 0x0f, GetReceiptsPacket = 0x0f,
ReceiptsPacket = 0x10, ReceiptsPacket = 0x10,
@ -87,8 +87,8 @@ impl PacketInfo for SyncPacket {
| NewPooledTransactionHashesPacket | NewPooledTransactionHashesPacket
| GetPooledTransactionsPacket | GetPooledTransactionsPacket
| PooledTransactionsPacket | PooledTransactionsPacket
//| GetNodeDataPacket | GetNodeDataPacket
//| NodeDataPacket | NodeDataPacket
| GetReceiptsPacket | GetReceiptsPacket
| ReceiptsPacket => ETH_PROTOCOL, | ReceiptsPacket => ETH_PROTOCOL,
@ -105,7 +105,6 @@ impl PacketInfo for SyncPacket {
} }
fn has_request_id_in_eth_66(&self) -> bool { fn has_request_id_in_eth_66(&self) -> bool {
// Note: NodeDataPacket and GetNodeDataPacket also get a request id in eth-66.
match self { match self {
GetBlockHeadersPacket GetBlockHeadersPacket
| BlockHeadersPacket | BlockHeadersPacket
@ -113,6 +112,8 @@ impl PacketInfo for SyncPacket {
| BlockBodiesPacket | BlockBodiesPacket
| GetPooledTransactionsPacket | GetPooledTransactionsPacket
| PooledTransactionsPacket | PooledTransactionsPacket
| GetNodeDataPacket
| NodeDataPacket
| GetReceiptsPacket | GetReceiptsPacket
| ReceiptsPacket => true, | ReceiptsPacket => true,
_ => false, _ => false,