diff --git a/Cargo.lock b/Cargo.lock index 97ebf381c..604a69a44 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -601,6 +601,7 @@ dependencies = [ "ethcore-devtools 1.6.0", "ethcore-io 1.6.0", "ethcore-ipc 1.6.0", + "ethcore-light 1.6.0", "ethcore-util 1.6.0", "ethcrypto 0.1.0", "ethjson 0.1.0", diff --git a/ethcore/light/src/client/header_chain.rs b/ethcore/light/src/client/header_chain.rs index 53c726b69..403d3555d 100644 --- a/ethcore/light/src/client/header_chain.rs +++ b/ethcore/light/src/client/header_chain.rs @@ -32,9 +32,10 @@ use cht; use ethcore::block_status::BlockStatus; use ethcore::error::BlockError; +use ethcore::encoded; +use ethcore::header::Header; use ethcore::ids::BlockId; -use ethcore::views::HeaderView; -use util::{Bytes, H256, U256, HeapSizeOf, Mutex, RwLock}; +use util::{H256, U256, HeapSizeOf, Mutex, RwLock}; use smallvec::SmallVec; @@ -77,9 +78,9 @@ impl HeapSizeOf for Entry { /// Header chain. See module docs for more details. pub struct HeaderChain { - genesis_header: Bytes, // special-case the genesis. + genesis_header: encoded::Header, // special-case the genesis. candidates: RwLock>, - headers: RwLock>, + headers: RwLock>, best_block: RwLock, cht_roots: Mutex>, } @@ -87,10 +88,12 @@ pub struct HeaderChain { impl HeaderChain { /// Create a new header chain given this genesis block. pub fn new(genesis: &[u8]) -> Self { + use ethcore::views::HeaderView; + let g_view = HeaderView::new(genesis); HeaderChain { - genesis_header: genesis.to_owned(), + genesis_header: encoded::Header::new(genesis.to_owned()), best_block: RwLock::new(BlockDescriptor { hash: g_view.hash(), number: 0, @@ -104,14 +107,11 @@ impl HeaderChain { /// Insert a pre-verified header. /// - /// This blindly trusts that the data given to it is - /// a) valid RLP encoding of a header and - /// b) has sensible data contained within it. - pub fn insert(&self, header: Bytes) -> Result<(), BlockError> { - let view = HeaderView::new(&header); - let hash = view.hash(); - let number = view.number(); - let parent_hash = view.parent_hash(); + /// This blindly trusts that the data given to it is sensible. + pub fn insert(&self, header: Header) -> Result<(), BlockError> { + let hash = header.hash(); + let number = header.number(); + let parent_hash = *header.parent_hash(); // hold candidates the whole time to guard import order. let mut candidates = self.candidates.write(); @@ -119,8 +119,7 @@ impl HeaderChain { // find parent details. let parent_td = if number == 1 { - let g_view = HeaderView::new(&self.genesis_header); - g_view.difficulty() + self.genesis_header.difficulty() } else { candidates.get(&(number - 1)) .and_then(|entry| entry.candidates.iter().find(|c| c.hash == parent_hash)) @@ -128,7 +127,7 @@ impl HeaderChain { .ok_or_else(|| BlockError::UnknownParent(parent_hash))? }; - let total_difficulty = parent_td + view.difficulty(); + let total_difficulty = parent_td + *header.difficulty(); // insert headers and candidates entries. candidates.entry(number).or_insert_with(|| Entry { candidates: SmallVec::new(), canonical_hash: hash }) @@ -138,7 +137,8 @@ impl HeaderChain { total_difficulty: total_difficulty, }); - self.headers.write().insert(hash, header.clone()); + let raw = ::rlp::encode(&header).to_vec(); + self.headers.write().insert(hash, encoded::Header::new(raw)); // reorganize ancestors so canonical entries are first in their // respective candidates vectors. @@ -211,24 +211,36 @@ impl HeaderChain { /// Get a block header. In the case of query by number, only canonical blocks /// will be returned. - pub fn get_header(&self, id: BlockId) -> Option { + pub fn block_header(&self, id: BlockId) -> Option { match id { BlockId::Earliest | BlockId::Number(0) => Some(self.genesis_header.clone()), - BlockId::Latest if self.headers.read().is_empty() => Some(self.genesis_header.clone()), - BlockId::Hash(hash) => self.headers.read().get(&hash).map(|x| x.to_vec()), + BlockId::Hash(hash) => self.headers.read().get(&hash).cloned(), BlockId::Number(num) => { if self.best_block.read().number < num { return None } self.candidates.read().get(&num).map(|entry| entry.canonical_hash) - .and_then(|hash| self.headers.read().get(&hash).map(|x| x.to_vec())) + .and_then(|hash| self.headers.read().get(&hash).cloned()) } BlockId::Latest | BlockId::Pending => { - let hash = self.best_block.read().hash; - self.headers.read().get(&hash).map(|x| x.to_vec()) + let hash = { + let best = self.best_block.read(); + if best.number == 0 { + return Some(self.genesis_header.clone()) + } + + best.hash + }; + + self.headers.read().get(&hash).cloned() } } } + /// Get the best block's header. + pub fn best_header(&self) -> encoded::Header { + self.block_header(BlockId::Latest).expect("Header for best block always stored; qed") + } + /// Get the nth CHT root, if it's been computed. /// /// CHT root 0 is from block `1..2048`. @@ -305,15 +317,15 @@ mod tests { header.set_number(i); header.set_timestamp(rolling_timestamp); header.set_difficulty(*genesis_header.difficulty() * i.into()); - - chain.insert(::rlp::encode(&header).to_vec()).unwrap(); - parent_hash = header.hash(); + + chain.insert(header).unwrap(); + rolling_timestamp += 10; } - assert!(chain.get_header(BlockId::Number(10)).is_none()); - assert!(chain.get_header(BlockId::Number(9000)).is_some()); + assert!(chain.block_header(BlockId::Number(10)).is_none()); + assert!(chain.block_header(BlockId::Number(9000)).is_some()); assert!(chain.cht_root(2).is_some()); assert!(chain.cht_root(3).is_none()); } @@ -333,10 +345,10 @@ mod tests { header.set_number(i); header.set_timestamp(rolling_timestamp); header.set_difficulty(*genesis_header.difficulty() * i.into()); - - chain.insert(::rlp::encode(&header).to_vec()).unwrap(); - parent_hash = header.hash(); + + chain.insert(header).unwrap(); + rolling_timestamp += 10; } @@ -349,10 +361,10 @@ mod tests { header.set_number(i); header.set_timestamp(rolling_timestamp); header.set_difficulty(*genesis_header.difficulty() * i.into()); - - chain.insert(::rlp::encode(&header).to_vec()).unwrap(); - parent_hash = header.hash(); + + chain.insert(header).unwrap(); + rolling_timestamp += 10; } } @@ -370,10 +382,10 @@ mod tests { header.set_number(i); header.set_timestamp(rolling_timestamp); header.set_difficulty(*genesis_header.difficulty() * (i * i).into()); - - chain.insert(::rlp::encode(&header).to_vec()).unwrap(); - parent_hash = header.hash(); + + chain.insert(header).unwrap(); + rolling_timestamp += 11; } } @@ -382,11 +394,23 @@ mod tests { assert_eq!(num, 12); while num > 0 { - let header: Header = ::rlp::decode(&chain.get_header(BlockId::Number(num)).unwrap()); + let header = chain.block_header(BlockId::Number(num)).unwrap(); assert_eq!(header.hash(), canon_hash); - canon_hash = *header.parent_hash(); + canon_hash = header.parent_hash(); num -= 1; } } + + #[test] + fn earliest_is_latest() { + let spec = Spec::new_test(); + let genesis_header = spec.genesis_header(); + + let chain = HeaderChain::new(&::rlp::encode(&genesis_header)); + + assert!(chain.block_header(BlockId::Earliest).is_some()); + assert!(chain.block_header(BlockId::Latest).is_some()); + assert!(chain.block_header(BlockId::Pending).is_some()); + } } diff --git a/ethcore/light/src/client/mod.rs b/ethcore/light/src/client/mod.rs index c2b57be24..ea4660abc 100644 --- a/ethcore/light/src/client/mod.rs +++ b/ethcore/light/src/client/mod.rs @@ -21,7 +21,6 @@ use ethcore::block_status::BlockStatus; use ethcore::client::ClientReport; use ethcore::ids::BlockId; use ethcore::header::Header; -use ethcore::views::HeaderView; use ethcore::verification::queue::{self, HeaderQueue}; use ethcore::transaction::{PendingTransaction, Condition as TransactionCondition}; use ethcore::blockchain_info::BlockChainInfo; @@ -35,7 +34,6 @@ use util::{Bytes, Mutex, RwLock}; use provider::Provider; use request; -use time; use self::header_chain::HeaderChain; @@ -109,12 +107,12 @@ impl Client { /// Fetch a vector of all pending transactions. pub fn ready_transactions(&self) -> Vec { - let best_num = self.chain.best_block().number; + let best = self.chain.best_header(); self.tx_pool.lock() .values() .filter(|t| match t.condition { - Some(TransactionCondition::Number(ref x)) => x <= &best_num, - Some(TransactionCondition::Timestamp(ref x)) => *x <= time::get_time().sec as u64, + Some(TransactionCondition::Number(x)) => x <= best.number(), + Some(TransactionCondition::Timestamp(x)) => x <= best.timestamp(), None => true, }) .cloned() @@ -131,17 +129,19 @@ impl Client { /// Get the chain info. pub fn chain_info(&self) -> BlockChainInfo { - let best_block = self.chain.best_block(); + let best_hdr = self.chain.best_header(); + let best_td = self.chain.best_block().total_difficulty; + let first_block = self.chain.first_block(); let genesis_hash = self.chain.genesis_hash(); BlockChainInfo { - total_difficulty: best_block.total_difficulty, - pending_total_difficulty: best_block.total_difficulty + self.queue.total_difficulty(), + total_difficulty: best_td, + pending_total_difficulty: best_td + self.queue.total_difficulty(), genesis_hash: genesis_hash, - best_block_hash: best_block.hash, - best_block_number: best_block.number, - best_block_timestamp: HeaderView::new(&self.chain.get_header(BlockId::Latest).expect("Latest hash is always in the chain")).timestamp(), + best_block_hash: best_hdr.hash(), + best_block_number: best_hdr.number(), + best_block_timestamp: best_hdr.timestamp(), ancient_block_hash: if first_block.is_some() { Some(genesis_hash) } else { None }, ancient_block_number: if first_block.is_some() { Some(0) } else { None }, first_block_hash: first_block.as_ref().map(|first| first.hash), @@ -155,8 +155,8 @@ impl Client { } /// Get a block header by Id. - pub fn get_header(&self, id: BlockId) -> Option { - self.chain.get_header(id) + pub fn block_header(&self, id: BlockId) -> Option { + self.chain.block_header(id) } /// Flush the header queue. @@ -180,7 +180,7 @@ impl Client { for verified_header in self.queue.drain(MAX) { let (num, hash) = (verified_header.number(), verified_header.hash()); - match self.chain.insert(::rlp::encode(&verified_header).to_vec()) { + match self.chain.insert(verified_header) { Ok(()) => { good.push(hash); self.report.write().blocks_imported += 1; @@ -252,7 +252,7 @@ impl Provider for Client { } fn block_header(&self, id: BlockId) -> Option { - self.chain.get_header(id).map(encoded::Header::new) + Client::block_header(self, id) } fn block_body(&self, _id: BlockId) -> Option { diff --git a/ethcore/light/src/on_demand/mod.rs b/ethcore/light/src/on_demand/mod.rs index 10be00dd7..c34e2d922 100644 --- a/ethcore/light/src/on_demand/mod.rs +++ b/ethcore/light/src/on_demand/mod.rs @@ -25,52 +25,17 @@ use ethcore::encoded; use ethcore::receipt::Receipt; use futures::{Async, Poll, Future}; -use futures::sync::oneshot; +use futures::sync::oneshot::{self, Sender, Receiver}; use network::PeerId; +use rlp::{RlpStream, Stream}; +use util::{Bytes, RwLock, U256}; +use util::sha3::{SHA3_NULL_RLP, SHA3_EMPTY_LIST_RLP}; use net::{Handler, Status, Capabilities, Announcement, EventContext, BasicContext, ReqId}; -use util::{Bytes, RwLock, U256}; use types::les_request::{self as les_request, Request as LesRequest}; pub mod request; -/// Errors which can occur while trying to fulfill a request. -#[derive(Debug, Clone, Copy, PartialEq)] -pub enum Error { - /// Request was canceled. - Canceled, - /// No suitable peers available to serve the request. - NoPeersAvailable, - /// Invalid request. - InvalidRequest, - /// Request timed out. - TimedOut, -} - -impl From for Error { - fn from(_: oneshot::Canceled) -> Self { - Error::Canceled - } -} - -/// Future which awaits a response to an on-demand request. -pub struct Response(oneshot::Receiver>); - -impl Future for Response { - type Item = T; - type Error = Error; - - fn poll(&mut self) -> Poll { - match self.0.poll().map_err(Into::into) { - Ok(Async::Ready(val)) => val.map(Async::Ready), - Ok(Async::NotReady) => Ok(Async::NotReady), - Err(e) => Err(e), - } - } -} - -type Sender = oneshot::Sender>; - // relevant peer info. struct Peer { status: Status, @@ -84,6 +49,7 @@ enum Pending { Block(request::Body, Sender), BlockReceipts(request::BlockReceipts, Sender>), Account(request::Account, Sender), + Code(request::Code, Sender), } /// On demand request service. See module docs for more details. @@ -92,6 +58,7 @@ enum Pending { pub struct OnDemand { peers: RwLock>, pending_requests: RwLock>, + orphaned_requests: RwLock>, } impl Default for OnDemand { @@ -99,6 +66,7 @@ impl Default for OnDemand { OnDemand { peers: RwLock::new(HashMap::new()), pending_requests: RwLock::new(HashMap::new()), + orphaned_requests: RwLock::new(Vec::new()), } } } @@ -106,32 +74,27 @@ impl Default for OnDemand { impl OnDemand { /// Request a header by block number and CHT root hash. /// Returns the header and the total difficulty. - pub fn header_by_number(&self, ctx: &BasicContext, req: request::HeaderByNumber) -> Response<(encoded::Header, U256)> { + pub fn header_by_number(&self, ctx: &BasicContext, req: request::HeaderByNumber) -> Receiver<(encoded::Header, U256)> { let (sender, receiver) = oneshot::channel(); self.dispatch_header_by_number(ctx, req, sender); - Response(receiver) + receiver } // dispatch the request, completing the request if no peers available. fn dispatch_header_by_number(&self, ctx: &BasicContext, req: request::HeaderByNumber, sender: Sender<(encoded::Header, U256)>) { - let num = req.num; - let cht_num = match ::cht::block_to_cht_number(req.num) { - Some(cht_num) => cht_num, - None => { - warn!(target: "on_demand", "Attempted to dispatch invalid header proof: req.num == 0"); - sender.complete(Err(Error::InvalidRequest)); - return; - } - }; + let num = req.num(); + let cht_num = req.cht_num(); let les_req = LesRequest::HeaderProofs(les_request::HeaderProofs { requests: vec![les_request::HeaderProof { cht_number: cht_num, - block_number: req.num, + block_number: num, from_level: 0, }], }); + let pending = Pending::HeaderByNumber(req, sender); + // we're looking for a peer with serveHeaders who's far enough along in the // chain. for (id, peer) in self.peers.read().iter() { @@ -141,7 +104,7 @@ impl OnDemand { trace!(target: "on_demand", "Assigning request to peer {}", id); self.pending_requests.write().insert( req_id, - Pending::HeaderByNumber(req, sender) + pending, ); return }, @@ -151,18 +114,17 @@ impl OnDemand { } } - // TODO: retrying trace!(target: "on_demand", "No suitable peer for request"); - sender.complete(Err(Error::NoPeersAvailable)); + self.orphaned_requests.write().push(pending) } /// Request a header by hash. This is less accurate than by-number because we don't know /// where in the chain this header lies, and therefore can't find a peer who is supposed to have /// it as easily. - pub fn header_by_hash(&self, ctx: &BasicContext, req: request::HeaderByHash) -> Response { + pub fn header_by_hash(&self, ctx: &BasicContext, req: request::HeaderByHash) -> Receiver { let (sender, receiver) = oneshot::channel(); self.dispatch_header_by_hash(ctx, req, sender); - Response(receiver) + receiver } fn dispatch_header_by_hash(&self, ctx: &BasicContext, req: request::HeaderByHash, sender: Sender) { @@ -181,16 +143,17 @@ impl OnDemand { .collect::>(); let mut rng = ::rand::thread_rng(); - ::rand::Rng::shuffle(&mut rng, &mut potential_peers); + let pending = Pending::HeaderByHash(req, sender); + for id in potential_peers { match ctx.request_from(id, les_req.clone()) { Ok(req_id) => { trace!(target: "on_demand", "Assigning request to peer {}", id); self.pending_requests.write().insert( req_id, - Pending::HeaderByHash(req, sender), + pending, ); return } @@ -199,18 +162,28 @@ impl OnDemand { } } - // TODO: retrying trace!(target: "on_demand", "No suitable peer for request"); - sender.complete(Err(Error::NoPeersAvailable)); + self.orphaned_requests.write().push(pending) } /// Request a block, given its header. Block bodies are requestable by hash only, /// and the header is required anyway to verify and complete the block body /// -- this just doesn't obscure the network query. - pub fn block(&self, ctx: &BasicContext, req: request::Body) -> Response { + pub fn block(&self, ctx: &BasicContext, req: request::Body) -> Receiver { let (sender, receiver) = oneshot::channel(); - self.dispatch_block(ctx, req, sender); - Response(receiver) + + // fast path for empty body. + if req.header.transactions_root() == SHA3_NULL_RLP && req.header.uncles_hash() == SHA3_EMPTY_LIST_RLP { + let mut stream = RlpStream::new_list(3); + stream.append_raw(&req.header.into_inner(), 1); + stream.begin_list(0); + stream.begin_list(0); + + sender.complete(encoded::Block::new(stream.out())) + } else { + self.dispatch_block(ctx, req, sender); + } + receiver } fn dispatch_block(&self, ctx: &BasicContext, req: request::Body, sender: Sender) { @@ -218,6 +191,7 @@ impl OnDemand { let les_req = LesRequest::Bodies(les_request::Bodies { block_hashes: vec![req.hash], }); + let pending = Pending::Block(req, sender); // we're looking for a peer with serveChainSince(num) for (id, peer) in self.peers.read().iter() { @@ -227,7 +201,7 @@ impl OnDemand { trace!(target: "on_demand", "Assigning request to peer {}", id); self.pending_requests.write().insert( req_id, - Pending::Block(req, sender) + pending, ); return } @@ -237,17 +211,23 @@ impl OnDemand { } } - // TODO: retrying trace!(target: "on_demand", "No suitable peer for request"); - sender.complete(Err(Error::NoPeersAvailable)); + self.orphaned_requests.write().push(pending) } /// Request the receipts for a block. The header serves two purposes: /// provide the block hash to fetch receipts for, and for verification of the receipts root. - pub fn block_receipts(&self, ctx: &BasicContext, req: request::BlockReceipts) -> Response> { + pub fn block_receipts(&self, ctx: &BasicContext, req: request::BlockReceipts) -> Receiver> { let (sender, receiver) = oneshot::channel(); - self.dispatch_block_receipts(ctx, req, sender); - Response(receiver) + + // fast path for empty receipts. + if req.0.receipts_root() == SHA3_NULL_RLP { + sender.complete(Vec::new()) + } else { + self.dispatch_block_receipts(ctx, req, sender); + } + + receiver } fn dispatch_block_receipts(&self, ctx: &BasicContext, req: request::BlockReceipts, sender: Sender>) { @@ -255,6 +235,7 @@ impl OnDemand { let les_req = LesRequest::Receipts(les_request::Receipts { block_hashes: vec![req.0.hash()], }); + let pending = Pending::BlockReceipts(req, sender); // we're looking for a peer with serveChainSince(num) for (id, peer) in self.peers.read().iter() { @@ -264,7 +245,7 @@ impl OnDemand { trace!(target: "on_demand", "Assigning request to peer {}", id); self.pending_requests.write().insert( req_id, - Pending::BlockReceipts(req, sender) + pending, ); return } @@ -274,17 +255,16 @@ impl OnDemand { } } - // TODO: retrying trace!(target: "on_demand", "No suitable peer for request"); - sender.complete(Err(Error::NoPeersAvailable)); + self.orphaned_requests.write().push(pending) } /// Request an account by address and block header -- which gives a hash to query and a state root /// to verify against. - pub fn account(&self, ctx: &BasicContext, req: request::Account) -> Response { + pub fn account(&self, ctx: &BasicContext, req: request::Account) -> Receiver { let (sender, receiver) = oneshot::channel(); self.dispatch_account(ctx, req, sender); - Response(receiver) + receiver } fn dispatch_account(&self, ctx: &BasicContext, req: request::Account, sender: Sender) { @@ -297,6 +277,7 @@ impl OnDemand { from_level: 0, }], }); + let pending = Pending::Account(req, sender); // we're looking for a peer with serveStateSince(num) for (id, peer) in self.peers.read().iter() { @@ -306,7 +287,7 @@ impl OnDemand { trace!(target: "on_demand", "Assigning request to peer {}", id); self.pending_requests.write().insert( req_id, - Pending::Account(req, sender) + pending, ); return } @@ -316,38 +297,125 @@ impl OnDemand { } } - // TODO: retrying trace!(target: "on_demand", "No suitable peer for request"); - sender.complete(Err(Error::NoPeersAvailable)); + self.orphaned_requests.write().push(pending) + } + + /// Request code by address, known code hash, and block header. + pub fn code(&self, ctx: &BasicContext, req: request::Code) -> Receiver { + let (sender, receiver) = oneshot::channel(); + + // fast path for no code. + if req.code_hash == ::util::sha3::SHA3_EMPTY { + sender.complete(Vec::new()) + } else { + self.dispatch_code(ctx, req, sender); + } + + receiver + } + + fn dispatch_code(&self, ctx: &BasicContext, req: request::Code, sender: Sender) { + let num = req.block_id.1; + let les_req = LesRequest::Codes(les_request::ContractCodes { + code_requests: vec![les_request::ContractCode { + block_hash: req.block_id.0, + account_key: ::util::Hashable::sha3(&req.address), + }] + }); + let pending = Pending::Code(req, sender); + + // we're looking for a peer with serveStateSince(num) + for (id, peer) in self.peers.read().iter() { + if peer.capabilities.serve_state_since.as_ref().map_or(false, |x| *x >= num) { + match ctx.request_from(*id, les_req.clone()) { + Ok(req_id) => { + trace!(target: "on_demand", "Assigning request to peer {}", id); + self.pending_requests.write().insert( + req_id, + pending + ); + return + } + Err(e) => + trace!(target: "on_demand", "Failed to make request of peer {}: {:?}", id, e), + } + } + } + + trace!(target: "on_demand", "No suitable peer for request"); + self.orphaned_requests.write().push(pending) + } + + // dispatch orphaned requests, and discard those for which the corresponding + // receiver has been dropped. + fn dispatch_orphaned(&self, ctx: &BasicContext) { + // wrapper future for calling `poll_cancel` on our `Senders` to preserve + // the invariant that it's always within a task. + struct CheckHangup<'a, T: 'a>(&'a mut Sender); + + impl<'a, T: 'a> Future for CheckHangup<'a, T> { + type Item = bool; + type Error = (); + + fn poll(&mut self) -> Poll { + Ok(Async::Ready(match self.0.poll_cancel() { + Ok(Async::NotReady) => false, // hasn't hung up. + _ => true, // has hung up. + })) + } + } + + // check whether a sender's hung up (using `wait` to preserve the task invariant) + // returns true if has hung up, false otherwise. + fn check_hangup(send: &mut Sender) -> bool { + CheckHangup(send).wait().expect("CheckHangup always returns ok; qed") + } + + if self.orphaned_requests.read().is_empty() { return } + + let to_dispatch = ::std::mem::replace(&mut *self.orphaned_requests.write(), Vec::new()); + + for orphaned in to_dispatch { + match orphaned { + Pending::HeaderByNumber(req, mut sender) => + if !check_hangup(&mut sender) { self.dispatch_header_by_number(ctx, req, sender) }, + Pending::HeaderByHash(req, mut sender) => + if !check_hangup(&mut sender) { self.dispatch_header_by_hash(ctx, req, sender) }, + Pending::Block(req, mut sender) => + if !check_hangup(&mut sender) { self.dispatch_block(ctx, req, sender) }, + Pending::BlockReceipts(req, mut sender) => + if !check_hangup(&mut sender) { self.dispatch_block_receipts(ctx, req, sender) }, + Pending::Account(req, mut sender) => + if !check_hangup(&mut sender) { self.dispatch_account(ctx, req, sender) }, + Pending::Code(req, mut sender) => + if !check_hangup(&mut sender) { self.dispatch_code(ctx, req, sender) }, + } + } } } impl Handler for OnDemand { fn on_connect(&self, ctx: &EventContext, status: &Status, capabilities: &Capabilities) { self.peers.write().insert(ctx.peer(), Peer { status: status.clone(), capabilities: capabilities.clone() }); + self.dispatch_orphaned(ctx.as_basic()); } fn on_disconnect(&self, ctx: &EventContext, unfulfilled: &[ReqId]) { self.peers.write().remove(&ctx.peer()); let ctx = ctx.as_basic(); - for unfulfilled in unfulfilled { - if let Some(pending) = self.pending_requests.write().remove(unfulfilled) { - trace!(target: "on_demand", "Attempting to reassign dropped request"); - match pending { - Pending::HeaderByNumber(req, sender) - => self.dispatch_header_by_number(ctx, req, sender), - Pending::HeaderByHash(req, sender) - => self.dispatch_header_by_hash(ctx, req, sender), - Pending::Block(req, sender) - => self.dispatch_block(ctx, req, sender), - Pending::BlockReceipts(req, sender) - => self.dispatch_block_receipts(ctx, req, sender), - Pending::Account(req, sender) - => self.dispatch_account(ctx, req, sender), + { + let mut orphaned = self.orphaned_requests.write(); + for unfulfilled in unfulfilled { + if let Some(pending) = self.pending_requests.write().remove(unfulfilled) { + trace!(target: "on_demand", "Attempting to reassign dropped request"); + orphaned.push(pending); } } } + + self.dispatch_orphaned(ctx); } fn on_announcement(&self, ctx: &EventContext, announcement: &Announcement) { @@ -356,6 +424,8 @@ impl Handler for OnDemand { peer.status.update_from(&announcement); peer.capabilities.update_from(&announcement); } + + self.dispatch_orphaned(ctx.as_basic()); } fn on_header_proofs(&self, ctx: &EventContext, req_id: ReqId, proofs: &[(Bytes, Vec)]) { @@ -370,7 +440,7 @@ impl Handler for OnDemand { if let Some(&(ref header, ref proof)) = proofs.get(0) { match req.check_response(header, proof) { Ok(header) => { - sender.complete(Ok(header)); + sender.complete(header); return } Err(e) => { @@ -398,7 +468,7 @@ impl Handler for OnDemand { if let Some(ref header) = headers.get(0) { match req.check_response(header) { Ok(header) => { - sender.complete(Ok(header)); + sender.complete(header); return } Err(e) => { @@ -426,7 +496,7 @@ impl Handler for OnDemand { if let Some(ref block) = bodies.get(0) { match req.check_response(block) { Ok(block) => { - sender.complete(Ok(block)); + sender.complete(block); return } Err(e) => { @@ -454,7 +524,7 @@ impl Handler for OnDemand { if let Some(ref receipts) = receipts.get(0) { match req.check_response(receipts) { Ok(receipts) => { - sender.complete(Ok(receipts)); + sender.complete(receipts); return } Err(e) => { @@ -482,7 +552,7 @@ impl Handler for OnDemand { if let Some(ref proof) = proofs.get(0) { match req.check_response(proof) { Ok(proof) => { - sender.complete(Ok(proof)); + sender.complete(proof); return } Err(e) => { @@ -497,6 +567,38 @@ impl Handler for OnDemand { _ => panic!("Only account request fetches state proof; qed"), } } + + fn on_code(&self, ctx: &EventContext, req_id: ReqId, codes: &[Bytes]) { + let peer = ctx.peer(); + let req = match self.pending_requests.write().remove(&req_id) { + Some(req) => req, + None => return, + }; + + match req { + Pending::Code(req, sender) => { + if let Some(code) = codes.get(0) { + match req.check_response(code.as_slice()) { + Ok(()) => { + sender.complete(code.clone()); + return + } + Err(e) => { + warn!("Error handling response for code request: {:?}", e); + ctx.disable_peer(peer); + } + } + + self.dispatch_code(ctx.as_basic(), req, sender); + } + } + _ => panic!("Only code request fetches code; qed"), + } + } + + fn tick(&self, ctx: &BasicContext) { + self.dispatch_orphaned(ctx) + } } #[cfg(test)] @@ -505,7 +607,6 @@ mod tests { use net::{Announcement, BasicContext, ReqId, Error as LesError}; use request::{Request as LesRequest, Kind as LesRequestKind}; use network::{PeerId, NodeId}; - use futures::Future; use util::H256; struct FakeContext; @@ -522,10 +623,14 @@ mod tests { } #[test] - fn no_peers() { + fn detects_hangup() { let on_demand = OnDemand::default(); let result = on_demand.header_by_hash(&FakeContext, request::HeaderByHash(H256::default())); - assert_eq!(result.wait().unwrap_err(), Error::NoPeersAvailable); + assert!(on_demand.orphaned_requests.read().len() == 1); + drop(result); + + on_demand.dispatch_orphaned(&FakeContext); + assert!(on_demand.orphaned_requests.read().is_empty()); } } diff --git a/ethcore/light/src/on_demand/request.rs b/ethcore/light/src/on_demand/request.rs index 90d6801c8..3964137d9 100644 --- a/ethcore/light/src/on_demand/request.rs +++ b/ethcore/light/src/on_demand/request.rs @@ -37,7 +37,7 @@ pub enum Error { BadProof, /// Wrong header number. WrongNumber(u64, u64), - /// Wrong header hash. + /// Wrong hash. WrongHash(H256, H256), /// Wrong trie root. WrongTrieRoot(H256, H256), @@ -59,12 +59,33 @@ impl From> for Error { #[derive(Debug, Clone, PartialEq, Eq)] pub struct HeaderByNumber { /// The header's number. - pub num: u64, + num: u64, + /// The cht number for the given block number. + cht_num: u64, /// The root of the CHT containing this header. - pub cht_root: H256, + cht_root: H256, } impl HeaderByNumber { + /// Construct a new header-by-number request. Fails if the given number is 0. + /// Provide the expected CHT root to compare against. + pub fn new(num: u64, cht_root: H256) -> Option { + ::cht::block_to_cht_number(num).map(|cht_num| HeaderByNumber { + num: num, + cht_num: cht_num, + cht_root: cht_root, + }) + } + + /// Access the requested block number. + pub fn num(&self) -> u64 { self.num } + + /// Access the CHT number. + pub fn cht_num(&self) -> u64 { self.cht_num } + + /// Access the expected CHT root. + pub fn cht_root(&self) -> H256 { self.cht_root } + /// Check a response with a header and cht proof. pub fn check_response(&self, header: &[u8], proof: &[Bytes]) -> Result<(encoded::Header, U256), Error> { let (expected_hash, td) = match ::cht::check_proof(proof, self.num, self.cht_root) { @@ -106,6 +127,15 @@ pub struct Body { } impl Body { + /// Create a request for a block body from a given header. + pub fn new(header: encoded::Header) -> Self { + let hash = header.hash(); + Body { + header: header, + hash: hash, + } + } + /// Check a response for this block body. pub fn check_response(&self, body: &[u8]) -> Result { let body_view = UntrustedRlp::new(&body); @@ -179,6 +209,28 @@ impl Account { } } +/// Request for account code. +pub struct Code { + /// Block hash, number pair. + pub block_id: (H256, u64), + /// Address requested. + pub address: Address, + /// Account's code hash. + pub code_hash: H256, +} + +impl Code { + /// Check a response with code against the code hash. + pub fn check_response(&self, code: &[u8]) -> Result<(), Error> { + let found_hash = code.sha3(); + if found_hash == self.code_hash { + Ok(()) + } else { + Err(Error::WrongHash(self.code_hash, found_hash)) + } + } +} + #[cfg(test)] mod tests { use super::*; @@ -191,6 +243,11 @@ mod tests { use ethcore::encoded; use ethcore::receipt::Receipt; + #[test] + fn no_invalid_header_by_number() { + assert!(HeaderByNumber::new(0, Default::default()).is_none()) + } + #[test] fn check_header_by_number() { use ::cht; @@ -213,10 +270,7 @@ mod tests { }; let proof = cht.prove(10_000, 0).unwrap().unwrap(); - let req = HeaderByNumber { - num: 10_000, - cht_root: cht.root(), - }; + let req = HeaderByNumber::new(10_000, cht.root()).unwrap(); let raw_header = test_client.block_header(::ethcore::ids::BlockId::Number(10_000)).unwrap(); @@ -319,4 +373,17 @@ mod tests { assert!(req.check_response(&proof[..]).is_ok()); } + + #[test] + fn check_code() { + let code = vec![1u8; 256]; + let req = Code { + block_id: (Default::default(), 2), + address: Default::default(), + code_hash: ::util::Hashable::sha3(&code), + }; + + assert!(req.check_response(&code).is_ok()); + assert!(req.check_response(&[]).is_err()); + } } diff --git a/ethcore/src/types/encoded.rs b/ethcore/src/types/encoded.rs index 0db71ee58..49f83f0b7 100644 --- a/ethcore/src/types/encoded.rs +++ b/ethcore/src/types/encoded.rs @@ -28,7 +28,7 @@ use header::{BlockNumber, Header as FullHeader}; use transaction::UnverifiedTransaction; use views; -use util::{Address, Hashable, H256, H2048, U256}; +use util::{Address, Hashable, H256, H2048, U256, HeapSizeOf}; use rlp::{Rlp, View}; /// Owning header view. @@ -36,6 +36,10 @@ use rlp::{Rlp, View}; #[cfg_attr(feature = "ipc", binary)] pub struct Header(Vec); +impl HeapSizeOf for Header { + fn heap_size_of_children(&self) -> usize { self.0.heap_size_of_children() } +} + impl Header { /// Create a new owning header view. /// Expects the data to be an RLP-encoded header -- any other case will likely lead to @@ -116,6 +120,10 @@ impl Hashable for Header { #[cfg_attr(feature = "ipc", binary)] pub struct Body(Vec); +impl HeapSizeOf for Body { + fn heap_size_of_children(&self) -> usize { self.0.heap_size_of_children() } +} + impl Body { /// Create a new owning block body view. The raw bytes passed in must be an rlp-encoded block /// body. @@ -172,6 +180,10 @@ impl Body { #[cfg_attr(feature = "ipc", binary)] pub struct Block(Vec); +impl HeapSizeOf for Block { + fn heap_size_of_children(&self) -> usize { self.0.heap_size_of_children() } +} + impl Block { /// Create a new owning block view. The raw bytes passed in must be an rlp-encoded block. pub fn new(raw: Vec) -> Self { Block(raw) } diff --git a/ethcore/src/types/transaction.rs b/ethcore/src/types/transaction.rs index 997b97883..506bf001e 100644 --- a/ethcore/src/types/transaction.rs +++ b/ethcore/src/types/transaction.rs @@ -472,6 +472,12 @@ impl PendingTransaction { } } +impl Deref for PendingTransaction { + type Target = SignedTransaction; + + fn deref(&self) -> &SignedTransaction { &self.transaction } +} + impl From for PendingTransaction { fn from(t: SignedTransaction) -> Self { PendingTransaction { diff --git a/parity/rpc_apis.rs b/parity/rpc_apis.rs index 4c5e88406..548e84995 100644 --- a/parity/rpc_apis.rs +++ b/parity/rpc_apis.rs @@ -28,6 +28,7 @@ use ethcore::miner::{Miner, ExternalMiner}; use ethcore::snapshot::SnapshotService; use ethcore_rpc::{Metadata, NetworkSettings}; use ethcore_rpc::informant::{Middleware, RpcStats, ClientNotifier}; +use ethcore_rpc::dispatch::FullDispatcher; use ethsync::{ManageNetwork, SyncProvider}; use hash_fetch::fetch::Client as FetchClient; use jsonrpc_core::{MetaIoHandler}; @@ -176,10 +177,11 @@ macro_rules! add_signing_methods { { let handler = &mut $handler; let deps = &$deps; + let dispatcher = FullDispatcher::new(Arc::downgrade(&deps.client), Arc::downgrade(&deps.miner)); if deps.signer_service.is_enabled() { - handler.extend_with($namespace::to_delegate(SigningQueueClient::new(&deps.signer_service, &deps.client, &deps.miner, &deps.secret_store))) + handler.extend_with($namespace::to_delegate(SigningQueueClient::new(&deps.signer_service, dispatcher, &deps.secret_store))) } else { - handler.extend_with($namespace::to_delegate(SigningUnsafeClient::new(&deps.client, &deps.secret_store, &deps.miner))) + handler.extend_with($namespace::to_delegate(SigningUnsafeClient::new(&deps.secret_store, dispatcher))) } } } @@ -194,6 +196,8 @@ pub fn setup_rpc(stats: Arc, deps: Arc, apis: ApiSet) -> // it's turned into vector, cause ont of the cases requires &[] let apis = apis.list_apis().into_iter().collect::>(); + let dispatcher = FullDispatcher::new(Arc::downgrade(&deps.client), Arc::downgrade(&deps.miner)); + for api in &apis { match *api { Api::Web3 => { @@ -223,10 +227,10 @@ pub fn setup_rpc(stats: Arc, deps: Arc, apis: ApiSet) -> add_signing_methods!(EthSigning, handler, deps); }, Api::Personal => { - handler.extend_with(PersonalClient::new(&deps.secret_store, &deps.client, &deps.miner, deps.geth_compatibility).to_delegate()); + handler.extend_with(PersonalClient::new(&deps.secret_store, dispatcher.clone(), deps.geth_compatibility).to_delegate()); }, Api::Signer => { - handler.extend_with(SignerClient::new(&deps.secret_store, &deps.client, &deps.miner, &deps.signer_service).to_delegate()); + handler.extend_with(SignerClient::new(&deps.secret_store, dispatcher.clone(), &deps.signer_service).to_delegate()); }, Api::Parity => { let signer = match deps.signer_service.is_enabled() { diff --git a/rpc/Cargo.toml b/rpc/Cargo.toml index 63eabb8a7..13d125f59 100644 --- a/rpc/Cargo.toml +++ b/rpc/Cargo.toml @@ -33,6 +33,7 @@ ethash = { path = "../ethash" } ethsync = { path = "../sync" } ethjson = { path = "../json" } ethcore-devtools = { path = "../devtools" } +ethcore-light = { path = "../ethcore/light" } parity-updater = { path = "../updater" } rlp = { path = "../util/rlp" } fetch = { path = "../util/fetch" } diff --git a/rpc/src/lib.rs b/rpc/src/lib.rs index 4f1b151b6..50cde77c5 100644 --- a/rpc/src/lib.rs +++ b/rpc/src/lib.rs @@ -33,6 +33,7 @@ extern crate ethcrypto as crypto; extern crate ethstore; extern crate ethsync; extern crate ethash; +extern crate ethcore_light as light; extern crate transient_hashmap; extern crate jsonrpc_ipc_server as ipc; extern crate ethcore_ipc; @@ -64,7 +65,7 @@ use jsonrpc_core::reactor::RpcHandler; pub use ipc::{Server as IpcServer, Error as IpcServerError}; pub use jsonrpc_http_server::{ServerBuilder, Server, RpcServerError}; pub mod v1; -pub use v1::{SigningQueue, SignerService, ConfirmationsQueue, NetworkSettings, Metadata, Origin, informant}; +pub use v1::{SigningQueue, SignerService, ConfirmationsQueue, NetworkSettings, Metadata, Origin, informant, dispatch}; pub use v1::block_import::is_major_importing; /// Start http server asynchronously and returns result with `Server` handle on success or an error. diff --git a/rpc/src/v1/helpers/dispatch.rs b/rpc/src/v1/helpers/dispatch.rs index e7506370d..8900593ac 100644 --- a/rpc/src/v1/helpers/dispatch.rs +++ b/rpc/src/v1/helpers/dispatch.rs @@ -14,11 +14,15 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . +//! Utilities and helpers for transaction dispatch. + use std::fmt::Debug; use std::ops::Deref; +use std::sync::Weak; + +use futures::{future, Future, BoxFuture}; use rlp::{self, Stream}; use util::{Address, H520, H256, U256, Uint, Bytes}; -use util::bytes::ToPretty; use util::sha3::Hashable; use ethkey::Signature; @@ -38,20 +42,154 @@ use v1::types::{ DecryptRequest as RpcDecryptRequest, }; +/// Has the capability to dispatch, sign, and decrypt. +/// +/// Requires a clone implementation, with the implication that it be cheap; +/// usually just bumping a reference count or two. +pub trait Dispatcher: Send + Sync + Clone { + // TODO: when ATC exist, use zero-cost + // type Out: IntoFuture + + /// Fill optional fields of a transaction request, fetching gas price but not nonce. + fn fill_optional_fields(&self, request: TransactionRequest, default_sender: Address) + -> BoxFuture; + + /// Sign the given transaction request without dispatching, fetching appropriate nonce. + fn sign(&self, accounts: &AccountProvider, filled: FilledTransactionRequest, password: SignWith) + -> BoxFuture, Error>; + + /// "Dispatch" a local transaction. + fn dispatch_transaction(&self, signed_transaction: PendingTransaction) -> Result; +} + +/// A dispatcher which uses references to a client and miner in order to sign +/// requests locally. +#[derive(Debug)] +pub struct FullDispatcher { + client: Weak, + miner: Weak, +} + +impl FullDispatcher { + /// Create a `FullDispatcher` from weak references to a client and miner. + pub fn new(client: Weak, miner: Weak) -> Self { + FullDispatcher { + client: client, + miner: miner, + } + } +} + +impl Clone for FullDispatcher { + fn clone(&self) -> Self { + FullDispatcher { + client: self.client.clone(), + miner: self.miner.clone(), + } + } +} + +impl Dispatcher for FullDispatcher { + fn fill_optional_fields(&self, request: TransactionRequest, default_sender: Address) + -> BoxFuture + { + let (client, miner) = (take_weakf!(self.client), take_weakf!(self.miner)); + let request = request; + future::ok(FilledTransactionRequest { + from: request.from.unwrap_or(default_sender), + used_default_from: request.from.is_none(), + to: request.to, + nonce: request.nonce, + gas_price: request.gas_price.unwrap_or_else(|| default_gas_price(&*client, &*miner)), + gas: request.gas.unwrap_or_else(|| miner.sensible_gas_limit()), + value: request.value.unwrap_or_else(|| 0.into()), + data: request.data.unwrap_or_else(Vec::new), + condition: request.condition, + }).boxed() + } + + fn sign(&self, accounts: &AccountProvider, filled: FilledTransactionRequest, password: SignWith) + -> BoxFuture, Error> + { + let (client, miner) = (take_weakf!(self.client), take_weakf!(self.miner)); + let network_id = client.signing_network_id(); + let address = filled.from; + future::ok({ + let t = Transaction { + nonce: filled.nonce + .or_else(|| miner + .last_nonce(&filled.from) + .map(|nonce| nonce + U256::one())) + .unwrap_or_else(|| client.latest_nonce(&filled.from)), + + action: filled.to.map_or(Action::Create, Action::Call), + gas: filled.gas, + gas_price: filled.gas_price, + value: filled.value, + data: filled.data, + }; + + let hash = t.hash(network_id); + if accounts.is_hardware_address(address) { + let mut stream = rlp::RlpStream::new(); + t.rlp_append_unsigned_transaction(&mut stream, network_id); + let signature = try_bf!( + accounts.sign_with_hardware(address, &stream.as_raw()) + .map_err(|e| { + debug!(target: "miner", "Error signing transaction with hardware wallet: {}", e); + errors::account("Error signing transaction with hardware wallet", e) + }) + ); + let signed = try_bf!( + SignedTransaction::new(t.with_signature(signature, network_id)) + .map_err(|e| { + debug!(target: "miner", "Hardware wallet has produced invalid signature: {}", e); + errors::account("Invalid signature generated", e) + }) + ); + WithToken::No(signed) + } else { + let signature = try_bf!(signature(accounts, address, hash, password)); + signature.map(|sig| { + SignedTransaction::new(t.with_signature(sig, network_id)) + .expect("Transaction was signed by AccountsProvider; it never produces invalid signatures; qed") + }) + } + }).boxed() + } + + fn dispatch_transaction(&self, signed_transaction: PendingTransaction) -> Result { + let hash = signed_transaction.transaction.hash(); + + take_weak!(self.miner).import_own_transaction(&*take_weak!(self.client), signed_transaction) + .map_err(errors::from_transaction_error) + .map(|_| hash) + } +} + +/// default MAC to use. pub const DEFAULT_MAC: [u8; 2] = [0, 0]; -type AccountToken = String; +/// Single-use account token. +pub type AccountToken = String; +/// Values used to unlock accounts for signing. #[derive(Debug, Clone, PartialEq)] pub enum SignWith { + /// Nothing -- implies the account is already unlocked. Nothing, + /// Unlock with password. Password(String), + /// Unlock with single-use token. Token(AccountToken), } +/// A value, potentially accompanied by a signing token. #[derive(Debug)] pub enum WithToken { + /// No token. No(T), + /// With token. Yes(T, AccountToken), } @@ -67,6 +205,7 @@ impl Deref for WithToken { } impl WithToken { + /// Map the value with the given closure, preserving the token. pub fn map(self, f: F) -> WithToken where S: Debug, F: FnOnce(T) -> S, @@ -77,12 +216,21 @@ impl WithToken { } } + /// Convert into inner value, ignoring possible token. pub fn into_value(self) -> T { match self { WithToken::No(v) => v, WithToken::Yes(v, _) => v, } } + + /// Convert the `WithToken` into a tuple. + pub fn into_tuple(self) -> (T, Option) { + match self { + WithToken::No(v) => (v, None), + WithToken::Yes(v, token) => (v, Some(token)) + } + } } impl From<(T, AccountToken)> for WithToken { @@ -91,30 +239,49 @@ impl From<(T, AccountToken)> for WithToken { } } -pub fn execute(client: &C, miner: &M, accounts: &AccountProvider, payload: ConfirmationPayload, pass: SignWith) -> Result, Error> - where C: MiningBlockChainClient, M: MinerService -{ +impl From<(T, Option)> for WithToken { + fn from(tuple: (T, Option)) -> Self { + match tuple.1 { + Some(token) => WithToken::Yes(tuple.0, token), + None => WithToken::No(tuple.0), + } + } +} + +/// Execute a confirmation payload. +pub fn execute( + dispatcher: D, + accounts: &AccountProvider, + payload: ConfirmationPayload, + pass: SignWith +) -> BoxFuture, Error> { match payload { ConfirmationPayload::SendTransaction(request) => { - sign_and_dispatch(client, miner, accounts, request, pass) - .map(|result| result - .map(RpcH256::from) - .map(ConfirmationResponse::SendTransaction) - ) + let condition = request.condition.clone().map(Into::into); + dispatcher.sign(accounts, request, pass) + .map(move |v| v.map(move |tx| PendingTransaction::new(tx, condition))) + .map(WithToken::into_tuple) + .map(|(tx, token)| (tx, token, dispatcher)) + .and_then(|(tx, tok, dispatcher)| { + dispatcher.dispatch_transaction(tx) + .map(RpcH256::from) + .map(ConfirmationResponse::SendTransaction) + .map(move |h| WithToken::from((h, tok))) + }).boxed() }, ConfirmationPayload::SignTransaction(request) => { - sign_no_dispatch(client, miner, accounts, request, pass) + dispatcher.sign(accounts, request, pass) .map(|result| result .map(RpcRichRawTransaction::from) .map(ConfirmationResponse::SignTransaction) - ) + ).boxed() }, ConfirmationPayload::Signature(address, mut data) => { let mut message_data = format!("\x19Ethereum Signed Message:\n{}", data.len()) .into_bytes(); message_data.append(&mut data); - signature(accounts, address, message_data.sha3(), pass) + let res = signature(accounts, address, message_data.sha3(), pass) .map(|result| result .map(|rsv| { let mut vrs = [0u8; 65]; @@ -126,14 +293,16 @@ pub fn execute(client: &C, miner: &M, accounts: &AccountProvider, payload: }) .map(RpcH520::from) .map(ConfirmationResponse::Signature) - ) + ); + future::done(res).boxed() }, ConfirmationPayload::Decrypt(address, data) => { - decrypt(accounts, address, data, pass) + let res = decrypt(accounts, address, data, pass) .map(|result| result .map(RpcBytes) .map(ConfirmationResponse::Decrypt) - ) + ); + future::done(res).boxed() }, } } @@ -160,120 +329,34 @@ fn decrypt(accounts: &AccountProvider, address: Address, msg: Bytes, password: S }) } -pub fn dispatch_transaction(client: &C, miner: &M, signed_transaction: PendingTransaction) -> Result - where C: MiningBlockChainClient, M: MinerService { - let hash = signed_transaction.transaction.hash(); - - miner.import_own_transaction(client, signed_transaction) - .map_err(errors::from_transaction_error) - .map(|_| hash) -} - -pub fn sign_no_dispatch(client: &C, miner: &M, accounts: &AccountProvider, filled: FilledTransactionRequest, password: SignWith) -> Result, Error> - where C: MiningBlockChainClient, M: MinerService { - - let network_id = client.signing_network_id(); - let address = filled.from; - let signed_transaction = { - let t = Transaction { - nonce: filled.nonce - .or_else(|| miner - .last_nonce(&filled.from) - .map(|nonce| nonce + U256::one())) - .unwrap_or_else(|| client.latest_nonce(&filled.from)), - - action: filled.to.map_or(Action::Create, Action::Call), - gas: filled.gas, - gas_price: filled.gas_price, - value: filled.value, - data: filled.data, - }; - - let hash = t.hash(network_id); - if accounts.is_hardware_address(address) { - let mut stream = rlp::RlpStream::new(); - t.rlp_append_unsigned_transaction(&mut stream, network_id); - let signature = accounts.sign_with_hardware(address, &stream.as_raw()) - .map_err(|e| { - debug!(target: "miner", "Error signing transaction with hardware wallet: {}", e); - errors::account("Error signing transaction with hardware wallet", e) - })?; - WithToken::No(SignedTransaction::new(t.with_signature(signature, network_id)) - .map_err(|e| { - debug!(target: "miner", "Hardware wallet has produced invalid signature: {}", e); - errors::account("Invalid signature generated", e) - })?) - } else { - let signature = signature(accounts, address, hash, password)?; - signature.map(|sig| { - SignedTransaction::new(t.with_signature(sig, network_id)) - .expect("Transaction was signed by AccountsProvider; it never produces invalid signatures; qed") - }) - } - }; - Ok(signed_transaction) -} - -pub fn sign_and_dispatch(client: &C, miner: &M, accounts: &AccountProvider, filled: FilledTransactionRequest, password: SignWith) -> Result, Error> - where C: MiningBlockChainClient, M: MinerService -{ - - let network_id = client.signing_network_id(); - let condition = filled.condition.clone(); - let signed_transaction = sign_no_dispatch(client, miner, accounts, filled, password)?; - - let (signed_transaction, token) = match signed_transaction { - WithToken::No(signed_transaction) => (signed_transaction, None), - WithToken::Yes(signed_transaction, token) => (signed_transaction, Some(token)), - }; - - trace!(target: "miner", "send_transaction: dispatching tx: {} for network ID {:?}", rlp::encode(&signed_transaction).to_vec().pretty(), network_id); - let pending_transaction = PendingTransaction::new(signed_transaction, condition.map(Into::into)); - dispatch_transaction(&*client, &*miner, pending_transaction).map(|hash| { - match token { - Some(ref token) => WithToken::Yes(hash, token.clone()), - None => WithToken::No(hash), - } - }) -} - -pub fn fill_optional_fields(request: TransactionRequest, default_sender: Address, client: &C, miner: &M) -> FilledTransactionRequest - where C: MiningBlockChainClient, M: MinerService -{ - FilledTransactionRequest { - from: request.from.unwrap_or(default_sender), - used_default_from: request.from.is_none(), - to: request.to, - nonce: request.nonce, - gas_price: request.gas_price.unwrap_or_else(|| default_gas_price(client, miner)), - gas: request.gas.unwrap_or_else(|| miner.sensible_gas_limit()), - value: request.value.unwrap_or_else(|| 0.into()), - data: request.data.unwrap_or_else(Vec::new), - condition: request.condition, - } -} - +/// Extract the default gas price from a client and miner. pub fn default_gas_price(client: &C, miner: &M) -> U256 where C: MiningBlockChainClient, M: MinerService { client.gas_price_median(100).unwrap_or_else(|| miner.sensible_gas_price()) } -pub fn from_rpc(payload: RpcConfirmationPayload, default_account: Address, client: &C, miner: &M) -> ConfirmationPayload - where C: MiningBlockChainClient, M: MinerService { - +/// Convert RPC confirmation payload to signer confirmation payload. +/// May need to resolve in the future to fetch things like gas price. +pub fn from_rpc(payload: RpcConfirmationPayload, default_account: Address, dispatcher: &D) -> BoxFuture + where D: Dispatcher +{ match payload { RpcConfirmationPayload::SendTransaction(request) => { - ConfirmationPayload::SendTransaction(fill_optional_fields(request.into(), default_account, client, miner)) + dispatcher.fill_optional_fields(request.into(), default_account) + .map(ConfirmationPayload::SendTransaction) + .boxed() }, RpcConfirmationPayload::SignTransaction(request) => { - ConfirmationPayload::SignTransaction(fill_optional_fields(request.into(), default_account, client, miner)) + dispatcher.fill_optional_fields(request.into(), default_account) + .map(ConfirmationPayload::SignTransaction) + .boxed() }, RpcConfirmationPayload::Decrypt(RpcDecryptRequest { address, msg }) => { - ConfirmationPayload::Decrypt(address.into(), msg.into()) + future::ok(ConfirmationPayload::Decrypt(address.into(), msg.into())).boxed() }, RpcConfirmationPayload::Signature(RpcSignRequest { address, data }) => { - ConfirmationPayload::Signature(address.into(), data.into()) + future::ok(ConfirmationPayload::Signature(address.into(), data.into())).boxed() }, } } diff --git a/rpc/src/v1/helpers/mod.rs b/rpc/src/v1/helpers/mod.rs index 04953ae31..1d6bd14f3 100644 --- a/rpc/src/v1/helpers/mod.rs +++ b/rpc/src/v1/helpers/mod.rs @@ -28,6 +28,7 @@ mod requests; mod signer; mod signing_queue; +pub use self::dispatch::{Dispatcher, FullDispatcher}; pub use self::network_settings::NetworkSettings; pub use self::poll_manager::PollManager; pub use self::poll_filter::{PollFilter, limit_logs}; diff --git a/rpc/src/v1/impls/eth.rs b/rpc/src/v1/impls/eth.rs index 1784cda18..7a954ccaf 100644 --- a/rpc/src/v1/impls/eth.rs +++ b/rpc/src/v1/impls/eth.rs @@ -22,7 +22,7 @@ use std::thread; use std::time::{Instant, Duration}; use std::sync::{Arc, Weak}; -use futures::{self, BoxFuture, Future}; +use futures::{self, future, BoxFuture, Future}; use rlp::{self, UntrustedRlp, View}; use time::get_time; use util::{H160, H256, Address, FixedHash, U256, H64, Uint}; @@ -38,7 +38,7 @@ use ethcore::filter::Filter as EthcoreFilter; use ethcore::header::{Header as BlockHeader, BlockNumber as EthBlockNumber}; use ethcore::log_entry::LogEntry; use ethcore::miner::{MinerService, ExternalMinerService}; -use ethcore::transaction::{Transaction as EthTransaction, SignedTransaction, PendingTransaction, Action}; +use ethcore::transaction::{Transaction as EthTransaction, SignedTransaction, Action}; use ethcore::snapshot::SnapshotService; use ethsync::{SyncProvider}; @@ -46,7 +46,7 @@ use jsonrpc_core::Error; use jsonrpc_macros::Trailing; use v1::helpers::{CallRequest as CRequest, errors, limit_logs}; -use v1::helpers::dispatch::{dispatch_transaction, default_gas_price}; +use v1::helpers::dispatch::{Dispatcher, FullDispatcher, default_gas_price}; use v1::helpers::block_import::is_major_importing; use v1::traits::Eth; use v1::types::{ @@ -144,7 +144,7 @@ impl EthClient where logs_bloom: view.log_bloom().into(), timestamp: view.timestamp().into(), difficulty: view.difficulty().into(), - total_difficulty: total_difficulty.into(), + total_difficulty: Some(total_difficulty.into()), seal_fields: view.seal().into_iter().map(Into::into).collect(), uncles: block.uncle_hashes().into_iter().map(Into::into).collect(), transactions: match include_txs { @@ -194,7 +194,7 @@ impl EthClient where logs_bloom: uncle.log_bloom().clone().into(), timestamp: uncle.timestamp().into(), difficulty: uncle.difficulty().clone().into(), - total_difficulty: (uncle.difficulty().clone() + parent_difficulty).into(), + total_difficulty: Some((uncle.difficulty().clone() + parent_difficulty).into()), receipts_root: uncle.receipts_root().clone().into(), extra_data: uncle.extra_data().clone().into(), seal_fields: uncle.seal().into_iter().cloned().map(Into::into).collect(), @@ -356,113 +356,119 @@ impl Eth for EthClient where Ok(RpcU256::from(take_weak!(self.client).chain_info().best_block_number)) } - fn balance(&self, address: RpcH160, num: Trailing) -> Result { + fn balance(&self, address: RpcH160, num: Trailing) -> BoxFuture { let address = address.into(); - match num.0 { - BlockNumber::Pending => Ok(take_weak!(self.miner).balance(&*take_weak!(self.client), &address).into()), - id => { - let client = take_weak!(self.client); - check_known(&*client, id.clone())?; + let res = match num.0.clone() { + BlockNumber::Pending => Ok(take_weakf!(self.miner).balance(&*take_weakf!(self.client), &address).into()), + id => { + let client = take_weakf!(self.client); + + try_bf!(check_known(&*client, id.clone())); match client.balance(&address, id.into()) { Some(balance) => Ok(balance.into()), None => Err(errors::state_pruned()), } } - } + }; + + future::done(res).boxed() } - fn storage_at(&self, address: RpcH160, pos: RpcU256, num: Trailing) -> Result { + fn storage_at(&self, address: RpcH160, pos: RpcU256, num: Trailing) -> BoxFuture { let address: Address = RpcH160::into(address); let position: U256 = RpcU256::into(pos); - match num.0 { - BlockNumber::Pending => Ok(take_weak!(self.miner).storage_at(&*take_weak!(self.client), &address, &H256::from(position)).into()), - id => { - let client = take_weak!(self.client); - check_known(&*client, id.clone())?; + let res = match num.0.clone() { + BlockNumber::Pending => Ok(take_weakf!(self.miner).storage_at(&*take_weakf!(self.client), &address, &H256::from(position)).into()), + id => { + let client = take_weakf!(self.client); + + try_bf!(check_known(&*client, id.clone())); match client.storage_at(&address, &H256::from(position), id.into()) { Some(s) => Ok(s.into()), None => Err(errors::state_pruned()), } } - } + }; + + future::done(res).boxed() } - fn transaction_count(&self, address: RpcH160, num: Trailing) -> Result { + fn transaction_count(&self, address: RpcH160, num: Trailing) -> BoxFuture { let address: Address = RpcH160::into(address); - match num.0 { - BlockNumber::Pending => Ok(take_weak!(self.miner).nonce(&*take_weak!(self.client), &address).into()), + let res = match num.0.clone() { + BlockNumber::Pending => Ok(take_weakf!(self.miner).nonce(&*take_weakf!(self.client), &address).into()), id => { - let client = take_weak!(self.client); + let client = take_weakf!(self.client); - check_known(&*client, id.clone())?; + try_bf!(check_known(&*client, id.clone())); match client.nonce(&address, id.into()) { Some(nonce) => Ok(nonce.into()), None => Err(errors::state_pruned()), } } - } + }; + + future::done(res).boxed() } - fn block_transaction_count_by_hash(&self, hash: RpcH256) -> Result, Error> { - Ok( - take_weak!(self.client).block(BlockId::Hash(hash.into())) - .map(|block| block.transactions_count().into()) - ) + fn block_transaction_count_by_hash(&self, hash: RpcH256) -> BoxFuture, Error> { + future::ok(take_weakf!(self.client).block(BlockId::Hash(hash.into())) + .map(|block| block.transactions_count().into())).boxed() } - fn block_transaction_count_by_number(&self, num: BlockNumber) -> Result, Error> { - match num { - BlockNumber::Pending => Ok(Some( - take_weak!(self.miner).status().transactions_in_pending_block.into() - )), - _ => Ok( - take_weak!(self.client).block(num.into()) - .map(|block| block.transactions_count().into()) - ) - } - } - - fn block_uncles_count_by_hash(&self, hash: RpcH256) -> Result, Error> { - Ok( - take_weak!(self.client).block(BlockId::Hash(hash.into())) - .map(|block| block.uncles_count().into()) - ) - } - - fn block_uncles_count_by_number(&self, num: BlockNumber) -> Result, Error> { - match num { - BlockNumber::Pending => Ok(Some(0.into())), - _ => Ok( - take_weak!(self.client).block(num.into()) - .map(|block| block.uncles_count().into()) + fn block_transaction_count_by_number(&self, num: BlockNumber) -> BoxFuture, Error> { + future::ok(match num { + BlockNumber::Pending => Some( + take_weakf!(self.miner).status().transactions_in_pending_block.into() ), - } + _ => + take_weakf!(self.client).block(num.into()) + .map(|block| block.transactions_count().into()) + }).boxed() } - fn code_at(&self, address: RpcH160, num: Trailing) -> Result { - let address: Address = RpcH160::into(address); - match num.0 { - BlockNumber::Pending => Ok(take_weak!(self.miner).code(&*take_weak!(self.client), &address).map_or_else(Bytes::default, Bytes::new)), - id => { - let client = take_weak!(self.client); + fn block_uncles_count_by_hash(&self, hash: RpcH256) -> BoxFuture, Error> { + future::ok(take_weakf!(self.client).block(BlockId::Hash(hash.into())) + .map(|block| block.uncles_count().into())) + .boxed() + } - check_known(&*client, id.clone())?; + fn block_uncles_count_by_number(&self, num: BlockNumber) -> BoxFuture, Error> { + future::ok(match num { + BlockNumber::Pending => Some(0.into()), + _ => take_weakf!(self.client).block(num.into()) + .map(|block| block.uncles_count().into() + ), + }).boxed() + } + + fn code_at(&self, address: RpcH160, num: Trailing) -> BoxFuture { + let address: Address = RpcH160::into(address); + + let res = match num.0.clone() { + BlockNumber::Pending => Ok(take_weakf!(self.miner).code(&*take_weakf!(self.client), &address).map_or_else(Bytes::default, Bytes::new)), + id => { + let client = take_weakf!(self.client); + + try_bf!(check_known(&*client, id.clone())); match client.code(&address, id.into()) { Some(code) => Ok(code.map_or_else(Bytes::default, Bytes::new)), None => Err(errors::state_pruned()), } } - } + }; + + future::done(res).boxed() } - fn block_by_hash(&self, hash: RpcH256, include_txs: bool) -> Result, Error> { - self.block(BlockId::Hash(hash.into()), include_txs) + fn block_by_hash(&self, hash: RpcH256, include_txs: bool) -> BoxFuture, Error> { + future::done(self.block(BlockId::Hash(hash.into()), include_txs)).boxed() } - fn block_by_number(&self, num: BlockNumber, include_txs: bool) -> Result, Error> { - self.block(num.into(), include_txs) + fn block_by_number(&self, num: BlockNumber, include_txs: bool) -> BoxFuture, Error> { + future::done(self.block(num.into(), include_txs)).boxed() } fn transaction_by_hash(&self, hash: RpcH256) -> Result, Error> { @@ -603,7 +609,8 @@ impl Eth for EthClient where .map_err(errors::from_rlp_error) .and_then(|tx| SignedTransaction::new(tx).map_err(errors::from_transaction_error)) .and_then(|signed_transaction| { - dispatch_transaction(&*take_weak!(self.client), &*take_weak!(self.miner), PendingTransaction::new(signed_transaction, None)) + FullDispatcher::new(self.client.clone(), self.miner.clone()) + .dispatch_transaction(signed_transaction.into()) }) .map(Into::into) } diff --git a/rpc/src/v1/impls/light/eth.rs b/rpc/src/v1/impls/light/eth.rs new file mode 100644 index 000000000..47765bd41 --- /dev/null +++ b/rpc/src/v1/impls/light/eth.rs @@ -0,0 +1,373 @@ +// Copyright 2015-2017 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Eth RPC interface for the light client. + +// TODO: remove when complete. +#![allow(unused_imports, unused_variables)] + +use std::sync::Arc; + +use jsonrpc_core::Error; +use jsonrpc_macros::Trailing; + +use light::client::Client as LightClient; +use light::cht; +use light::on_demand::{request, OnDemand}; + +use ethcore::account_provider::{AccountProvider, DappId}; +use ethcore::basic_account::BasicAccount; +use ethcore::encoded; +use ethcore::ids::BlockId; +use ethsync::LightSync; +use util::sha3::{SHA3_NULL_RLP, SHA3_EMPTY_LIST_RLP}; +use util::U256; + +use futures::{future, Future, BoxFuture}; +use futures::sync::oneshot; + +use v1::helpers::{CallRequest as CRequest, errors, limit_logs}; +use v1::helpers::block_import::is_major_importing; +use v1::traits::Eth; +use v1::types::{ + RichBlock, Block, BlockTransactions, BlockNumber, Bytes, SyncStatus, SyncInfo, + Transaction, CallRequest, Index, Filter, Log, Receipt, Work, + H64 as RpcH64, H256 as RpcH256, H160 as RpcH160, U256 as RpcU256, +}; +use v1::metadata::Metadata; + +use util::Address; + +/// Light client `ETH` RPC. +pub struct EthClient { + sync: Arc, + client: Arc, + on_demand: Arc, + accounts: Arc, +} + +// helper for internal error: no network context. +fn err_no_context() -> Error { + errors::internal("network service detached", "") +} + +// helper for internal error: on demand sender cancelled. +fn err_premature_cancel(_cancel: oneshot::Canceled) -> Error { + errors::internal("on-demand sender prematurely cancelled", "") +} + +impl EthClient { + /// Create a new `EthClient` with a handle to the light sync instance, client, + /// and on-demand request service, which is assumed to be attached as a handler. + pub fn new( + sync: Arc, + client: Arc, + on_demand: Arc, + accounts: Arc, + ) -> Self { + EthClient { + sync: sync, + client: client, + on_demand: on_demand, + accounts: accounts, + } + } + + /// Get a block header from the on demand service or client, or error. + fn header(&self, id: BlockId) -> BoxFuture, Error> { + if let Some(h) = self.client.block_header(id) { + return future::ok(Some(h)).boxed() + } + + let maybe_future = match id { + BlockId::Number(n) => { + let cht_root = cht::block_to_cht_number(n).and_then(|cn| self.client.cht_root(cn as usize)); + match cht_root { + None => return future::ok(None).boxed(), + Some(root) => { + let req = request::HeaderByNumber::new(n, root) + .expect("only fails for 0; client always stores genesis; client already queried; qed"); + + self.sync.with_context(|ctx| + self.on_demand.header_by_number(ctx, req) + .map(|(h, _)| Some(h)) + .map_err(err_premature_cancel) + .boxed() + ) + } + } + } + BlockId::Hash(h) => { + self.sync.with_context(|ctx| + self.on_demand.header_by_hash(ctx, request::HeaderByHash(h)) + .then(|res| future::done(match res { + Ok(h) => Ok(Some(h)), + Err(e) => Err(err_premature_cancel(e)), + })) + .boxed() + ) + } + _ => None, // latest, earliest, and pending will have all already returned. + }; + + // todo: cache returned values (header, TD) + match maybe_future { + Some(recv) => recv, + None => future::err(err_no_context()).boxed() + } + } + + // helper for getting account info at a given block. + fn account(&self, address: Address, id: BlockId) -> BoxFuture, Error> { + let (sync, on_demand) = (self.sync.clone(), self.on_demand.clone()); + + self.header(id).and_then(move |header| { + let header = match header { + None => return future::ok(None).boxed(), + Some(hdr) => hdr, + }; + + sync.with_context(|ctx| on_demand.account(ctx, request::Account { + header: header, + address: address, + }).map(Some)) + .map(|x| x.map_err(err_premature_cancel).boxed()) + .unwrap_or_else(|| future::err(err_no_context()).boxed()) + }).boxed() + } +} + +impl Eth for EthClient { + type Metadata = Metadata; + + fn protocol_version(&self) -> Result { + Ok(format!("{}", ::light::net::MAX_PROTOCOL_VERSION)) + } + + fn syncing(&self) -> Result { + rpc_unimplemented!() + } + + fn author(&self, _meta: Self::Metadata) -> BoxFuture { + future::ok(Default::default()).boxed() + } + + fn is_mining(&self) -> Result { + Ok(false) + } + + fn hashrate(&self) -> Result { + Ok(Default::default()) + } + + fn gas_price(&self) -> Result { + Ok(Default::default()) + } + + fn accounts(&self, meta: Metadata) -> BoxFuture, Error> { + let dapp: DappId = meta.dapp_id.unwrap_or_default().into(); + + let accounts = self.accounts + .note_dapp_used(dapp.clone()) + .and_then(|_| self.accounts.dapps_addresses(dapp)) + .map_err(|e| errors::internal("Could not fetch accounts.", e)) + .map(|accs| accs.into_iter().map(Into::::into).collect()); + + future::done(accounts).boxed() + } + + fn block_number(&self) -> Result { + Ok(self.client.chain_info().best_block_number.into()) + } + + fn balance(&self, address: RpcH160, num: Trailing) -> BoxFuture { + self.account(address.into(), num.0.into()) + .map(|acc| acc.map_or(0.into(), |a| a.balance).into()).boxed() + } + + fn storage_at(&self, _address: RpcH160, _key: RpcU256, _num: Trailing) -> BoxFuture { + future::err(errors::unimplemented(None)).boxed() + } + + fn block_by_hash(&self, hash: RpcH256, include_txs: bool) -> BoxFuture, Error> { + future::err(errors::unimplemented(None)).boxed() + } + + fn block_by_number(&self, num: BlockNumber, include_txs: bool) -> BoxFuture, Error> { + future::err(errors::unimplemented(None)).boxed() + } + + fn transaction_count(&self, address: RpcH160, num: Trailing) -> BoxFuture { + self.account(address.into(), num.0.into()) + .map(|acc| acc.map_or(0.into(), |a| a.nonce).into()).boxed() + } + + fn block_transaction_count_by_hash(&self, hash: RpcH256) -> BoxFuture, Error> { + let (sync, on_demand) = (self.sync.clone(), self.on_demand.clone()); + + self.header(BlockId::Hash(hash.into())).and_then(move |hdr| { + let hdr = match hdr { + None => return future::ok(None).boxed(), + Some(hdr) => hdr, + }; + + if hdr.transactions_root() == SHA3_NULL_RLP { + future::ok(Some(U256::from(0).into())).boxed() + } else { + sync.with_context(|ctx| on_demand.block(ctx, request::Body::new(hdr))) + .map(|x| x.map(|b| Some(U256::from(b.transactions_count()).into()))) + .map(|x| x.map_err(err_premature_cancel).boxed()) + .unwrap_or_else(|| future::err(err_no_context()).boxed()) + } + }).boxed() + } + + fn block_transaction_count_by_number(&self, num: BlockNumber) -> BoxFuture, Error> { + let (sync, on_demand) = (self.sync.clone(), self.on_demand.clone()); + + self.header(num.into()).and_then(move |hdr| { + let hdr = match hdr { + None => return future::ok(None).boxed(), + Some(hdr) => hdr, + }; + + if hdr.transactions_root() == SHA3_NULL_RLP { + future::ok(Some(U256::from(0).into())).boxed() + } else { + sync.with_context(|ctx| on_demand.block(ctx, request::Body::new(hdr))) + .map(|x| x.map(|b| Some(U256::from(b.transactions_count()).into()))) + .map(|x| x.map_err(err_premature_cancel).boxed()) + .unwrap_or_else(|| future::err(err_no_context()).boxed()) + } + }).boxed() + } + + fn block_uncles_count_by_hash(&self, hash: RpcH256) -> BoxFuture, Error> { + let (sync, on_demand) = (self.sync.clone(), self.on_demand.clone()); + + self.header(BlockId::Hash(hash.into())).and_then(move |hdr| { + let hdr = match hdr { + None => return future::ok(None).boxed(), + Some(hdr) => hdr, + }; + + if hdr.uncles_hash() == SHA3_EMPTY_LIST_RLP { + future::ok(Some(U256::from(0).into())).boxed() + } else { + sync.with_context(|ctx| on_demand.block(ctx, request::Body::new(hdr))) + .map(|x| x.map(|b| Some(U256::from(b.uncles_count()).into()))) + .map(|x| x.map_err(err_premature_cancel).boxed()) + .unwrap_or_else(|| future::err(err_no_context()).boxed()) + } + }).boxed() + } + + fn block_uncles_count_by_number(&self, num: BlockNumber) -> BoxFuture, Error> { + let (sync, on_demand) = (self.sync.clone(), self.on_demand.clone()); + + self.header(num.into()).and_then(move |hdr| { + let hdr = match hdr { + None => return future::ok(None).boxed(), + Some(hdr) => hdr, + }; + + if hdr.uncles_hash() == SHA3_EMPTY_LIST_RLP { + future::ok(Some(U256::from(0).into())).boxed() + } else { + sync.with_context(|ctx| on_demand.block(ctx, request::Body::new(hdr))) + .map(|x| x.map(|b| Some(U256::from(b.uncles_count()).into()))) + .map(|x| x.map_err(err_premature_cancel).boxed()) + .unwrap_or_else(|| future::err(err_no_context()).boxed()) + } + }).boxed() + } + + fn code_at(&self, address: RpcH160, num: Trailing) -> BoxFuture { + future::err(errors::unimplemented(None)).boxed() + } + + fn send_raw_transaction(&self, raw: Bytes) -> Result { + Err(errors::unimplemented(None)) + } + + fn submit_transaction(&self, raw: Bytes) -> Result { + Err(errors::unimplemented(None)) + } + + fn call(&self, req: CallRequest, num: Trailing) -> Result { + Err(errors::unimplemented(None)) + } + + fn estimate_gas(&self, req: CallRequest, num: Trailing) -> Result { + Err(errors::unimplemented(None)) + } + + fn transaction_by_hash(&self, hash: RpcH256) -> Result, Error> { + Err(errors::unimplemented(None)) + } + + fn transaction_by_block_hash_and_index(&self, hash: RpcH256, idx: Index) -> Result, Error> { + Err(errors::unimplemented(None)) + } + + fn transaction_by_block_number_and_index(&self, num: BlockNumber, idx: Index) -> Result, Error> { + Err(errors::unimplemented(None)) + } + + fn transaction_receipt(&self, hash: RpcH256) -> Result, Error> { + Err(errors::unimplemented(None)) + } + + fn uncle_by_block_hash_and_index(&self, hash: RpcH256, idx: Index) -> Result, Error> { + Err(errors::unimplemented(None)) + } + + fn uncle_by_block_number_and_index(&self, num: BlockNumber, idx: Index) -> Result, Error> { + Err(errors::unimplemented(None)) + } + + fn compilers(&self) -> Result, Error> { + Err(errors::unimplemented(None)) + } + + fn compile_lll(&self, _code: String) -> Result { + Err(errors::unimplemented(None)) + } + + fn compile_solidity(&self, _code: String) -> Result { + Err(errors::unimplemented(None)) + } + + fn compile_serpent(&self, _code: String) -> Result { + Err(errors::unimplemented(None)) + } + + fn logs(&self, _filter: Filter) -> Result, Error> { + Err(errors::unimplemented(None)) + } + + fn work(&self, _timeout: Trailing) -> Result { + Err(errors::unimplemented(None)) + } + + fn submit_work(&self, _nonce: RpcH64, _pow_hash: RpcH256, _mix_hash: RpcH256) -> Result { + Err(errors::unimplemented(None)) + } + + fn submit_hashrate(&self, _rate: RpcU256, _id: RpcH256) -> Result { + Err(errors::unimplemented(None)) + } +} diff --git a/rpc/src/v1/impls/light/mod.rs b/rpc/src/v1/impls/light/mod.rs new file mode 100644 index 000000000..1772d5b58 --- /dev/null +++ b/rpc/src/v1/impls/light/mod.rs @@ -0,0 +1,21 @@ +// Copyright 2015-2017 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! RPC implementations for the light client. + +pub mod eth; + +pub use self::eth::EthClient; diff --git a/rpc/src/v1/impls/mod.rs b/rpc/src/v1/impls/mod.rs index 72829c2b5..f5149d721 100644 --- a/rpc/src/v1/impls/mod.rs +++ b/rpc/src/v1/impls/mod.rs @@ -16,15 +16,6 @@ //! Ethereum rpc interface implementation. -macro_rules! take_weak { - ($weak: expr) => { - match $weak.upgrade() { - Some(arc) => arc, - None => return Err(Error::internal_error()) - } - } -} - mod eth; mod eth_filter; mod net; @@ -39,6 +30,8 @@ mod rpc; mod traces; mod web3; +pub mod light; + pub use self::web3::Web3Client; pub use self::eth::{EthClient, EthClientOptions}; pub use self::eth_filter::EthFilterClient; diff --git a/rpc/src/v1/impls/personal.rs b/rpc/src/v1/impls/personal.rs index 44e854412..03ce5ffeb 100644 --- a/rpc/src/v1/impls/personal.rs +++ b/rpc/src/v1/impls/personal.rs @@ -18,48 +18,37 @@ use std::sync::{Arc, Weak}; use ethcore::account_provider::AccountProvider; -use ethcore::client::MiningBlockChainClient; -use ethcore::miner::MinerService; -use util::{Address, U128, Uint}; +use ethcore::transaction::PendingTransaction; -use futures::{self, Future, BoxFuture}; +use util::{Address, U128, Uint, ToPretty}; + +use futures::{future, Future, BoxFuture}; use jsonrpc_core::Error; use v1::helpers::errors; -use v1::helpers::dispatch::{self, sign_and_dispatch}; +use v1::helpers::dispatch::{Dispatcher, SignWith}; use v1::traits::Personal; use v1::types::{H160 as RpcH160, H256 as RpcH256, U128 as RpcU128, TransactionRequest}; use v1::metadata::Metadata; /// Account management (personal) rpc implementation. -pub struct PersonalClient where - C: MiningBlockChainClient, - M: MinerService, -{ +pub struct PersonalClient { accounts: Weak, - client: Weak, - miner: Weak, + dispatcher: D, allow_perm_unlock: bool, } -impl PersonalClient where - C: MiningBlockChainClient, - M: MinerService, -{ +impl PersonalClient { /// Creates new PersonalClient - pub fn new(store: &Arc, client: &Arc, miner: &Arc, allow_perm_unlock: bool) -> Self { + pub fn new(store: &Arc, dispatcher: D, allow_perm_unlock: bool) -> Self { PersonalClient { accounts: Arc::downgrade(store), - client: Arc::downgrade(client), - miner: Arc::downgrade(miner), + dispatcher: dispatcher, allow_perm_unlock: allow_perm_unlock, } } } -impl Personal for PersonalClient where - C: MiningBlockChainClient + 'static, - M: MinerService + 'static, -{ +impl Personal for PersonalClient { type Metadata = Metadata; fn accounts(&self) -> Result, Error> { @@ -106,28 +95,36 @@ impl Personal for PersonalClient where } fn send_transaction(&self, meta: Metadata, request: TransactionRequest, password: String) -> BoxFuture { - let sign_and_send = move || { - let client = take_weak!(self.client); - let miner = take_weak!(self.miner); - let accounts = take_weak!(self.accounts); + let dispatcher = self.dispatcher.clone(); + let accounts = take_weakf!(self.accounts); - let default_account = match request.from { - Some(ref account) => account.clone().into(), - None => accounts - .default_address(meta.dapp_id.unwrap_or_default().into()) - .map_err(|e| errors::account("Cannot find default account.", e))?, - }; - - let request = dispatch::fill_optional_fields(request.into(), default_account, &*client, &*miner); - sign_and_dispatch( - &*client, - &*miner, - &*accounts, - request, - dispatch::SignWith::Password(password) - ).map(|v| v.into_value().into()) + let default = match request.from.as_ref() { + Some(account) => Ok(account.clone().into()), + None => accounts + .default_address(meta.dapp_id.unwrap_or_default().into()) + .map_err(|e| errors::account("Cannot find default account.", e)), }; - futures::done(sign_and_send()).boxed() + let default = match default { + Ok(default) => default, + Err(e) => return future::err(e).boxed(), + }; + + dispatcher.fill_optional_fields(request.into(), default) + .and_then(move |filled| { + let condition = filled.condition.clone().map(Into::into); + dispatcher.sign(&accounts, filled, SignWith::Password(password)) + .map(|tx| tx.into_value()) + .map(move |tx| PendingTransaction::new(tx, condition)) + .map(move |tx| (tx, dispatcher)) + }) + .and_then(|(pending_tx, dispatcher)| { + let network_id = pending_tx.network_id(); + trace!(target: "miner", "send_transaction: dispatching tx: {} for network ID {:?}", + ::rlp::encode(&*pending_tx).to_vec().pretty(), network_id); + + dispatcher.dispatch_transaction(pending_tx).map(Into::into) + }) + .boxed() } } diff --git a/rpc/src/v1/impls/signer.rs b/rpc/src/v1/impls/signer.rs index 90a811e37..a94db94a0 100644 --- a/rpc/src/v1/impls/signer.rs +++ b/rpc/src/v1/impls/signer.rs @@ -20,49 +20,53 @@ use std::sync::{Arc, Weak}; use rlp::{UntrustedRlp, View}; use ethcore::account_provider::AccountProvider; -use ethcore::client::MiningBlockChainClient; use ethcore::transaction::{SignedTransaction, PendingTransaction}; -use ethcore::miner::MinerService; +use futures::{future, BoxFuture, Future, IntoFuture}; use jsonrpc_core::Error; use v1::helpers::{errors, SignerService, SigningQueue, ConfirmationPayload}; -use v1::helpers::dispatch::{self, dispatch_transaction, WithToken}; +use v1::helpers::dispatch::{self, Dispatcher, WithToken}; use v1::traits::Signer; use v1::types::{TransactionModification, ConfirmationRequest, ConfirmationResponse, ConfirmationResponseWithToken, U256, Bytes}; /// Transactions confirmation (personal) rpc implementation. -pub struct SignerClient where C: MiningBlockChainClient, M: MinerService { +pub struct SignerClient { signer: Weak, accounts: Weak, - client: Weak, - miner: Weak, + dispatcher: D } -impl SignerClient where C: MiningBlockChainClient, M: MinerService { +impl SignerClient { /// Create new instance of signer client. pub fn new( store: &Arc, - client: &Arc, - miner: &Arc, + dispatcher: D, signer: &Arc, ) -> Self { SignerClient { signer: Arc::downgrade(signer), accounts: Arc::downgrade(store), - client: Arc::downgrade(client), - miner: Arc::downgrade(miner), + dispatcher: dispatcher, } } - fn confirm_internal(&self, id: U256, modification: TransactionModification, f: F) -> Result, Error> where - F: FnOnce(&C, &M, &AccountProvider, ConfirmationPayload) -> Result, Error>, + fn confirm_internal(&self, id: U256, modification: TransactionModification, f: F) -> BoxFuture, Error> where + F: FnOnce(D, &AccountProvider, ConfirmationPayload) -> T, + T: IntoFuture, Error=Error>, + T::Future: Send + 'static { let id = id.into(); - let accounts = take_weak!(self.accounts); - let signer = take_weak!(self.signer); - let client = take_weak!(self.client); - let miner = take_weak!(self.miner); + let dispatcher = self.dispatcher.clone(); + + let setup = || { + Ok((take_weak!(self.accounts), take_weak!(self.signer))) + }; + + let (accounts, signer) = match setup() { + Ok(x) => x, + Err(e) => return future::err(e).boxed(), + }; signer.peek(&id).map(|confirmation| { let mut payload = confirmation.payload.clone(); @@ -83,17 +87,21 @@ impl SignerClient where C: MiningBlockChainClient, request.condition = condition.clone().map(Into::into); } } - let result = f(&*client, &*miner, &*accounts, payload); - // Execute - if let Ok(ref response) = result { - signer.request_confirmed(id, Ok((*response).clone())); - } - result - }).unwrap_or_else(|| Err(errors::invalid_params("Unknown RequestID", id))) + let fut = f(dispatcher, &*accounts, payload); + fut.into_future().then(move |result| { + // Execute + if let Ok(ref response) = result { + signer.request_confirmed(id, Ok((*response).clone())); + } + + result + }).boxed() + }) + .unwrap_or_else(|| future::err(errors::invalid_params("Unknown RequestID", id)).boxed()) } } -impl Signer for SignerClient where C: MiningBlockChainClient, M: MinerService { +impl Signer for SignerClient { fn requests_to_confirm(&self) -> Result, Error> { let signer = take_weak!(self.signer); @@ -107,29 +115,31 @@ impl Signer for SignerClient where C: MiningBlockC // TODO [ToDr] TransactionModification is redundant for some calls // might be better to replace it in future - fn confirm_request(&self, id: U256, modification: TransactionModification, pass: String) -> Result { - self.confirm_internal(id, modification, move |client, miner, accounts, payload| { - dispatch::execute(client, miner, accounts, payload, dispatch::SignWith::Password(pass)) - }).map(|v| v.into_value()) + fn confirm_request(&self, id: U256, modification: TransactionModification, pass: String) + -> BoxFuture + { + self.confirm_internal(id, modification, move |dis, accounts, payload| { + dispatch::execute(dis, accounts, payload, dispatch::SignWith::Password(pass)) + }).map(|v| v.into_value()).boxed() } - fn confirm_request_with_token(&self, id: U256, modification: TransactionModification, token: String) -> Result { - self.confirm_internal(id, modification, move |client, miner, accounts, payload| { - dispatch::execute(client, miner, accounts, payload, dispatch::SignWith::Token(token)) + fn confirm_request_with_token(&self, id: U256, modification: TransactionModification, token: String) + -> BoxFuture + { + self.confirm_internal(id, modification, move |dis, accounts, payload| { + dispatch::execute(dis, accounts, payload, dispatch::SignWith::Token(token)) }).and_then(|v| match v { WithToken::No(_) => Err(errors::internal("Unexpected response without token.", "")), WithToken::Yes(response, token) => Ok(ConfirmationResponseWithToken { result: response, token: token, }), - }) + }).boxed() } fn confirm_request_raw(&self, id: U256, bytes: Bytes) -> Result { let id = id.into(); let signer = take_weak!(self.signer); - let client = take_weak!(self.client); - let miner = take_weak!(self.miner); signer.peek(&id).map(|confirmation| { let result = match confirmation.payload { @@ -150,7 +160,7 @@ impl Signer for SignerClient where C: MiningBlockC // Dispatch if everything is ok if sender_matches && data_matches && value_matches && nonce_matches { let pending_transaction = PendingTransaction::new(signed_transaction, request.condition.map(Into::into)); - dispatch_transaction(&*client, &*miner, pending_transaction) + self.dispatcher.dispatch_transaction(pending_transaction) .map(Into::into) .map(ConfirmationResponse::SendTransaction) } else { diff --git a/rpc/src/v1/impls/signing.rs b/rpc/src/v1/impls/signing.rs index 0db90436c..6bf2155ed 100644 --- a/rpc/src/v1/impls/signing.rs +++ b/rpc/src/v1/impls/signing.rs @@ -21,16 +21,15 @@ use transient_hashmap::TransientHashMap; use util::{U256, Mutex}; use ethcore::account_provider::AccountProvider; -use ethcore::miner::MinerService; -use ethcore::client::MiningBlockChainClient; -use futures::{self, BoxFuture, Future}; +use futures::{self, future, BoxFuture, Future}; use jsonrpc_core::Error; use v1::helpers::{ - errors, dispatch, + errors, DefaultAccount, - SigningQueue, ConfirmationPromise, ConfirmationResult, ConfirmationPayload, SignerService + SigningQueue, ConfirmationPromise, ConfirmationResult, SignerService }; +use v1::helpers::dispatch::{self, Dispatcher}; use v1::metadata::Metadata; use v1::traits::{EthSigning, ParitySigning}; use v1::types::{ @@ -50,105 +49,96 @@ enum DispatchResult { } /// Implementation of functions that require signing when no trusted signer is used. -pub struct SigningQueueClient where C: MiningBlockChainClient, M: MinerService { +pub struct SigningQueueClient { signer: Weak, accounts: Weak, - client: Weak, - miner: Weak, - - pending: Mutex>, + dispatcher: D, + pending: Arc>>, } -impl SigningQueueClient where - C: MiningBlockChainClient, - M: MinerService, +fn handle_dispatch(res: Result, on_response: OnResponse) + where OnResponse: FnOnce(Result) + Send + 'static { + match res { + Ok(DispatchResult::Value(result)) => on_response(Ok(result)), + Ok(DispatchResult::Promise(promise)) => { + promise.wait_for_result(move |result| { + on_response(result.unwrap_or_else(|| Err(errors::request_rejected()))) + }) + }, + Err(e) => on_response(Err(e)), + } +} + +impl SigningQueueClient { /// Creates a new signing queue client given shared signing queue. - pub fn new(signer: &Arc, client: &Arc, miner: &Arc, accounts: &Arc) -> Self { + pub fn new(signer: &Arc, dispatcher: D, accounts: &Arc) -> Self { SigningQueueClient { signer: Arc::downgrade(signer), accounts: Arc::downgrade(accounts), - client: Arc::downgrade(client), - miner: Arc::downgrade(miner), - pending: Mutex::new(TransientHashMap::new(MAX_PENDING_DURATION)), + dispatcher: dispatcher, + pending: Arc::new(Mutex::new(TransientHashMap::new(MAX_PENDING_DURATION))), } } - fn handle_dispatch(&self, res: Result, on_response: OnResponse) - where OnResponse: FnOnce(Result) + Send + 'static - { - match res { - Ok(DispatchResult::Value(result)) => on_response(Ok(result)), - Ok(DispatchResult::Promise(promise)) => { - promise.wait_for_result(move |result| { - on_response(result.unwrap_or_else(|| Err(errors::request_rejected()))) - }) - }, - Err(e) => on_response(Err(e)), - } - } - - fn add_to_queue(&self, payload: ConfirmationPayload) -> Result { - let client = take_weak!(self.client); - let miner = take_weak!(self.miner); - let accounts = take_weak!(self.accounts); - - let sender = payload.sender(); - if accounts.is_unlocked(sender) { - return dispatch::execute(&*client, &*miner, &*accounts, payload, dispatch::SignWith::Nothing) - .map(|v| v.into_value()) - .map(DispatchResult::Value); - } - - take_weak!(self.signer).add_request(payload) - .map(DispatchResult::Promise) - .map_err(|_| errors::request_rejected_limit()) - } - - fn dispatch(&self, payload: RpcConfirmationPayload, default_account: DefaultAccount) -> Result { - let client = take_weak!(self.client); - let miner = take_weak!(self.miner); - + fn dispatch(&self, payload: RpcConfirmationPayload, default_account: DefaultAccount) -> BoxFuture { + let accounts = take_weakf!(self.accounts); let default_account = match default_account { DefaultAccount::Provided(acc) => acc, - DefaultAccount::ForDapp(dapp) => take_weak!(self.accounts).default_address(dapp).ok().unwrap_or_default(), + DefaultAccount::ForDapp(dapp) => accounts.default_address(dapp).ok().unwrap_or_default(), }; - let payload = dispatch::from_rpc(payload, default_account, &*client, &*miner); - self.add_to_queue(payload) + + let dispatcher = self.dispatcher.clone(); + let signer = take_weakf!(self.signer); + dispatch::from_rpc(payload, default_account, &dispatcher) + .and_then(move |payload| { + let sender = payload.sender(); + if accounts.is_unlocked(sender) { + dispatch::execute(dispatcher, &accounts, payload, dispatch::SignWith::Nothing) + .map(|v| v.into_value()) + .map(DispatchResult::Value) + .boxed() + } else { + future::done( + signer.add_request(payload) + .map(DispatchResult::Promise) + .map_err(|_| errors::request_rejected_limit()) + ).boxed() + } + }) + .boxed() } } -impl ParitySigning for SigningQueueClient where - C: MiningBlockChainClient, - M: MinerService, -{ +impl ParitySigning for SigningQueueClient { type Metadata = Metadata; - fn post_sign(&self, address: RpcH160, data: RpcBytes) -> Result, Error> { + fn post_sign(&self, address: RpcH160, data: RpcBytes) -> BoxFuture, Error> { + let pending = self.pending.clone(); self.dispatch(RpcConfirmationPayload::Signature((address.clone(), data).into()), DefaultAccount::Provided(address.into())) - .map(|result| match result { + .map(move |result| match result { DispatchResult::Value(v) => RpcEither::Or(v), DispatchResult::Promise(promise) => { let id = promise.id(); - self.pending.lock().insert(id, promise); + pending.lock().insert(id, promise); RpcEither::Either(id.into()) }, }) + .boxed() } fn post_transaction(&self, meta: Metadata, request: RpcTransactionRequest) -> BoxFuture, Error> { - let post_transaction = move || { - self.dispatch(RpcConfirmationPayload::SendTransaction(request), meta.into()) - .map(|result| match result { - DispatchResult::Value(v) => RpcEither::Or(v), - DispatchResult::Promise(promise) => { - let id = promise.id(); - self.pending.lock().insert(id, promise); - RpcEither::Either(id.into()) - }, - }) - }; - futures::done(post_transaction()).boxed() + let pending = self.pending.clone(); + self.dispatch(RpcConfirmationPayload::SendTransaction(request), meta.into()) + .map(move |result| match result { + DispatchResult::Value(v) => RpcEither::Or(v), + DispatchResult::Promise(promise) => { + let id = promise.id(); + pending.lock().insert(id, promise); + RpcEither::Either(id.into()) + }, + }) + .boxed() } fn check_request(&self, id: RpcU256) -> Result, Error> { @@ -170,67 +160,78 @@ impl ParitySigning for SigningQueueClient where let res = self.dispatch(RpcConfirmationPayload::Decrypt((address.clone(), data).into()), address.into()); let (ready, p) = futures::oneshot(); - // TODO [todr] typed handle_dispatch - self.handle_dispatch(res, |response| { - match response { - Ok(RpcConfirmationResponse::Decrypt(data)) => ready.complete(Ok(data)), - Err(e) => ready.complete(Err(e)), - e => ready.complete(Err(errors::internal("Unexpected result.", e))), - } - }); - p.then(|result| futures::done(result.expect("Ready is never dropped nor canceled."))).boxed() + // when dispatch is complete + res.then(move |res| { + // register callback via the oneshot sender. + handle_dispatch(res, move |response| { + match response { + Ok(RpcConfirmationResponse::Decrypt(data)) => ready.complete(Ok(data)), + Err(e) => ready.complete(Err(e)), + e => ready.complete(Err(errors::internal("Unexpected result.", e))), + } + }); + + // and wait for that to resolve. + p.then(|result| futures::done(result.expect("Ready is never dropped nor canceled."))) + }).boxed() } } -impl EthSigning for SigningQueueClient where - C: MiningBlockChainClient, - M: MinerService, -{ +impl EthSigning for SigningQueueClient { type Metadata = Metadata; fn sign(&self, address: RpcH160, data: RpcBytes) -> BoxFuture { let res = self.dispatch(RpcConfirmationPayload::Signature((address.clone(), data).into()), address.into()); let (ready, p) = futures::oneshot(); - self.handle_dispatch(res, |response| { - match response { - Ok(RpcConfirmationResponse::Signature(signature)) => ready.complete(Ok(signature)), - Err(e) => ready.complete(Err(e)), - e => ready.complete(Err(errors::internal("Unexpected result.", e))), - } - }); - p.then(|result| futures::done(result.expect("Ready is never dropped nor canceled."))).boxed() + res.then(move |res| { + handle_dispatch(res, move |response| { + match response { + Ok(RpcConfirmationResponse::Signature(sig)) => ready.complete(Ok(sig)), + Err(e) => ready.complete(Err(e)), + e => ready.complete(Err(errors::internal("Unexpected result.", e))), + } + }); + + p.then(|result| futures::done(result.expect("Ready is never dropped nor canceled."))) + }).boxed() } fn send_transaction(&self, meta: Metadata, request: RpcTransactionRequest) -> BoxFuture { let res = self.dispatch(RpcConfirmationPayload::SendTransaction(request), meta.into()); let (ready, p) = futures::oneshot(); - self.handle_dispatch(res, |response| { - match response { - Ok(RpcConfirmationResponse::SendTransaction(hash)) => ready.complete(Ok(hash)), - Err(e) => ready.complete(Err(e)), - e => ready.complete(Err(errors::internal("Unexpected result.", e))), - } - }); - p.then(|result| futures::done(result.expect("Ready is never dropped nor canceled."))).boxed() + res.then(move |res| { + handle_dispatch(res, move |response| { + match response { + Ok(RpcConfirmationResponse::SendTransaction(hash)) => ready.complete(Ok(hash)), + Err(e) => ready.complete(Err(e)), + e => ready.complete(Err(errors::internal("Unexpected result.", e))), + } + }); + + p.then(|result| futures::done(result.expect("Ready is never dropped nor canceled."))) + }).boxed() } fn sign_transaction(&self, meta: Metadata, request: RpcTransactionRequest) -> BoxFuture { let res = self.dispatch(RpcConfirmationPayload::SignTransaction(request), meta.into()); let (ready, p) = futures::oneshot(); - self.handle_dispatch(res, |response| { - match response { - Ok(RpcConfirmationResponse::SignTransaction(tx)) => ready.complete(Ok(tx)), - Err(e) => ready.complete(Err(e)), - e => ready.complete(Err(errors::internal("Unexpected result.", e))), - } - }); - p.then(|result| futures::done(result.expect("Ready is never dropped nor canceled."))).boxed() + res.then(move |res| { + handle_dispatch(res, move |response| { + match response { + Ok(RpcConfirmationResponse::SignTransaction(tx)) => ready.complete(Ok(tx)), + Err(e) => ready.complete(Err(e)), + e => ready.complete(Err(errors::internal("Unexpected result.", e))), + } + }); + + p.then(|result| futures::done(result.expect("Ready is never dropped nor canceled."))) + }).boxed() } } diff --git a/rpc/src/v1/impls/signing_unsafe.rs b/rpc/src/v1/impls/signing_unsafe.rs index b43362c76..333b823f9 100644 --- a/rpc/src/v1/impls/signing_unsafe.rs +++ b/rpc/src/v1/impls/signing_unsafe.rs @@ -19,12 +19,11 @@ use std::sync::{Arc, Weak}; use ethcore::account_provider::AccountProvider; -use ethcore::miner::MinerService; -use ethcore::client::MiningBlockChainClient; -use futures::{self, BoxFuture, Future}; +use futures::{future, BoxFuture, Future}; use jsonrpc_core::Error; -use v1::helpers::{errors, dispatch, DefaultAccount}; +use v1::helpers::{errors, DefaultAccount}; +use v1::helpers::dispatch::{self, Dispatcher}; use v1::metadata::Metadata; use v1::traits::{EthSigning, ParitySigning}; use v1::types::{ @@ -38,106 +37,93 @@ use v1::types::{ }; /// Implementation of functions that require signing when no trusted signer is used. -pub struct SigningUnsafeClient where - C: MiningBlockChainClient, - M: MinerService, -{ +pub struct SigningUnsafeClient { accounts: Weak, - client: Weak, - miner: Weak, + dispatcher: D, } -impl SigningUnsafeClient where - C: MiningBlockChainClient, - M: MinerService, -{ - +impl SigningUnsafeClient { /// Creates new SigningUnsafeClient. - pub fn new(client: &Arc, accounts: &Arc, miner: &Arc) - -> Self { + pub fn new(accounts: &Arc, dispatcher: D) -> Self { SigningUnsafeClient { - client: Arc::downgrade(client), - miner: Arc::downgrade(miner), accounts: Arc::downgrade(accounts), + dispatcher: dispatcher, } } - fn handle(&self, payload: RpcConfirmationPayload, account: DefaultAccount) -> Result { - let client = take_weak!(self.client); - let miner = take_weak!(self.miner); - let accounts = take_weak!(self.accounts); - - let default_account = match account { + fn handle(&self, payload: RpcConfirmationPayload, account: DefaultAccount) -> BoxFuture { + let accounts = take_weakf!(self.accounts); + let default = match account { DefaultAccount::Provided(acc) => acc, DefaultAccount::ForDapp(dapp) => accounts.default_address(dapp).ok().unwrap_or_default(), }; - let payload = dispatch::from_rpc(payload, default_account, &*client, &*miner); - dispatch::execute(&*client, &*miner, &*accounts, payload, dispatch::SignWith::Nothing) + + let dis = self.dispatcher.clone(); + dispatch::from_rpc(payload, default, &dis) + .and_then(move |payload| { + dispatch::execute(dis, &accounts, payload, dispatch::SignWith::Nothing) + }) .map(|v| v.into_value()) + .boxed() } } -impl EthSigning for SigningUnsafeClient where - C: MiningBlockChainClient, - M: MinerService, +impl EthSigning for SigningUnsafeClient { type Metadata = Metadata; fn sign(&self, address: RpcH160, data: RpcBytes) -> BoxFuture { - let result = match self.handle(RpcConfirmationPayload::Signature((address.clone(), data).into()), address.into()) { - Ok(RpcConfirmationResponse::Signature(signature)) => Ok(signature), - Err(e) => Err(e), - e => Err(errors::internal("Unexpected result", e)), - }; - - futures::done(result).boxed() + self.handle(RpcConfirmationPayload::Signature((address.clone(), data).into()), address.into()) + .then(|res| match res { + Ok(RpcConfirmationResponse::Signature(signature)) => Ok(signature), + Err(e) => Err(e), + e => Err(errors::internal("Unexpected result", e)), + }) + .boxed() } fn send_transaction(&self, meta: Metadata, request: RpcTransactionRequest) -> BoxFuture { - let result = match self.handle(RpcConfirmationPayload::SendTransaction(request), meta.into()) { - Ok(RpcConfirmationResponse::SendTransaction(hash)) => Ok(hash), - Err(e) => Err(e), - e => Err(errors::internal("Unexpected result", e)), - }; - - futures::done(result).boxed() + self.handle(RpcConfirmationPayload::SendTransaction(request), meta.into()) + .then(|res| match res { + Ok(RpcConfirmationResponse::SendTransaction(hash)) => Ok(hash), + Err(e) => Err(e), + e => Err(errors::internal("Unexpected result", e)), + }) + .boxed() } fn sign_transaction(&self, meta: Metadata, request: RpcTransactionRequest) -> BoxFuture { - let result = match self.handle(RpcConfirmationPayload::SignTransaction(request), meta.into()) { - Ok(RpcConfirmationResponse::SignTransaction(tx)) => Ok(tx), - Err(e) => Err(e), - e => Err(errors::internal("Unexpected result", e)), - }; - - futures::done(result).boxed() + self.handle(RpcConfirmationPayload::SignTransaction(request), meta.into()) + .then(|res| match res { + Ok(RpcConfirmationResponse::SignTransaction(tx)) => Ok(tx), + Err(e) => Err(e), + e => Err(errors::internal("Unexpected result", e)), + }) + .boxed() } } -impl ParitySigning for SigningUnsafeClient where - C: MiningBlockChainClient, - M: MinerService, -{ +impl ParitySigning for SigningUnsafeClient { type Metadata = Metadata; fn decrypt_message(&self, address: RpcH160, data: RpcBytes) -> BoxFuture { - let result = match self.handle(RpcConfirmationPayload::Decrypt((address.clone(), data).into()), address.into()) { - Ok(RpcConfirmationResponse::Decrypt(data)) => Ok(data), - Err(e) => Err(e), - e => Err(errors::internal("Unexpected result", e)), - }; - - futures::done(result).boxed() + self.handle(RpcConfirmationPayload::Decrypt((address.clone(), data).into()), address.into()) + .then(|res| match res { + Ok(RpcConfirmationResponse::Decrypt(data)) => Ok(data), + Err(e) => Err(e), + e => Err(errors::internal("Unexpected result", e)), + }) + .boxed() } - fn post_sign(&self, _: RpcH160, _: RpcBytes) -> Result, Error> { + fn post_sign(&self, _: RpcH160, _: RpcBytes) -> BoxFuture, Error> { // We don't support this in non-signer mode. - Err(errors::signer_disabled()) + future::err(errors::signer_disabled()).boxed() } fn post_transaction(&self, _: Metadata, _: RpcTransactionRequest) -> BoxFuture, Error> { // We don't support this in non-signer mode. - futures::done(Err(errors::signer_disabled())).boxed() + future::err((errors::signer_disabled())).boxed() } fn check_request(&self, _: RpcU256) -> Result, Error> { diff --git a/rpc/src/v1/mod.rs b/rpc/src/v1/mod.rs index 0b0787717..c69acbea3 100644 --- a/rpc/src/v1/mod.rs +++ b/rpc/src/v1/mod.rs @@ -18,6 +18,37 @@ //! //! Compliant with ethereum rpc. +// Upgrade a weak pointer, returning an error on failure. +macro_rules! take_weak { + ($weak: expr) => { + match $weak.upgrade() { + Some(arc) => arc, + None => return Err(Error::internal_error()), + } + } +} + +// Upgrade a weak pointer, returning an error leaf-future on failure. +macro_rules! take_weakf { + ($weak: expr) => { + match $weak.upgrade() { + Some(arc) => arc, + None => return ::futures::future::err(Error::internal_error()).boxed(), + } + } +} + +// short for "try_boxfuture" +// unwrap a result, returning a BoxFuture<_, Err> on failure. +macro_rules! try_bf { + ($res: expr) => { + match $res { + Ok(val) => val, + Err(e) => return ::futures::future::err(e.into()).boxed(), + } + } +} + #[macro_use] mod helpers; mod impls; @@ -29,5 +60,5 @@ pub mod types; pub use self::traits::{Web3, Eth, EthFilter, EthSigning, Net, Parity, ParityAccounts, ParitySet, ParitySigning, Signer, Personal, Traces, Rpc}; pub use self::impls::*; -pub use self::helpers::{SigningQueue, SignerService, ConfirmationsQueue, NetworkSettings, block_import, informant}; +pub use self::helpers::{SigningQueue, SignerService, ConfirmationsQueue, NetworkSettings, block_import, informant, dispatch}; pub use self::metadata::{Metadata, Origin}; diff --git a/rpc/src/v1/tests/eth.rs b/rpc/src/v1/tests/eth.rs index 4dd178629..3f66427e7 100644 --- a/rpc/src/v1/tests/eth.rs +++ b/rpc/src/v1/tests/eth.rs @@ -33,6 +33,7 @@ use util::{U256, H256, Uint, Address, Hashable}; use jsonrpc_core::IoHandler; use v1::impls::{EthClient, SigningUnsafeClient}; +use v1::helpers::dispatch::FullDispatcher; use v1::metadata::Metadata; use v1::tests::helpers::{TestSnapshotService, TestSyncProvider, Config}; use v1::traits::eth::Eth; @@ -141,10 +142,11 @@ impl EthTester { &external_miner, Default::default(), ); + + let dispatcher = FullDispatcher::new(Arc::downgrade(&client), Arc::downgrade(&miner_service)); let eth_sign = SigningUnsafeClient::new( - &client, &account_provider, - &miner_service + dispatcher, ); let mut handler = IoHandler::default(); diff --git a/rpc/src/v1/tests/mocked/eth.rs b/rpc/src/v1/tests/mocked/eth.rs index 5f0e84389..9e23cf474 100644 --- a/rpc/src/v1/tests/mocked/eth.rs +++ b/rpc/src/v1/tests/mocked/eth.rs @@ -34,6 +34,7 @@ use ethsync::SyncState; use jsonrpc_core::IoHandler; use v1::{Eth, EthClient, EthClientOptions, EthFilter, EthFilterClient, EthSigning, SigningUnsafeClient}; +use v1::helpers::dispatch::FullDispatcher; use v1::tests::helpers::{TestSyncProvider, Config, TestMinerService, TestSnapshotService}; use v1::metadata::Metadata; @@ -88,7 +89,9 @@ impl EthTester { let external_miner = Arc::new(ExternalMiner::new(hashrates.clone())); let eth = EthClient::new(&client, &snapshot, &sync, &ap, &miner, &external_miner, options).to_delegate(); let filter = EthFilterClient::new(&client, &miner).to_delegate(); - let sign = SigningUnsafeClient::new(&client, &ap, &miner).to_delegate(); + + let dispatcher = FullDispatcher::new(Arc::downgrade(&client), Arc::downgrade(&miner)); + let sign = SigningUnsafeClient::new(&ap, dispatcher).to_delegate(); let mut io: IoHandler = IoHandler::default(); io.extend_with(eth); io.extend_with(sign); diff --git a/rpc/src/v1/tests/mocked/personal.rs b/rpc/src/v1/tests/mocked/personal.rs index 3e0228714..af152ad0b 100644 --- a/rpc/src/v1/tests/mocked/personal.rs +++ b/rpc/src/v1/tests/mocked/personal.rs @@ -16,13 +16,16 @@ use std::sync::Arc; use std::str::FromStr; -use jsonrpc_core::IoHandler; -use util::{U256, Uint, Address}; + use ethcore::account_provider::AccountProvider; -use v1::{PersonalClient, Personal, Metadata}; -use v1::tests::helpers::TestMinerService; use ethcore::client::TestBlockChainClient; use ethcore::transaction::{Action, Transaction}; +use jsonrpc_core::IoHandler; +use util::{U256, Uint, Address}; + +use v1::{PersonalClient, Personal, Metadata}; +use v1::helpers::dispatch::FullDispatcher; +use v1::tests::helpers::TestMinerService; struct PersonalTester { accounts: Arc, @@ -50,7 +53,9 @@ fn setup() -> PersonalTester { let accounts = accounts_provider(); let client = blockchain_client(); let miner = miner_service(); - let personal = PersonalClient::new(&accounts, &client, &miner, false); + + let dispatcher = FullDispatcher::new(Arc::downgrade(&client), Arc::downgrade(&miner)); + let personal = PersonalClient::new(&accounts, dispatcher, false); let mut io = IoHandler::default(); io.extend_with(personal.to_delegate()); diff --git a/rpc/src/v1/tests/mocked/signer.rs b/rpc/src/v1/tests/mocked/signer.rs index 9762a8531..4bec863de 100644 --- a/rpc/src/v1/tests/mocked/signer.rs +++ b/rpc/src/v1/tests/mocked/signer.rs @@ -29,6 +29,7 @@ use v1::{SignerClient, Signer}; use v1::metadata::Metadata; use v1::tests::helpers::TestMinerService; use v1::helpers::{SigningQueue, SignerService, FilledTransactionRequest, ConfirmationPayload}; +use v1::helpers::dispatch::FullDispatcher; struct SignerTester { signer: Arc, @@ -59,8 +60,9 @@ fn signer_tester() -> SignerTester { let client = blockchain_client(); let miner = miner_service(); + let dispatcher = FullDispatcher::new(Arc::downgrade(&client), Arc::downgrade(&miner)); let mut io = IoHandler::default(); - io.extend_with(SignerClient::new(&accounts, &client, &miner, &signer).to_delegate()); + io.extend_with(SignerClient::new(&accounts, dispatcher, &signer).to_delegate()); SignerTester { signer: signer, diff --git a/rpc/src/v1/tests/mocked/signing.rs b/rpc/src/v1/tests/mocked/signing.rs index c70ef6d3f..e50682675 100644 --- a/rpc/src/v1/tests/mocked/signing.rs +++ b/rpc/src/v1/tests/mocked/signing.rs @@ -16,13 +16,14 @@ use std::str::FromStr; use std::sync::Arc; +use std::time::Duration; use rlp; use jsonrpc_core::{IoHandler, Success}; use v1::impls::SigningQueueClient; use v1::metadata::Metadata; use v1::traits::{EthSigning, ParitySigning, Parity}; -use v1::helpers::{SignerService, SigningQueue}; +use v1::helpers::{SignerService, SigningQueue, FullDispatcher}; use v1::types::ConfirmationResponse; use v1::tests::helpers::TestMinerService; use v1::tests::mocked::parity; @@ -51,9 +52,12 @@ impl Default for SigningTester { let miner = Arc::new(TestMinerService::default()); let accounts = Arc::new(AccountProvider::transient_provider()); let mut io = IoHandler::default(); - let rpc = SigningQueueClient::new(&signer, &client, &miner, &accounts); + + let dispatcher = FullDispatcher::new(Arc::downgrade(&client), Arc::downgrade(&miner)); + + let rpc = SigningQueueClient::new(&signer, dispatcher.clone(), &accounts); io.extend_with(EthSigning::to_delegate(rpc)); - let rpc = SigningQueueClient::new(&signer, &client, &miner, &accounts); + let rpc = SigningQueueClient::new(&signer, dispatcher, &accounts); io.extend_with(ParitySigning::to_delegate(rpc)); SigningTester { @@ -91,9 +95,17 @@ fn should_add_sign_to_queue() { // then let promise = tester.io.handle_request(&request); - assert_eq!(tester.signer.requests().len(), 1); - // respond - tester.signer.request_confirmed(1.into(), Ok(ConfirmationResponse::Signature(0.into()))); + + // the future must be polled at least once before request is queued. + let signer = tester.signer.clone(); + ::std::thread::spawn(move || loop { + if signer.requests().len() == 1 { + // respond + signer.request_confirmed(1.into(), Ok(ConfirmationResponse::Signature(0.into()))); + break + } + ::std::thread::sleep(Duration::from_millis(100)) + }); let res = promise.wait().unwrap(); assert_eq!(res, Some(response.to_owned())); @@ -229,9 +241,17 @@ fn should_add_transaction_to_queue() { // then let promise = tester.io.handle_request(&request); - assert_eq!(tester.signer.requests().len(), 1); - // respond - tester.signer.request_confirmed(1.into(), Ok(ConfirmationResponse::SendTransaction(0.into()))); + + // the future must be polled at least once before request is queued. + let signer = tester.signer.clone(); + ::std::thread::spawn(move || loop { + if signer.requests().len() == 1 { + // respond + signer.request_confirmed(1.into(), Ok(ConfirmationResponse::SendTransaction(0.into()))); + break + } + ::std::thread::sleep(Duration::from_millis(100)) + }); let res = promise.wait().unwrap(); assert_eq!(res, Some(response.to_owned())); @@ -296,9 +316,17 @@ fn should_add_sign_transaction_to_the_queue() { // then tester.miner.last_nonces.write().insert(address.clone(), U256::zero()); let promise = tester.io.handle_request(&request); - assert_eq!(tester.signer.requests().len(), 1); - // respond - tester.signer.request_confirmed(1.into(), Ok(ConfirmationResponse::SignTransaction(t.into()))); + + // the future must be polled at least once before request is queued. + let signer = tester.signer.clone(); + ::std::thread::spawn(move || loop { + if signer.requests().len() == 1 { + // respond + signer.request_confirmed(1.into(), Ok(ConfirmationResponse::SignTransaction(t.into()))); + break + } + ::std::thread::sleep(Duration::from_millis(100)) + }); let res = promise.wait().unwrap(); assert_eq!(res, Some(response.to_owned())); @@ -391,12 +419,22 @@ fn should_add_decryption_to_the_queue() { }"#; let response = r#"{"jsonrpc":"2.0","result":"0x0102","id":1}"#; + // then let promise = tester.io.handle_request(&request); - assert_eq!(tester.signer.requests().len(), 1); - // respond - tester.signer.request_confirmed(1.into(), Ok(ConfirmationResponse::Decrypt(vec![0x1, 0x2].into()))); + // the future must be polled at least once before request is queued. + let signer = tester.signer.clone(); + ::std::thread::spawn(move || loop { + if signer.requests().len() == 1 { + // respond + signer.request_confirmed(1.into(), Ok(ConfirmationResponse::Decrypt(vec![0x1, 0x2].into()))); + break + } + ::std::thread::sleep(Duration::from_millis(100)) + }); + + // check response: will deadlock if unsuccessful. let res = promise.wait().unwrap(); assert_eq!(res, Some(response.to_owned())); } diff --git a/rpc/src/v1/traits/eth.rs b/rpc/src/v1/traits/eth.rs index 877a18a1d..a50188bf0 100644 --- a/rpc/src/v1/traits/eth.rs +++ b/rpc/src/v1/traits/eth.rs @@ -62,44 +62,44 @@ build_rpc_trait! { fn block_number(&self) -> Result; /// Returns balance of the given account. - #[rpc(name = "eth_getBalance")] - fn balance(&self, H160, Trailing) -> Result; + #[rpc(async, name = "eth_getBalance")] + fn balance(&self, H160, Trailing) -> BoxFuture; /// Returns content of the storage at given address. - #[rpc(name = "eth_getStorageAt")] - fn storage_at(&self, H160, U256, Trailing) -> Result; + #[rpc(async, name = "eth_getStorageAt")] + fn storage_at(&self, H160, U256, Trailing) -> BoxFuture; /// Returns block with given hash. - #[rpc(name = "eth_getBlockByHash")] - fn block_by_hash(&self, H256, bool) -> Result, Error>; + #[rpc(async, name = "eth_getBlockByHash")] + fn block_by_hash(&self, H256, bool) -> BoxFuture, Error>; /// Returns block with given number. - #[rpc(name = "eth_getBlockByNumber")] - fn block_by_number(&self, BlockNumber, bool) -> Result, Error>; + #[rpc(async, name = "eth_getBlockByNumber")] + fn block_by_number(&self, BlockNumber, bool) -> BoxFuture, Error>; /// Returns the number of transactions sent from given address at given time (block number). - #[rpc(name = "eth_getTransactionCount")] - fn transaction_count(&self, H160, Trailing) -> Result; + #[rpc(async, name = "eth_getTransactionCount")] + fn transaction_count(&self, H160, Trailing) -> BoxFuture; /// Returns the number of transactions in a block with given hash. - #[rpc(name = "eth_getBlockTransactionCountByHash")] - fn block_transaction_count_by_hash(&self, H256) -> Result, Error>; + #[rpc(async, name = "eth_getBlockTransactionCountByHash")] + fn block_transaction_count_by_hash(&self, H256) -> BoxFuture, Error>; /// Returns the number of transactions in a block with given block number. - #[rpc(name = "eth_getBlockTransactionCountByNumber")] - fn block_transaction_count_by_number(&self, BlockNumber) -> Result, Error>; + #[rpc(async, name = "eth_getBlockTransactionCountByNumber")] + fn block_transaction_count_by_number(&self, BlockNumber) -> BoxFuture, Error>; /// Returns the number of uncles in a block with given hash. - #[rpc(name = "eth_getUncleCountByBlockHash")] - fn block_uncles_count_by_hash(&self, H256) -> Result, Error>; + #[rpc(async, name = "eth_getUncleCountByBlockHash")] + fn block_uncles_count_by_hash(&self, H256) -> BoxFuture, Error>; /// Returns the number of uncles in a block with given block number. - #[rpc(name = "eth_getUncleCountByBlockNumber")] - fn block_uncles_count_by_number(&self, BlockNumber) -> Result, Error>; + #[rpc(async, name = "eth_getUncleCountByBlockNumber")] + fn block_uncles_count_by_number(&self, BlockNumber) -> BoxFuture, Error>; /// Returns the code at given address at given time (block number). - #[rpc(name = "eth_getCode")] - fn code_at(&self, H160, Trailing) -> Result; + #[rpc(async, name = "eth_getCode")] + fn code_at(&self, H160, Trailing) -> BoxFuture; /// Sends signed transaction, returning its hash. #[rpc(name = "eth_sendRawTransaction")] diff --git a/rpc/src/v1/traits/parity_signing.rs b/rpc/src/v1/traits/parity_signing.rs index 9e723e4bf..3370bc259 100644 --- a/rpc/src/v1/traits/parity_signing.rs +++ b/rpc/src/v1/traits/parity_signing.rs @@ -27,8 +27,8 @@ build_rpc_trait! { /// Posts sign request asynchronously. /// Will return a confirmation ID for later use with check_transaction. - #[rpc(name = "parity_postSign")] - fn post_sign(&self, H160, Bytes) -> Result, Error>; + #[rpc(async, name = "parity_postSign")] + fn post_sign(&self, H160, Bytes) -> BoxFuture, Error>; /// Posts transaction asynchronously. /// Will return a transaction ID for later use with check_transaction. diff --git a/rpc/src/v1/traits/signer.rs b/rpc/src/v1/traits/signer.rs index e4f2f0419..1b03f6d8a 100644 --- a/rpc/src/v1/traits/signer.rs +++ b/rpc/src/v1/traits/signer.rs @@ -16,6 +16,7 @@ //! Parity Signer-related rpc interface. use jsonrpc_core::Error; +use futures::BoxFuture; use v1::types::{U256, Bytes, TransactionModification, ConfirmationRequest, ConfirmationResponse, ConfirmationResponseWithToken}; @@ -28,12 +29,12 @@ build_rpc_trait! { fn requests_to_confirm(&self) -> Result, Error>; /// Confirm specific request. - #[rpc(name = "signer_confirmRequest")] - fn confirm_request(&self, U256, TransactionModification, String) -> Result; + #[rpc(async, name = "signer_confirmRequest")] + fn confirm_request(&self, U256, TransactionModification, String) -> BoxFuture; /// Confirm specific request with token. - #[rpc(name = "signer_confirmRequestWithToken")] - fn confirm_request_with_token(&self, U256, TransactionModification, String) -> Result; + #[rpc(async, name = "signer_confirmRequestWithToken")] + fn confirm_request_with_token(&self, U256, TransactionModification, String) -> BoxFuture; /// Confirm specific request with already signed data. #[rpc(name = "signer_confirmRequestRaw")] diff --git a/rpc/src/v1/types/block.rs b/rpc/src/v1/types/block.rs index d9848a0ac..7905dc5e5 100644 --- a/rpc/src/v1/types/block.rs +++ b/rpc/src/v1/types/block.rs @@ -83,7 +83,7 @@ pub struct Block { pub difficulty: U256, /// Total difficulty #[serde(rename="totalDifficulty")] - pub total_difficulty: U256, + pub total_difficulty: Option, /// Seal fields #[serde(rename="sealFields")] pub seal_fields: Vec, @@ -164,7 +164,7 @@ mod tests { logs_bloom: H2048::default(), timestamp: U256::default(), difficulty: U256::default(), - total_difficulty: U256::default(), + total_difficulty: Some(U256::default()), seal_fields: vec![Bytes::default(), Bytes::default()], uncles: vec![], transactions: BlockTransactions::Hashes(vec![].into()), diff --git a/sync/src/api.rs b/sync/src/api.rs index 2b024443c..5b97bc566 100644 --- a/sync/src/api.rs +++ b/sync/src/api.rs @@ -43,7 +43,7 @@ pub const WARP_SYNC_PROTOCOL_ID: ProtocolId = *b"par"; /// Ethereum sync protocol pub const ETH_PROTOCOL: ProtocolId = *b"eth"; /// Ethereum light protocol -pub const LES_PROTOCOL: ProtocolId = *b"les"; +pub const LIGHT_PROTOCOL: ProtocolId = *b"plp"; /// Sync configuration #[derive(Debug, Clone, Copy)] @@ -56,7 +56,7 @@ pub struct SyncConfig { pub network_id: u64, /// Main "eth" subprotocol name. pub subprotocol_name: [u8; 3], - /// Light "les" subprotocol name. + /// Light subprotocol name. pub light_subprotocol_name: [u8; 3], /// Fork block to check pub fork_block: Option<(BlockNumber, H256)>, @@ -73,7 +73,7 @@ impl Default for SyncConfig { download_old_blocks: true, network_id: 1, subprotocol_name: ETH_PROTOCOL, - light_subprotocol_name: LES_PROTOCOL, + light_subprotocol_name: LIGHT_PROTOCOL, fork_block: None, warp_sync: false, serve_light: false, @@ -674,6 +674,16 @@ impl LightSync { subprotocol_name: params.subprotocol_name, }) } + + /// Execute a closure with a protocol context. + pub fn with_context(&self, f: F) -> Option + where F: FnOnce(&::light::net::BasicContext) -> T + { + self.network.with_context_eval( + self.subprotocol_name, + move |ctx| self.proto.with_context(ctx, f), + ) + } } impl ManageNetwork for LightSync { diff --git a/sync/src/light_sync/tests/mod.rs b/sync/src/light_sync/tests/mod.rs index fbce48627..1653d09d3 100644 --- a/sync/src/light_sync/tests/mod.rs +++ b/sync/src/light_sync/tests/mod.rs @@ -28,7 +28,7 @@ fn basic_sync() { net.sync(); - assert!(net.peer(0).light_chain().get_header(BlockId::Number(6000)).is_some()); + assert!(net.peer(0).light_chain().block_header(BlockId::Number(6000)).is_some()); } #[test] @@ -49,15 +49,15 @@ fn fork_post_cht() { let _ = light_chain.import_header(header); light_chain.flush_queue(); light_chain.import_verified(); - assert!(light_chain.get_header(id).is_some()); + assert!(light_chain.block_header(id).is_some()); } net.sync(); for id in (0..CHAIN_LENGTH).map(|x| x + 1).map(BlockId::Number) { assert_eq!( - net.peer(0).light_chain().get_header(id), - net.peer(2).chain().block_header(id).map(|h| h.into_inner()) + net.peer(0).light_chain().block_header(id).unwrap(), + net.peer(2).chain().block_header(id).unwrap() ); } } diff --git a/util/network/src/host.rs b/util/network/src/host.rs index 98efbd6a0..c3f5ab8cf 100644 --- a/util/network/src/host.rs +++ b/util/network/src/host.rs @@ -983,14 +983,14 @@ impl Host { self.nodes.write().update(node_changes, &*self.reserved_nodes.read()); } - pub fn with_context(&self, protocol: ProtocolId, io: &IoContext, action: F) where F: Fn(&NetworkContext) { + pub fn with_context(&self, protocol: ProtocolId, io: &IoContext, action: F) where F: FnOnce(&NetworkContext) { let reserved = { self.reserved_nodes.read() }; let context = NetworkContext::new(io, protocol, None, self.sessions.clone(), &reserved); action(&context); } - pub fn with_context_eval(&self, protocol: ProtocolId, io: &IoContext, action: F) -> T where F: Fn(&NetworkContext) -> T { + pub fn with_context_eval(&self, protocol: ProtocolId, io: &IoContext, action: F) -> T where F: FnOnce(&NetworkContext) -> T { let reserved = { self.reserved_nodes.read() }; let context = NetworkContext::new(io, protocol, None, self.sessions.clone(), &reserved); diff --git a/util/network/src/service.rs b/util/network/src/service.rs index 73b5dafb5..76e21c7f8 100644 --- a/util/network/src/service.rs +++ b/util/network/src/service.rs @@ -177,7 +177,7 @@ impl NetworkService { } /// Executes action in the network context - pub fn with_context(&self, protocol: ProtocolId, action: F) where F: Fn(&NetworkContext) { + pub fn with_context(&self, protocol: ProtocolId, action: F) where F: FnOnce(&NetworkContext) { let io = IoContext::new(self.io_service.channel(), 0); let host = self.host.read(); if let Some(ref host) = host.as_ref() { @@ -186,7 +186,7 @@ impl NetworkService { } /// Evaluates function in the network context - pub fn with_context_eval(&self, protocol: ProtocolId, action: F) -> Option where F: Fn(&NetworkContext) -> T { + pub fn with_context_eval(&self, protocol: ProtocolId, action: F) -> Option where F: FnOnce(&NetworkContext) -> T { let io = IoContext::new(self.io_service.channel(), 0); let host = self.host.read(); host.as_ref().map(|ref host| host.with_context_eval(protocol, &io, action))