dispatch batched requests
This commit is contained in:
parent
3eea77709b
commit
574cfae470
@ -43,6 +43,8 @@ use request::{self as basic_request, Request as NetworkRequest, Response as Netw
|
|||||||
|
|
||||||
pub mod request;
|
pub mod request;
|
||||||
|
|
||||||
|
pub use self::request::{CheckedRequest ,Request, Response};
|
||||||
|
|
||||||
// relevant peer info.
|
// relevant peer info.
|
||||||
struct Peer {
|
struct Peer {
|
||||||
status: Status,
|
status: Status,
|
||||||
@ -50,23 +52,13 @@ struct Peer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl Peer {
|
impl Peer {
|
||||||
// Whether a given peer can handle a specific request.
|
// whether this peer can fulfill the
|
||||||
fn can_handle(&self, pending: &Pending) -> bool {
|
fn can_fulfill(&self, c: &Capabilities) -> bool {
|
||||||
match *pending {
|
let caps = &self.capabilities;
|
||||||
Pending::HeaderProof(ref req, _) =>
|
|
||||||
self.capabilities.serve_headers && self.status.head_num > req.num(),
|
caps.serve_headers == c.serve_headers &&
|
||||||
Pending::HeaderByHash(_, _) => self.capabilities.serve_headers,
|
caps.serve_chain_since >= c.serve_chain_since &&
|
||||||
Pending::Block(ref req, _) =>
|
caps.serve_state_since >= c.serve_chain_since
|
||||||
self.capabilities.serve_chain_since.as_ref().map_or(false, |x| *x <= req.header.number()),
|
|
||||||
Pending::BlockReceipts(ref req, _) =>
|
|
||||||
self.capabilities.serve_chain_since.as_ref().map_or(false, |x| *x <= req.0.number()),
|
|
||||||
Pending::Account(ref req, _) =>
|
|
||||||
self.capabilities.serve_state_since.as_ref().map_or(false, |x| *x <= req.header.number()),
|
|
||||||
Pending::Code(ref req, _) =>
|
|
||||||
self.capabilities.serve_state_since.as_ref().map_or(false, |x| *x <= req.block_id.1),
|
|
||||||
Pending::TxProof(ref req, _) =>
|
|
||||||
self.capabilities.serve_state_since.as_ref().map_or(false, |x| *x <= req.header.number()),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -78,262 +70,256 @@ enum ChtProofSender {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Attempted request info and sender to put received value.
|
// Attempted request info and sender to put received value.
|
||||||
enum Pending {
|
struct Pending {
|
||||||
HeaderProof(request::HeaderProof, ChtProofSender),
|
requests: basic_request::Requests<CheckedRequest>,
|
||||||
HeaderByHash(request::HeaderByHash, Sender<encoded::Header>),
|
net_requests: basic_request::Requests<NetworkRequest>,
|
||||||
Block(request::Body, Sender<encoded::Block>),
|
required_capabilities: Capabilities,
|
||||||
BlockReceipts(request::BlockReceipts, Sender<Vec<Receipt>>),
|
responses: Vec<Response>,
|
||||||
Account(request::Account, Sender<BasicAccount>),
|
sender: oneshot::Sender<Vec<Response>>,
|
||||||
Code(request::Code, Sender<Bytes>),
|
|
||||||
TxProof(request::TransactionProof, Sender<Result<Executed, ExecutionError>>),
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Pending {
|
// helper to guess capabilities required for a given batch of network requests.
|
||||||
// Create a network request.
|
fn guess_capabilities(requests: &[CheckedRequest]) -> Capabilities {
|
||||||
fn make_request(&self) -> NetworkRequest {
|
let mut caps = Capabilities {
|
||||||
match *self {
|
serve_headers: false,
|
||||||
Pending::HeaderByHash(ref req, _) => NetworkRequest::Headers(basic_request::IncompleteHeadersRequest {
|
serve_chain_since: None,
|
||||||
start: basic_request::HashOrNumber::Hash(req.0).into(),
|
serve_state_since: None,
|
||||||
skip: 0,
|
tx_relay: false,
|
||||||
max: 1,
|
};
|
||||||
reverse: false,
|
|
||||||
}),
|
let update_since = |current: &mut Option<u64>, new|
|
||||||
Pending::HeaderProof(ref req, _) => NetworkRequest::HeaderProof(basic_request::IncompleteHeaderProofRequest {
|
*current = match *current {
|
||||||
num: req.num().into(),
|
Some(x) => Some(::std::cmp::min(x, new)),
|
||||||
}),
|
None => Some(new),
|
||||||
Pending::Block(ref req, _) => NetworkRequest::Body(basic_request::IncompleteBodyRequest {
|
};
|
||||||
hash: req.hash.into(),
|
|
||||||
}),
|
for request in requests {
|
||||||
Pending::BlockReceipts(ref req, _) => NetworkRequest::Receipts(basic_request::IncompleteReceiptsRequest {
|
match *request {
|
||||||
hash: req.0.hash().into(),
|
// TODO: might be worth returning a required block number for this also.
|
||||||
}),
|
CheckedRequest::HeaderProof(_, _) =>
|
||||||
Pending::Account(ref req, _) => NetworkRequest::Account(basic_request::IncompleteAccountRequest {
|
caps.serve_headers = true,
|
||||||
block_hash: req.header.hash().into(),
|
CheckedRequest::HeaderByHash(_, _) =>
|
||||||
address_hash: ::util::Hashable::sha3(&req.address).into(),
|
caps.serve_headers = true,
|
||||||
}),
|
CheckedRequest::Body(ref req, _) =>
|
||||||
Pending::Code(ref req, _) => NetworkRequest::Code(basic_request::IncompleteCodeRequest {
|
update_since(&mut caps.serve_chain_since, req.header.number()),
|
||||||
block_hash: req.block_id.0.into(),
|
CheckedRequest::Receipts(ref req, _) =>
|
||||||
code_hash: req.code_hash.into(),
|
update_since(&mut caps.serve_chain_since, req.0.number()),
|
||||||
}),
|
CheckedRequest::Account(ref req, _) =>
|
||||||
Pending::TxProof(ref req, _) => NetworkRequest::Execution(basic_request::IncompleteExecutionRequest {
|
update_since(&mut caps.serve_state_since, req.header.number()),
|
||||||
block_hash: req.header.hash().into(),
|
CheckedRequest::Code(ref req, _) =>
|
||||||
from: req.tx.sender(),
|
update_since(&mut caps.serve_state_since, req.block_id.1),
|
||||||
gas: req.tx.gas,
|
CheckedRequest::Execution(ref req, _) =>
|
||||||
gas_price: req.tx.gas_price,
|
update_since(&mut caps.serve_state_since, req.header.number()),
|
||||||
action: req.tx.action.clone(),
|
|
||||||
value: req.tx.value,
|
|
||||||
data: req.tx.data.clone(),
|
|
||||||
}),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
caps
|
||||||
}
|
}
|
||||||
|
|
||||||
/// On demand request service. See module docs for more details.
|
/// On demand request service. See module docs for more details.
|
||||||
/// Accumulates info about all peers' capabilities and dispatches
|
/// Accumulates info about all peers' capabilities and dispatches
|
||||||
/// requests to them accordingly.
|
/// requests to them accordingly.
|
||||||
|
// lock in declaration order.
|
||||||
pub struct OnDemand {
|
pub struct OnDemand {
|
||||||
|
pending: RwLock<Vec<Pending>>,
|
||||||
peers: RwLock<HashMap<PeerId, Peer>>,
|
peers: RwLock<HashMap<PeerId, Peer>>,
|
||||||
pending_requests: RwLock<HashMap<ReqId, Pending>>,
|
in_transit: RwLock<HashMap<ReqId, Pending>>,
|
||||||
cache: Arc<Mutex<Cache>>,
|
cache: Arc<Mutex<Cache>>,
|
||||||
orphaned_requests: RwLock<Vec<Pending>>,
|
|
||||||
start_nonce: U256,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
const RECEIVER_IN_SCOPE: &'static str = "Receiver is still in scope, so it's not dropped; qed";
|
const RECEIVER_IN_SCOPE: &'static str = "Receiver is still in scope, so it's not dropped; qed";
|
||||||
|
|
||||||
impl OnDemand {
|
impl OnDemand {
|
||||||
/// Create a new `OnDemand` service with the given cache.
|
/// Create a new `OnDemand` service with the given cache.
|
||||||
pub fn new(cache: Arc<Mutex<Cache>>, account_start_nonce: U256) -> Self {
|
pub fn new(cache: Arc<Mutex<Cache>>) -> Self {
|
||||||
OnDemand {
|
OnDemand {
|
||||||
|
pending: RwLock::new(Vec::new()),
|
||||||
peers: RwLock::new(HashMap::new()),
|
peers: RwLock::new(HashMap::new()),
|
||||||
pending_requests: RwLock::new(HashMap::new()),
|
in_transit: RwLock::new(HashMap::new()),
|
||||||
cache: cache,
|
cache: cache,
|
||||||
orphaned_requests: RwLock::new(Vec::new()),
|
|
||||||
start_nonce: account_start_nonce,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Request a header's hash by block number and CHT root hash.
|
// /// Request a header's hash by block number and CHT root hash.
|
||||||
/// Returns the hash.
|
// /// Returns the hash.
|
||||||
pub fn hash_by_number(&self, ctx: &BasicContext, req: request::HeaderProof) -> Receiver<H256> {
|
// pub fn hash_by_number(&self, ctx: &BasicContext, req: request::HeaderProof) -> Receiver<H256> {
|
||||||
let (sender, receiver) = oneshot::channel();
|
// let (sender, receiver) = oneshot::channel();
|
||||||
let cached = {
|
// let cached = {
|
||||||
let mut cache = self.cache.lock();
|
// let mut cache = self.cache.lock();
|
||||||
cache.block_hash(&req.num())
|
// cache.block_hash(&req.num())
|
||||||
};
|
// };
|
||||||
|
|
||||||
match cached {
|
// match cached {
|
||||||
Some(hash) => sender.send(hash).expect(RECEIVER_IN_SCOPE),
|
// Some(hash) => sender.send(hash).expect(RECEIVER_IN_SCOPE),
|
||||||
None => self.dispatch(ctx, Pending::HeaderProof(req, ChtProofSender::Hash(sender))),
|
// None => self.dispatch(ctx, Pending::HeaderProof(req, ChtProofSender::Hash(sender))),
|
||||||
}
|
// }
|
||||||
receiver
|
// receiver
|
||||||
}
|
// }
|
||||||
|
|
||||||
/// Request a canonical block's chain score.
|
// /// Request a canonical block's chain score.
|
||||||
/// Returns the chain score.
|
// /// Returns the chain score.
|
||||||
pub fn chain_score_by_number(&self, ctx: &BasicContext, req: request::HeaderProof) -> Receiver<U256> {
|
// pub fn chain_score_by_number(&self, ctx: &BasicContext, req: request::HeaderProof) -> Receiver<U256> {
|
||||||
let (sender, receiver) = oneshot::channel();
|
// let (sender, receiver) = oneshot::channel();
|
||||||
let cached = {
|
// let cached = {
|
||||||
let mut cache = self.cache.lock();
|
// let mut cache = self.cache.lock();
|
||||||
cache.block_hash(&req.num()).and_then(|hash| cache.chain_score(&hash))
|
// cache.block_hash(&req.num()).and_then(|hash| cache.chain_score(&hash))
|
||||||
};
|
// };
|
||||||
|
|
||||||
match cached {
|
// match cached {
|
||||||
Some(score) => sender.send(score).expect(RECEIVER_IN_SCOPE),
|
// Some(score) => sender.send(score).expect(RECEIVER_IN_SCOPE),
|
||||||
None => self.dispatch(ctx, Pending::HeaderProof(req, ChtProofSender::ChainScore(sender))),
|
// None => self.dispatch(ctx, Pending::HeaderProof(req, ChtProofSender::ChainScore(sender))),
|
||||||
}
|
// }
|
||||||
|
|
||||||
receiver
|
// receiver
|
||||||
}
|
// }
|
||||||
|
|
||||||
/// Request a canonical block's hash and chain score by number.
|
// /// Request a canonical block's hash and chain score by number.
|
||||||
/// Returns the hash and chain score.
|
// /// Returns the hash and chain score.
|
||||||
pub fn hash_and_score_by_number(&self, ctx: &BasicContext, req: request::HeaderProof) -> Receiver<(H256, U256)> {
|
// pub fn hash_and_score_by_number(&self, ctx: &BasicContext, req: request::HeaderProof) -> Receiver<(H256, U256)> {
|
||||||
let (sender, receiver) = oneshot::channel();
|
// let (sender, receiver) = oneshot::channel();
|
||||||
let cached = {
|
// let cached = {
|
||||||
let mut cache = self.cache.lock();
|
// let mut cache = self.cache.lock();
|
||||||
let hash = cache.block_hash(&req.num());
|
// let hash = cache.block_hash(&req.num());
|
||||||
(
|
// (
|
||||||
hash.clone(),
|
// hash.clone(),
|
||||||
hash.and_then(|hash| cache.chain_score(&hash)),
|
// hash.and_then(|hash| cache.chain_score(&hash)),
|
||||||
)
|
// )
|
||||||
};
|
// };
|
||||||
|
|
||||||
match cached {
|
// match cached {
|
||||||
(Some(hash), Some(score)) => sender.send((hash, score)).expect(RECEIVER_IN_SCOPE),
|
// (Some(hash), Some(score)) => sender.send((hash, score)).expect(RECEIVER_IN_SCOPE),
|
||||||
_ => self.dispatch(ctx, Pending::HeaderProof(req, ChtProofSender::Both(sender))),
|
// _ => self.dispatch(ctx, Pending::HeaderProof(req, ChtProofSender::Both(sender))),
|
||||||
}
|
// }
|
||||||
|
|
||||||
receiver
|
// receiver
|
||||||
}
|
// }
|
||||||
|
|
||||||
/// Request a header by hash. This is less accurate than by-number because we don't know
|
// /// Request a header by hash. This is less accurate than by-number because we don't know
|
||||||
/// where in the chain this header lies, and therefore can't find a peer who is supposed to have
|
// /// where in the chain this header lies, and therefore can't find a peer who is supposed to have
|
||||||
/// it as easily.
|
// /// it as easily.
|
||||||
pub fn header_by_hash(&self, ctx: &BasicContext, req: request::HeaderByHash) -> Receiver<encoded::Header> {
|
// pub fn header_by_hash(&self, ctx: &BasicContext, req: request::HeaderByHash) -> Receiver<encoded::Header> {
|
||||||
let (sender, receiver) = oneshot::channel();
|
// let (sender, receiver) = oneshot::channel();
|
||||||
match { self.cache.lock().block_header(&req.0) } {
|
// match { self.cache.lock().block_header(&req.0) } {
|
||||||
Some(hdr) => sender.send(hdr).expect(RECEIVER_IN_SCOPE),
|
// Some(hdr) => sender.send(hdr).expect(RECEIVER_IN_SCOPE),
|
||||||
None => self.dispatch(ctx, Pending::HeaderByHash(req, sender)),
|
// None => self.dispatch(ctx, Pending::HeaderByHash(req, sender)),
|
||||||
}
|
// }
|
||||||
receiver
|
// receiver
|
||||||
}
|
// }
|
||||||
|
|
||||||
/// Request a block, given its header. Block bodies are requestable by hash only,
|
// /// Request a block, given its header. Block bodies are requestable by hash only,
|
||||||
/// and the header is required anyway to verify and complete the block body
|
// /// and the header is required anyway to verify and complete the block body
|
||||||
/// -- this just doesn't obscure the network query.
|
// /// -- this just doesn't obscure the network query.
|
||||||
pub fn block(&self, ctx: &BasicContext, req: request::Body) -> Receiver<encoded::Block> {
|
// pub fn block(&self, ctx: &BasicContext, req: request::Body) -> Receiver<encoded::Block> {
|
||||||
|
// let (sender, receiver) = oneshot::channel();
|
||||||
|
|
||||||
|
// // fast path for empty body.
|
||||||
|
// if req.header.transactions_root() == SHA3_NULL_RLP && req.header.uncles_hash() == SHA3_EMPTY_LIST_RLP {
|
||||||
|
// let mut stream = RlpStream::new_list(3);
|
||||||
|
// stream.append_raw(&req.header.into_inner(), 1);
|
||||||
|
// stream.begin_list(0);
|
||||||
|
// stream.begin_list(0);
|
||||||
|
|
||||||
|
// sender.send(encoded::Block::new(stream.out())).expect(RECEIVER_IN_SCOPE);
|
||||||
|
// } else {
|
||||||
|
// match { self.cache.lock().block_body(&req.hash) } {
|
||||||
|
// Some(body) => {
|
||||||
|
// let mut stream = RlpStream::new_list(3);
|
||||||
|
// let body = body.rlp();
|
||||||
|
// stream.append_raw(&req.header.into_inner(), 1);
|
||||||
|
// stream.append_raw(&body.at(0).as_raw(), 1);
|
||||||
|
// stream.append_raw(&body.at(1).as_raw(), 1);
|
||||||
|
|
||||||
|
// sender.send(encoded::Block::new(stream.out())).expect(RECEIVER_IN_SCOPE);
|
||||||
|
// }
|
||||||
|
// None => self.dispatch(ctx, Pending::Block(req, sender)),
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
// receiver
|
||||||
|
// }
|
||||||
|
|
||||||
|
// /// Request the receipts for a block. The header serves two purposes:
|
||||||
|
// /// provide the block hash to fetch receipts for, and for verification of the receipts root.
|
||||||
|
// pub fn block_receipts(&self, ctx: &BasicContext, req: request::BlockReceipts) -> Receiver<Vec<Receipt>> {
|
||||||
|
// let (sender, receiver) = oneshot::channel();
|
||||||
|
|
||||||
|
// // fast path for empty receipts.
|
||||||
|
// if req.0.receipts_root() == SHA3_NULL_RLP {
|
||||||
|
// sender.send(Vec::new()).expect(RECEIVER_IN_SCOPE);
|
||||||
|
// } else {
|
||||||
|
// match { self.cache.lock().block_receipts(&req.0.hash()) } {
|
||||||
|
// Some(receipts) => sender.send(receipts).expect(RECEIVER_IN_SCOPE),
|
||||||
|
// None => self.dispatch(ctx, Pending::BlockReceipts(req, sender)),
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
|
||||||
|
// receiver
|
||||||
|
// }
|
||||||
|
|
||||||
|
// /// Request an account by address and block header -- which gives a hash to query and a state root
|
||||||
|
// /// to verify against.
|
||||||
|
// pub fn account(&self, ctx: &BasicContext, req: request::Account) -> Receiver<BasicAccount> {
|
||||||
|
// let (sender, receiver) = oneshot::channel();
|
||||||
|
// self.dispatch(ctx, Pending::Account(req, sender));
|
||||||
|
// receiver
|
||||||
|
// }
|
||||||
|
|
||||||
|
// /// Request code by address, known code hash, and block header.
|
||||||
|
// pub fn code(&self, ctx: &BasicContext, req: request::Code) -> Receiver<Bytes> {
|
||||||
|
// let (sender, receiver) = oneshot::channel();
|
||||||
|
|
||||||
|
// // fast path for no code.
|
||||||
|
// if req.code_hash == SHA3_EMPTY {
|
||||||
|
// sender.send(Vec::new()).expect(RECEIVER_IN_SCOPE)
|
||||||
|
// } else {
|
||||||
|
// self.dispatch(ctx, Pending::Code(req, sender));
|
||||||
|
// }
|
||||||
|
|
||||||
|
// receiver
|
||||||
|
// }
|
||||||
|
|
||||||
|
// /// Request proof-of-execution for a transaction.
|
||||||
|
// pub fn transaction_proof(&self, ctx: &BasicContext, req: request::TransactionProof) -> Receiver<Result<Executed, ExecutionError>> {
|
||||||
|
// let (sender, receiver) = oneshot::channel();
|
||||||
|
|
||||||
|
// self.dispatch(ctx, Pending::TxProof(req, sender));
|
||||||
|
|
||||||
|
// receiver
|
||||||
|
// }
|
||||||
|
|
||||||
|
/// Submit a batch of requests.
|
||||||
|
///
|
||||||
|
/// Fails if back-references are not coherent.
|
||||||
|
/// The returned vector of responses will match the requests exactly.
|
||||||
|
pub fn make_requests(&self, ctx: &BasicContext, requests: Vec<Request>)
|
||||||
|
-> Result<Receiver<Vec<Response>>, basic_request::NoSuchOutput>
|
||||||
|
{
|
||||||
let (sender, receiver) = oneshot::channel();
|
let (sender, receiver) = oneshot::channel();
|
||||||
|
|
||||||
// fast path for empty body.
|
|
||||||
if req.header.transactions_root() == SHA3_NULL_RLP && req.header.uncles_hash() == SHA3_EMPTY_LIST_RLP {
|
|
||||||
let mut stream = RlpStream::new_list(3);
|
|
||||||
stream.append_raw(&req.header.into_inner(), 1);
|
|
||||||
stream.begin_list(0);
|
|
||||||
stream.begin_list(0);
|
|
||||||
|
|
||||||
sender.send(encoded::Block::new(stream.out())).expect(RECEIVER_IN_SCOPE);
|
|
||||||
} else {
|
|
||||||
match { self.cache.lock().block_body(&req.hash) } {
|
|
||||||
Some(body) => {
|
|
||||||
let mut stream = RlpStream::new_list(3);
|
|
||||||
let body = body.rlp();
|
|
||||||
stream.append_raw(&req.header.into_inner(), 1);
|
|
||||||
stream.append_raw(&body.at(0).as_raw(), 1);
|
|
||||||
stream.append_raw(&body.at(1).as_raw(), 1);
|
|
||||||
|
|
||||||
sender.send(encoded::Block::new(stream.out())).expect(RECEIVER_IN_SCOPE);
|
|
||||||
}
|
|
||||||
None => self.dispatch(ctx, Pending::Block(req, sender)),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
receiver
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Request the receipts for a block. The header serves two purposes:
|
|
||||||
/// provide the block hash to fetch receipts for, and for verification of the receipts root.
|
|
||||||
pub fn block_receipts(&self, ctx: &BasicContext, req: request::BlockReceipts) -> Receiver<Vec<Receipt>> {
|
|
||||||
let (sender, receiver) = oneshot::channel();
|
|
||||||
|
|
||||||
// fast path for empty receipts.
|
|
||||||
if req.0.receipts_root() == SHA3_NULL_RLP {
|
|
||||||
sender.send(Vec::new()).expect(RECEIVER_IN_SCOPE);
|
|
||||||
} else {
|
|
||||||
match { self.cache.lock().block_receipts(&req.0.hash()) } {
|
|
||||||
Some(receipts) => sender.send(receipts).expect(RECEIVER_IN_SCOPE),
|
|
||||||
None => self.dispatch(ctx, Pending::BlockReceipts(req, sender)),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
receiver
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Request an account by address and block header -- which gives a hash to query and a state root
|
|
||||||
/// to verify against.
|
|
||||||
pub fn account(&self, ctx: &BasicContext, req: request::Account) -> Receiver<BasicAccount> {
|
|
||||||
let (sender, receiver) = oneshot::channel();
|
|
||||||
self.dispatch(ctx, Pending::Account(req, sender));
|
|
||||||
receiver
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Request code by address, known code hash, and block header.
|
|
||||||
pub fn code(&self, ctx: &BasicContext, req: request::Code) -> Receiver<Bytes> {
|
|
||||||
let (sender, receiver) = oneshot::channel();
|
|
||||||
|
|
||||||
// fast path for no code.
|
|
||||||
if req.code_hash == SHA3_EMPTY {
|
|
||||||
sender.send(Vec::new()).expect(RECEIVER_IN_SCOPE)
|
|
||||||
} else {
|
|
||||||
self.dispatch(ctx, Pending::Code(req, sender));
|
|
||||||
}
|
|
||||||
|
|
||||||
receiver
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Request proof-of-execution for a transaction.
|
|
||||||
pub fn transaction_proof(&self, ctx: &BasicContext, req: request::TransactionProof) -> Receiver<Result<Executed, ExecutionError>> {
|
|
||||||
let (sender, receiver) = oneshot::channel();
|
|
||||||
|
|
||||||
self.dispatch(ctx, Pending::TxProof(req, sender));
|
|
||||||
|
|
||||||
receiver
|
|
||||||
}
|
|
||||||
|
|
||||||
// dispatch the request, with a "suitability" function to filter acceptable peers.
|
|
||||||
fn dispatch(&self, ctx: &BasicContext, pending: Pending) {
|
|
||||||
let mut builder = basic_request::RequestBuilder::default();
|
let mut builder = basic_request::RequestBuilder::default();
|
||||||
builder.push(pending.make_request())
|
|
||||||
.expect("make_request always returns fully complete request; qed");
|
|
||||||
|
|
||||||
let complete = builder.build();
|
let responses = Vec::with_capacity(requests.len());
|
||||||
|
for request in requests {
|
||||||
let kind = complete.requests()[0].kind();
|
builder.push(CheckedRequest::from(request))?;
|
||||||
for (id, peer) in self.peers.read().iter() {
|
|
||||||
if !peer.can_handle(&pending) { continue }
|
|
||||||
match ctx.request_from(*id, complete.clone()) {
|
|
||||||
Ok(req_id) => {
|
|
||||||
trace!(target: "on_demand", "{}: Assigned {:?} to peer {}",
|
|
||||||
req_id, kind, id);
|
|
||||||
|
|
||||||
self.pending_requests.write().insert(
|
|
||||||
req_id,
|
|
||||||
pending,
|
|
||||||
);
|
|
||||||
return
|
|
||||||
}
|
|
||||||
Err(net::Error::NoCredits) => {}
|
|
||||||
Err(e) =>
|
|
||||||
trace!(target: "on_demand", "Failed to make request of peer {}: {:?}", id, e),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
self.orphaned_requests.write().push(pending);
|
let requests = builder.build();
|
||||||
|
let net_requests = requests.clone().map_requests(|req| req.into_net_request());
|
||||||
|
let capabilities = guess_capabilities(requests.requests());
|
||||||
|
|
||||||
|
self.pending.write().push(Pending {
|
||||||
|
requests: requests,
|
||||||
|
net_requests: net_requests,
|
||||||
|
required_capabilities: capabilities,
|
||||||
|
responses: responses,
|
||||||
|
sender: sender,
|
||||||
|
});
|
||||||
|
|
||||||
|
Ok(receiver)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// dispatch pending requests, and discard those for which the corresponding
|
||||||
// dispatch orphaned requests, and discard those for which the corresponding
|
|
||||||
// receiver has been dropped.
|
// receiver has been dropped.
|
||||||
fn dispatch_orphaned(&self, ctx: &BasicContext) {
|
fn dispatch_pending(&self, ctx: &BasicContext) {
|
||||||
// wrapper future for calling `poll_cancel` on our `Senders` to preserve
|
// wrapper future for calling `poll_cancel` on our `Senders` to preserve
|
||||||
// the invariant that it's always within a task.
|
// the invariant that it's always within a task.
|
||||||
struct CheckHangup<'a, T: 'a>(&'a mut Sender<T>);
|
struct CheckHangup<'a, T: 'a>(&'a mut Sender<T>);
|
||||||
@ -356,35 +342,42 @@ impl OnDemand {
|
|||||||
CheckHangup(send).wait().expect("CheckHangup always returns ok; qed")
|
CheckHangup(send).wait().expect("CheckHangup always returns ok; qed")
|
||||||
}
|
}
|
||||||
|
|
||||||
if self.orphaned_requests.read().is_empty() { return }
|
if self.pending.read().is_empty() { return }
|
||||||
|
let mut pending = self.pending.write();
|
||||||
|
|
||||||
let to_dispatch = ::std::mem::replace(&mut *self.orphaned_requests.write(), Vec::new());
|
// iterate over all pending requests, and check them for hang-up.
|
||||||
|
// then, try and find a peer who can serve it.
|
||||||
trace!(target: "on_demand", "Attempting to dispatch {} orphaned requests.", to_dispatch.len());
|
let peers = self.peers.read();
|
||||||
for mut orphaned in to_dispatch {
|
*pending = ::std::mem::replace(&mut *pending, Vec::new()).into_iter()
|
||||||
let hung_up = match orphaned {
|
.filter_map(|mut pending| match check_hangup(&mut pending.sender) {
|
||||||
Pending::HeaderProof(_, ref mut sender) => match *sender {
|
true => Some(pending),
|
||||||
ChtProofSender::Both(ref mut s) => check_hangup(s),
|
false => None,
|
||||||
ChtProofSender::Hash(ref mut s) => check_hangup(s),
|
})
|
||||||
ChtProofSender::ChainScore(ref mut s) => check_hangup(s),
|
.filter_map(|pending| {
|
||||||
},
|
for (peer_id, peer) in peers.iter() { // .shuffle?
|
||||||
Pending::HeaderByHash(_, ref mut sender) => check_hangup(sender),
|
if !peer.can_fulfill(&pending.required_capabilities) {
|
||||||
Pending::Block(_, ref mut sender) => check_hangup(sender),
|
continue
|
||||||
Pending::BlockReceipts(_, ref mut sender) => check_hangup(sender),
|
|
||||||
Pending::Account(_, ref mut sender) => check_hangup(sender),
|
|
||||||
Pending::Code(_, ref mut sender) => check_hangup(sender),
|
|
||||||
Pending::TxProof(_, ref mut sender) => check_hangup(sender),
|
|
||||||
};
|
|
||||||
|
|
||||||
if !hung_up { self.dispatch(ctx, orphaned) }
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
match ctx.request_from(*peer_id, pending.net_requests.clone()) {
|
||||||
|
Ok(req_id) => {
|
||||||
|
self.in_transit.write().insert(req_id, pending);
|
||||||
|
return None
|
||||||
|
}
|
||||||
|
Err(net::Error::NoCredits) => {}
|
||||||
|
Err(e) => debug!(target: "on_demand", "Error dispatching request to peer: {}", e),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Some(pending)
|
||||||
|
})
|
||||||
|
.collect(); // `pending` now contains all requests we couldn't dispatch.
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Handler for OnDemand {
|
impl Handler for OnDemand {
|
||||||
fn on_connect(&self, ctx: &EventContext, status: &Status, capabilities: &Capabilities) {
|
fn on_connect(&self, ctx: &EventContext, status: &Status, capabilities: &Capabilities) {
|
||||||
self.peers.write().insert(ctx.peer(), Peer { status: status.clone(), capabilities: capabilities.clone() });
|
self.peers.write().insert(ctx.peer(), Peer { status: status.clone(), capabilities: capabilities.clone() });
|
||||||
self.dispatch_orphaned(ctx.as_basic());
|
self.dispatch_pending(ctx.as_basic());
|
||||||
}
|
}
|
||||||
|
|
||||||
fn on_disconnect(&self, ctx: &EventContext, unfulfilled: &[ReqId]) {
|
fn on_disconnect(&self, ctx: &EventContext, unfulfilled: &[ReqId]) {
|
||||||
@ -392,16 +385,16 @@ impl Handler for OnDemand {
|
|||||||
let ctx = ctx.as_basic();
|
let ctx = ctx.as_basic();
|
||||||
|
|
||||||
{
|
{
|
||||||
let mut orphaned = self.orphaned_requests.write();
|
let mut pending = self.pending.write();
|
||||||
for unfulfilled in unfulfilled {
|
for unfulfilled in unfulfilled {
|
||||||
if let Some(pending) = self.pending_requests.write().remove(unfulfilled) {
|
if let Some(unfulfilled) = self.in_transit.write().remove(unfulfilled) {
|
||||||
trace!(target: "on_demand", "Attempting to reassign dropped request");
|
trace!(target: "on_demand", "Attempting to reassign dropped request");
|
||||||
orphaned.push(pending);
|
pending.push(unfulfilled);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
self.dispatch_orphaned(ctx);
|
self.dispatch_pending(ctx);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn on_announcement(&self, ctx: &EventContext, announcement: &Announcement) {
|
fn on_announcement(&self, ctx: &EventContext, announcement: &Announcement) {
|
||||||
@ -413,142 +406,68 @@ impl Handler for OnDemand {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
self.dispatch_orphaned(ctx.as_basic());
|
self.dispatch_pending(ctx.as_basic());
|
||||||
}
|
}
|
||||||
|
|
||||||
fn on_responses(&self, ctx: &EventContext, req_id: ReqId, responses: &[basic_request::Response]) {
|
fn on_responses(&self, ctx: &EventContext, req_id: ReqId, responses: &[basic_request::Response]) {
|
||||||
|
use request::IncompleteRequest;
|
||||||
|
|
||||||
let peer = ctx.peer();
|
let peer = ctx.peer();
|
||||||
let req = match self.pending_requests.write().remove(&req_id) {
|
let mut pending = match self.in_transit.write().remove(&req_id) {
|
||||||
Some(req) => req,
|
Some(req) => req,
|
||||||
None => return,
|
None => return,
|
||||||
};
|
};
|
||||||
|
|
||||||
let response = match responses.get(0) {
|
// for each incoming response
|
||||||
Some(response) => response,
|
// 1. ensure verification data filled.
|
||||||
None => {
|
// 2. pending.requests.supply_response
|
||||||
trace!(target: "on_demand", "Ignoring empty response for request {}", req_id);
|
// 3. if extracted on-demand response
|
||||||
self.dispatch(ctx.as_basic(), req);
|
for response in responses {
|
||||||
|
match pending.requests.supply_response(response) {
|
||||||
|
Ok(response) => pending.responses.push(response),
|
||||||
|
Err(e) => {
|
||||||
|
let peer = ctx.peer();
|
||||||
|
debug!(target: "on_demand", "Peer {} gave bad response: {:?}", peer, e);
|
||||||
|
ctx.disable_peer(peer);
|
||||||
|
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if pending.requests.is_complete() {
|
||||||
|
let _ = pending.sender.send(pending.responses);
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
};
|
|
||||||
|
|
||||||
trace!(target: "on_demand", "Handling response for request {}, kind={:?}", req_id, response.kind());
|
// update network requests (unless we're done, in which case fulfill the future.)
|
||||||
|
let mut builder = basic_request::RequestBuilder::default();
|
||||||
|
let num_answered = pending.requests.num_answered();
|
||||||
|
let mut mapping = move |idx| idx - num_answered;
|
||||||
|
|
||||||
// handle the response appropriately for the request.
|
for request in pending.requests.requests().iter().skip(num_answered) {
|
||||||
// all branches which do not return early lead to disabling of the peer
|
let mut net_req = request.clone().into_net_request();
|
||||||
// due to misbehavior.
|
|
||||||
match req {
|
|
||||||
Pending::HeaderProof(req, sender) => {
|
|
||||||
if let NetworkResponse::HeaderProof(ref response) = *response {
|
|
||||||
match req.check_response(&response.proof) {
|
|
||||||
Ok((hash, score)) => {
|
|
||||||
let mut cache = self.cache.lock();
|
|
||||||
cache.insert_block_hash(req.num(), hash);
|
|
||||||
cache.insert_chain_score(hash, score);
|
|
||||||
|
|
||||||
match sender {
|
// all back-references with request index less than `num_answered` have
|
||||||
ChtProofSender::Both(sender) => { let _ = sender.send((hash, score)); }
|
// been filled by now. all remaining requests point to nothing earlier
|
||||||
ChtProofSender::Hash(sender) => { let _ = sender.send(hash); }
|
// than the next unanswered request.
|
||||||
ChtProofSender::ChainScore(sender) => { let _ = sender.send(score); }
|
net_req.adjust_refs(&mut mapping);
|
||||||
}
|
builder.push(net_req)
|
||||||
return
|
.expect("all back-references to answered requests have been filled; qed");
|
||||||
}
|
|
||||||
Err(e) => warn!(target: "on_demand", "Error handling response for header request: {:?}", e),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Pending::HeaderByHash(req, sender) => {
|
|
||||||
if let NetworkResponse::Headers(ref response) = *response {
|
|
||||||
match req.check_response(&response.headers) {
|
|
||||||
Ok(header) => {
|
|
||||||
self.cache.lock().insert_block_header(req.0, header.clone());
|
|
||||||
let _ = sender.send(header);
|
|
||||||
return
|
|
||||||
}
|
|
||||||
Err(e) => warn!(target: "on_demand", "Error handling response for header request: {:?}", e),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Pending::Block(req, sender) => {
|
|
||||||
if let NetworkResponse::Body(ref response) = *response {
|
|
||||||
match req.check_response(&response.body) {
|
|
||||||
Ok(block) => {
|
|
||||||
self.cache.lock().insert_block_body(req.hash, response.body.clone());
|
|
||||||
let _ = sender.send(block);
|
|
||||||
return
|
|
||||||
}
|
|
||||||
Err(e) => warn!(target: "on_demand", "Error handling response for block request: {:?}", e),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Pending::BlockReceipts(req, sender) => {
|
|
||||||
if let NetworkResponse::Receipts(ref response) = *response {
|
|
||||||
match req.check_response(&response.receipts) {
|
|
||||||
Ok(receipts) => {
|
|
||||||
let hash = req.0.hash();
|
|
||||||
self.cache.lock().insert_block_receipts(hash, receipts.clone());
|
|
||||||
let _ = sender.send(receipts);
|
|
||||||
return
|
|
||||||
}
|
|
||||||
Err(e) => warn!(target: "on_demand", "Error handling response for receipts request: {:?}", e),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Pending::Account(req, sender) => {
|
|
||||||
if let NetworkResponse::Account(ref response) = *response {
|
|
||||||
match req.check_response(&response.proof) {
|
|
||||||
Ok(account) => {
|
|
||||||
let account = account.unwrap_or_else(|| {
|
|
||||||
BasicAccount {
|
|
||||||
balance: 0.into(),
|
|
||||||
nonce: self.start_nonce,
|
|
||||||
code_hash: SHA3_EMPTY,
|
|
||||||
storage_root: SHA3_NULL_RLP
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
// TODO: validate against request outputs.
|
|
||||||
// needs engine + env info as part of request.
|
|
||||||
let _ = sender.send(account);
|
|
||||||
return
|
|
||||||
}
|
|
||||||
Err(e) => warn!(target: "on_demand", "Error handling response for state request: {:?}", e),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Pending::Code(req, sender) => {
|
|
||||||
if let NetworkResponse::Code(ref response) = *response {
|
|
||||||
match req.check_response(response.code.as_slice()) {
|
|
||||||
Ok(code) => {
|
|
||||||
let _ = sender.send(code);
|
|
||||||
return
|
|
||||||
}
|
|
||||||
Err(e) => warn!(target: "on_demand", "Error handling response for code request: {:?}", e),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Pending::TxProof(req, sender) => {
|
|
||||||
if let NetworkResponse::Execution(ref response) = *response {
|
|
||||||
match req.check_response(&response.items) {
|
|
||||||
ProvedExecution::Complete(executed) => {
|
|
||||||
let _ = sender.send(Ok(executed));
|
|
||||||
return
|
|
||||||
}
|
|
||||||
ProvedExecution::Failed(err) => {
|
|
||||||
let _ = sender.send(Err(err));
|
|
||||||
return
|
|
||||||
}
|
|
||||||
ProvedExecution::BadProof => warn!(target: "on_demand", "Error handling response for transaction proof request"),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx.disable_peer(peer);
|
// update pending fields and re-queue.
|
||||||
|
let capabilities = guess_capabilities(&pending.requests.requests()[num_answered..]);
|
||||||
|
pending.net_requests = builder.build();
|
||||||
|
pending.required_capabilities = capabilities;
|
||||||
|
|
||||||
|
self.pending.write().push(pending);
|
||||||
|
self.dispatch_pending(ctx.as_basic());
|
||||||
}
|
}
|
||||||
|
|
||||||
fn tick(&self, ctx: &BasicContext) {
|
fn tick(&self, ctx: &BasicContext) {
|
||||||
self.dispatch_orphaned(ctx)
|
self.dispatch_pending(ctx)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -587,7 +506,7 @@ mod tests {
|
|||||||
assert!(on_demand.orphaned_requests.read().len() == 1);
|
assert!(on_demand.orphaned_requests.read().len() == 1);
|
||||||
drop(result);
|
drop(result);
|
||||||
|
|
||||||
on_demand.dispatch_orphaned(&FakeContext);
|
on_demand.dispatch_pending(&FakeContext);
|
||||||
assert!(on_demand.orphaned_requests.read().is_empty());
|
assert!(on_demand.orphaned_requests.read().is_empty());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -126,6 +126,23 @@ impl From<Request> for CheckedRequest {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl CheckedRequest {
|
||||||
|
/// Convert this into a network request.
|
||||||
|
pub fn into_net_request(self) -> net_request::Request {
|
||||||
|
use ::request::Request as NetRequest;
|
||||||
|
|
||||||
|
match self {
|
||||||
|
CheckedRequest::HeaderProof(_, req) => NetRequest::HeaderProof(req),
|
||||||
|
CheckedRequest::HeaderByHash(_, req) => NetRequest::Headers(req),
|
||||||
|
CheckedRequest::Receipts(_, req) => NetRequest::Receipts(req),
|
||||||
|
CheckedRequest::Body(_, req) => NetRequest::Body(req),
|
||||||
|
CheckedRequest::Account(_, req) => NetRequest::Account(req),
|
||||||
|
CheckedRequest::Code(_, req) => NetRequest::Code(req),
|
||||||
|
CheckedRequest::Execution(_, req) => NetRequest::Execution(req),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl IncompleteRequest for CheckedRequest {
|
impl IncompleteRequest for CheckedRequest {
|
||||||
type Complete = net_request::CompleteRequest;
|
type Complete = net_request::CompleteRequest;
|
||||||
type Response = net_request::Response;
|
type Response = net_request::Response;
|
||||||
@ -192,6 +209,19 @@ impl IncompleteRequest for CheckedRequest {
|
|||||||
CheckedRequest::Execution(_, req) => req.complete().map(CompleteRequest::Execution),
|
CheckedRequest::Execution(_, req) => req.complete().map(CompleteRequest::Execution),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
fn adjust_refs<F>(&mut self, mapping: F) where F: FnMut(usize) -> usize {
|
||||||
|
match *self {
|
||||||
|
CheckedRequest::HeaderProof(_, ref mut req) => req.adjust_refs(mapping),
|
||||||
|
CheckedRequest::HeaderByHash(_, ref mut req) => req.adjust_refs(mapping),
|
||||||
|
CheckedRequest::Receipts(_, ref mut req) => req.adjust_refs(mapping),
|
||||||
|
CheckedRequest::Body(_, ref mut req) => req.adjust_refs(mapping),
|
||||||
|
CheckedRequest::Account(_, ref mut req) => req.adjust_refs(mapping),
|
||||||
|
CheckedRequest::Code(_, ref mut req) => req.adjust_refs(mapping),
|
||||||
|
CheckedRequest::Execution(_, ref mut req) => req.adjust_refs(mapping),
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl net_request::CheckedRequest for CheckedRequest {
|
impl net_request::CheckedRequest for CheckedRequest {
|
||||||
|
@ -87,9 +87,14 @@ impl<T: IncompleteRequest + Clone> Requests<T> {
|
|||||||
/// Get the number of answered requests.
|
/// Get the number of answered requests.
|
||||||
pub fn num_answered(&self) -> usize { self.answered }
|
pub fn num_answered(&self) -> usize { self.answered }
|
||||||
|
|
||||||
|
/// Whether the batch is complete.
|
||||||
|
pub fn is_complete(&self) -> bool {
|
||||||
|
self.answered == self.requests.len()
|
||||||
|
}
|
||||||
|
|
||||||
/// Get the next request as a filled request. Returns `None` when all requests answered.
|
/// Get the next request as a filled request. Returns `None` when all requests answered.
|
||||||
pub fn next_complete(&self) -> Option<T::Complete> {
|
pub fn next_complete(&self) -> Option<T::Complete> {
|
||||||
if self.answered == self.requests.len() {
|
if self.is_complete() {
|
||||||
None
|
None
|
||||||
} else {
|
} else {
|
||||||
Some(self.requests[self.answered].clone()
|
Some(self.requests[self.answered].clone()
|
||||||
@ -97,6 +102,17 @@ impl<T: IncompleteRequest + Clone> Requests<T> {
|
|||||||
.expect("All outputs checked as invariant of `Requests` object; qed"))
|
.expect("All outputs checked as invariant of `Requests` object; qed"))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Map requests from one type into another.
|
||||||
|
pub fn map_requests<F, U>(self, f: F) -> Requests<U>
|
||||||
|
where F: FnMut(T) -> U, U: IncompleteRequest
|
||||||
|
{
|
||||||
|
Requests {
|
||||||
|
outputs: self.outputs,
|
||||||
|
requests: self.requests.into_iter().map(f).collect(),
|
||||||
|
answered: self.answered,
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: super::CheckedRequest> Requests<T> {
|
impl<T: super::CheckedRequest> Requests<T> {
|
||||||
@ -122,8 +138,8 @@ impl<T: super::CheckedRequest> Requests<T> {
|
|||||||
|
|
||||||
self.answered += 1;
|
self.answered += 1;
|
||||||
|
|
||||||
// fill as much of the next request as we can.
|
// fill as much of each remaining request as we can.
|
||||||
if let Some(ref mut req) = self.requests.get_mut(self.answered) {
|
for req in self.requests.iter_mut().skip(self.answered) {
|
||||||
req.fill(|req_idx, out_idx| outputs.get(&(req_idx, out_idx)).cloned().ok_or(NoSuchOutput))
|
req.fill(|req_idx, out_idx| outputs.get(&(req_idx, out_idx)).cloned().ok_or(NoSuchOutput))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -100,6 +100,12 @@ impl<T> Field<T> {
|
|||||||
_ => Err(NoSuchOutput),
|
_ => Err(NoSuchOutput),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn adjust_req<F>(&mut self, mut mapping: F) where F: FnMut(usize) -> usize {
|
||||||
|
if let Field::BackReference(ref mut req_idx, _) = *self {
|
||||||
|
*req_idx = mapping(*req_idx)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> From<T> for Field<T> {
|
impl<T> From<T> for Field<T> {
|
||||||
@ -358,6 +364,19 @@ impl IncompleteRequest for Request {
|
|||||||
Request::Execution(req) => req.complete().map(CompleteRequest::Execution),
|
Request::Execution(req) => req.complete().map(CompleteRequest::Execution),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn adjust_refs<F>(&mut self, mapping: F) where F: FnMut(usize) -> usize {
|
||||||
|
match *self {
|
||||||
|
Request::Headers(ref mut req) => req.adjust_refs(mapping),
|
||||||
|
Request::HeaderProof(ref mut req) => req.adjust_refs(mapping),
|
||||||
|
Request::Receipts(ref mut req) => req.adjust_refs(mapping),
|
||||||
|
Request::Body(ref mut req) => req.adjust_refs(mapping),
|
||||||
|
Request::Account(ref mut req) => req.adjust_refs(mapping),
|
||||||
|
Request::Storage(ref mut req) => req.adjust_refs(mapping),
|
||||||
|
Request::Code(ref mut req) => req.adjust_refs(mapping),
|
||||||
|
Request::Execution(ref mut req) => req.adjust_refs(mapping),
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl CheckedRequest for Request {
|
impl CheckedRequest for Request {
|
||||||
@ -536,6 +555,9 @@ pub trait IncompleteRequest: Sized {
|
|||||||
/// Attempt to convert this request into its complete variant.
|
/// Attempt to convert this request into its complete variant.
|
||||||
/// Will succeed if all fields have been filled, will fail otherwise.
|
/// Will succeed if all fields have been filled, will fail otherwise.
|
||||||
fn complete(self) -> Result<Self::Complete, NoSuchOutput>;
|
fn complete(self) -> Result<Self::Complete, NoSuchOutput>;
|
||||||
|
|
||||||
|
/// Adjust back-reference request indices.
|
||||||
|
fn adjust_refs<F>(&mut self, mapping: F) where F: FnMut(usize) -> usize;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A request which can be checked against its response for more validity.
|
/// A request which can be checked against its response for more validity.
|
||||||
@ -631,6 +653,10 @@ pub mod header {
|
|||||||
reverse: self.reverse,
|
reverse: self.reverse,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn adjust_refs<F>(&mut self, mapping: F) where F: FnMut(usize) -> usize {
|
||||||
|
self.start.adjust_req(mapping)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A complete header request.
|
/// A complete header request.
|
||||||
@ -745,6 +771,10 @@ pub mod header_proof {
|
|||||||
num: self.num.into_scalar()?,
|
num: self.num.into_scalar()?,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn adjust_refs<F>(&mut self, mapping: F) where F: FnMut(usize) -> usize {
|
||||||
|
self.num.adjust_req(mapping)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A complete header proof request.
|
/// A complete header proof request.
|
||||||
@ -849,6 +879,10 @@ pub mod block_receipts {
|
|||||||
hash: self.hash.into_scalar()?,
|
hash: self.hash.into_scalar()?,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn adjust_refs<F>(&mut self, mapping: F) where F: FnMut(usize) -> usize {
|
||||||
|
self.hash.adjust_req(mapping)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A complete block receipts request.
|
/// A complete block receipts request.
|
||||||
@ -942,6 +976,10 @@ pub mod block_body {
|
|||||||
hash: self.hash.into_scalar()?,
|
hash: self.hash.into_scalar()?,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn adjust_refs<F>(&mut self, mapping: F) where F: FnMut(usize) -> usize {
|
||||||
|
self.hash.adjust_req(mapping)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A complete block body request.
|
/// A complete block body request.
|
||||||
@ -1062,6 +1100,11 @@ pub mod account {
|
|||||||
address_hash: self.address_hash.into_scalar()?,
|
address_hash: self.address_hash.into_scalar()?,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn adjust_refs<F>(&mut self, mut mapping: F) where F: FnMut(usize) -> usize {
|
||||||
|
self.block_hash.adjust_req(&mut mapping);
|
||||||
|
self.address_hash.adjust_req(&mut mapping);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A complete request for an account.
|
/// A complete request for an account.
|
||||||
@ -1212,6 +1255,12 @@ pub mod storage {
|
|||||||
key_hash: self.key_hash.into_scalar()?,
|
key_hash: self.key_hash.into_scalar()?,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn adjust_refs<F>(&mut self, mut mapping: F) where F: FnMut(usize) -> usize {
|
||||||
|
self.block_hash.adjust_req(&mut mapping);
|
||||||
|
self.address_hash.adjust_req(&mut mapping);
|
||||||
|
self.key_hash.adjust_req(&mut mapping);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A complete request for a storage proof.
|
/// A complete request for a storage proof.
|
||||||
@ -1332,6 +1381,11 @@ pub mod contract_code {
|
|||||||
code_hash: self.code_hash.into_scalar()?,
|
code_hash: self.code_hash.into_scalar()?,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn adjust_refs<F>(&mut self, mut mapping: F) where F: FnMut(usize) -> usize {
|
||||||
|
self.block_hash.adjust_req(&mut mapping);
|
||||||
|
self.code_hash.adjust_req(&mut mapping);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A complete request.
|
/// A complete request.
|
||||||
@ -1464,6 +1518,10 @@ pub mod execution {
|
|||||||
data: self.data,
|
data: self.data,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn adjust_refs<F>(&mut self, mapping: F) where F: FnMut(usize) -> usize {
|
||||||
|
self.block_hash.adjust_req(mapping);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A complete request.
|
/// A complete request.
|
||||||
|
Loading…
Reference in New Issue
Block a user