Merge pull request #4485 from ethcore/lightrpc

Preparation for Light client RPC
This commit is contained in:
Robert Habermeier 2017-02-10 17:15:03 +01:00 committed by GitHub
commit 48ae38eaf9
34 changed files with 1463 additions and 678 deletions

1
Cargo.lock generated
View File

@ -601,6 +601,7 @@ dependencies = [
"ethcore-devtools 1.6.0", "ethcore-devtools 1.6.0",
"ethcore-io 1.6.0", "ethcore-io 1.6.0",
"ethcore-ipc 1.6.0", "ethcore-ipc 1.6.0",
"ethcore-light 1.6.0",
"ethcore-util 1.6.0", "ethcore-util 1.6.0",
"ethcrypto 0.1.0", "ethcrypto 0.1.0",
"ethjson 0.1.0", "ethjson 0.1.0",

View File

@ -32,9 +32,10 @@ use cht;
use ethcore::block_status::BlockStatus; use ethcore::block_status::BlockStatus;
use ethcore::error::BlockError; use ethcore::error::BlockError;
use ethcore::encoded;
use ethcore::header::Header;
use ethcore::ids::BlockId; use ethcore::ids::BlockId;
use ethcore::views::HeaderView; use util::{H256, U256, HeapSizeOf, Mutex, RwLock};
use util::{Bytes, H256, U256, HeapSizeOf, Mutex, RwLock};
use smallvec::SmallVec; use smallvec::SmallVec;
@ -77,9 +78,9 @@ impl HeapSizeOf for Entry {
/// Header chain. See module docs for more details. /// Header chain. See module docs for more details.
pub struct HeaderChain { pub struct HeaderChain {
genesis_header: Bytes, // special-case the genesis. genesis_header: encoded::Header, // special-case the genesis.
candidates: RwLock<BTreeMap<u64, Entry>>, candidates: RwLock<BTreeMap<u64, Entry>>,
headers: RwLock<HashMap<H256, Bytes>>, headers: RwLock<HashMap<H256, encoded::Header>>,
best_block: RwLock<BlockDescriptor>, best_block: RwLock<BlockDescriptor>,
cht_roots: Mutex<Vec<H256>>, cht_roots: Mutex<Vec<H256>>,
} }
@ -87,10 +88,12 @@ pub struct HeaderChain {
impl HeaderChain { impl HeaderChain {
/// Create a new header chain given this genesis block. /// Create a new header chain given this genesis block.
pub fn new(genesis: &[u8]) -> Self { pub fn new(genesis: &[u8]) -> Self {
use ethcore::views::HeaderView;
let g_view = HeaderView::new(genesis); let g_view = HeaderView::new(genesis);
HeaderChain { HeaderChain {
genesis_header: genesis.to_owned(), genesis_header: encoded::Header::new(genesis.to_owned()),
best_block: RwLock::new(BlockDescriptor { best_block: RwLock::new(BlockDescriptor {
hash: g_view.hash(), hash: g_view.hash(),
number: 0, number: 0,
@ -104,14 +107,11 @@ impl HeaderChain {
/// Insert a pre-verified header. /// Insert a pre-verified header.
/// ///
/// This blindly trusts that the data given to it is /// This blindly trusts that the data given to it is sensible.
/// a) valid RLP encoding of a header and pub fn insert(&self, header: Header) -> Result<(), BlockError> {
/// b) has sensible data contained within it. let hash = header.hash();
pub fn insert(&self, header: Bytes) -> Result<(), BlockError> { let number = header.number();
let view = HeaderView::new(&header); let parent_hash = *header.parent_hash();
let hash = view.hash();
let number = view.number();
let parent_hash = view.parent_hash();
// hold candidates the whole time to guard import order. // hold candidates the whole time to guard import order.
let mut candidates = self.candidates.write(); let mut candidates = self.candidates.write();
@ -119,8 +119,7 @@ impl HeaderChain {
// find parent details. // find parent details.
let parent_td = let parent_td =
if number == 1 { if number == 1 {
let g_view = HeaderView::new(&self.genesis_header); self.genesis_header.difficulty()
g_view.difficulty()
} else { } else {
candidates.get(&(number - 1)) candidates.get(&(number - 1))
.and_then(|entry| entry.candidates.iter().find(|c| c.hash == parent_hash)) .and_then(|entry| entry.candidates.iter().find(|c| c.hash == parent_hash))
@ -128,7 +127,7 @@ impl HeaderChain {
.ok_or_else(|| BlockError::UnknownParent(parent_hash))? .ok_or_else(|| BlockError::UnknownParent(parent_hash))?
}; };
let total_difficulty = parent_td + view.difficulty(); let total_difficulty = parent_td + *header.difficulty();
// insert headers and candidates entries. // insert headers and candidates entries.
candidates.entry(number).or_insert_with(|| Entry { candidates: SmallVec::new(), canonical_hash: hash }) candidates.entry(number).or_insert_with(|| Entry { candidates: SmallVec::new(), canonical_hash: hash })
@ -138,7 +137,8 @@ impl HeaderChain {
total_difficulty: total_difficulty, total_difficulty: total_difficulty,
}); });
self.headers.write().insert(hash, header.clone()); let raw = ::rlp::encode(&header).to_vec();
self.headers.write().insert(hash, encoded::Header::new(raw));
// reorganize ancestors so canonical entries are first in their // reorganize ancestors so canonical entries are first in their
// respective candidates vectors. // respective candidates vectors.
@ -211,24 +211,36 @@ impl HeaderChain {
/// Get a block header. In the case of query by number, only canonical blocks /// Get a block header. In the case of query by number, only canonical blocks
/// will be returned. /// will be returned.
pub fn get_header(&self, id: BlockId) -> Option<Bytes> { pub fn block_header(&self, id: BlockId) -> Option<encoded::Header> {
match id { match id {
BlockId::Earliest | BlockId::Number(0) => Some(self.genesis_header.clone()), BlockId::Earliest | BlockId::Number(0) => Some(self.genesis_header.clone()),
BlockId::Latest if self.headers.read().is_empty() => Some(self.genesis_header.clone()), BlockId::Hash(hash) => self.headers.read().get(&hash).cloned(),
BlockId::Hash(hash) => self.headers.read().get(&hash).map(|x| x.to_vec()),
BlockId::Number(num) => { BlockId::Number(num) => {
if self.best_block.read().number < num { return None } if self.best_block.read().number < num { return None }
self.candidates.read().get(&num).map(|entry| entry.canonical_hash) self.candidates.read().get(&num).map(|entry| entry.canonical_hash)
.and_then(|hash| self.headers.read().get(&hash).map(|x| x.to_vec())) .and_then(|hash| self.headers.read().get(&hash).cloned())
} }
BlockId::Latest | BlockId::Pending => { BlockId::Latest | BlockId::Pending => {
let hash = self.best_block.read().hash; let hash = {
self.headers.read().get(&hash).map(|x| x.to_vec()) let best = self.best_block.read();
if best.number == 0 {
return Some(self.genesis_header.clone())
}
best.hash
};
self.headers.read().get(&hash).cloned()
} }
} }
} }
/// Get the best block's header.
pub fn best_header(&self) -> encoded::Header {
self.block_header(BlockId::Latest).expect("Header for best block always stored; qed")
}
/// Get the nth CHT root, if it's been computed. /// Get the nth CHT root, if it's been computed.
/// ///
/// CHT root 0 is from block `1..2048`. /// CHT root 0 is from block `1..2048`.
@ -305,15 +317,15 @@ mod tests {
header.set_number(i); header.set_number(i);
header.set_timestamp(rolling_timestamp); header.set_timestamp(rolling_timestamp);
header.set_difficulty(*genesis_header.difficulty() * i.into()); header.set_difficulty(*genesis_header.difficulty() * i.into());
chain.insert(::rlp::encode(&header).to_vec()).unwrap();
parent_hash = header.hash(); parent_hash = header.hash();
chain.insert(header).unwrap();
rolling_timestamp += 10; rolling_timestamp += 10;
} }
assert!(chain.get_header(BlockId::Number(10)).is_none()); assert!(chain.block_header(BlockId::Number(10)).is_none());
assert!(chain.get_header(BlockId::Number(9000)).is_some()); assert!(chain.block_header(BlockId::Number(9000)).is_some());
assert!(chain.cht_root(2).is_some()); assert!(chain.cht_root(2).is_some());
assert!(chain.cht_root(3).is_none()); assert!(chain.cht_root(3).is_none());
} }
@ -333,10 +345,10 @@ mod tests {
header.set_number(i); header.set_number(i);
header.set_timestamp(rolling_timestamp); header.set_timestamp(rolling_timestamp);
header.set_difficulty(*genesis_header.difficulty() * i.into()); header.set_difficulty(*genesis_header.difficulty() * i.into());
chain.insert(::rlp::encode(&header).to_vec()).unwrap();
parent_hash = header.hash(); parent_hash = header.hash();
chain.insert(header).unwrap();
rolling_timestamp += 10; rolling_timestamp += 10;
} }
@ -349,10 +361,10 @@ mod tests {
header.set_number(i); header.set_number(i);
header.set_timestamp(rolling_timestamp); header.set_timestamp(rolling_timestamp);
header.set_difficulty(*genesis_header.difficulty() * i.into()); header.set_difficulty(*genesis_header.difficulty() * i.into());
chain.insert(::rlp::encode(&header).to_vec()).unwrap();
parent_hash = header.hash(); parent_hash = header.hash();
chain.insert(header).unwrap();
rolling_timestamp += 10; rolling_timestamp += 10;
} }
} }
@ -370,10 +382,10 @@ mod tests {
header.set_number(i); header.set_number(i);
header.set_timestamp(rolling_timestamp); header.set_timestamp(rolling_timestamp);
header.set_difficulty(*genesis_header.difficulty() * (i * i).into()); header.set_difficulty(*genesis_header.difficulty() * (i * i).into());
chain.insert(::rlp::encode(&header).to_vec()).unwrap();
parent_hash = header.hash(); parent_hash = header.hash();
chain.insert(header).unwrap();
rolling_timestamp += 11; rolling_timestamp += 11;
} }
} }
@ -382,11 +394,23 @@ mod tests {
assert_eq!(num, 12); assert_eq!(num, 12);
while num > 0 { while num > 0 {
let header: Header = ::rlp::decode(&chain.get_header(BlockId::Number(num)).unwrap()); let header = chain.block_header(BlockId::Number(num)).unwrap();
assert_eq!(header.hash(), canon_hash); assert_eq!(header.hash(), canon_hash);
canon_hash = *header.parent_hash(); canon_hash = header.parent_hash();
num -= 1; num -= 1;
} }
} }
#[test]
fn earliest_is_latest() {
let spec = Spec::new_test();
let genesis_header = spec.genesis_header();
let chain = HeaderChain::new(&::rlp::encode(&genesis_header));
assert!(chain.block_header(BlockId::Earliest).is_some());
assert!(chain.block_header(BlockId::Latest).is_some());
assert!(chain.block_header(BlockId::Pending).is_some());
}
} }

View File

@ -21,7 +21,6 @@ use ethcore::block_status::BlockStatus;
use ethcore::client::ClientReport; use ethcore::client::ClientReport;
use ethcore::ids::BlockId; use ethcore::ids::BlockId;
use ethcore::header::Header; use ethcore::header::Header;
use ethcore::views::HeaderView;
use ethcore::verification::queue::{self, HeaderQueue}; use ethcore::verification::queue::{self, HeaderQueue};
use ethcore::transaction::{PendingTransaction, Condition as TransactionCondition}; use ethcore::transaction::{PendingTransaction, Condition as TransactionCondition};
use ethcore::blockchain_info::BlockChainInfo; use ethcore::blockchain_info::BlockChainInfo;
@ -35,7 +34,6 @@ use util::{Bytes, Mutex, RwLock};
use provider::Provider; use provider::Provider;
use request; use request;
use time;
use self::header_chain::HeaderChain; use self::header_chain::HeaderChain;
@ -109,12 +107,12 @@ impl Client {
/// Fetch a vector of all pending transactions. /// Fetch a vector of all pending transactions.
pub fn ready_transactions(&self) -> Vec<PendingTransaction> { pub fn ready_transactions(&self) -> Vec<PendingTransaction> {
let best_num = self.chain.best_block().number; let best = self.chain.best_header();
self.tx_pool.lock() self.tx_pool.lock()
.values() .values()
.filter(|t| match t.condition { .filter(|t| match t.condition {
Some(TransactionCondition::Number(ref x)) => x <= &best_num, Some(TransactionCondition::Number(x)) => x <= best.number(),
Some(TransactionCondition::Timestamp(ref x)) => *x <= time::get_time().sec as u64, Some(TransactionCondition::Timestamp(x)) => x <= best.timestamp(),
None => true, None => true,
}) })
.cloned() .cloned()
@ -131,17 +129,19 @@ impl Client {
/// Get the chain info. /// Get the chain info.
pub fn chain_info(&self) -> BlockChainInfo { pub fn chain_info(&self) -> BlockChainInfo {
let best_block = self.chain.best_block(); let best_hdr = self.chain.best_header();
let best_td = self.chain.best_block().total_difficulty;
let first_block = self.chain.first_block(); let first_block = self.chain.first_block();
let genesis_hash = self.chain.genesis_hash(); let genesis_hash = self.chain.genesis_hash();
BlockChainInfo { BlockChainInfo {
total_difficulty: best_block.total_difficulty, total_difficulty: best_td,
pending_total_difficulty: best_block.total_difficulty + self.queue.total_difficulty(), pending_total_difficulty: best_td + self.queue.total_difficulty(),
genesis_hash: genesis_hash, genesis_hash: genesis_hash,
best_block_hash: best_block.hash, best_block_hash: best_hdr.hash(),
best_block_number: best_block.number, best_block_number: best_hdr.number(),
best_block_timestamp: HeaderView::new(&self.chain.get_header(BlockId::Latest).expect("Latest hash is always in the chain")).timestamp(), best_block_timestamp: best_hdr.timestamp(),
ancient_block_hash: if first_block.is_some() { Some(genesis_hash) } else { None }, ancient_block_hash: if first_block.is_some() { Some(genesis_hash) } else { None },
ancient_block_number: if first_block.is_some() { Some(0) } else { None }, ancient_block_number: if first_block.is_some() { Some(0) } else { None },
first_block_hash: first_block.as_ref().map(|first| first.hash), first_block_hash: first_block.as_ref().map(|first| first.hash),
@ -155,8 +155,8 @@ impl Client {
} }
/// Get a block header by Id. /// Get a block header by Id.
pub fn get_header(&self, id: BlockId) -> Option<Bytes> { pub fn block_header(&self, id: BlockId) -> Option<encoded::Header> {
self.chain.get_header(id) self.chain.block_header(id)
} }
/// Flush the header queue. /// Flush the header queue.
@ -180,7 +180,7 @@ impl Client {
for verified_header in self.queue.drain(MAX) { for verified_header in self.queue.drain(MAX) {
let (num, hash) = (verified_header.number(), verified_header.hash()); let (num, hash) = (verified_header.number(), verified_header.hash());
match self.chain.insert(::rlp::encode(&verified_header).to_vec()) { match self.chain.insert(verified_header) {
Ok(()) => { Ok(()) => {
good.push(hash); good.push(hash);
self.report.write().blocks_imported += 1; self.report.write().blocks_imported += 1;
@ -252,7 +252,7 @@ impl Provider for Client {
} }
fn block_header(&self, id: BlockId) -> Option<encoded::Header> { fn block_header(&self, id: BlockId) -> Option<encoded::Header> {
self.chain.get_header(id).map(encoded::Header::new) Client::block_header(self, id)
} }
fn block_body(&self, _id: BlockId) -> Option<encoded::Body> { fn block_body(&self, _id: BlockId) -> Option<encoded::Body> {

View File

@ -25,52 +25,17 @@ use ethcore::encoded;
use ethcore::receipt::Receipt; use ethcore::receipt::Receipt;
use futures::{Async, Poll, Future}; use futures::{Async, Poll, Future};
use futures::sync::oneshot; use futures::sync::oneshot::{self, Sender, Receiver};
use network::PeerId; use network::PeerId;
use rlp::{RlpStream, Stream};
use util::{Bytes, RwLock, U256};
use util::sha3::{SHA3_NULL_RLP, SHA3_EMPTY_LIST_RLP};
use net::{Handler, Status, Capabilities, Announcement, EventContext, BasicContext, ReqId}; use net::{Handler, Status, Capabilities, Announcement, EventContext, BasicContext, ReqId};
use util::{Bytes, RwLock, U256};
use types::les_request::{self as les_request, Request as LesRequest}; use types::les_request::{self as les_request, Request as LesRequest};
pub mod request; pub mod request;
/// Errors which can occur while trying to fulfill a request.
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum Error {
/// Request was canceled.
Canceled,
/// No suitable peers available to serve the request.
NoPeersAvailable,
/// Invalid request.
InvalidRequest,
/// Request timed out.
TimedOut,
}
impl From<oneshot::Canceled> for Error {
fn from(_: oneshot::Canceled) -> Self {
Error::Canceled
}
}
/// Future which awaits a response to an on-demand request.
pub struct Response<T>(oneshot::Receiver<Result<T, Error>>);
impl<T> Future for Response<T> {
type Item = T;
type Error = Error;
fn poll(&mut self) -> Poll<T, Error> {
match self.0.poll().map_err(Into::into) {
Ok(Async::Ready(val)) => val.map(Async::Ready),
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(e) => Err(e),
}
}
}
type Sender<T> = oneshot::Sender<Result<T, Error>>;
// relevant peer info. // relevant peer info.
struct Peer { struct Peer {
status: Status, status: Status,
@ -84,6 +49,7 @@ enum Pending {
Block(request::Body, Sender<encoded::Block>), Block(request::Body, Sender<encoded::Block>),
BlockReceipts(request::BlockReceipts, Sender<Vec<Receipt>>), BlockReceipts(request::BlockReceipts, Sender<Vec<Receipt>>),
Account(request::Account, Sender<BasicAccount>), Account(request::Account, Sender<BasicAccount>),
Code(request::Code, Sender<Bytes>),
} }
/// On demand request service. See module docs for more details. /// On demand request service. See module docs for more details.
@ -92,6 +58,7 @@ enum Pending {
pub struct OnDemand { pub struct OnDemand {
peers: RwLock<HashMap<PeerId, Peer>>, peers: RwLock<HashMap<PeerId, Peer>>,
pending_requests: RwLock<HashMap<ReqId, Pending>>, pending_requests: RwLock<HashMap<ReqId, Pending>>,
orphaned_requests: RwLock<Vec<Pending>>,
} }
impl Default for OnDemand { impl Default for OnDemand {
@ -99,6 +66,7 @@ impl Default for OnDemand {
OnDemand { OnDemand {
peers: RwLock::new(HashMap::new()), peers: RwLock::new(HashMap::new()),
pending_requests: RwLock::new(HashMap::new()), pending_requests: RwLock::new(HashMap::new()),
orphaned_requests: RwLock::new(Vec::new()),
} }
} }
} }
@ -106,32 +74,27 @@ impl Default for OnDemand {
impl OnDemand { impl OnDemand {
/// Request a header by block number and CHT root hash. /// Request a header by block number and CHT root hash.
/// Returns the header and the total difficulty. /// Returns the header and the total difficulty.
pub fn header_by_number(&self, ctx: &BasicContext, req: request::HeaderByNumber) -> Response<(encoded::Header, U256)> { pub fn header_by_number(&self, ctx: &BasicContext, req: request::HeaderByNumber) -> Receiver<(encoded::Header, U256)> {
let (sender, receiver) = oneshot::channel(); let (sender, receiver) = oneshot::channel();
self.dispatch_header_by_number(ctx, req, sender); self.dispatch_header_by_number(ctx, req, sender);
Response(receiver) receiver
} }
// dispatch the request, completing the request if no peers available. // dispatch the request, completing the request if no peers available.
fn dispatch_header_by_number(&self, ctx: &BasicContext, req: request::HeaderByNumber, sender: Sender<(encoded::Header, U256)>) { fn dispatch_header_by_number(&self, ctx: &BasicContext, req: request::HeaderByNumber, sender: Sender<(encoded::Header, U256)>) {
let num = req.num; let num = req.num();
let cht_num = match ::cht::block_to_cht_number(req.num) { let cht_num = req.cht_num();
Some(cht_num) => cht_num,
None => {
warn!(target: "on_demand", "Attempted to dispatch invalid header proof: req.num == 0");
sender.complete(Err(Error::InvalidRequest));
return;
}
};
let les_req = LesRequest::HeaderProofs(les_request::HeaderProofs { let les_req = LesRequest::HeaderProofs(les_request::HeaderProofs {
requests: vec![les_request::HeaderProof { requests: vec![les_request::HeaderProof {
cht_number: cht_num, cht_number: cht_num,
block_number: req.num, block_number: num,
from_level: 0, from_level: 0,
}], }],
}); });
let pending = Pending::HeaderByNumber(req, sender);
// we're looking for a peer with serveHeaders who's far enough along in the // we're looking for a peer with serveHeaders who's far enough along in the
// chain. // chain.
for (id, peer) in self.peers.read().iter() { for (id, peer) in self.peers.read().iter() {
@ -141,7 +104,7 @@ impl OnDemand {
trace!(target: "on_demand", "Assigning request to peer {}", id); trace!(target: "on_demand", "Assigning request to peer {}", id);
self.pending_requests.write().insert( self.pending_requests.write().insert(
req_id, req_id,
Pending::HeaderByNumber(req, sender) pending,
); );
return return
}, },
@ -151,18 +114,17 @@ impl OnDemand {
} }
} }
// TODO: retrying
trace!(target: "on_demand", "No suitable peer for request"); trace!(target: "on_demand", "No suitable peer for request");
sender.complete(Err(Error::NoPeersAvailable)); self.orphaned_requests.write().push(pending)
} }
/// Request a header by hash. This is less accurate than by-number because we don't know /// Request a header by hash. This is less accurate than by-number because we don't know
/// where in the chain this header lies, and therefore can't find a peer who is supposed to have /// where in the chain this header lies, and therefore can't find a peer who is supposed to have
/// it as easily. /// it as easily.
pub fn header_by_hash(&self, ctx: &BasicContext, req: request::HeaderByHash) -> Response<encoded::Header> { pub fn header_by_hash(&self, ctx: &BasicContext, req: request::HeaderByHash) -> Receiver<encoded::Header> {
let (sender, receiver) = oneshot::channel(); let (sender, receiver) = oneshot::channel();
self.dispatch_header_by_hash(ctx, req, sender); self.dispatch_header_by_hash(ctx, req, sender);
Response(receiver) receiver
} }
fn dispatch_header_by_hash(&self, ctx: &BasicContext, req: request::HeaderByHash, sender: Sender<encoded::Header>) { fn dispatch_header_by_hash(&self, ctx: &BasicContext, req: request::HeaderByHash, sender: Sender<encoded::Header>) {
@ -181,16 +143,17 @@ impl OnDemand {
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let mut rng = ::rand::thread_rng(); let mut rng = ::rand::thread_rng();
::rand::Rng::shuffle(&mut rng, &mut potential_peers); ::rand::Rng::shuffle(&mut rng, &mut potential_peers);
let pending = Pending::HeaderByHash(req, sender);
for id in potential_peers { for id in potential_peers {
match ctx.request_from(id, les_req.clone()) { match ctx.request_from(id, les_req.clone()) {
Ok(req_id) => { Ok(req_id) => {
trace!(target: "on_demand", "Assigning request to peer {}", id); trace!(target: "on_demand", "Assigning request to peer {}", id);
self.pending_requests.write().insert( self.pending_requests.write().insert(
req_id, req_id,
Pending::HeaderByHash(req, sender), pending,
); );
return return
} }
@ -199,18 +162,28 @@ impl OnDemand {
} }
} }
// TODO: retrying
trace!(target: "on_demand", "No suitable peer for request"); trace!(target: "on_demand", "No suitable peer for request");
sender.complete(Err(Error::NoPeersAvailable)); self.orphaned_requests.write().push(pending)
} }
/// Request a block, given its header. Block bodies are requestable by hash only, /// Request a block, given its header. Block bodies are requestable by hash only,
/// and the header is required anyway to verify and complete the block body /// and the header is required anyway to verify and complete the block body
/// -- this just doesn't obscure the network query. /// -- this just doesn't obscure the network query.
pub fn block(&self, ctx: &BasicContext, req: request::Body) -> Response<encoded::Block> { pub fn block(&self, ctx: &BasicContext, req: request::Body) -> Receiver<encoded::Block> {
let (sender, receiver) = oneshot::channel(); let (sender, receiver) = oneshot::channel();
// fast path for empty body.
if req.header.transactions_root() == SHA3_NULL_RLP && req.header.uncles_hash() == SHA3_EMPTY_LIST_RLP {
let mut stream = RlpStream::new_list(3);
stream.append_raw(&req.header.into_inner(), 1);
stream.begin_list(0);
stream.begin_list(0);
sender.complete(encoded::Block::new(stream.out()))
} else {
self.dispatch_block(ctx, req, sender); self.dispatch_block(ctx, req, sender);
Response(receiver) }
receiver
} }
fn dispatch_block(&self, ctx: &BasicContext, req: request::Body, sender: Sender<encoded::Block>) { fn dispatch_block(&self, ctx: &BasicContext, req: request::Body, sender: Sender<encoded::Block>) {
@ -218,6 +191,7 @@ impl OnDemand {
let les_req = LesRequest::Bodies(les_request::Bodies { let les_req = LesRequest::Bodies(les_request::Bodies {
block_hashes: vec![req.hash], block_hashes: vec![req.hash],
}); });
let pending = Pending::Block(req, sender);
// we're looking for a peer with serveChainSince(num) // we're looking for a peer with serveChainSince(num)
for (id, peer) in self.peers.read().iter() { for (id, peer) in self.peers.read().iter() {
@ -227,7 +201,7 @@ impl OnDemand {
trace!(target: "on_demand", "Assigning request to peer {}", id); trace!(target: "on_demand", "Assigning request to peer {}", id);
self.pending_requests.write().insert( self.pending_requests.write().insert(
req_id, req_id,
Pending::Block(req, sender) pending,
); );
return return
} }
@ -237,17 +211,23 @@ impl OnDemand {
} }
} }
// TODO: retrying
trace!(target: "on_demand", "No suitable peer for request"); trace!(target: "on_demand", "No suitable peer for request");
sender.complete(Err(Error::NoPeersAvailable)); self.orphaned_requests.write().push(pending)
} }
/// Request the receipts for a block. The header serves two purposes: /// Request the receipts for a block. The header serves two purposes:
/// provide the block hash to fetch receipts for, and for verification of the receipts root. /// provide the block hash to fetch receipts for, and for verification of the receipts root.
pub fn block_receipts(&self, ctx: &BasicContext, req: request::BlockReceipts) -> Response<Vec<Receipt>> { pub fn block_receipts(&self, ctx: &BasicContext, req: request::BlockReceipts) -> Receiver<Vec<Receipt>> {
let (sender, receiver) = oneshot::channel(); let (sender, receiver) = oneshot::channel();
// fast path for empty receipts.
if req.0.receipts_root() == SHA3_NULL_RLP {
sender.complete(Vec::new())
} else {
self.dispatch_block_receipts(ctx, req, sender); self.dispatch_block_receipts(ctx, req, sender);
Response(receiver) }
receiver
} }
fn dispatch_block_receipts(&self, ctx: &BasicContext, req: request::BlockReceipts, sender: Sender<Vec<Receipt>>) { fn dispatch_block_receipts(&self, ctx: &BasicContext, req: request::BlockReceipts, sender: Sender<Vec<Receipt>>) {
@ -255,6 +235,7 @@ impl OnDemand {
let les_req = LesRequest::Receipts(les_request::Receipts { let les_req = LesRequest::Receipts(les_request::Receipts {
block_hashes: vec![req.0.hash()], block_hashes: vec![req.0.hash()],
}); });
let pending = Pending::BlockReceipts(req, sender);
// we're looking for a peer with serveChainSince(num) // we're looking for a peer with serveChainSince(num)
for (id, peer) in self.peers.read().iter() { for (id, peer) in self.peers.read().iter() {
@ -264,7 +245,7 @@ impl OnDemand {
trace!(target: "on_demand", "Assigning request to peer {}", id); trace!(target: "on_demand", "Assigning request to peer {}", id);
self.pending_requests.write().insert( self.pending_requests.write().insert(
req_id, req_id,
Pending::BlockReceipts(req, sender) pending,
); );
return return
} }
@ -274,17 +255,16 @@ impl OnDemand {
} }
} }
// TODO: retrying
trace!(target: "on_demand", "No suitable peer for request"); trace!(target: "on_demand", "No suitable peer for request");
sender.complete(Err(Error::NoPeersAvailable)); self.orphaned_requests.write().push(pending)
} }
/// Request an account by address and block header -- which gives a hash to query and a state root /// Request an account by address and block header -- which gives a hash to query and a state root
/// to verify against. /// to verify against.
pub fn account(&self, ctx: &BasicContext, req: request::Account) -> Response<BasicAccount> { pub fn account(&self, ctx: &BasicContext, req: request::Account) -> Receiver<BasicAccount> {
let (sender, receiver) = oneshot::channel(); let (sender, receiver) = oneshot::channel();
self.dispatch_account(ctx, req, sender); self.dispatch_account(ctx, req, sender);
Response(receiver) receiver
} }
fn dispatch_account(&self, ctx: &BasicContext, req: request::Account, sender: Sender<BasicAccount>) { fn dispatch_account(&self, ctx: &BasicContext, req: request::Account, sender: Sender<BasicAccount>) {
@ -297,6 +277,7 @@ impl OnDemand {
from_level: 0, from_level: 0,
}], }],
}); });
let pending = Pending::Account(req, sender);
// we're looking for a peer with serveStateSince(num) // we're looking for a peer with serveStateSince(num)
for (id, peer) in self.peers.read().iter() { for (id, peer) in self.peers.read().iter() {
@ -306,7 +287,7 @@ impl OnDemand {
trace!(target: "on_demand", "Assigning request to peer {}", id); trace!(target: "on_demand", "Assigning request to peer {}", id);
self.pending_requests.write().insert( self.pending_requests.write().insert(
req_id, req_id,
Pending::Account(req, sender) pending,
); );
return return
} }
@ -316,38 +297,125 @@ impl OnDemand {
} }
} }
// TODO: retrying
trace!(target: "on_demand", "No suitable peer for request"); trace!(target: "on_demand", "No suitable peer for request");
sender.complete(Err(Error::NoPeersAvailable)); self.orphaned_requests.write().push(pending)
}
/// Request code by address, known code hash, and block header.
pub fn code(&self, ctx: &BasicContext, req: request::Code) -> Receiver<Bytes> {
let (sender, receiver) = oneshot::channel();
// fast path for no code.
if req.code_hash == ::util::sha3::SHA3_EMPTY {
sender.complete(Vec::new())
} else {
self.dispatch_code(ctx, req, sender);
}
receiver
}
fn dispatch_code(&self, ctx: &BasicContext, req: request::Code, sender: Sender<Bytes>) {
let num = req.block_id.1;
let les_req = LesRequest::Codes(les_request::ContractCodes {
code_requests: vec![les_request::ContractCode {
block_hash: req.block_id.0,
account_key: ::util::Hashable::sha3(&req.address),
}]
});
let pending = Pending::Code(req, sender);
// we're looking for a peer with serveStateSince(num)
for (id, peer) in self.peers.read().iter() {
if peer.capabilities.serve_state_since.as_ref().map_or(false, |x| *x >= num) {
match ctx.request_from(*id, les_req.clone()) {
Ok(req_id) => {
trace!(target: "on_demand", "Assigning request to peer {}", id);
self.pending_requests.write().insert(
req_id,
pending
);
return
}
Err(e) =>
trace!(target: "on_demand", "Failed to make request of peer {}: {:?}", id, e),
}
}
}
trace!(target: "on_demand", "No suitable peer for request");
self.orphaned_requests.write().push(pending)
}
// dispatch orphaned requests, and discard those for which the corresponding
// receiver has been dropped.
fn dispatch_orphaned(&self, ctx: &BasicContext) {
// wrapper future for calling `poll_cancel` on our `Senders` to preserve
// the invariant that it's always within a task.
struct CheckHangup<'a, T: 'a>(&'a mut Sender<T>);
impl<'a, T: 'a> Future for CheckHangup<'a, T> {
type Item = bool;
type Error = ();
fn poll(&mut self) -> Poll<bool, ()> {
Ok(Async::Ready(match self.0.poll_cancel() {
Ok(Async::NotReady) => false, // hasn't hung up.
_ => true, // has hung up.
}))
}
}
// check whether a sender's hung up (using `wait` to preserve the task invariant)
// returns true if has hung up, false otherwise.
fn check_hangup<T>(send: &mut Sender<T>) -> bool {
CheckHangup(send).wait().expect("CheckHangup always returns ok; qed")
}
if self.orphaned_requests.read().is_empty() { return }
let to_dispatch = ::std::mem::replace(&mut *self.orphaned_requests.write(), Vec::new());
for orphaned in to_dispatch {
match orphaned {
Pending::HeaderByNumber(req, mut sender) =>
if !check_hangup(&mut sender) { self.dispatch_header_by_number(ctx, req, sender) },
Pending::HeaderByHash(req, mut sender) =>
if !check_hangup(&mut sender) { self.dispatch_header_by_hash(ctx, req, sender) },
Pending::Block(req, mut sender) =>
if !check_hangup(&mut sender) { self.dispatch_block(ctx, req, sender) },
Pending::BlockReceipts(req, mut sender) =>
if !check_hangup(&mut sender) { self.dispatch_block_receipts(ctx, req, sender) },
Pending::Account(req, mut sender) =>
if !check_hangup(&mut sender) { self.dispatch_account(ctx, req, sender) },
Pending::Code(req, mut sender) =>
if !check_hangup(&mut sender) { self.dispatch_code(ctx, req, sender) },
}
}
} }
} }
impl Handler for OnDemand { impl Handler for OnDemand {
fn on_connect(&self, ctx: &EventContext, status: &Status, capabilities: &Capabilities) { fn on_connect(&self, ctx: &EventContext, status: &Status, capabilities: &Capabilities) {
self.peers.write().insert(ctx.peer(), Peer { status: status.clone(), capabilities: capabilities.clone() }); self.peers.write().insert(ctx.peer(), Peer { status: status.clone(), capabilities: capabilities.clone() });
self.dispatch_orphaned(ctx.as_basic());
} }
fn on_disconnect(&self, ctx: &EventContext, unfulfilled: &[ReqId]) { fn on_disconnect(&self, ctx: &EventContext, unfulfilled: &[ReqId]) {
self.peers.write().remove(&ctx.peer()); self.peers.write().remove(&ctx.peer());
let ctx = ctx.as_basic(); let ctx = ctx.as_basic();
{
let mut orphaned = self.orphaned_requests.write();
for unfulfilled in unfulfilled { for unfulfilled in unfulfilled {
if let Some(pending) = self.pending_requests.write().remove(unfulfilled) { if let Some(pending) = self.pending_requests.write().remove(unfulfilled) {
trace!(target: "on_demand", "Attempting to reassign dropped request"); trace!(target: "on_demand", "Attempting to reassign dropped request");
match pending { orphaned.push(pending);
Pending::HeaderByNumber(req, sender)
=> self.dispatch_header_by_number(ctx, req, sender),
Pending::HeaderByHash(req, sender)
=> self.dispatch_header_by_hash(ctx, req, sender),
Pending::Block(req, sender)
=> self.dispatch_block(ctx, req, sender),
Pending::BlockReceipts(req, sender)
=> self.dispatch_block_receipts(ctx, req, sender),
Pending::Account(req, sender)
=> self.dispatch_account(ctx, req, sender),
} }
} }
} }
self.dispatch_orphaned(ctx);
} }
fn on_announcement(&self, ctx: &EventContext, announcement: &Announcement) { fn on_announcement(&self, ctx: &EventContext, announcement: &Announcement) {
@ -356,6 +424,8 @@ impl Handler for OnDemand {
peer.status.update_from(&announcement); peer.status.update_from(&announcement);
peer.capabilities.update_from(&announcement); peer.capabilities.update_from(&announcement);
} }
self.dispatch_orphaned(ctx.as_basic());
} }
fn on_header_proofs(&self, ctx: &EventContext, req_id: ReqId, proofs: &[(Bytes, Vec<Bytes>)]) { fn on_header_proofs(&self, ctx: &EventContext, req_id: ReqId, proofs: &[(Bytes, Vec<Bytes>)]) {
@ -370,7 +440,7 @@ impl Handler for OnDemand {
if let Some(&(ref header, ref proof)) = proofs.get(0) { if let Some(&(ref header, ref proof)) = proofs.get(0) {
match req.check_response(header, proof) { match req.check_response(header, proof) {
Ok(header) => { Ok(header) => {
sender.complete(Ok(header)); sender.complete(header);
return return
} }
Err(e) => { Err(e) => {
@ -398,7 +468,7 @@ impl Handler for OnDemand {
if let Some(ref header) = headers.get(0) { if let Some(ref header) = headers.get(0) {
match req.check_response(header) { match req.check_response(header) {
Ok(header) => { Ok(header) => {
sender.complete(Ok(header)); sender.complete(header);
return return
} }
Err(e) => { Err(e) => {
@ -426,7 +496,7 @@ impl Handler for OnDemand {
if let Some(ref block) = bodies.get(0) { if let Some(ref block) = bodies.get(0) {
match req.check_response(block) { match req.check_response(block) {
Ok(block) => { Ok(block) => {
sender.complete(Ok(block)); sender.complete(block);
return return
} }
Err(e) => { Err(e) => {
@ -454,7 +524,7 @@ impl Handler for OnDemand {
if let Some(ref receipts) = receipts.get(0) { if let Some(ref receipts) = receipts.get(0) {
match req.check_response(receipts) { match req.check_response(receipts) {
Ok(receipts) => { Ok(receipts) => {
sender.complete(Ok(receipts)); sender.complete(receipts);
return return
} }
Err(e) => { Err(e) => {
@ -482,7 +552,7 @@ impl Handler for OnDemand {
if let Some(ref proof) = proofs.get(0) { if let Some(ref proof) = proofs.get(0) {
match req.check_response(proof) { match req.check_response(proof) {
Ok(proof) => { Ok(proof) => {
sender.complete(Ok(proof)); sender.complete(proof);
return return
} }
Err(e) => { Err(e) => {
@ -497,6 +567,38 @@ impl Handler for OnDemand {
_ => panic!("Only account request fetches state proof; qed"), _ => panic!("Only account request fetches state proof; qed"),
} }
} }
fn on_code(&self, ctx: &EventContext, req_id: ReqId, codes: &[Bytes]) {
let peer = ctx.peer();
let req = match self.pending_requests.write().remove(&req_id) {
Some(req) => req,
None => return,
};
match req {
Pending::Code(req, sender) => {
if let Some(code) = codes.get(0) {
match req.check_response(code.as_slice()) {
Ok(()) => {
sender.complete(code.clone());
return
}
Err(e) => {
warn!("Error handling response for code request: {:?}", e);
ctx.disable_peer(peer);
}
}
self.dispatch_code(ctx.as_basic(), req, sender);
}
}
_ => panic!("Only code request fetches code; qed"),
}
}
fn tick(&self, ctx: &BasicContext) {
self.dispatch_orphaned(ctx)
}
} }
#[cfg(test)] #[cfg(test)]
@ -505,7 +607,6 @@ mod tests {
use net::{Announcement, BasicContext, ReqId, Error as LesError}; use net::{Announcement, BasicContext, ReqId, Error as LesError};
use request::{Request as LesRequest, Kind as LesRequestKind}; use request::{Request as LesRequest, Kind as LesRequestKind};
use network::{PeerId, NodeId}; use network::{PeerId, NodeId};
use futures::Future;
use util::H256; use util::H256;
struct FakeContext; struct FakeContext;
@ -522,10 +623,14 @@ mod tests {
} }
#[test] #[test]
fn no_peers() { fn detects_hangup() {
let on_demand = OnDemand::default(); let on_demand = OnDemand::default();
let result = on_demand.header_by_hash(&FakeContext, request::HeaderByHash(H256::default())); let result = on_demand.header_by_hash(&FakeContext, request::HeaderByHash(H256::default()));
assert_eq!(result.wait().unwrap_err(), Error::NoPeersAvailable); assert!(on_demand.orphaned_requests.read().len() == 1);
drop(result);
on_demand.dispatch_orphaned(&FakeContext);
assert!(on_demand.orphaned_requests.read().is_empty());
} }
} }

View File

@ -37,7 +37,7 @@ pub enum Error {
BadProof, BadProof,
/// Wrong header number. /// Wrong header number.
WrongNumber(u64, u64), WrongNumber(u64, u64),
/// Wrong header hash. /// Wrong hash.
WrongHash(H256, H256), WrongHash(H256, H256),
/// Wrong trie root. /// Wrong trie root.
WrongTrieRoot(H256, H256), WrongTrieRoot(H256, H256),
@ -59,12 +59,33 @@ impl From<Box<TrieError>> for Error {
#[derive(Debug, Clone, PartialEq, Eq)] #[derive(Debug, Clone, PartialEq, Eq)]
pub struct HeaderByNumber { pub struct HeaderByNumber {
/// The header's number. /// The header's number.
pub num: u64, num: u64,
/// The cht number for the given block number.
cht_num: u64,
/// The root of the CHT containing this header. /// The root of the CHT containing this header.
pub cht_root: H256, cht_root: H256,
} }
impl HeaderByNumber { impl HeaderByNumber {
/// Construct a new header-by-number request. Fails if the given number is 0.
/// Provide the expected CHT root to compare against.
pub fn new(num: u64, cht_root: H256) -> Option<Self> {
::cht::block_to_cht_number(num).map(|cht_num| HeaderByNumber {
num: num,
cht_num: cht_num,
cht_root: cht_root,
})
}
/// Access the requested block number.
pub fn num(&self) -> u64 { self.num }
/// Access the CHT number.
pub fn cht_num(&self) -> u64 { self.cht_num }
/// Access the expected CHT root.
pub fn cht_root(&self) -> H256 { self.cht_root }
/// Check a response with a header and cht proof. /// Check a response with a header and cht proof.
pub fn check_response(&self, header: &[u8], proof: &[Bytes]) -> Result<(encoded::Header, U256), Error> { pub fn check_response(&self, header: &[u8], proof: &[Bytes]) -> Result<(encoded::Header, U256), Error> {
let (expected_hash, td) = match ::cht::check_proof(proof, self.num, self.cht_root) { let (expected_hash, td) = match ::cht::check_proof(proof, self.num, self.cht_root) {
@ -106,6 +127,15 @@ pub struct Body {
} }
impl Body { impl Body {
/// Create a request for a block body from a given header.
pub fn new(header: encoded::Header) -> Self {
let hash = header.hash();
Body {
header: header,
hash: hash,
}
}
/// Check a response for this block body. /// Check a response for this block body.
pub fn check_response(&self, body: &[u8]) -> Result<encoded::Block, Error> { pub fn check_response(&self, body: &[u8]) -> Result<encoded::Block, Error> {
let body_view = UntrustedRlp::new(&body); let body_view = UntrustedRlp::new(&body);
@ -179,6 +209,28 @@ impl Account {
} }
} }
/// Request for account code.
pub struct Code {
/// Block hash, number pair.
pub block_id: (H256, u64),
/// Address requested.
pub address: Address,
/// Account's code hash.
pub code_hash: H256,
}
impl Code {
/// Check a response with code against the code hash.
pub fn check_response(&self, code: &[u8]) -> Result<(), Error> {
let found_hash = code.sha3();
if found_hash == self.code_hash {
Ok(())
} else {
Err(Error::WrongHash(self.code_hash, found_hash))
}
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
@ -191,6 +243,11 @@ mod tests {
use ethcore::encoded; use ethcore::encoded;
use ethcore::receipt::Receipt; use ethcore::receipt::Receipt;
#[test]
fn no_invalid_header_by_number() {
assert!(HeaderByNumber::new(0, Default::default()).is_none())
}
#[test] #[test]
fn check_header_by_number() { fn check_header_by_number() {
use ::cht; use ::cht;
@ -213,10 +270,7 @@ mod tests {
}; };
let proof = cht.prove(10_000, 0).unwrap().unwrap(); let proof = cht.prove(10_000, 0).unwrap().unwrap();
let req = HeaderByNumber { let req = HeaderByNumber::new(10_000, cht.root()).unwrap();
num: 10_000,
cht_root: cht.root(),
};
let raw_header = test_client.block_header(::ethcore::ids::BlockId::Number(10_000)).unwrap(); let raw_header = test_client.block_header(::ethcore::ids::BlockId::Number(10_000)).unwrap();
@ -319,4 +373,17 @@ mod tests {
assert!(req.check_response(&proof[..]).is_ok()); assert!(req.check_response(&proof[..]).is_ok());
} }
#[test]
fn check_code() {
let code = vec![1u8; 256];
let req = Code {
block_id: (Default::default(), 2),
address: Default::default(),
code_hash: ::util::Hashable::sha3(&code),
};
assert!(req.check_response(&code).is_ok());
assert!(req.check_response(&[]).is_err());
}
} }

View File

@ -28,7 +28,7 @@ use header::{BlockNumber, Header as FullHeader};
use transaction::UnverifiedTransaction; use transaction::UnverifiedTransaction;
use views; use views;
use util::{Address, Hashable, H256, H2048, U256}; use util::{Address, Hashable, H256, H2048, U256, HeapSizeOf};
use rlp::{Rlp, View}; use rlp::{Rlp, View};
/// Owning header view. /// Owning header view.
@ -36,6 +36,10 @@ use rlp::{Rlp, View};
#[cfg_attr(feature = "ipc", binary)] #[cfg_attr(feature = "ipc", binary)]
pub struct Header(Vec<u8>); pub struct Header(Vec<u8>);
impl HeapSizeOf for Header {
fn heap_size_of_children(&self) -> usize { self.0.heap_size_of_children() }
}
impl Header { impl Header {
/// Create a new owning header view. /// Create a new owning header view.
/// Expects the data to be an RLP-encoded header -- any other case will likely lead to /// Expects the data to be an RLP-encoded header -- any other case will likely lead to
@ -116,6 +120,10 @@ impl Hashable for Header {
#[cfg_attr(feature = "ipc", binary)] #[cfg_attr(feature = "ipc", binary)]
pub struct Body(Vec<u8>); pub struct Body(Vec<u8>);
impl HeapSizeOf for Body {
fn heap_size_of_children(&self) -> usize { self.0.heap_size_of_children() }
}
impl Body { impl Body {
/// Create a new owning block body view. The raw bytes passed in must be an rlp-encoded block /// Create a new owning block body view. The raw bytes passed in must be an rlp-encoded block
/// body. /// body.
@ -172,6 +180,10 @@ impl Body {
#[cfg_attr(feature = "ipc", binary)] #[cfg_attr(feature = "ipc", binary)]
pub struct Block(Vec<u8>); pub struct Block(Vec<u8>);
impl HeapSizeOf for Block {
fn heap_size_of_children(&self) -> usize { self.0.heap_size_of_children() }
}
impl Block { impl Block {
/// Create a new owning block view. The raw bytes passed in must be an rlp-encoded block. /// Create a new owning block view. The raw bytes passed in must be an rlp-encoded block.
pub fn new(raw: Vec<u8>) -> Self { Block(raw) } pub fn new(raw: Vec<u8>) -> Self { Block(raw) }

View File

@ -472,6 +472,12 @@ impl PendingTransaction {
} }
} }
impl Deref for PendingTransaction {
type Target = SignedTransaction;
fn deref(&self) -> &SignedTransaction { &self.transaction }
}
impl From<SignedTransaction> for PendingTransaction { impl From<SignedTransaction> for PendingTransaction {
fn from(t: SignedTransaction) -> Self { fn from(t: SignedTransaction) -> Self {
PendingTransaction { PendingTransaction {

View File

@ -28,6 +28,7 @@ use ethcore::miner::{Miner, ExternalMiner};
use ethcore::snapshot::SnapshotService; use ethcore::snapshot::SnapshotService;
use ethcore_rpc::{Metadata, NetworkSettings}; use ethcore_rpc::{Metadata, NetworkSettings};
use ethcore_rpc::informant::{Middleware, RpcStats, ClientNotifier}; use ethcore_rpc::informant::{Middleware, RpcStats, ClientNotifier};
use ethcore_rpc::dispatch::FullDispatcher;
use ethsync::{ManageNetwork, SyncProvider}; use ethsync::{ManageNetwork, SyncProvider};
use hash_fetch::fetch::Client as FetchClient; use hash_fetch::fetch::Client as FetchClient;
use jsonrpc_core::{MetaIoHandler}; use jsonrpc_core::{MetaIoHandler};
@ -176,10 +177,11 @@ macro_rules! add_signing_methods {
{ {
let handler = &mut $handler; let handler = &mut $handler;
let deps = &$deps; let deps = &$deps;
let dispatcher = FullDispatcher::new(Arc::downgrade(&deps.client), Arc::downgrade(&deps.miner));
if deps.signer_service.is_enabled() { if deps.signer_service.is_enabled() {
handler.extend_with($namespace::to_delegate(SigningQueueClient::new(&deps.signer_service, &deps.client, &deps.miner, &deps.secret_store))) handler.extend_with($namespace::to_delegate(SigningQueueClient::new(&deps.signer_service, dispatcher, &deps.secret_store)))
} else { } else {
handler.extend_with($namespace::to_delegate(SigningUnsafeClient::new(&deps.client, &deps.secret_store, &deps.miner))) handler.extend_with($namespace::to_delegate(SigningUnsafeClient::new(&deps.secret_store, dispatcher)))
} }
} }
} }
@ -194,6 +196,8 @@ pub fn setup_rpc(stats: Arc<RpcStats>, deps: Arc<Dependencies>, apis: ApiSet) ->
// it's turned into vector, cause ont of the cases requires &[] // it's turned into vector, cause ont of the cases requires &[]
let apis = apis.list_apis().into_iter().collect::<Vec<_>>(); let apis = apis.list_apis().into_iter().collect::<Vec<_>>();
let dispatcher = FullDispatcher::new(Arc::downgrade(&deps.client), Arc::downgrade(&deps.miner));
for api in &apis { for api in &apis {
match *api { match *api {
Api::Web3 => { Api::Web3 => {
@ -223,10 +227,10 @@ pub fn setup_rpc(stats: Arc<RpcStats>, deps: Arc<Dependencies>, apis: ApiSet) ->
add_signing_methods!(EthSigning, handler, deps); add_signing_methods!(EthSigning, handler, deps);
}, },
Api::Personal => { Api::Personal => {
handler.extend_with(PersonalClient::new(&deps.secret_store, &deps.client, &deps.miner, deps.geth_compatibility).to_delegate()); handler.extend_with(PersonalClient::new(&deps.secret_store, dispatcher.clone(), deps.geth_compatibility).to_delegate());
}, },
Api::Signer => { Api::Signer => {
handler.extend_with(SignerClient::new(&deps.secret_store, &deps.client, &deps.miner, &deps.signer_service).to_delegate()); handler.extend_with(SignerClient::new(&deps.secret_store, dispatcher.clone(), &deps.signer_service).to_delegate());
}, },
Api::Parity => { Api::Parity => {
let signer = match deps.signer_service.is_enabled() { let signer = match deps.signer_service.is_enabled() {

View File

@ -33,6 +33,7 @@ ethash = { path = "../ethash" }
ethsync = { path = "../sync" } ethsync = { path = "../sync" }
ethjson = { path = "../json" } ethjson = { path = "../json" }
ethcore-devtools = { path = "../devtools" } ethcore-devtools = { path = "../devtools" }
ethcore-light = { path = "../ethcore/light" }
parity-updater = { path = "../updater" } parity-updater = { path = "../updater" }
rlp = { path = "../util/rlp" } rlp = { path = "../util/rlp" }
fetch = { path = "../util/fetch" } fetch = { path = "../util/fetch" }

View File

@ -33,6 +33,7 @@ extern crate ethcrypto as crypto;
extern crate ethstore; extern crate ethstore;
extern crate ethsync; extern crate ethsync;
extern crate ethash; extern crate ethash;
extern crate ethcore_light as light;
extern crate transient_hashmap; extern crate transient_hashmap;
extern crate jsonrpc_ipc_server as ipc; extern crate jsonrpc_ipc_server as ipc;
extern crate ethcore_ipc; extern crate ethcore_ipc;
@ -64,7 +65,7 @@ use jsonrpc_core::reactor::RpcHandler;
pub use ipc::{Server as IpcServer, Error as IpcServerError}; pub use ipc::{Server as IpcServer, Error as IpcServerError};
pub use jsonrpc_http_server::{ServerBuilder, Server, RpcServerError}; pub use jsonrpc_http_server::{ServerBuilder, Server, RpcServerError};
pub mod v1; pub mod v1;
pub use v1::{SigningQueue, SignerService, ConfirmationsQueue, NetworkSettings, Metadata, Origin, informant}; pub use v1::{SigningQueue, SignerService, ConfirmationsQueue, NetworkSettings, Metadata, Origin, informant, dispatch};
pub use v1::block_import::is_major_importing; pub use v1::block_import::is_major_importing;
/// Start http server asynchronously and returns result with `Server` handle on success or an error. /// Start http server asynchronously and returns result with `Server` handle on success or an error.

View File

@ -14,11 +14,15 @@
// You should have received a copy of the GNU General Public License // You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>. // along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! Utilities and helpers for transaction dispatch.
use std::fmt::Debug; use std::fmt::Debug;
use std::ops::Deref; use std::ops::Deref;
use std::sync::Weak;
use futures::{future, Future, BoxFuture};
use rlp::{self, Stream}; use rlp::{self, Stream};
use util::{Address, H520, H256, U256, Uint, Bytes}; use util::{Address, H520, H256, U256, Uint, Bytes};
use util::bytes::ToPretty;
use util::sha3::Hashable; use util::sha3::Hashable;
use ethkey::Signature; use ethkey::Signature;
@ -38,20 +42,154 @@ use v1::types::{
DecryptRequest as RpcDecryptRequest, DecryptRequest as RpcDecryptRequest,
}; };
/// Has the capability to dispatch, sign, and decrypt.
///
/// Requires a clone implementation, with the implication that it be cheap;
/// usually just bumping a reference count or two.
pub trait Dispatcher: Send + Sync + Clone {
// TODO: when ATC exist, use zero-cost
// type Out<T>: IntoFuture<T, Error>
/// Fill optional fields of a transaction request, fetching gas price but not nonce.
fn fill_optional_fields(&self, request: TransactionRequest, default_sender: Address)
-> BoxFuture<FilledTransactionRequest, Error>;
/// Sign the given transaction request without dispatching, fetching appropriate nonce.
fn sign(&self, accounts: &AccountProvider, filled: FilledTransactionRequest, password: SignWith)
-> BoxFuture<WithToken<SignedTransaction>, Error>;
/// "Dispatch" a local transaction.
fn dispatch_transaction(&self, signed_transaction: PendingTransaction) -> Result<H256, Error>;
}
/// A dispatcher which uses references to a client and miner in order to sign
/// requests locally.
#[derive(Debug)]
pub struct FullDispatcher<C, M> {
client: Weak<C>,
miner: Weak<M>,
}
impl<C, M> FullDispatcher<C, M> {
/// Create a `FullDispatcher` from weak references to a client and miner.
pub fn new(client: Weak<C>, miner: Weak<M>) -> Self {
FullDispatcher {
client: client,
miner: miner,
}
}
}
impl<C, M> Clone for FullDispatcher<C, M> {
fn clone(&self) -> Self {
FullDispatcher {
client: self.client.clone(),
miner: self.miner.clone(),
}
}
}
impl<C: MiningBlockChainClient, M: MinerService> Dispatcher for FullDispatcher<C, M> {
fn fill_optional_fields(&self, request: TransactionRequest, default_sender: Address)
-> BoxFuture<FilledTransactionRequest, Error>
{
let (client, miner) = (take_weakf!(self.client), take_weakf!(self.miner));
let request = request;
future::ok(FilledTransactionRequest {
from: request.from.unwrap_or(default_sender),
used_default_from: request.from.is_none(),
to: request.to,
nonce: request.nonce,
gas_price: request.gas_price.unwrap_or_else(|| default_gas_price(&*client, &*miner)),
gas: request.gas.unwrap_or_else(|| miner.sensible_gas_limit()),
value: request.value.unwrap_or_else(|| 0.into()),
data: request.data.unwrap_or_else(Vec::new),
condition: request.condition,
}).boxed()
}
fn sign(&self, accounts: &AccountProvider, filled: FilledTransactionRequest, password: SignWith)
-> BoxFuture<WithToken<SignedTransaction>, Error>
{
let (client, miner) = (take_weakf!(self.client), take_weakf!(self.miner));
let network_id = client.signing_network_id();
let address = filled.from;
future::ok({
let t = Transaction {
nonce: filled.nonce
.or_else(|| miner
.last_nonce(&filled.from)
.map(|nonce| nonce + U256::one()))
.unwrap_or_else(|| client.latest_nonce(&filled.from)),
action: filled.to.map_or(Action::Create, Action::Call),
gas: filled.gas,
gas_price: filled.gas_price,
value: filled.value,
data: filled.data,
};
let hash = t.hash(network_id);
if accounts.is_hardware_address(address) {
let mut stream = rlp::RlpStream::new();
t.rlp_append_unsigned_transaction(&mut stream, network_id);
let signature = try_bf!(
accounts.sign_with_hardware(address, &stream.as_raw())
.map_err(|e| {
debug!(target: "miner", "Error signing transaction with hardware wallet: {}", e);
errors::account("Error signing transaction with hardware wallet", e)
})
);
let signed = try_bf!(
SignedTransaction::new(t.with_signature(signature, network_id))
.map_err(|e| {
debug!(target: "miner", "Hardware wallet has produced invalid signature: {}", e);
errors::account("Invalid signature generated", e)
})
);
WithToken::No(signed)
} else {
let signature = try_bf!(signature(accounts, address, hash, password));
signature.map(|sig| {
SignedTransaction::new(t.with_signature(sig, network_id))
.expect("Transaction was signed by AccountsProvider; it never produces invalid signatures; qed")
})
}
}).boxed()
}
fn dispatch_transaction(&self, signed_transaction: PendingTransaction) -> Result<H256, Error> {
let hash = signed_transaction.transaction.hash();
take_weak!(self.miner).import_own_transaction(&*take_weak!(self.client), signed_transaction)
.map_err(errors::from_transaction_error)
.map(|_| hash)
}
}
/// default MAC to use.
pub const DEFAULT_MAC: [u8; 2] = [0, 0]; pub const DEFAULT_MAC: [u8; 2] = [0, 0];
type AccountToken = String; /// Single-use account token.
pub type AccountToken = String;
/// Values used to unlock accounts for signing.
#[derive(Debug, Clone, PartialEq)] #[derive(Debug, Clone, PartialEq)]
pub enum SignWith { pub enum SignWith {
/// Nothing -- implies the account is already unlocked.
Nothing, Nothing,
/// Unlock with password.
Password(String), Password(String),
/// Unlock with single-use token.
Token(AccountToken), Token(AccountToken),
} }
/// A value, potentially accompanied by a signing token.
#[derive(Debug)] #[derive(Debug)]
pub enum WithToken<T: Debug> { pub enum WithToken<T: Debug> {
/// No token.
No(T), No(T),
/// With token.
Yes(T, AccountToken), Yes(T, AccountToken),
} }
@ -67,6 +205,7 @@ impl<T: Debug> Deref for WithToken<T> {
} }
impl<T: Debug> WithToken<T> { impl<T: Debug> WithToken<T> {
/// Map the value with the given closure, preserving the token.
pub fn map<S, F>(self, f: F) -> WithToken<S> where pub fn map<S, F>(self, f: F) -> WithToken<S> where
S: Debug, S: Debug,
F: FnOnce(T) -> S, F: FnOnce(T) -> S,
@ -77,12 +216,21 @@ impl<T: Debug> WithToken<T> {
} }
} }
/// Convert into inner value, ignoring possible token.
pub fn into_value(self) -> T { pub fn into_value(self) -> T {
match self { match self {
WithToken::No(v) => v, WithToken::No(v) => v,
WithToken::Yes(v, _) => v, WithToken::Yes(v, _) => v,
} }
} }
/// Convert the `WithToken` into a tuple.
pub fn into_tuple(self) -> (T, Option<AccountToken>) {
match self {
WithToken::No(v) => (v, None),
WithToken::Yes(v, token) => (v, Some(token))
}
}
} }
impl<T: Debug> From<(T, AccountToken)> for WithToken<T> { impl<T: Debug> From<(T, AccountToken)> for WithToken<T> {
@ -91,30 +239,49 @@ impl<T: Debug> From<(T, AccountToken)> for WithToken<T> {
} }
} }
pub fn execute<C, M>(client: &C, miner: &M, accounts: &AccountProvider, payload: ConfirmationPayload, pass: SignWith) -> Result<WithToken<ConfirmationResponse>, Error> impl<T: Debug> From<(T, Option<AccountToken>)> for WithToken<T> {
where C: MiningBlockChainClient, M: MinerService fn from(tuple: (T, Option<AccountToken>)) -> Self {
{ match tuple.1 {
Some(token) => WithToken::Yes(tuple.0, token),
None => WithToken::No(tuple.0),
}
}
}
/// Execute a confirmation payload.
pub fn execute<D: Dispatcher + 'static>(
dispatcher: D,
accounts: &AccountProvider,
payload: ConfirmationPayload,
pass: SignWith
) -> BoxFuture<WithToken<ConfirmationResponse>, Error> {
match payload { match payload {
ConfirmationPayload::SendTransaction(request) => { ConfirmationPayload::SendTransaction(request) => {
sign_and_dispatch(client, miner, accounts, request, pass) let condition = request.condition.clone().map(Into::into);
.map(|result| result dispatcher.sign(accounts, request, pass)
.map(move |v| v.map(move |tx| PendingTransaction::new(tx, condition)))
.map(WithToken::into_tuple)
.map(|(tx, token)| (tx, token, dispatcher))
.and_then(|(tx, tok, dispatcher)| {
dispatcher.dispatch_transaction(tx)
.map(RpcH256::from) .map(RpcH256::from)
.map(ConfirmationResponse::SendTransaction) .map(ConfirmationResponse::SendTransaction)
) .map(move |h| WithToken::from((h, tok)))
}).boxed()
}, },
ConfirmationPayload::SignTransaction(request) => { ConfirmationPayload::SignTransaction(request) => {
sign_no_dispatch(client, miner, accounts, request, pass) dispatcher.sign(accounts, request, pass)
.map(|result| result .map(|result| result
.map(RpcRichRawTransaction::from) .map(RpcRichRawTransaction::from)
.map(ConfirmationResponse::SignTransaction) .map(ConfirmationResponse::SignTransaction)
) ).boxed()
}, },
ConfirmationPayload::Signature(address, mut data) => { ConfirmationPayload::Signature(address, mut data) => {
let mut message_data = let mut message_data =
format!("\x19Ethereum Signed Message:\n{}", data.len()) format!("\x19Ethereum Signed Message:\n{}", data.len())
.into_bytes(); .into_bytes();
message_data.append(&mut data); message_data.append(&mut data);
signature(accounts, address, message_data.sha3(), pass) let res = signature(accounts, address, message_data.sha3(), pass)
.map(|result| result .map(|result| result
.map(|rsv| { .map(|rsv| {
let mut vrs = [0u8; 65]; let mut vrs = [0u8; 65];
@ -126,14 +293,16 @@ pub fn execute<C, M>(client: &C, miner: &M, accounts: &AccountProvider, payload:
}) })
.map(RpcH520::from) .map(RpcH520::from)
.map(ConfirmationResponse::Signature) .map(ConfirmationResponse::Signature)
) );
future::done(res).boxed()
}, },
ConfirmationPayload::Decrypt(address, data) => { ConfirmationPayload::Decrypt(address, data) => {
decrypt(accounts, address, data, pass) let res = decrypt(accounts, address, data, pass)
.map(|result| result .map(|result| result
.map(RpcBytes) .map(RpcBytes)
.map(ConfirmationResponse::Decrypt) .map(ConfirmationResponse::Decrypt)
) );
future::done(res).boxed()
}, },
} }
} }
@ -160,120 +329,34 @@ fn decrypt(accounts: &AccountProvider, address: Address, msg: Bytes, password: S
}) })
} }
pub fn dispatch_transaction<C, M>(client: &C, miner: &M, signed_transaction: PendingTransaction) -> Result<H256, Error> /// Extract the default gas price from a client and miner.
where C: MiningBlockChainClient, M: MinerService {
let hash = signed_transaction.transaction.hash();
miner.import_own_transaction(client, signed_transaction)
.map_err(errors::from_transaction_error)
.map(|_| hash)
}
pub fn sign_no_dispatch<C, M>(client: &C, miner: &M, accounts: &AccountProvider, filled: FilledTransactionRequest, password: SignWith) -> Result<WithToken<SignedTransaction>, Error>
where C: MiningBlockChainClient, M: MinerService {
let network_id = client.signing_network_id();
let address = filled.from;
let signed_transaction = {
let t = Transaction {
nonce: filled.nonce
.or_else(|| miner
.last_nonce(&filled.from)
.map(|nonce| nonce + U256::one()))
.unwrap_or_else(|| client.latest_nonce(&filled.from)),
action: filled.to.map_or(Action::Create, Action::Call),
gas: filled.gas,
gas_price: filled.gas_price,
value: filled.value,
data: filled.data,
};
let hash = t.hash(network_id);
if accounts.is_hardware_address(address) {
let mut stream = rlp::RlpStream::new();
t.rlp_append_unsigned_transaction(&mut stream, network_id);
let signature = accounts.sign_with_hardware(address, &stream.as_raw())
.map_err(|e| {
debug!(target: "miner", "Error signing transaction with hardware wallet: {}", e);
errors::account("Error signing transaction with hardware wallet", e)
})?;
WithToken::No(SignedTransaction::new(t.with_signature(signature, network_id))
.map_err(|e| {
debug!(target: "miner", "Hardware wallet has produced invalid signature: {}", e);
errors::account("Invalid signature generated", e)
})?)
} else {
let signature = signature(accounts, address, hash, password)?;
signature.map(|sig| {
SignedTransaction::new(t.with_signature(sig, network_id))
.expect("Transaction was signed by AccountsProvider; it never produces invalid signatures; qed")
})
}
};
Ok(signed_transaction)
}
pub fn sign_and_dispatch<C, M>(client: &C, miner: &M, accounts: &AccountProvider, filled: FilledTransactionRequest, password: SignWith) -> Result<WithToken<H256>, Error>
where C: MiningBlockChainClient, M: MinerService
{
let network_id = client.signing_network_id();
let condition = filled.condition.clone();
let signed_transaction = sign_no_dispatch(client, miner, accounts, filled, password)?;
let (signed_transaction, token) = match signed_transaction {
WithToken::No(signed_transaction) => (signed_transaction, None),
WithToken::Yes(signed_transaction, token) => (signed_transaction, Some(token)),
};
trace!(target: "miner", "send_transaction: dispatching tx: {} for network ID {:?}", rlp::encode(&signed_transaction).to_vec().pretty(), network_id);
let pending_transaction = PendingTransaction::new(signed_transaction, condition.map(Into::into));
dispatch_transaction(&*client, &*miner, pending_transaction).map(|hash| {
match token {
Some(ref token) => WithToken::Yes(hash, token.clone()),
None => WithToken::No(hash),
}
})
}
pub fn fill_optional_fields<C, M>(request: TransactionRequest, default_sender: Address, client: &C, miner: &M) -> FilledTransactionRequest
where C: MiningBlockChainClient, M: MinerService
{
FilledTransactionRequest {
from: request.from.unwrap_or(default_sender),
used_default_from: request.from.is_none(),
to: request.to,
nonce: request.nonce,
gas_price: request.gas_price.unwrap_or_else(|| default_gas_price(client, miner)),
gas: request.gas.unwrap_or_else(|| miner.sensible_gas_limit()),
value: request.value.unwrap_or_else(|| 0.into()),
data: request.data.unwrap_or_else(Vec::new),
condition: request.condition,
}
}
pub fn default_gas_price<C, M>(client: &C, miner: &M) -> U256 pub fn default_gas_price<C, M>(client: &C, miner: &M) -> U256
where C: MiningBlockChainClient, M: MinerService where C: MiningBlockChainClient, M: MinerService
{ {
client.gas_price_median(100).unwrap_or_else(|| miner.sensible_gas_price()) client.gas_price_median(100).unwrap_or_else(|| miner.sensible_gas_price())
} }
pub fn from_rpc<C, M>(payload: RpcConfirmationPayload, default_account: Address, client: &C, miner: &M) -> ConfirmationPayload /// Convert RPC confirmation payload to signer confirmation payload.
where C: MiningBlockChainClient, M: MinerService { /// May need to resolve in the future to fetch things like gas price.
pub fn from_rpc<D>(payload: RpcConfirmationPayload, default_account: Address, dispatcher: &D) -> BoxFuture<ConfirmationPayload, Error>
where D: Dispatcher
{
match payload { match payload {
RpcConfirmationPayload::SendTransaction(request) => { RpcConfirmationPayload::SendTransaction(request) => {
ConfirmationPayload::SendTransaction(fill_optional_fields(request.into(), default_account, client, miner)) dispatcher.fill_optional_fields(request.into(), default_account)
.map(ConfirmationPayload::SendTransaction)
.boxed()
}, },
RpcConfirmationPayload::SignTransaction(request) => { RpcConfirmationPayload::SignTransaction(request) => {
ConfirmationPayload::SignTransaction(fill_optional_fields(request.into(), default_account, client, miner)) dispatcher.fill_optional_fields(request.into(), default_account)
.map(ConfirmationPayload::SignTransaction)
.boxed()
}, },
RpcConfirmationPayload::Decrypt(RpcDecryptRequest { address, msg }) => { RpcConfirmationPayload::Decrypt(RpcDecryptRequest { address, msg }) => {
ConfirmationPayload::Decrypt(address.into(), msg.into()) future::ok(ConfirmationPayload::Decrypt(address.into(), msg.into())).boxed()
}, },
RpcConfirmationPayload::Signature(RpcSignRequest { address, data }) => { RpcConfirmationPayload::Signature(RpcSignRequest { address, data }) => {
ConfirmationPayload::Signature(address.into(), data.into()) future::ok(ConfirmationPayload::Signature(address.into(), data.into())).boxed()
}, },
} }
} }

View File

@ -28,6 +28,7 @@ mod requests;
mod signer; mod signer;
mod signing_queue; mod signing_queue;
pub use self::dispatch::{Dispatcher, FullDispatcher};
pub use self::network_settings::NetworkSettings; pub use self::network_settings::NetworkSettings;
pub use self::poll_manager::PollManager; pub use self::poll_manager::PollManager;
pub use self::poll_filter::{PollFilter, limit_logs}; pub use self::poll_filter::{PollFilter, limit_logs};

View File

@ -22,7 +22,7 @@ use std::thread;
use std::time::{Instant, Duration}; use std::time::{Instant, Duration};
use std::sync::{Arc, Weak}; use std::sync::{Arc, Weak};
use futures::{self, BoxFuture, Future}; use futures::{self, future, BoxFuture, Future};
use rlp::{self, UntrustedRlp, View}; use rlp::{self, UntrustedRlp, View};
use time::get_time; use time::get_time;
use util::{H160, H256, Address, FixedHash, U256, H64, Uint}; use util::{H160, H256, Address, FixedHash, U256, H64, Uint};
@ -38,7 +38,7 @@ use ethcore::filter::Filter as EthcoreFilter;
use ethcore::header::{Header as BlockHeader, BlockNumber as EthBlockNumber}; use ethcore::header::{Header as BlockHeader, BlockNumber as EthBlockNumber};
use ethcore::log_entry::LogEntry; use ethcore::log_entry::LogEntry;
use ethcore::miner::{MinerService, ExternalMinerService}; use ethcore::miner::{MinerService, ExternalMinerService};
use ethcore::transaction::{Transaction as EthTransaction, SignedTransaction, PendingTransaction, Action}; use ethcore::transaction::{Transaction as EthTransaction, SignedTransaction, Action};
use ethcore::snapshot::SnapshotService; use ethcore::snapshot::SnapshotService;
use ethsync::{SyncProvider}; use ethsync::{SyncProvider};
@ -46,7 +46,7 @@ use jsonrpc_core::Error;
use jsonrpc_macros::Trailing; use jsonrpc_macros::Trailing;
use v1::helpers::{CallRequest as CRequest, errors, limit_logs}; use v1::helpers::{CallRequest as CRequest, errors, limit_logs};
use v1::helpers::dispatch::{dispatch_transaction, default_gas_price}; use v1::helpers::dispatch::{Dispatcher, FullDispatcher, default_gas_price};
use v1::helpers::block_import::is_major_importing; use v1::helpers::block_import::is_major_importing;
use v1::traits::Eth; use v1::traits::Eth;
use v1::types::{ use v1::types::{
@ -144,7 +144,7 @@ impl<C, SN: ?Sized, S: ?Sized, M, EM> EthClient<C, SN, S, M, EM> where
logs_bloom: view.log_bloom().into(), logs_bloom: view.log_bloom().into(),
timestamp: view.timestamp().into(), timestamp: view.timestamp().into(),
difficulty: view.difficulty().into(), difficulty: view.difficulty().into(),
total_difficulty: total_difficulty.into(), total_difficulty: Some(total_difficulty.into()),
seal_fields: view.seal().into_iter().map(Into::into).collect(), seal_fields: view.seal().into_iter().map(Into::into).collect(),
uncles: block.uncle_hashes().into_iter().map(Into::into).collect(), uncles: block.uncle_hashes().into_iter().map(Into::into).collect(),
transactions: match include_txs { transactions: match include_txs {
@ -194,7 +194,7 @@ impl<C, SN: ?Sized, S: ?Sized, M, EM> EthClient<C, SN, S, M, EM> where
logs_bloom: uncle.log_bloom().clone().into(), logs_bloom: uncle.log_bloom().clone().into(),
timestamp: uncle.timestamp().into(), timestamp: uncle.timestamp().into(),
difficulty: uncle.difficulty().clone().into(), difficulty: uncle.difficulty().clone().into(),
total_difficulty: (uncle.difficulty().clone() + parent_difficulty).into(), total_difficulty: Some((uncle.difficulty().clone() + parent_difficulty).into()),
receipts_root: uncle.receipts_root().clone().into(), receipts_root: uncle.receipts_root().clone().into(),
extra_data: uncle.extra_data().clone().into(), extra_data: uncle.extra_data().clone().into(),
seal_fields: uncle.seal().into_iter().cloned().map(Into::into).collect(), seal_fields: uncle.seal().into_iter().cloned().map(Into::into).collect(),
@ -356,113 +356,119 @@ impl<C, SN: ?Sized, S: ?Sized, M, EM> Eth for EthClient<C, SN, S, M, EM> where
Ok(RpcU256::from(take_weak!(self.client).chain_info().best_block_number)) Ok(RpcU256::from(take_weak!(self.client).chain_info().best_block_number))
} }
fn balance(&self, address: RpcH160, num: Trailing<BlockNumber>) -> Result<RpcU256, Error> { fn balance(&self, address: RpcH160, num: Trailing<BlockNumber>) -> BoxFuture<RpcU256, Error> {
let address = address.into(); let address = address.into();
match num.0 {
BlockNumber::Pending => Ok(take_weak!(self.miner).balance(&*take_weak!(self.client), &address).into()),
id => {
let client = take_weak!(self.client);
check_known(&*client, id.clone())?; let res = match num.0.clone() {
BlockNumber::Pending => Ok(take_weakf!(self.miner).balance(&*take_weakf!(self.client), &address).into()),
id => {
let client = take_weakf!(self.client);
try_bf!(check_known(&*client, id.clone()));
match client.balance(&address, id.into()) { match client.balance(&address, id.into()) {
Some(balance) => Ok(balance.into()), Some(balance) => Ok(balance.into()),
None => Err(errors::state_pruned()), None => Err(errors::state_pruned()),
} }
} }
} };
future::done(res).boxed()
} }
fn storage_at(&self, address: RpcH160, pos: RpcU256, num: Trailing<BlockNumber>) -> Result<RpcH256, Error> { fn storage_at(&self, address: RpcH160, pos: RpcU256, num: Trailing<BlockNumber>) -> BoxFuture<RpcH256, Error> {
let address: Address = RpcH160::into(address); let address: Address = RpcH160::into(address);
let position: U256 = RpcU256::into(pos); let position: U256 = RpcU256::into(pos);
match num.0 {
BlockNumber::Pending => Ok(take_weak!(self.miner).storage_at(&*take_weak!(self.client), &address, &H256::from(position)).into()),
id => {
let client = take_weak!(self.client);
check_known(&*client, id.clone())?; let res = match num.0.clone() {
BlockNumber::Pending => Ok(take_weakf!(self.miner).storage_at(&*take_weakf!(self.client), &address, &H256::from(position)).into()),
id => {
let client = take_weakf!(self.client);
try_bf!(check_known(&*client, id.clone()));
match client.storage_at(&address, &H256::from(position), id.into()) { match client.storage_at(&address, &H256::from(position), id.into()) {
Some(s) => Ok(s.into()), Some(s) => Ok(s.into()),
None => Err(errors::state_pruned()), None => Err(errors::state_pruned()),
} }
} }
} };
future::done(res).boxed()
} }
fn transaction_count(&self, address: RpcH160, num: Trailing<BlockNumber>) -> Result<RpcU256, Error> { fn transaction_count(&self, address: RpcH160, num: Trailing<BlockNumber>) -> BoxFuture<RpcU256, Error> {
let address: Address = RpcH160::into(address); let address: Address = RpcH160::into(address);
match num.0 { let res = match num.0.clone() {
BlockNumber::Pending => Ok(take_weak!(self.miner).nonce(&*take_weak!(self.client), &address).into()), BlockNumber::Pending => Ok(take_weakf!(self.miner).nonce(&*take_weakf!(self.client), &address).into()),
id => { id => {
let client = take_weak!(self.client); let client = take_weakf!(self.client);
check_known(&*client, id.clone())?; try_bf!(check_known(&*client, id.clone()));
match client.nonce(&address, id.into()) { match client.nonce(&address, id.into()) {
Some(nonce) => Ok(nonce.into()), Some(nonce) => Ok(nonce.into()),
None => Err(errors::state_pruned()), None => Err(errors::state_pruned()),
} }
} }
} };
future::done(res).boxed()
} }
fn block_transaction_count_by_hash(&self, hash: RpcH256) -> Result<Option<RpcU256>, Error> { fn block_transaction_count_by_hash(&self, hash: RpcH256) -> BoxFuture<Option<RpcU256>, Error> {
Ok( future::ok(take_weakf!(self.client).block(BlockId::Hash(hash.into()))
take_weak!(self.client).block(BlockId::Hash(hash.into())) .map(|block| block.transactions_count().into())).boxed()
.map(|block| block.transactions_count().into())
)
} }
fn block_transaction_count_by_number(&self, num: BlockNumber) -> Result<Option<RpcU256>, Error> { fn block_transaction_count_by_number(&self, num: BlockNumber) -> BoxFuture<Option<RpcU256>, Error> {
match num { future::ok(match num {
BlockNumber::Pending => Ok(Some( BlockNumber::Pending => Some(
take_weak!(self.miner).status().transactions_in_pending_block.into() take_weakf!(self.miner).status().transactions_in_pending_block.into()
)),
_ => Ok(
take_weak!(self.client).block(num.into())
.map(|block| block.transactions_count().into())
)
}
}
fn block_uncles_count_by_hash(&self, hash: RpcH256) -> Result<Option<RpcU256>, Error> {
Ok(
take_weak!(self.client).block(BlockId::Hash(hash.into()))
.map(|block| block.uncles_count().into())
)
}
fn block_uncles_count_by_number(&self, num: BlockNumber) -> Result<Option<RpcU256>, Error> {
match num {
BlockNumber::Pending => Ok(Some(0.into())),
_ => Ok(
take_weak!(self.client).block(num.into())
.map(|block| block.uncles_count().into())
), ),
} _ =>
take_weakf!(self.client).block(num.into())
.map(|block| block.transactions_count().into())
}).boxed()
} }
fn code_at(&self, address: RpcH160, num: Trailing<BlockNumber>) -> Result<Bytes, Error> { fn block_uncles_count_by_hash(&self, hash: RpcH256) -> BoxFuture<Option<RpcU256>, Error> {
future::ok(take_weakf!(self.client).block(BlockId::Hash(hash.into()))
.map(|block| block.uncles_count().into()))
.boxed()
}
fn block_uncles_count_by_number(&self, num: BlockNumber) -> BoxFuture<Option<RpcU256>, Error> {
future::ok(match num {
BlockNumber::Pending => Some(0.into()),
_ => take_weakf!(self.client).block(num.into())
.map(|block| block.uncles_count().into()
),
}).boxed()
}
fn code_at(&self, address: RpcH160, num: Trailing<BlockNumber>) -> BoxFuture<Bytes, Error> {
let address: Address = RpcH160::into(address); let address: Address = RpcH160::into(address);
match num.0 {
BlockNumber::Pending => Ok(take_weak!(self.miner).code(&*take_weak!(self.client), &address).map_or_else(Bytes::default, Bytes::new)),
id => {
let client = take_weak!(self.client);
check_known(&*client, id.clone())?; let res = match num.0.clone() {
BlockNumber::Pending => Ok(take_weakf!(self.miner).code(&*take_weakf!(self.client), &address).map_or_else(Bytes::default, Bytes::new)),
id => {
let client = take_weakf!(self.client);
try_bf!(check_known(&*client, id.clone()));
match client.code(&address, id.into()) { match client.code(&address, id.into()) {
Some(code) => Ok(code.map_or_else(Bytes::default, Bytes::new)), Some(code) => Ok(code.map_or_else(Bytes::default, Bytes::new)),
None => Err(errors::state_pruned()), None => Err(errors::state_pruned()),
} }
} }
} };
future::done(res).boxed()
} }
fn block_by_hash(&self, hash: RpcH256, include_txs: bool) -> Result<Option<RichBlock>, Error> { fn block_by_hash(&self, hash: RpcH256, include_txs: bool) -> BoxFuture<Option<RichBlock>, Error> {
self.block(BlockId::Hash(hash.into()), include_txs) future::done(self.block(BlockId::Hash(hash.into()), include_txs)).boxed()
} }
fn block_by_number(&self, num: BlockNumber, include_txs: bool) -> Result<Option<RichBlock>, Error> { fn block_by_number(&self, num: BlockNumber, include_txs: bool) -> BoxFuture<Option<RichBlock>, Error> {
self.block(num.into(), include_txs) future::done(self.block(num.into(), include_txs)).boxed()
} }
fn transaction_by_hash(&self, hash: RpcH256) -> Result<Option<Transaction>, Error> { fn transaction_by_hash(&self, hash: RpcH256) -> Result<Option<Transaction>, Error> {
@ -603,7 +609,8 @@ impl<C, SN: ?Sized, S: ?Sized, M, EM> Eth for EthClient<C, SN, S, M, EM> where
.map_err(errors::from_rlp_error) .map_err(errors::from_rlp_error)
.and_then(|tx| SignedTransaction::new(tx).map_err(errors::from_transaction_error)) .and_then(|tx| SignedTransaction::new(tx).map_err(errors::from_transaction_error))
.and_then(|signed_transaction| { .and_then(|signed_transaction| {
dispatch_transaction(&*take_weak!(self.client), &*take_weak!(self.miner), PendingTransaction::new(signed_transaction, None)) FullDispatcher::new(self.client.clone(), self.miner.clone())
.dispatch_transaction(signed_transaction.into())
}) })
.map(Into::into) .map(Into::into)
} }

View File

@ -0,0 +1,373 @@
// Copyright 2015-2017 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! Eth RPC interface for the light client.
// TODO: remove when complete.
#![allow(unused_imports, unused_variables)]
use std::sync::Arc;
use jsonrpc_core::Error;
use jsonrpc_macros::Trailing;
use light::client::Client as LightClient;
use light::cht;
use light::on_demand::{request, OnDemand};
use ethcore::account_provider::{AccountProvider, DappId};
use ethcore::basic_account::BasicAccount;
use ethcore::encoded;
use ethcore::ids::BlockId;
use ethsync::LightSync;
use util::sha3::{SHA3_NULL_RLP, SHA3_EMPTY_LIST_RLP};
use util::U256;
use futures::{future, Future, BoxFuture};
use futures::sync::oneshot;
use v1::helpers::{CallRequest as CRequest, errors, limit_logs};
use v1::helpers::block_import::is_major_importing;
use v1::traits::Eth;
use v1::types::{
RichBlock, Block, BlockTransactions, BlockNumber, Bytes, SyncStatus, SyncInfo,
Transaction, CallRequest, Index, Filter, Log, Receipt, Work,
H64 as RpcH64, H256 as RpcH256, H160 as RpcH160, U256 as RpcU256,
};
use v1::metadata::Metadata;
use util::Address;
/// Light client `ETH` RPC.
pub struct EthClient {
sync: Arc<LightSync>,
client: Arc<LightClient>,
on_demand: Arc<OnDemand>,
accounts: Arc<AccountProvider>,
}
// helper for internal error: no network context.
fn err_no_context() -> Error {
errors::internal("network service detached", "")
}
// helper for internal error: on demand sender cancelled.
fn err_premature_cancel(_cancel: oneshot::Canceled) -> Error {
errors::internal("on-demand sender prematurely cancelled", "")
}
impl EthClient {
/// Create a new `EthClient` with a handle to the light sync instance, client,
/// and on-demand request service, which is assumed to be attached as a handler.
pub fn new(
sync: Arc<LightSync>,
client: Arc<LightClient>,
on_demand: Arc<OnDemand>,
accounts: Arc<AccountProvider>,
) -> Self {
EthClient {
sync: sync,
client: client,
on_demand: on_demand,
accounts: accounts,
}
}
/// Get a block header from the on demand service or client, or error.
fn header(&self, id: BlockId) -> BoxFuture<Option<encoded::Header>, Error> {
if let Some(h) = self.client.block_header(id) {
return future::ok(Some(h)).boxed()
}
let maybe_future = match id {
BlockId::Number(n) => {
let cht_root = cht::block_to_cht_number(n).and_then(|cn| self.client.cht_root(cn as usize));
match cht_root {
None => return future::ok(None).boxed(),
Some(root) => {
let req = request::HeaderByNumber::new(n, root)
.expect("only fails for 0; client always stores genesis; client already queried; qed");
self.sync.with_context(|ctx|
self.on_demand.header_by_number(ctx, req)
.map(|(h, _)| Some(h))
.map_err(err_premature_cancel)
.boxed()
)
}
}
}
BlockId::Hash(h) => {
self.sync.with_context(|ctx|
self.on_demand.header_by_hash(ctx, request::HeaderByHash(h))
.then(|res| future::done(match res {
Ok(h) => Ok(Some(h)),
Err(e) => Err(err_premature_cancel(e)),
}))
.boxed()
)
}
_ => None, // latest, earliest, and pending will have all already returned.
};
// todo: cache returned values (header, TD)
match maybe_future {
Some(recv) => recv,
None => future::err(err_no_context()).boxed()
}
}
// helper for getting account info at a given block.
fn account(&self, address: Address, id: BlockId) -> BoxFuture<Option<BasicAccount>, Error> {
let (sync, on_demand) = (self.sync.clone(), self.on_demand.clone());
self.header(id).and_then(move |header| {
let header = match header {
None => return future::ok(None).boxed(),
Some(hdr) => hdr,
};
sync.with_context(|ctx| on_demand.account(ctx, request::Account {
header: header,
address: address,
}).map(Some))
.map(|x| x.map_err(err_premature_cancel).boxed())
.unwrap_or_else(|| future::err(err_no_context()).boxed())
}).boxed()
}
}
impl Eth for EthClient {
type Metadata = Metadata;
fn protocol_version(&self) -> Result<String, Error> {
Ok(format!("{}", ::light::net::MAX_PROTOCOL_VERSION))
}
fn syncing(&self) -> Result<SyncStatus, Error> {
rpc_unimplemented!()
}
fn author(&self, _meta: Self::Metadata) -> BoxFuture<RpcH160, Error> {
future::ok(Default::default()).boxed()
}
fn is_mining(&self) -> Result<bool, Error> {
Ok(false)
}
fn hashrate(&self) -> Result<RpcU256, Error> {
Ok(Default::default())
}
fn gas_price(&self) -> Result<RpcU256, Error> {
Ok(Default::default())
}
fn accounts(&self, meta: Metadata) -> BoxFuture<Vec<RpcH160>, Error> {
let dapp: DappId = meta.dapp_id.unwrap_or_default().into();
let accounts = self.accounts
.note_dapp_used(dapp.clone())
.and_then(|_| self.accounts.dapps_addresses(dapp))
.map_err(|e| errors::internal("Could not fetch accounts.", e))
.map(|accs| accs.into_iter().map(Into::<RpcH160>::into).collect());
future::done(accounts).boxed()
}
fn block_number(&self) -> Result<RpcU256, Error> {
Ok(self.client.chain_info().best_block_number.into())
}
fn balance(&self, address: RpcH160, num: Trailing<BlockNumber>) -> BoxFuture<RpcU256, Error> {
self.account(address.into(), num.0.into())
.map(|acc| acc.map_or(0.into(), |a| a.balance).into()).boxed()
}
fn storage_at(&self, _address: RpcH160, _key: RpcU256, _num: Trailing<BlockNumber>) -> BoxFuture<RpcH256, Error> {
future::err(errors::unimplemented(None)).boxed()
}
fn block_by_hash(&self, hash: RpcH256, include_txs: bool) -> BoxFuture<Option<RichBlock>, Error> {
future::err(errors::unimplemented(None)).boxed()
}
fn block_by_number(&self, num: BlockNumber, include_txs: bool) -> BoxFuture<Option<RichBlock>, Error> {
future::err(errors::unimplemented(None)).boxed()
}
fn transaction_count(&self, address: RpcH160, num: Trailing<BlockNumber>) -> BoxFuture<RpcU256, Error> {
self.account(address.into(), num.0.into())
.map(|acc| acc.map_or(0.into(), |a| a.nonce).into()).boxed()
}
fn block_transaction_count_by_hash(&self, hash: RpcH256) -> BoxFuture<Option<RpcU256>, Error> {
let (sync, on_demand) = (self.sync.clone(), self.on_demand.clone());
self.header(BlockId::Hash(hash.into())).and_then(move |hdr| {
let hdr = match hdr {
None => return future::ok(None).boxed(),
Some(hdr) => hdr,
};
if hdr.transactions_root() == SHA3_NULL_RLP {
future::ok(Some(U256::from(0).into())).boxed()
} else {
sync.with_context(|ctx| on_demand.block(ctx, request::Body::new(hdr)))
.map(|x| x.map(|b| Some(U256::from(b.transactions_count()).into())))
.map(|x| x.map_err(err_premature_cancel).boxed())
.unwrap_or_else(|| future::err(err_no_context()).boxed())
}
}).boxed()
}
fn block_transaction_count_by_number(&self, num: BlockNumber) -> BoxFuture<Option<RpcU256>, Error> {
let (sync, on_demand) = (self.sync.clone(), self.on_demand.clone());
self.header(num.into()).and_then(move |hdr| {
let hdr = match hdr {
None => return future::ok(None).boxed(),
Some(hdr) => hdr,
};
if hdr.transactions_root() == SHA3_NULL_RLP {
future::ok(Some(U256::from(0).into())).boxed()
} else {
sync.with_context(|ctx| on_demand.block(ctx, request::Body::new(hdr)))
.map(|x| x.map(|b| Some(U256::from(b.transactions_count()).into())))
.map(|x| x.map_err(err_premature_cancel).boxed())
.unwrap_or_else(|| future::err(err_no_context()).boxed())
}
}).boxed()
}
fn block_uncles_count_by_hash(&self, hash: RpcH256) -> BoxFuture<Option<RpcU256>, Error> {
let (sync, on_demand) = (self.sync.clone(), self.on_demand.clone());
self.header(BlockId::Hash(hash.into())).and_then(move |hdr| {
let hdr = match hdr {
None => return future::ok(None).boxed(),
Some(hdr) => hdr,
};
if hdr.uncles_hash() == SHA3_EMPTY_LIST_RLP {
future::ok(Some(U256::from(0).into())).boxed()
} else {
sync.with_context(|ctx| on_demand.block(ctx, request::Body::new(hdr)))
.map(|x| x.map(|b| Some(U256::from(b.uncles_count()).into())))
.map(|x| x.map_err(err_premature_cancel).boxed())
.unwrap_or_else(|| future::err(err_no_context()).boxed())
}
}).boxed()
}
fn block_uncles_count_by_number(&self, num: BlockNumber) -> BoxFuture<Option<RpcU256>, Error> {
let (sync, on_demand) = (self.sync.clone(), self.on_demand.clone());
self.header(num.into()).and_then(move |hdr| {
let hdr = match hdr {
None => return future::ok(None).boxed(),
Some(hdr) => hdr,
};
if hdr.uncles_hash() == SHA3_EMPTY_LIST_RLP {
future::ok(Some(U256::from(0).into())).boxed()
} else {
sync.with_context(|ctx| on_demand.block(ctx, request::Body::new(hdr)))
.map(|x| x.map(|b| Some(U256::from(b.uncles_count()).into())))
.map(|x| x.map_err(err_premature_cancel).boxed())
.unwrap_or_else(|| future::err(err_no_context()).boxed())
}
}).boxed()
}
fn code_at(&self, address: RpcH160, num: Trailing<BlockNumber>) -> BoxFuture<Bytes, Error> {
future::err(errors::unimplemented(None)).boxed()
}
fn send_raw_transaction(&self, raw: Bytes) -> Result<RpcH256, Error> {
Err(errors::unimplemented(None))
}
fn submit_transaction(&self, raw: Bytes) -> Result<RpcH256, Error> {
Err(errors::unimplemented(None))
}
fn call(&self, req: CallRequest, num: Trailing<BlockNumber>) -> Result<Bytes, Error> {
Err(errors::unimplemented(None))
}
fn estimate_gas(&self, req: CallRequest, num: Trailing<BlockNumber>) -> Result<RpcU256, Error> {
Err(errors::unimplemented(None))
}
fn transaction_by_hash(&self, hash: RpcH256) -> Result<Option<Transaction>, Error> {
Err(errors::unimplemented(None))
}
fn transaction_by_block_hash_and_index(&self, hash: RpcH256, idx: Index) -> Result<Option<Transaction>, Error> {
Err(errors::unimplemented(None))
}
fn transaction_by_block_number_and_index(&self, num: BlockNumber, idx: Index) -> Result<Option<Transaction>, Error> {
Err(errors::unimplemented(None))
}
fn transaction_receipt(&self, hash: RpcH256) -> Result<Option<Receipt>, Error> {
Err(errors::unimplemented(None))
}
fn uncle_by_block_hash_and_index(&self, hash: RpcH256, idx: Index) -> Result<Option<RichBlock>, Error> {
Err(errors::unimplemented(None))
}
fn uncle_by_block_number_and_index(&self, num: BlockNumber, idx: Index) -> Result<Option<RichBlock>, Error> {
Err(errors::unimplemented(None))
}
fn compilers(&self) -> Result<Vec<String>, Error> {
Err(errors::unimplemented(None))
}
fn compile_lll(&self, _code: String) -> Result<Bytes, Error> {
Err(errors::unimplemented(None))
}
fn compile_solidity(&self, _code: String) -> Result<Bytes, Error> {
Err(errors::unimplemented(None))
}
fn compile_serpent(&self, _code: String) -> Result<Bytes, Error> {
Err(errors::unimplemented(None))
}
fn logs(&self, _filter: Filter) -> Result<Vec<Log>, Error> {
Err(errors::unimplemented(None))
}
fn work(&self, _timeout: Trailing<u64>) -> Result<Work, Error> {
Err(errors::unimplemented(None))
}
fn submit_work(&self, _nonce: RpcH64, _pow_hash: RpcH256, _mix_hash: RpcH256) -> Result<bool, Error> {
Err(errors::unimplemented(None))
}
fn submit_hashrate(&self, _rate: RpcU256, _id: RpcH256) -> Result<bool, Error> {
Err(errors::unimplemented(None))
}
}

View File

@ -0,0 +1,21 @@
// Copyright 2015-2017 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! RPC implementations for the light client.
pub mod eth;
pub use self::eth::EthClient;

View File

@ -16,15 +16,6 @@
//! Ethereum rpc interface implementation. //! Ethereum rpc interface implementation.
macro_rules! take_weak {
($weak: expr) => {
match $weak.upgrade() {
Some(arc) => arc,
None => return Err(Error::internal_error())
}
}
}
mod eth; mod eth;
mod eth_filter; mod eth_filter;
mod net; mod net;
@ -39,6 +30,8 @@ mod rpc;
mod traces; mod traces;
mod web3; mod web3;
pub mod light;
pub use self::web3::Web3Client; pub use self::web3::Web3Client;
pub use self::eth::{EthClient, EthClientOptions}; pub use self::eth::{EthClient, EthClientOptions};
pub use self::eth_filter::EthFilterClient; pub use self::eth_filter::EthFilterClient;

View File

@ -18,48 +18,37 @@
use std::sync::{Arc, Weak}; use std::sync::{Arc, Weak};
use ethcore::account_provider::AccountProvider; use ethcore::account_provider::AccountProvider;
use ethcore::client::MiningBlockChainClient; use ethcore::transaction::PendingTransaction;
use ethcore::miner::MinerService;
use util::{Address, U128, Uint};
use futures::{self, Future, BoxFuture}; use util::{Address, U128, Uint, ToPretty};
use futures::{future, Future, BoxFuture};
use jsonrpc_core::Error; use jsonrpc_core::Error;
use v1::helpers::errors; use v1::helpers::errors;
use v1::helpers::dispatch::{self, sign_and_dispatch}; use v1::helpers::dispatch::{Dispatcher, SignWith};
use v1::traits::Personal; use v1::traits::Personal;
use v1::types::{H160 as RpcH160, H256 as RpcH256, U128 as RpcU128, TransactionRequest}; use v1::types::{H160 as RpcH160, H256 as RpcH256, U128 as RpcU128, TransactionRequest};
use v1::metadata::Metadata; use v1::metadata::Metadata;
/// Account management (personal) rpc implementation. /// Account management (personal) rpc implementation.
pub struct PersonalClient<C, M> where pub struct PersonalClient<D: Dispatcher> {
C: MiningBlockChainClient,
M: MinerService,
{
accounts: Weak<AccountProvider>, accounts: Weak<AccountProvider>,
client: Weak<C>, dispatcher: D,
miner: Weak<M>,
allow_perm_unlock: bool, allow_perm_unlock: bool,
} }
impl<C, M> PersonalClient<C, M> where impl<D: Dispatcher> PersonalClient<D> {
C: MiningBlockChainClient,
M: MinerService,
{
/// Creates new PersonalClient /// Creates new PersonalClient
pub fn new(store: &Arc<AccountProvider>, client: &Arc<C>, miner: &Arc<M>, allow_perm_unlock: bool) -> Self { pub fn new(store: &Arc<AccountProvider>, dispatcher: D, allow_perm_unlock: bool) -> Self {
PersonalClient { PersonalClient {
accounts: Arc::downgrade(store), accounts: Arc::downgrade(store),
client: Arc::downgrade(client), dispatcher: dispatcher,
miner: Arc::downgrade(miner),
allow_perm_unlock: allow_perm_unlock, allow_perm_unlock: allow_perm_unlock,
} }
} }
} }
impl<C, M> Personal for PersonalClient<C, M> where impl<D: Dispatcher + 'static> Personal for PersonalClient<D> {
C: MiningBlockChainClient + 'static,
M: MinerService + 'static,
{
type Metadata = Metadata; type Metadata = Metadata;
fn accounts(&self) -> Result<Vec<RpcH160>, Error> { fn accounts(&self) -> Result<Vec<RpcH160>, Error> {
@ -106,28 +95,36 @@ impl<C, M> Personal for PersonalClient<C, M> where
} }
fn send_transaction(&self, meta: Metadata, request: TransactionRequest, password: String) -> BoxFuture<RpcH256, Error> { fn send_transaction(&self, meta: Metadata, request: TransactionRequest, password: String) -> BoxFuture<RpcH256, Error> {
let sign_and_send = move || { let dispatcher = self.dispatcher.clone();
let client = take_weak!(self.client); let accounts = take_weakf!(self.accounts);
let miner = take_weak!(self.miner);
let accounts = take_weak!(self.accounts);
let default_account = match request.from { let default = match request.from.as_ref() {
Some(ref account) => account.clone().into(), Some(account) => Ok(account.clone().into()),
None => accounts None => accounts
.default_address(meta.dapp_id.unwrap_or_default().into()) .default_address(meta.dapp_id.unwrap_or_default().into())
.map_err(|e| errors::account("Cannot find default account.", e))?, .map_err(|e| errors::account("Cannot find default account.", e)),
}; };
let request = dispatch::fill_optional_fields(request.into(), default_account, &*client, &*miner); let default = match default {
sign_and_dispatch( Ok(default) => default,
&*client, Err(e) => return future::err(e).boxed(),
&*miner,
&*accounts,
request,
dispatch::SignWith::Password(password)
).map(|v| v.into_value().into())
}; };
futures::done(sign_and_send()).boxed() dispatcher.fill_optional_fields(request.into(), default)
.and_then(move |filled| {
let condition = filled.condition.clone().map(Into::into);
dispatcher.sign(&accounts, filled, SignWith::Password(password))
.map(|tx| tx.into_value())
.map(move |tx| PendingTransaction::new(tx, condition))
.map(move |tx| (tx, dispatcher))
})
.and_then(|(pending_tx, dispatcher)| {
let network_id = pending_tx.network_id();
trace!(target: "miner", "send_transaction: dispatching tx: {} for network ID {:?}",
::rlp::encode(&*pending_tx).to_vec().pretty(), network_id);
dispatcher.dispatch_transaction(pending_tx).map(Into::into)
})
.boxed()
} }
} }

View File

@ -20,49 +20,53 @@ use std::sync::{Arc, Weak};
use rlp::{UntrustedRlp, View}; use rlp::{UntrustedRlp, View};
use ethcore::account_provider::AccountProvider; use ethcore::account_provider::AccountProvider;
use ethcore::client::MiningBlockChainClient;
use ethcore::transaction::{SignedTransaction, PendingTransaction}; use ethcore::transaction::{SignedTransaction, PendingTransaction};
use ethcore::miner::MinerService; use futures::{future, BoxFuture, Future, IntoFuture};
use jsonrpc_core::Error; use jsonrpc_core::Error;
use v1::helpers::{errors, SignerService, SigningQueue, ConfirmationPayload}; use v1::helpers::{errors, SignerService, SigningQueue, ConfirmationPayload};
use v1::helpers::dispatch::{self, dispatch_transaction, WithToken}; use v1::helpers::dispatch::{self, Dispatcher, WithToken};
use v1::traits::Signer; use v1::traits::Signer;
use v1::types::{TransactionModification, ConfirmationRequest, ConfirmationResponse, ConfirmationResponseWithToken, U256, Bytes}; use v1::types::{TransactionModification, ConfirmationRequest, ConfirmationResponse, ConfirmationResponseWithToken, U256, Bytes};
/// Transactions confirmation (personal) rpc implementation. /// Transactions confirmation (personal) rpc implementation.
pub struct SignerClient<C, M> where C: MiningBlockChainClient, M: MinerService { pub struct SignerClient<D: Dispatcher> {
signer: Weak<SignerService>, signer: Weak<SignerService>,
accounts: Weak<AccountProvider>, accounts: Weak<AccountProvider>,
client: Weak<C>, dispatcher: D
miner: Weak<M>,
} }
impl<C: 'static, M: 'static> SignerClient<C, M> where C: MiningBlockChainClient, M: MinerService { impl<D: Dispatcher + 'static> SignerClient<D> {
/// Create new instance of signer client. /// Create new instance of signer client.
pub fn new( pub fn new(
store: &Arc<AccountProvider>, store: &Arc<AccountProvider>,
client: &Arc<C>, dispatcher: D,
miner: &Arc<M>,
signer: &Arc<SignerService>, signer: &Arc<SignerService>,
) -> Self { ) -> Self {
SignerClient { SignerClient {
signer: Arc::downgrade(signer), signer: Arc::downgrade(signer),
accounts: Arc::downgrade(store), accounts: Arc::downgrade(store),
client: Arc::downgrade(client), dispatcher: dispatcher,
miner: Arc::downgrade(miner),
} }
} }
fn confirm_internal<F>(&self, id: U256, modification: TransactionModification, f: F) -> Result<WithToken<ConfirmationResponse>, Error> where fn confirm_internal<F, T>(&self, id: U256, modification: TransactionModification, f: F) -> BoxFuture<WithToken<ConfirmationResponse>, Error> where
F: FnOnce(&C, &M, &AccountProvider, ConfirmationPayload) -> Result<WithToken<ConfirmationResponse>, Error>, F: FnOnce(D, &AccountProvider, ConfirmationPayload) -> T,
T: IntoFuture<Item=WithToken<ConfirmationResponse>, Error=Error>,
T::Future: Send + 'static
{ {
let id = id.into(); let id = id.into();
let accounts = take_weak!(self.accounts); let dispatcher = self.dispatcher.clone();
let signer = take_weak!(self.signer);
let client = take_weak!(self.client); let setup = || {
let miner = take_weak!(self.miner); Ok((take_weak!(self.accounts), take_weak!(self.signer)))
};
let (accounts, signer) = match setup() {
Ok(x) => x,
Err(e) => return future::err(e).boxed(),
};
signer.peek(&id).map(|confirmation| { signer.peek(&id).map(|confirmation| {
let mut payload = confirmation.payload.clone(); let mut payload = confirmation.payload.clone();
@ -83,17 +87,21 @@ impl<C: 'static, M: 'static> SignerClient<C, M> where C: MiningBlockChainClient,
request.condition = condition.clone().map(Into::into); request.condition = condition.clone().map(Into::into);
} }
} }
let result = f(&*client, &*miner, &*accounts, payload); let fut = f(dispatcher, &*accounts, payload);
fut.into_future().then(move |result| {
// Execute // Execute
if let Ok(ref response) = result { if let Ok(ref response) = result {
signer.request_confirmed(id, Ok((*response).clone())); signer.request_confirmed(id, Ok((*response).clone()));
} }
result result
}).unwrap_or_else(|| Err(errors::invalid_params("Unknown RequestID", id))) }).boxed()
})
.unwrap_or_else(|| future::err(errors::invalid_params("Unknown RequestID", id)).boxed())
} }
} }
impl<C: 'static, M: 'static> Signer for SignerClient<C, M> where C: MiningBlockChainClient, M: MinerService { impl<D: Dispatcher + 'static> Signer for SignerClient<D> {
fn requests_to_confirm(&self) -> Result<Vec<ConfirmationRequest>, Error> { fn requests_to_confirm(&self) -> Result<Vec<ConfirmationRequest>, Error> {
let signer = take_weak!(self.signer); let signer = take_weak!(self.signer);
@ -107,29 +115,31 @@ impl<C: 'static, M: 'static> Signer for SignerClient<C, M> where C: MiningBlockC
// TODO [ToDr] TransactionModification is redundant for some calls // TODO [ToDr] TransactionModification is redundant for some calls
// might be better to replace it in future // might be better to replace it in future
fn confirm_request(&self, id: U256, modification: TransactionModification, pass: String) -> Result<ConfirmationResponse, Error> { fn confirm_request(&self, id: U256, modification: TransactionModification, pass: String)
self.confirm_internal(id, modification, move |client, miner, accounts, payload| { -> BoxFuture<ConfirmationResponse, Error>
dispatch::execute(client, miner, accounts, payload, dispatch::SignWith::Password(pass)) {
}).map(|v| v.into_value()) self.confirm_internal(id, modification, move |dis, accounts, payload| {
dispatch::execute(dis, accounts, payload, dispatch::SignWith::Password(pass))
}).map(|v| v.into_value()).boxed()
} }
fn confirm_request_with_token(&self, id: U256, modification: TransactionModification, token: String) -> Result<ConfirmationResponseWithToken, Error> { fn confirm_request_with_token(&self, id: U256, modification: TransactionModification, token: String)
self.confirm_internal(id, modification, move |client, miner, accounts, payload| { -> BoxFuture<ConfirmationResponseWithToken, Error>
dispatch::execute(client, miner, accounts, payload, dispatch::SignWith::Token(token)) {
self.confirm_internal(id, modification, move |dis, accounts, payload| {
dispatch::execute(dis, accounts, payload, dispatch::SignWith::Token(token))
}).and_then(|v| match v { }).and_then(|v| match v {
WithToken::No(_) => Err(errors::internal("Unexpected response without token.", "")), WithToken::No(_) => Err(errors::internal("Unexpected response without token.", "")),
WithToken::Yes(response, token) => Ok(ConfirmationResponseWithToken { WithToken::Yes(response, token) => Ok(ConfirmationResponseWithToken {
result: response, result: response,
token: token, token: token,
}), }),
}) }).boxed()
} }
fn confirm_request_raw(&self, id: U256, bytes: Bytes) -> Result<ConfirmationResponse, Error> { fn confirm_request_raw(&self, id: U256, bytes: Bytes) -> Result<ConfirmationResponse, Error> {
let id = id.into(); let id = id.into();
let signer = take_weak!(self.signer); let signer = take_weak!(self.signer);
let client = take_weak!(self.client);
let miner = take_weak!(self.miner);
signer.peek(&id).map(|confirmation| { signer.peek(&id).map(|confirmation| {
let result = match confirmation.payload { let result = match confirmation.payload {
@ -150,7 +160,7 @@ impl<C: 'static, M: 'static> Signer for SignerClient<C, M> where C: MiningBlockC
// Dispatch if everything is ok // Dispatch if everything is ok
if sender_matches && data_matches && value_matches && nonce_matches { if sender_matches && data_matches && value_matches && nonce_matches {
let pending_transaction = PendingTransaction::new(signed_transaction, request.condition.map(Into::into)); let pending_transaction = PendingTransaction::new(signed_transaction, request.condition.map(Into::into));
dispatch_transaction(&*client, &*miner, pending_transaction) self.dispatcher.dispatch_transaction(pending_transaction)
.map(Into::into) .map(Into::into)
.map(ConfirmationResponse::SendTransaction) .map(ConfirmationResponse::SendTransaction)
} else { } else {

View File

@ -21,16 +21,15 @@ use transient_hashmap::TransientHashMap;
use util::{U256, Mutex}; use util::{U256, Mutex};
use ethcore::account_provider::AccountProvider; use ethcore::account_provider::AccountProvider;
use ethcore::miner::MinerService;
use ethcore::client::MiningBlockChainClient;
use futures::{self, BoxFuture, Future}; use futures::{self, future, BoxFuture, Future};
use jsonrpc_core::Error; use jsonrpc_core::Error;
use v1::helpers::{ use v1::helpers::{
errors, dispatch, errors,
DefaultAccount, DefaultAccount,
SigningQueue, ConfirmationPromise, ConfirmationResult, ConfirmationPayload, SignerService SigningQueue, ConfirmationPromise, ConfirmationResult, SignerService
}; };
use v1::helpers::dispatch::{self, Dispatcher};
use v1::metadata::Metadata; use v1::metadata::Metadata;
use v1::traits::{EthSigning, ParitySigning}; use v1::traits::{EthSigning, ParitySigning};
use v1::types::{ use v1::types::{
@ -50,33 +49,16 @@ enum DispatchResult {
} }
/// Implementation of functions that require signing when no trusted signer is used. /// Implementation of functions that require signing when no trusted signer is used.
pub struct SigningQueueClient<C, M> where C: MiningBlockChainClient, M: MinerService { pub struct SigningQueueClient<D> {
signer: Weak<SignerService>, signer: Weak<SignerService>,
accounts: Weak<AccountProvider>, accounts: Weak<AccountProvider>,
client: Weak<C>, dispatcher: D,
miner: Weak<M>, pending: Arc<Mutex<TransientHashMap<U256, ConfirmationPromise>>>,
pending: Mutex<TransientHashMap<U256, ConfirmationPromise>>,
} }
impl<C, M> SigningQueueClient<C, M> where fn handle_dispatch<OnResponse>(res: Result<DispatchResult, Error>, on_response: OnResponse)
C: MiningBlockChainClient,
M: MinerService,
{
/// Creates a new signing queue client given shared signing queue.
pub fn new(signer: &Arc<SignerService>, client: &Arc<C>, miner: &Arc<M>, accounts: &Arc<AccountProvider>) -> Self {
SigningQueueClient {
signer: Arc::downgrade(signer),
accounts: Arc::downgrade(accounts),
client: Arc::downgrade(client),
miner: Arc::downgrade(miner),
pending: Mutex::new(TransientHashMap::new(MAX_PENDING_DURATION)),
}
}
fn handle_dispatch<OnResponse>(&self, res: Result<DispatchResult, Error>, on_response: OnResponse)
where OnResponse: FnOnce(Result<RpcConfirmationResponse, Error>) + Send + 'static where OnResponse: FnOnce(Result<RpcConfirmationResponse, Error>) + Send + 'static
{ {
match res { match res {
Ok(DispatchResult::Value(result)) => on_response(Ok(result)), Ok(DispatchResult::Value(result)) => on_response(Ok(result)),
Ok(DispatchResult::Promise(promise)) => { Ok(DispatchResult::Promise(promise)) => {
@ -86,69 +68,77 @@ impl<C, M> SigningQueueClient<C, M> where
}, },
Err(e) => on_response(Err(e)), Err(e) => on_response(Err(e)),
} }
}
impl<D: Dispatcher + 'static> SigningQueueClient<D> {
/// Creates a new signing queue client given shared signing queue.
pub fn new(signer: &Arc<SignerService>, dispatcher: D, accounts: &Arc<AccountProvider>) -> Self {
SigningQueueClient {
signer: Arc::downgrade(signer),
accounts: Arc::downgrade(accounts),
dispatcher: dispatcher,
pending: Arc::new(Mutex::new(TransientHashMap::new(MAX_PENDING_DURATION))),
}
} }
fn add_to_queue(&self, payload: ConfirmationPayload) -> Result<DispatchResult, Error> { fn dispatch(&self, payload: RpcConfirmationPayload, default_account: DefaultAccount) -> BoxFuture<DispatchResult, Error> {
let client = take_weak!(self.client); let accounts = take_weakf!(self.accounts);
let miner = take_weak!(self.miner);
let accounts = take_weak!(self.accounts);
let sender = payload.sender();
if accounts.is_unlocked(sender) {
return dispatch::execute(&*client, &*miner, &*accounts, payload, dispatch::SignWith::Nothing)
.map(|v| v.into_value())
.map(DispatchResult::Value);
}
take_weak!(self.signer).add_request(payload)
.map(DispatchResult::Promise)
.map_err(|_| errors::request_rejected_limit())
}
fn dispatch(&self, payload: RpcConfirmationPayload, default_account: DefaultAccount) -> Result<DispatchResult, Error> {
let client = take_weak!(self.client);
let miner = take_weak!(self.miner);
let default_account = match default_account { let default_account = match default_account {
DefaultAccount::Provided(acc) => acc, DefaultAccount::Provided(acc) => acc,
DefaultAccount::ForDapp(dapp) => take_weak!(self.accounts).default_address(dapp).ok().unwrap_or_default(), DefaultAccount::ForDapp(dapp) => accounts.default_address(dapp).ok().unwrap_or_default(),
}; };
let payload = dispatch::from_rpc(payload, default_account, &*client, &*miner);
self.add_to_queue(payload) let dispatcher = self.dispatcher.clone();
let signer = take_weakf!(self.signer);
dispatch::from_rpc(payload, default_account, &dispatcher)
.and_then(move |payload| {
let sender = payload.sender();
if accounts.is_unlocked(sender) {
dispatch::execute(dispatcher, &accounts, payload, dispatch::SignWith::Nothing)
.map(|v| v.into_value())
.map(DispatchResult::Value)
.boxed()
} else {
future::done(
signer.add_request(payload)
.map(DispatchResult::Promise)
.map_err(|_| errors::request_rejected_limit())
).boxed()
}
})
.boxed()
} }
} }
impl<C: 'static, M: 'static> ParitySigning for SigningQueueClient<C, M> where impl<D: Dispatcher + 'static> ParitySigning for SigningQueueClient<D> {
C: MiningBlockChainClient,
M: MinerService,
{
type Metadata = Metadata; type Metadata = Metadata;
fn post_sign(&self, address: RpcH160, data: RpcBytes) -> Result<RpcEither<RpcU256, RpcConfirmationResponse>, Error> { fn post_sign(&self, address: RpcH160, data: RpcBytes) -> BoxFuture<RpcEither<RpcU256, RpcConfirmationResponse>, Error> {
let pending = self.pending.clone();
self.dispatch(RpcConfirmationPayload::Signature((address.clone(), data).into()), DefaultAccount::Provided(address.into())) self.dispatch(RpcConfirmationPayload::Signature((address.clone(), data).into()), DefaultAccount::Provided(address.into()))
.map(|result| match result { .map(move |result| match result {
DispatchResult::Value(v) => RpcEither::Or(v), DispatchResult::Value(v) => RpcEither::Or(v),
DispatchResult::Promise(promise) => { DispatchResult::Promise(promise) => {
let id = promise.id(); let id = promise.id();
self.pending.lock().insert(id, promise); pending.lock().insert(id, promise);
RpcEither::Either(id.into()) RpcEither::Either(id.into())
}, },
}) })
.boxed()
} }
fn post_transaction(&self, meta: Metadata, request: RpcTransactionRequest) -> BoxFuture<RpcEither<RpcU256, RpcConfirmationResponse>, Error> { fn post_transaction(&self, meta: Metadata, request: RpcTransactionRequest) -> BoxFuture<RpcEither<RpcU256, RpcConfirmationResponse>, Error> {
let post_transaction = move || { let pending = self.pending.clone();
self.dispatch(RpcConfirmationPayload::SendTransaction(request), meta.into()) self.dispatch(RpcConfirmationPayload::SendTransaction(request), meta.into())
.map(|result| match result { .map(move |result| match result {
DispatchResult::Value(v) => RpcEither::Or(v), DispatchResult::Value(v) => RpcEither::Or(v),
DispatchResult::Promise(promise) => { DispatchResult::Promise(promise) => {
let id = promise.id(); let id = promise.id();
self.pending.lock().insert(id, promise); pending.lock().insert(id, promise);
RpcEither::Either(id.into()) RpcEither::Either(id.into())
}, },
}) })
}; .boxed()
futures::done(post_transaction()).boxed()
} }
fn check_request(&self, id: RpcU256) -> Result<Option<RpcConfirmationResponse>, Error> { fn check_request(&self, id: RpcU256) -> Result<Option<RpcConfirmationResponse>, Error> {
@ -170,8 +160,11 @@ impl<C: 'static, M: 'static> ParitySigning for SigningQueueClient<C, M> where
let res = self.dispatch(RpcConfirmationPayload::Decrypt((address.clone(), data).into()), address.into()); let res = self.dispatch(RpcConfirmationPayload::Decrypt((address.clone(), data).into()), address.into());
let (ready, p) = futures::oneshot(); let (ready, p) = futures::oneshot();
// TODO [todr] typed handle_dispatch
self.handle_dispatch(res, |response| { // when dispatch is complete
res.then(move |res| {
// register callback via the oneshot sender.
handle_dispatch(res, move |response| {
match response { match response {
Ok(RpcConfirmationResponse::Decrypt(data)) => ready.complete(Ok(data)), Ok(RpcConfirmationResponse::Decrypt(data)) => ready.complete(Ok(data)),
Err(e) => ready.complete(Err(e)), Err(e) => ready.complete(Err(e)),
@ -179,36 +172,40 @@ impl<C: 'static, M: 'static> ParitySigning for SigningQueueClient<C, M> where
} }
}); });
p.then(|result| futures::done(result.expect("Ready is never dropped nor canceled."))).boxed() // and wait for that to resolve.
p.then(|result| futures::done(result.expect("Ready is never dropped nor canceled.")))
}).boxed()
} }
} }
impl<C: 'static, M: 'static> EthSigning for SigningQueueClient<C, M> where impl<D: Dispatcher + 'static> EthSigning for SigningQueueClient<D> {
C: MiningBlockChainClient,
M: MinerService,
{
type Metadata = Metadata; type Metadata = Metadata;
fn sign(&self, address: RpcH160, data: RpcBytes) -> BoxFuture<RpcH520, Error> { fn sign(&self, address: RpcH160, data: RpcBytes) -> BoxFuture<RpcH520, Error> {
let res = self.dispatch(RpcConfirmationPayload::Signature((address.clone(), data).into()), address.into()); let res = self.dispatch(RpcConfirmationPayload::Signature((address.clone(), data).into()), address.into());
let (ready, p) = futures::oneshot(); let (ready, p) = futures::oneshot();
self.handle_dispatch(res, |response| {
res.then(move |res| {
handle_dispatch(res, move |response| {
match response { match response {
Ok(RpcConfirmationResponse::Signature(signature)) => ready.complete(Ok(signature)), Ok(RpcConfirmationResponse::Signature(sig)) => ready.complete(Ok(sig)),
Err(e) => ready.complete(Err(e)), Err(e) => ready.complete(Err(e)),
e => ready.complete(Err(errors::internal("Unexpected result.", e))), e => ready.complete(Err(errors::internal("Unexpected result.", e))),
} }
}); });
p.then(|result| futures::done(result.expect("Ready is never dropped nor canceled."))).boxed() p.then(|result| futures::done(result.expect("Ready is never dropped nor canceled.")))
}).boxed()
} }
fn send_transaction(&self, meta: Metadata, request: RpcTransactionRequest) -> BoxFuture<RpcH256, Error> { fn send_transaction(&self, meta: Metadata, request: RpcTransactionRequest) -> BoxFuture<RpcH256, Error> {
let res = self.dispatch(RpcConfirmationPayload::SendTransaction(request), meta.into()); let res = self.dispatch(RpcConfirmationPayload::SendTransaction(request), meta.into());
let (ready, p) = futures::oneshot(); let (ready, p) = futures::oneshot();
self.handle_dispatch(res, |response| {
res.then(move |res| {
handle_dispatch(res, move |response| {
match response { match response {
Ok(RpcConfirmationResponse::SendTransaction(hash)) => ready.complete(Ok(hash)), Ok(RpcConfirmationResponse::SendTransaction(hash)) => ready.complete(Ok(hash)),
Err(e) => ready.complete(Err(e)), Err(e) => ready.complete(Err(e)),
@ -216,14 +213,17 @@ impl<C: 'static, M: 'static> EthSigning for SigningQueueClient<C, M> where
} }
}); });
p.then(|result| futures::done(result.expect("Ready is never dropped nor canceled."))).boxed() p.then(|result| futures::done(result.expect("Ready is never dropped nor canceled.")))
}).boxed()
} }
fn sign_transaction(&self, meta: Metadata, request: RpcTransactionRequest) -> BoxFuture<RpcRichRawTransaction, Error> { fn sign_transaction(&self, meta: Metadata, request: RpcTransactionRequest) -> BoxFuture<RpcRichRawTransaction, Error> {
let res = self.dispatch(RpcConfirmationPayload::SignTransaction(request), meta.into()); let res = self.dispatch(RpcConfirmationPayload::SignTransaction(request), meta.into());
let (ready, p) = futures::oneshot(); let (ready, p) = futures::oneshot();
self.handle_dispatch(res, |response| {
res.then(move |res| {
handle_dispatch(res, move |response| {
match response { match response {
Ok(RpcConfirmationResponse::SignTransaction(tx)) => ready.complete(Ok(tx)), Ok(RpcConfirmationResponse::SignTransaction(tx)) => ready.complete(Ok(tx)),
Err(e) => ready.complete(Err(e)), Err(e) => ready.complete(Err(e)),
@ -231,6 +231,7 @@ impl<C: 'static, M: 'static> EthSigning for SigningQueueClient<C, M> where
} }
}); });
p.then(|result| futures::done(result.expect("Ready is never dropped nor canceled."))).boxed() p.then(|result| futures::done(result.expect("Ready is never dropped nor canceled.")))
}).boxed()
} }
} }

View File

@ -19,12 +19,11 @@
use std::sync::{Arc, Weak}; use std::sync::{Arc, Weak};
use ethcore::account_provider::AccountProvider; use ethcore::account_provider::AccountProvider;
use ethcore::miner::MinerService;
use ethcore::client::MiningBlockChainClient;
use futures::{self, BoxFuture, Future}; use futures::{future, BoxFuture, Future};
use jsonrpc_core::Error; use jsonrpc_core::Error;
use v1::helpers::{errors, dispatch, DefaultAccount}; use v1::helpers::{errors, DefaultAccount};
use v1::helpers::dispatch::{self, Dispatcher};
use v1::metadata::Metadata; use v1::metadata::Metadata;
use v1::traits::{EthSigning, ParitySigning}; use v1::traits::{EthSigning, ParitySigning};
use v1::types::{ use v1::types::{
@ -38,106 +37,93 @@ use v1::types::{
}; };
/// Implementation of functions that require signing when no trusted signer is used. /// Implementation of functions that require signing when no trusted signer is used.
pub struct SigningUnsafeClient<C, M> where pub struct SigningUnsafeClient<D> {
C: MiningBlockChainClient,
M: MinerService,
{
accounts: Weak<AccountProvider>, accounts: Weak<AccountProvider>,
client: Weak<C>, dispatcher: D,
miner: Weak<M>,
} }
impl<C, M> SigningUnsafeClient<C, M> where impl<D: Dispatcher + 'static> SigningUnsafeClient<D> {
C: MiningBlockChainClient,
M: MinerService,
{
/// Creates new SigningUnsafeClient. /// Creates new SigningUnsafeClient.
pub fn new(client: &Arc<C>, accounts: &Arc<AccountProvider>, miner: &Arc<M>) pub fn new(accounts: &Arc<AccountProvider>, dispatcher: D) -> Self {
-> Self {
SigningUnsafeClient { SigningUnsafeClient {
client: Arc::downgrade(client),
miner: Arc::downgrade(miner),
accounts: Arc::downgrade(accounts), accounts: Arc::downgrade(accounts),
dispatcher: dispatcher,
} }
} }
fn handle(&self, payload: RpcConfirmationPayload, account: DefaultAccount) -> Result<RpcConfirmationResponse, Error> { fn handle(&self, payload: RpcConfirmationPayload, account: DefaultAccount) -> BoxFuture<RpcConfirmationResponse, Error> {
let client = take_weak!(self.client); let accounts = take_weakf!(self.accounts);
let miner = take_weak!(self.miner); let default = match account {
let accounts = take_weak!(self.accounts);
let default_account = match account {
DefaultAccount::Provided(acc) => acc, DefaultAccount::Provided(acc) => acc,
DefaultAccount::ForDapp(dapp) => accounts.default_address(dapp).ok().unwrap_or_default(), DefaultAccount::ForDapp(dapp) => accounts.default_address(dapp).ok().unwrap_or_default(),
}; };
let payload = dispatch::from_rpc(payload, default_account, &*client, &*miner);
dispatch::execute(&*client, &*miner, &*accounts, payload, dispatch::SignWith::Nothing) let dis = self.dispatcher.clone();
dispatch::from_rpc(payload, default, &dis)
.and_then(move |payload| {
dispatch::execute(dis, &accounts, payload, dispatch::SignWith::Nothing)
})
.map(|v| v.into_value()) .map(|v| v.into_value())
.boxed()
} }
} }
impl<C: 'static, M: 'static> EthSigning for SigningUnsafeClient<C, M> where impl<D: Dispatcher + 'static> EthSigning for SigningUnsafeClient<D>
C: MiningBlockChainClient,
M: MinerService,
{ {
type Metadata = Metadata; type Metadata = Metadata;
fn sign(&self, address: RpcH160, data: RpcBytes) -> BoxFuture<RpcH520, Error> { fn sign(&self, address: RpcH160, data: RpcBytes) -> BoxFuture<RpcH520, Error> {
let result = match self.handle(RpcConfirmationPayload::Signature((address.clone(), data).into()), address.into()) { self.handle(RpcConfirmationPayload::Signature((address.clone(), data).into()), address.into())
.then(|res| match res {
Ok(RpcConfirmationResponse::Signature(signature)) => Ok(signature), Ok(RpcConfirmationResponse::Signature(signature)) => Ok(signature),
Err(e) => Err(e), Err(e) => Err(e),
e => Err(errors::internal("Unexpected result", e)), e => Err(errors::internal("Unexpected result", e)),
}; })
.boxed()
futures::done(result).boxed()
} }
fn send_transaction(&self, meta: Metadata, request: RpcTransactionRequest) -> BoxFuture<RpcH256, Error> { fn send_transaction(&self, meta: Metadata, request: RpcTransactionRequest) -> BoxFuture<RpcH256, Error> {
let result = match self.handle(RpcConfirmationPayload::SendTransaction(request), meta.into()) { self.handle(RpcConfirmationPayload::SendTransaction(request), meta.into())
.then(|res| match res {
Ok(RpcConfirmationResponse::SendTransaction(hash)) => Ok(hash), Ok(RpcConfirmationResponse::SendTransaction(hash)) => Ok(hash),
Err(e) => Err(e), Err(e) => Err(e),
e => Err(errors::internal("Unexpected result", e)), e => Err(errors::internal("Unexpected result", e)),
}; })
.boxed()
futures::done(result).boxed()
} }
fn sign_transaction(&self, meta: Metadata, request: RpcTransactionRequest) -> BoxFuture<RpcRichRawTransaction, Error> { fn sign_transaction(&self, meta: Metadata, request: RpcTransactionRequest) -> BoxFuture<RpcRichRawTransaction, Error> {
let result = match self.handle(RpcConfirmationPayload::SignTransaction(request), meta.into()) { self.handle(RpcConfirmationPayload::SignTransaction(request), meta.into())
.then(|res| match res {
Ok(RpcConfirmationResponse::SignTransaction(tx)) => Ok(tx), Ok(RpcConfirmationResponse::SignTransaction(tx)) => Ok(tx),
Err(e) => Err(e), Err(e) => Err(e),
e => Err(errors::internal("Unexpected result", e)), e => Err(errors::internal("Unexpected result", e)),
}; })
.boxed()
futures::done(result).boxed()
} }
} }
impl<C: 'static, M: 'static> ParitySigning for SigningUnsafeClient<C, M> where impl<D: Dispatcher + 'static> ParitySigning for SigningUnsafeClient<D> {
C: MiningBlockChainClient,
M: MinerService,
{
type Metadata = Metadata; type Metadata = Metadata;
fn decrypt_message(&self, address: RpcH160, data: RpcBytes) -> BoxFuture<RpcBytes, Error> { fn decrypt_message(&self, address: RpcH160, data: RpcBytes) -> BoxFuture<RpcBytes, Error> {
let result = match self.handle(RpcConfirmationPayload::Decrypt((address.clone(), data).into()), address.into()) { self.handle(RpcConfirmationPayload::Decrypt((address.clone(), data).into()), address.into())
.then(|res| match res {
Ok(RpcConfirmationResponse::Decrypt(data)) => Ok(data), Ok(RpcConfirmationResponse::Decrypt(data)) => Ok(data),
Err(e) => Err(e), Err(e) => Err(e),
e => Err(errors::internal("Unexpected result", e)), e => Err(errors::internal("Unexpected result", e)),
}; })
.boxed()
futures::done(result).boxed()
} }
fn post_sign(&self, _: RpcH160, _: RpcBytes) -> Result<RpcEither<RpcU256, RpcConfirmationResponse>, Error> { fn post_sign(&self, _: RpcH160, _: RpcBytes) -> BoxFuture<RpcEither<RpcU256, RpcConfirmationResponse>, Error> {
// We don't support this in non-signer mode. // We don't support this in non-signer mode.
Err(errors::signer_disabled()) future::err(errors::signer_disabled()).boxed()
} }
fn post_transaction(&self, _: Metadata, _: RpcTransactionRequest) -> BoxFuture<RpcEither<RpcU256, RpcConfirmationResponse>, Error> { fn post_transaction(&self, _: Metadata, _: RpcTransactionRequest) -> BoxFuture<RpcEither<RpcU256, RpcConfirmationResponse>, Error> {
// We don't support this in non-signer mode. // We don't support this in non-signer mode.
futures::done(Err(errors::signer_disabled())).boxed() future::err((errors::signer_disabled())).boxed()
} }
fn check_request(&self, _: RpcU256) -> Result<Option<RpcConfirmationResponse>, Error> { fn check_request(&self, _: RpcU256) -> Result<Option<RpcConfirmationResponse>, Error> {

View File

@ -18,6 +18,37 @@
//! //!
//! Compliant with ethereum rpc. //! Compliant with ethereum rpc.
// Upgrade a weak pointer, returning an error on failure.
macro_rules! take_weak {
($weak: expr) => {
match $weak.upgrade() {
Some(arc) => arc,
None => return Err(Error::internal_error()),
}
}
}
// Upgrade a weak pointer, returning an error leaf-future on failure.
macro_rules! take_weakf {
($weak: expr) => {
match $weak.upgrade() {
Some(arc) => arc,
None => return ::futures::future::err(Error::internal_error()).boxed(),
}
}
}
// short for "try_boxfuture"
// unwrap a result, returning a BoxFuture<_, Err> on failure.
macro_rules! try_bf {
($res: expr) => {
match $res {
Ok(val) => val,
Err(e) => return ::futures::future::err(e.into()).boxed(),
}
}
}
#[macro_use] #[macro_use]
mod helpers; mod helpers;
mod impls; mod impls;
@ -29,5 +60,5 @@ pub mod types;
pub use self::traits::{Web3, Eth, EthFilter, EthSigning, Net, Parity, ParityAccounts, ParitySet, ParitySigning, Signer, Personal, Traces, Rpc}; pub use self::traits::{Web3, Eth, EthFilter, EthSigning, Net, Parity, ParityAccounts, ParitySet, ParitySigning, Signer, Personal, Traces, Rpc};
pub use self::impls::*; pub use self::impls::*;
pub use self::helpers::{SigningQueue, SignerService, ConfirmationsQueue, NetworkSettings, block_import, informant}; pub use self::helpers::{SigningQueue, SignerService, ConfirmationsQueue, NetworkSettings, block_import, informant, dispatch};
pub use self::metadata::{Metadata, Origin}; pub use self::metadata::{Metadata, Origin};

View File

@ -33,6 +33,7 @@ use util::{U256, H256, Uint, Address, Hashable};
use jsonrpc_core::IoHandler; use jsonrpc_core::IoHandler;
use v1::impls::{EthClient, SigningUnsafeClient}; use v1::impls::{EthClient, SigningUnsafeClient};
use v1::helpers::dispatch::FullDispatcher;
use v1::metadata::Metadata; use v1::metadata::Metadata;
use v1::tests::helpers::{TestSnapshotService, TestSyncProvider, Config}; use v1::tests::helpers::{TestSnapshotService, TestSyncProvider, Config};
use v1::traits::eth::Eth; use v1::traits::eth::Eth;
@ -141,10 +142,11 @@ impl EthTester {
&external_miner, &external_miner,
Default::default(), Default::default(),
); );
let dispatcher = FullDispatcher::new(Arc::downgrade(&client), Arc::downgrade(&miner_service));
let eth_sign = SigningUnsafeClient::new( let eth_sign = SigningUnsafeClient::new(
&client,
&account_provider, &account_provider,
&miner_service dispatcher,
); );
let mut handler = IoHandler::default(); let mut handler = IoHandler::default();

View File

@ -34,6 +34,7 @@ use ethsync::SyncState;
use jsonrpc_core::IoHandler; use jsonrpc_core::IoHandler;
use v1::{Eth, EthClient, EthClientOptions, EthFilter, EthFilterClient, EthSigning, SigningUnsafeClient}; use v1::{Eth, EthClient, EthClientOptions, EthFilter, EthFilterClient, EthSigning, SigningUnsafeClient};
use v1::helpers::dispatch::FullDispatcher;
use v1::tests::helpers::{TestSyncProvider, Config, TestMinerService, TestSnapshotService}; use v1::tests::helpers::{TestSyncProvider, Config, TestMinerService, TestSnapshotService};
use v1::metadata::Metadata; use v1::metadata::Metadata;
@ -88,7 +89,9 @@ impl EthTester {
let external_miner = Arc::new(ExternalMiner::new(hashrates.clone())); let external_miner = Arc::new(ExternalMiner::new(hashrates.clone()));
let eth = EthClient::new(&client, &snapshot, &sync, &ap, &miner, &external_miner, options).to_delegate(); let eth = EthClient::new(&client, &snapshot, &sync, &ap, &miner, &external_miner, options).to_delegate();
let filter = EthFilterClient::new(&client, &miner).to_delegate(); let filter = EthFilterClient::new(&client, &miner).to_delegate();
let sign = SigningUnsafeClient::new(&client, &ap, &miner).to_delegate();
let dispatcher = FullDispatcher::new(Arc::downgrade(&client), Arc::downgrade(&miner));
let sign = SigningUnsafeClient::new(&ap, dispatcher).to_delegate();
let mut io: IoHandler<Metadata> = IoHandler::default(); let mut io: IoHandler<Metadata> = IoHandler::default();
io.extend_with(eth); io.extend_with(eth);
io.extend_with(sign); io.extend_with(sign);

View File

@ -16,13 +16,16 @@
use std::sync::Arc; use std::sync::Arc;
use std::str::FromStr; use std::str::FromStr;
use jsonrpc_core::IoHandler;
use util::{U256, Uint, Address};
use ethcore::account_provider::AccountProvider; use ethcore::account_provider::AccountProvider;
use v1::{PersonalClient, Personal, Metadata};
use v1::tests::helpers::TestMinerService;
use ethcore::client::TestBlockChainClient; use ethcore::client::TestBlockChainClient;
use ethcore::transaction::{Action, Transaction}; use ethcore::transaction::{Action, Transaction};
use jsonrpc_core::IoHandler;
use util::{U256, Uint, Address};
use v1::{PersonalClient, Personal, Metadata};
use v1::helpers::dispatch::FullDispatcher;
use v1::tests::helpers::TestMinerService;
struct PersonalTester { struct PersonalTester {
accounts: Arc<AccountProvider>, accounts: Arc<AccountProvider>,
@ -50,7 +53,9 @@ fn setup() -> PersonalTester {
let accounts = accounts_provider(); let accounts = accounts_provider();
let client = blockchain_client(); let client = blockchain_client();
let miner = miner_service(); let miner = miner_service();
let personal = PersonalClient::new(&accounts, &client, &miner, false);
let dispatcher = FullDispatcher::new(Arc::downgrade(&client), Arc::downgrade(&miner));
let personal = PersonalClient::new(&accounts, dispatcher, false);
let mut io = IoHandler::default(); let mut io = IoHandler::default();
io.extend_with(personal.to_delegate()); io.extend_with(personal.to_delegate());

View File

@ -29,6 +29,7 @@ use v1::{SignerClient, Signer};
use v1::metadata::Metadata; use v1::metadata::Metadata;
use v1::tests::helpers::TestMinerService; use v1::tests::helpers::TestMinerService;
use v1::helpers::{SigningQueue, SignerService, FilledTransactionRequest, ConfirmationPayload}; use v1::helpers::{SigningQueue, SignerService, FilledTransactionRequest, ConfirmationPayload};
use v1::helpers::dispatch::FullDispatcher;
struct SignerTester { struct SignerTester {
signer: Arc<SignerService>, signer: Arc<SignerService>,
@ -59,8 +60,9 @@ fn signer_tester() -> SignerTester {
let client = blockchain_client(); let client = blockchain_client();
let miner = miner_service(); let miner = miner_service();
let dispatcher = FullDispatcher::new(Arc::downgrade(&client), Arc::downgrade(&miner));
let mut io = IoHandler::default(); let mut io = IoHandler::default();
io.extend_with(SignerClient::new(&accounts, &client, &miner, &signer).to_delegate()); io.extend_with(SignerClient::new(&accounts, dispatcher, &signer).to_delegate());
SignerTester { SignerTester {
signer: signer, signer: signer,

View File

@ -16,13 +16,14 @@
use std::str::FromStr; use std::str::FromStr;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration;
use rlp; use rlp;
use jsonrpc_core::{IoHandler, Success}; use jsonrpc_core::{IoHandler, Success};
use v1::impls::SigningQueueClient; use v1::impls::SigningQueueClient;
use v1::metadata::Metadata; use v1::metadata::Metadata;
use v1::traits::{EthSigning, ParitySigning, Parity}; use v1::traits::{EthSigning, ParitySigning, Parity};
use v1::helpers::{SignerService, SigningQueue}; use v1::helpers::{SignerService, SigningQueue, FullDispatcher};
use v1::types::ConfirmationResponse; use v1::types::ConfirmationResponse;
use v1::tests::helpers::TestMinerService; use v1::tests::helpers::TestMinerService;
use v1::tests::mocked::parity; use v1::tests::mocked::parity;
@ -51,9 +52,12 @@ impl Default for SigningTester {
let miner = Arc::new(TestMinerService::default()); let miner = Arc::new(TestMinerService::default());
let accounts = Arc::new(AccountProvider::transient_provider()); let accounts = Arc::new(AccountProvider::transient_provider());
let mut io = IoHandler::default(); let mut io = IoHandler::default();
let rpc = SigningQueueClient::new(&signer, &client, &miner, &accounts);
let dispatcher = FullDispatcher::new(Arc::downgrade(&client), Arc::downgrade(&miner));
let rpc = SigningQueueClient::new(&signer, dispatcher.clone(), &accounts);
io.extend_with(EthSigning::to_delegate(rpc)); io.extend_with(EthSigning::to_delegate(rpc));
let rpc = SigningQueueClient::new(&signer, &client, &miner, &accounts); let rpc = SigningQueueClient::new(&signer, dispatcher, &accounts);
io.extend_with(ParitySigning::to_delegate(rpc)); io.extend_with(ParitySigning::to_delegate(rpc));
SigningTester { SigningTester {
@ -91,9 +95,17 @@ fn should_add_sign_to_queue() {
// then // then
let promise = tester.io.handle_request(&request); let promise = tester.io.handle_request(&request);
assert_eq!(tester.signer.requests().len(), 1);
// the future must be polled at least once before request is queued.
let signer = tester.signer.clone();
::std::thread::spawn(move || loop {
if signer.requests().len() == 1 {
// respond // respond
tester.signer.request_confirmed(1.into(), Ok(ConfirmationResponse::Signature(0.into()))); signer.request_confirmed(1.into(), Ok(ConfirmationResponse::Signature(0.into())));
break
}
::std::thread::sleep(Duration::from_millis(100))
});
let res = promise.wait().unwrap(); let res = promise.wait().unwrap();
assert_eq!(res, Some(response.to_owned())); assert_eq!(res, Some(response.to_owned()));
@ -229,9 +241,17 @@ fn should_add_transaction_to_queue() {
// then // then
let promise = tester.io.handle_request(&request); let promise = tester.io.handle_request(&request);
assert_eq!(tester.signer.requests().len(), 1);
// the future must be polled at least once before request is queued.
let signer = tester.signer.clone();
::std::thread::spawn(move || loop {
if signer.requests().len() == 1 {
// respond // respond
tester.signer.request_confirmed(1.into(), Ok(ConfirmationResponse::SendTransaction(0.into()))); signer.request_confirmed(1.into(), Ok(ConfirmationResponse::SendTransaction(0.into())));
break
}
::std::thread::sleep(Duration::from_millis(100))
});
let res = promise.wait().unwrap(); let res = promise.wait().unwrap();
assert_eq!(res, Some(response.to_owned())); assert_eq!(res, Some(response.to_owned()));
@ -296,9 +316,17 @@ fn should_add_sign_transaction_to_the_queue() {
// then // then
tester.miner.last_nonces.write().insert(address.clone(), U256::zero()); tester.miner.last_nonces.write().insert(address.clone(), U256::zero());
let promise = tester.io.handle_request(&request); let promise = tester.io.handle_request(&request);
assert_eq!(tester.signer.requests().len(), 1);
// the future must be polled at least once before request is queued.
let signer = tester.signer.clone();
::std::thread::spawn(move || loop {
if signer.requests().len() == 1 {
// respond // respond
tester.signer.request_confirmed(1.into(), Ok(ConfirmationResponse::SignTransaction(t.into()))); signer.request_confirmed(1.into(), Ok(ConfirmationResponse::SignTransaction(t.into())));
break
}
::std::thread::sleep(Duration::from_millis(100))
});
let res = promise.wait().unwrap(); let res = promise.wait().unwrap();
assert_eq!(res, Some(response.to_owned())); assert_eq!(res, Some(response.to_owned()));
@ -391,12 +419,22 @@ fn should_add_decryption_to_the_queue() {
}"#; }"#;
let response = r#"{"jsonrpc":"2.0","result":"0x0102","id":1}"#; let response = r#"{"jsonrpc":"2.0","result":"0x0102","id":1}"#;
// then // then
let promise = tester.io.handle_request(&request); let promise = tester.io.handle_request(&request);
assert_eq!(tester.signer.requests().len(), 1);
// respond
tester.signer.request_confirmed(1.into(), Ok(ConfirmationResponse::Decrypt(vec![0x1, 0x2].into())));
// the future must be polled at least once before request is queued.
let signer = tester.signer.clone();
::std::thread::spawn(move || loop {
if signer.requests().len() == 1 {
// respond
signer.request_confirmed(1.into(), Ok(ConfirmationResponse::Decrypt(vec![0x1, 0x2].into())));
break
}
::std::thread::sleep(Duration::from_millis(100))
});
// check response: will deadlock if unsuccessful.
let res = promise.wait().unwrap(); let res = promise.wait().unwrap();
assert_eq!(res, Some(response.to_owned())); assert_eq!(res, Some(response.to_owned()));
} }

View File

@ -62,44 +62,44 @@ build_rpc_trait! {
fn block_number(&self) -> Result<U256, Error>; fn block_number(&self) -> Result<U256, Error>;
/// Returns balance of the given account. /// Returns balance of the given account.
#[rpc(name = "eth_getBalance")] #[rpc(async, name = "eth_getBalance")]
fn balance(&self, H160, Trailing<BlockNumber>) -> Result<U256, Error>; fn balance(&self, H160, Trailing<BlockNumber>) -> BoxFuture<U256, Error>;
/// Returns content of the storage at given address. /// Returns content of the storage at given address.
#[rpc(name = "eth_getStorageAt")] #[rpc(async, name = "eth_getStorageAt")]
fn storage_at(&self, H160, U256, Trailing<BlockNumber>) -> Result<H256, Error>; fn storage_at(&self, H160, U256, Trailing<BlockNumber>) -> BoxFuture<H256, Error>;
/// Returns block with given hash. /// Returns block with given hash.
#[rpc(name = "eth_getBlockByHash")] #[rpc(async, name = "eth_getBlockByHash")]
fn block_by_hash(&self, H256, bool) -> Result<Option<RichBlock>, Error>; fn block_by_hash(&self, H256, bool) -> BoxFuture<Option<RichBlock>, Error>;
/// Returns block with given number. /// Returns block with given number.
#[rpc(name = "eth_getBlockByNumber")] #[rpc(async, name = "eth_getBlockByNumber")]
fn block_by_number(&self, BlockNumber, bool) -> Result<Option<RichBlock>, Error>; fn block_by_number(&self, BlockNumber, bool) -> BoxFuture<Option<RichBlock>, Error>;
/// Returns the number of transactions sent from given address at given time (block number). /// Returns the number of transactions sent from given address at given time (block number).
#[rpc(name = "eth_getTransactionCount")] #[rpc(async, name = "eth_getTransactionCount")]
fn transaction_count(&self, H160, Trailing<BlockNumber>) -> Result<U256, Error>; fn transaction_count(&self, H160, Trailing<BlockNumber>) -> BoxFuture<U256, Error>;
/// Returns the number of transactions in a block with given hash. /// Returns the number of transactions in a block with given hash.
#[rpc(name = "eth_getBlockTransactionCountByHash")] #[rpc(async, name = "eth_getBlockTransactionCountByHash")]
fn block_transaction_count_by_hash(&self, H256) -> Result<Option<U256>, Error>; fn block_transaction_count_by_hash(&self, H256) -> BoxFuture<Option<U256>, Error>;
/// Returns the number of transactions in a block with given block number. /// Returns the number of transactions in a block with given block number.
#[rpc(name = "eth_getBlockTransactionCountByNumber")] #[rpc(async, name = "eth_getBlockTransactionCountByNumber")]
fn block_transaction_count_by_number(&self, BlockNumber) -> Result<Option<U256>, Error>; fn block_transaction_count_by_number(&self, BlockNumber) -> BoxFuture<Option<U256>, Error>;
/// Returns the number of uncles in a block with given hash. /// Returns the number of uncles in a block with given hash.
#[rpc(name = "eth_getUncleCountByBlockHash")] #[rpc(async, name = "eth_getUncleCountByBlockHash")]
fn block_uncles_count_by_hash(&self, H256) -> Result<Option<U256>, Error>; fn block_uncles_count_by_hash(&self, H256) -> BoxFuture<Option<U256>, Error>;
/// Returns the number of uncles in a block with given block number. /// Returns the number of uncles in a block with given block number.
#[rpc(name = "eth_getUncleCountByBlockNumber")] #[rpc(async, name = "eth_getUncleCountByBlockNumber")]
fn block_uncles_count_by_number(&self, BlockNumber) -> Result<Option<U256>, Error>; fn block_uncles_count_by_number(&self, BlockNumber) -> BoxFuture<Option<U256>, Error>;
/// Returns the code at given address at given time (block number). /// Returns the code at given address at given time (block number).
#[rpc(name = "eth_getCode")] #[rpc(async, name = "eth_getCode")]
fn code_at(&self, H160, Trailing<BlockNumber>) -> Result<Bytes, Error>; fn code_at(&self, H160, Trailing<BlockNumber>) -> BoxFuture<Bytes, Error>;
/// Sends signed transaction, returning its hash. /// Sends signed transaction, returning its hash.
#[rpc(name = "eth_sendRawTransaction")] #[rpc(name = "eth_sendRawTransaction")]

View File

@ -27,8 +27,8 @@ build_rpc_trait! {
/// Posts sign request asynchronously. /// Posts sign request asynchronously.
/// Will return a confirmation ID for later use with check_transaction. /// Will return a confirmation ID for later use with check_transaction.
#[rpc(name = "parity_postSign")] #[rpc(async, name = "parity_postSign")]
fn post_sign(&self, H160, Bytes) -> Result<Either<U256, ConfirmationResponse>, Error>; fn post_sign(&self, H160, Bytes) -> BoxFuture<Either<U256, ConfirmationResponse>, Error>;
/// Posts transaction asynchronously. /// Posts transaction asynchronously.
/// Will return a transaction ID for later use with check_transaction. /// Will return a transaction ID for later use with check_transaction.

View File

@ -16,6 +16,7 @@
//! Parity Signer-related rpc interface. //! Parity Signer-related rpc interface.
use jsonrpc_core::Error; use jsonrpc_core::Error;
use futures::BoxFuture;
use v1::types::{U256, Bytes, TransactionModification, ConfirmationRequest, ConfirmationResponse, ConfirmationResponseWithToken}; use v1::types::{U256, Bytes, TransactionModification, ConfirmationRequest, ConfirmationResponse, ConfirmationResponseWithToken};
@ -28,12 +29,12 @@ build_rpc_trait! {
fn requests_to_confirm(&self) -> Result<Vec<ConfirmationRequest>, Error>; fn requests_to_confirm(&self) -> Result<Vec<ConfirmationRequest>, Error>;
/// Confirm specific request. /// Confirm specific request.
#[rpc(name = "signer_confirmRequest")] #[rpc(async, name = "signer_confirmRequest")]
fn confirm_request(&self, U256, TransactionModification, String) -> Result<ConfirmationResponse, Error>; fn confirm_request(&self, U256, TransactionModification, String) -> BoxFuture<ConfirmationResponse, Error>;
/// Confirm specific request with token. /// Confirm specific request with token.
#[rpc(name = "signer_confirmRequestWithToken")] #[rpc(async, name = "signer_confirmRequestWithToken")]
fn confirm_request_with_token(&self, U256, TransactionModification, String) -> Result<ConfirmationResponseWithToken, Error>; fn confirm_request_with_token(&self, U256, TransactionModification, String) -> BoxFuture<ConfirmationResponseWithToken, Error>;
/// Confirm specific request with already signed data. /// Confirm specific request with already signed data.
#[rpc(name = "signer_confirmRequestRaw")] #[rpc(name = "signer_confirmRequestRaw")]

View File

@ -83,7 +83,7 @@ pub struct Block {
pub difficulty: U256, pub difficulty: U256,
/// Total difficulty /// Total difficulty
#[serde(rename="totalDifficulty")] #[serde(rename="totalDifficulty")]
pub total_difficulty: U256, pub total_difficulty: Option<U256>,
/// Seal fields /// Seal fields
#[serde(rename="sealFields")] #[serde(rename="sealFields")]
pub seal_fields: Vec<Bytes>, pub seal_fields: Vec<Bytes>,
@ -164,7 +164,7 @@ mod tests {
logs_bloom: H2048::default(), logs_bloom: H2048::default(),
timestamp: U256::default(), timestamp: U256::default(),
difficulty: U256::default(), difficulty: U256::default(),
total_difficulty: U256::default(), total_difficulty: Some(U256::default()),
seal_fields: vec![Bytes::default(), Bytes::default()], seal_fields: vec![Bytes::default(), Bytes::default()],
uncles: vec![], uncles: vec![],
transactions: BlockTransactions::Hashes(vec![].into()), transactions: BlockTransactions::Hashes(vec![].into()),

View File

@ -43,7 +43,7 @@ pub const WARP_SYNC_PROTOCOL_ID: ProtocolId = *b"par";
/// Ethereum sync protocol /// Ethereum sync protocol
pub const ETH_PROTOCOL: ProtocolId = *b"eth"; pub const ETH_PROTOCOL: ProtocolId = *b"eth";
/// Ethereum light protocol /// Ethereum light protocol
pub const LES_PROTOCOL: ProtocolId = *b"les"; pub const LIGHT_PROTOCOL: ProtocolId = *b"plp";
/// Sync configuration /// Sync configuration
#[derive(Debug, Clone, Copy)] #[derive(Debug, Clone, Copy)]
@ -56,7 +56,7 @@ pub struct SyncConfig {
pub network_id: u64, pub network_id: u64,
/// Main "eth" subprotocol name. /// Main "eth" subprotocol name.
pub subprotocol_name: [u8; 3], pub subprotocol_name: [u8; 3],
/// Light "les" subprotocol name. /// Light subprotocol name.
pub light_subprotocol_name: [u8; 3], pub light_subprotocol_name: [u8; 3],
/// Fork block to check /// Fork block to check
pub fork_block: Option<(BlockNumber, H256)>, pub fork_block: Option<(BlockNumber, H256)>,
@ -73,7 +73,7 @@ impl Default for SyncConfig {
download_old_blocks: true, download_old_blocks: true,
network_id: 1, network_id: 1,
subprotocol_name: ETH_PROTOCOL, subprotocol_name: ETH_PROTOCOL,
light_subprotocol_name: LES_PROTOCOL, light_subprotocol_name: LIGHT_PROTOCOL,
fork_block: None, fork_block: None,
warp_sync: false, warp_sync: false,
serve_light: false, serve_light: false,
@ -674,6 +674,16 @@ impl LightSync {
subprotocol_name: params.subprotocol_name, subprotocol_name: params.subprotocol_name,
}) })
} }
/// Execute a closure with a protocol context.
pub fn with_context<F, T>(&self, f: F) -> Option<T>
where F: FnOnce(&::light::net::BasicContext) -> T
{
self.network.with_context_eval(
self.subprotocol_name,
move |ctx| self.proto.with_context(ctx, f),
)
}
} }
impl ManageNetwork for LightSync { impl ManageNetwork for LightSync {

View File

@ -28,7 +28,7 @@ fn basic_sync() {
net.sync(); net.sync();
assert!(net.peer(0).light_chain().get_header(BlockId::Number(6000)).is_some()); assert!(net.peer(0).light_chain().block_header(BlockId::Number(6000)).is_some());
} }
#[test] #[test]
@ -49,15 +49,15 @@ fn fork_post_cht() {
let _ = light_chain.import_header(header); let _ = light_chain.import_header(header);
light_chain.flush_queue(); light_chain.flush_queue();
light_chain.import_verified(); light_chain.import_verified();
assert!(light_chain.get_header(id).is_some()); assert!(light_chain.block_header(id).is_some());
} }
net.sync(); net.sync();
for id in (0..CHAIN_LENGTH).map(|x| x + 1).map(BlockId::Number) { for id in (0..CHAIN_LENGTH).map(|x| x + 1).map(BlockId::Number) {
assert_eq!( assert_eq!(
net.peer(0).light_chain().get_header(id), net.peer(0).light_chain().block_header(id).unwrap(),
net.peer(2).chain().block_header(id).map(|h| h.into_inner()) net.peer(2).chain().block_header(id).unwrap()
); );
} }
} }

View File

@ -983,14 +983,14 @@ impl Host {
self.nodes.write().update(node_changes, &*self.reserved_nodes.read()); self.nodes.write().update(node_changes, &*self.reserved_nodes.read());
} }
pub fn with_context<F>(&self, protocol: ProtocolId, io: &IoContext<NetworkIoMessage>, action: F) where F: Fn(&NetworkContext) { pub fn with_context<F>(&self, protocol: ProtocolId, io: &IoContext<NetworkIoMessage>, action: F) where F: FnOnce(&NetworkContext) {
let reserved = { self.reserved_nodes.read() }; let reserved = { self.reserved_nodes.read() };
let context = NetworkContext::new(io, protocol, None, self.sessions.clone(), &reserved); let context = NetworkContext::new(io, protocol, None, self.sessions.clone(), &reserved);
action(&context); action(&context);
} }
pub fn with_context_eval<F, T>(&self, protocol: ProtocolId, io: &IoContext<NetworkIoMessage>, action: F) -> T where F: Fn(&NetworkContext) -> T { pub fn with_context_eval<F, T>(&self, protocol: ProtocolId, io: &IoContext<NetworkIoMessage>, action: F) -> T where F: FnOnce(&NetworkContext) -> T {
let reserved = { self.reserved_nodes.read() }; let reserved = { self.reserved_nodes.read() };
let context = NetworkContext::new(io, protocol, None, self.sessions.clone(), &reserved); let context = NetworkContext::new(io, protocol, None, self.sessions.clone(), &reserved);

View File

@ -177,7 +177,7 @@ impl NetworkService {
} }
/// Executes action in the network context /// Executes action in the network context
pub fn with_context<F>(&self, protocol: ProtocolId, action: F) where F: Fn(&NetworkContext) { pub fn with_context<F>(&self, protocol: ProtocolId, action: F) where F: FnOnce(&NetworkContext) {
let io = IoContext::new(self.io_service.channel(), 0); let io = IoContext::new(self.io_service.channel(), 0);
let host = self.host.read(); let host = self.host.read();
if let Some(ref host) = host.as_ref() { if let Some(ref host) = host.as_ref() {
@ -186,7 +186,7 @@ impl NetworkService {
} }
/// Evaluates function in the network context /// Evaluates function in the network context
pub fn with_context_eval<F, T>(&self, protocol: ProtocolId, action: F) -> Option<T> where F: Fn(&NetworkContext) -> T { pub fn with_context_eval<F, T>(&self, protocol: ProtocolId, action: F) -> Option<T> where F: FnOnce(&NetworkContext) -> T {
let io = IoContext::new(self.io_service.channel(), 0); let io = IoContext::new(self.io_service.channel(), 0);
let host = self.host.read(); let host = self.host.read();
host.as_ref().map(|ref host| host.with_context_eval(protocol, &io, action)) host.as_ref().map(|ref host| host.with_context_eval(protocol, &io, action))