diff --git a/miner/src/lib.rs b/miner/src/lib.rs index f92e0de52..211c61b1f 100644 --- a/miner/src/lib.rs +++ b/miner/src/lib.rs @@ -60,9 +60,11 @@ pub use transaction_queue::{TransactionQueue, AccountDetails, TransactionImportR pub use miner::{Miner}; pub use external::{ExternalMiner, ExternalMinerService}; +use std::collections::BTreeMap; use util::{H256, U256, Address, Bytes}; use ethcore::client::{BlockChainClient, Executed}; use ethcore::block::{ClosedBlock}; +use ethcore::receipt::{Receipt}; use ethcore::error::{Error, ExecutionError}; use ethcore::transaction::SignedTransaction; @@ -134,9 +136,15 @@ pub trait MinerService : Send + Sync { /// Query pending transactions for hash. fn transaction(&self, hash: &H256) -> Option; + /// Get a list of all transactions. + fn all_transactions(&self) -> Vec; + /// Get a list of all pending transactions. fn pending_transactions(&self) -> Vec; + /// Get a list of all pending receipts. + fn pending_receipts(&self) -> BTreeMap; + /// Returns highest transaction nonce for given address. fn last_nonce(&self, address: &Address) -> Option; diff --git a/miner/src/miner.rs b/miner/src/miner.rs index 86f3880c0..2cd2727a5 100644 --- a/miner/src/miner.rs +++ b/miner/src/miner.rs @@ -25,6 +25,7 @@ use ethcore::block::{ClosedBlock, IsBlock}; use ethcore::error::*; use ethcore::client::{Executive, Executed, EnvInfo, TransactOptions}; use ethcore::transaction::SignedTransaction; +use ethcore::receipt::{Receipt}; use ethcore::spec::Spec; use ethcore::engine::Engine; use super::{MinerService, MinerStatus, TransactionQueue, AccountDetails, TransactionImportResult, TransactionOrigin}; @@ -407,18 +408,54 @@ impl MinerService for Miner { } fn pending_transactions_hashes(&self) -> Vec { - let transaction_queue = self.transaction_queue.lock().unwrap(); - transaction_queue.pending_hashes() + match (self.sealing_enabled.load(atomic::Ordering::Relaxed), self.sealing_work.lock().unwrap().peek_last_ref()) { + (true, Some(pending)) => pending.transactions().iter().map(|t| t.hash()).collect(), + _ => { + let queue = self.transaction_queue.lock().unwrap(); + queue.pending_hashes() + } + } } fn transaction(&self, hash: &H256) -> Option { + match (self.sealing_enabled.load(atomic::Ordering::Relaxed), self.sealing_work.lock().unwrap().peek_last_ref()) { + (true, Some(pending)) => pending.transactions().iter().find(|t| &t.hash() == hash).map(|t| t.clone()), + _ => { + let queue = self.transaction_queue.lock().unwrap(); + queue.find(hash) + } + } + } + + fn all_transactions(&self) -> Vec { let queue = self.transaction_queue.lock().unwrap(); - queue.find(hash) + queue.top_transactions() } fn pending_transactions(&self) -> Vec { - let queue = self.transaction_queue.lock().unwrap(); - queue.top_transactions() + // TODO: should only use the sealing_work when it's current (it could be an old block) + match (self.sealing_enabled.load(atomic::Ordering::Relaxed), self.sealing_work.lock().unwrap().peek_last_ref()) { + (true, Some(pending)) => pending.transactions().clone(), + _ => { + let queue = self.transaction_queue.lock().unwrap(); + queue.top_transactions() + } + } + } + + fn pending_receipts(&self) -> BTreeMap { + match (self.sealing_enabled.load(atomic::Ordering::Relaxed), self.sealing_work.lock().unwrap().peek_last_ref()) { + (true, Some(pending)) => { + let hashes = pending.transactions() + .iter() + .map(|t| t.hash()); + + let receipts = pending.receipts().clone().into_iter(); + + hashes.zip(receipts).collect() + }, + _ => BTreeMap::new() + } } fn last_nonce(&self, address: &Address) -> Option { diff --git a/rpc/src/v1/helpers/poll_filter.rs b/rpc/src/v1/helpers/poll_filter.rs index f9ed6230c..31bbf47fe 100644 --- a/rpc/src/v1/helpers/poll_filter.rs +++ b/rpc/src/v1/helpers/poll_filter.rs @@ -1,13 +1,18 @@ -//! Helper type with all filter possibilities. +//! Helper type with all filter state data. +use std::collections::HashSet; use util::hash::H256; -use ethcore::filter::Filter; +use v1::types::{Filter, Log}; pub type BlockNumber = u64; +/// Filter state. #[derive(Clone)] pub enum PollFilter { + /// Number of last block which client was notified about. Block(BlockNumber), + /// Hashes of all transactions which client was notified about. PendingTransaction(Vec), - Logs(BlockNumber, Filter) + /// Number of From block number, pending logs and log filter iself. + Logs(BlockNumber, HashSet, Filter) } diff --git a/rpc/src/v1/impls/eth.rs b/rpc/src/v1/impls/eth.rs index f6543ec04..4a4ee1643 100644 --- a/rpc/src/v1/impls/eth.rs +++ b/rpc/src/v1/impls/eth.rs @@ -33,6 +33,8 @@ use ethcore::block::IsBlock; use ethcore::views::*; use ethcore::ethereum::Ethash; use ethcore::transaction::{Transaction as EthTransaction, SignedTransaction, Action}; +use ethcore::log_entry::LogEntry; +use ethcore::filter::Filter as EthcoreFilter; use self::ethash::SeedHashCompute; use v1::traits::{Eth, EthFilter}; use v1::types::{Block, BlockTransactions, BlockNumber, Bytes, SyncStatus, SyncInfo, Transaction, TransactionRequest, CallRequest, OptionalValue, Index, Filter, Log, Receipt}; @@ -236,6 +238,25 @@ fn from_params_default_third(params: Params) -> Result<(F1, F2, BlockNum } } +fn pending_logs(miner: &M, filter: &EthcoreFilter) -> Vec where M: MinerService { + let receipts = miner.pending_receipts(); + + let pending_logs = receipts.into_iter() + .flat_map(|(hash, r)| r.logs.into_iter().map(|l| (hash.clone(), l)).collect::>()) + .collect::>(); + + let result = pending_logs.into_iter() + .filter(|pair| filter.matches(&pair.1)) + .map(|pair| { + let mut log = Log::from(pair.1); + log.transaction_hash = Some(pair.0); + log + }) + .collect(); + + result +} + impl Eth for EthClient where C: BlockChainClient + 'static, S: SyncProvider + 'static, @@ -447,10 +468,18 @@ impl Eth for EthClient where fn logs(&self, params: Params) -> Result { from_params::<(Filter,)>(params) .and_then(|(filter,)| { - let logs = take_weak!(self.client).logs(filter.into()) + let include_pending = filter.to_block == Some(BlockNumber::Pending); + let filter: EthcoreFilter = filter.into(); + let mut logs = take_weak!(self.client).logs(filter.clone()) .into_iter() .map(From::from) .collect::>(); + + if include_pending { + let pending = pending_logs(take_weak!(self.miner).deref(), &filter); + logs.extend(pending); + } + to_value(&logs) }) } @@ -593,7 +622,7 @@ impl EthFilter for EthFilterClient where .and_then(|(filter,)| { let mut polls = self.polls.lock().unwrap(); let block_number = take_weak!(self.client).chain_info().best_block_number; - let id = polls.create_poll(PollFilter::Logs(block_number, filter.into())); + let id = polls.create_poll(PollFilter::Logs(block_number, Default::default(), filter)); to_value(&U256::from(id)) }) } @@ -656,18 +685,44 @@ impl EthFilter for EthFilterClient where to_value(&diff) }, - PollFilter::Logs(ref mut block_number, ref filter) => { - let mut filter = filter.clone(); + PollFilter::Logs(ref mut block_number, ref mut previous_logs, ref filter) => { + // retrive the current block number + let current_number = client.chain_info().best_block_number; + + // check if we need to check pending hashes + let include_pending = filter.to_block == Some(BlockNumber::Pending); + + // build appropriate filter + let mut filter: EthcoreFilter = filter.clone().into(); filter.from_block = BlockID::Number(*block_number); filter.to_block = BlockID::Latest; - let logs = client.logs(filter) + + // retrieve logs in range from_block..min(BlockID::Latest..to_block) + let mut logs = client.logs(filter.clone()) .into_iter() .map(From::from) .collect::>(); - let current_number = client.chain_info().best_block_number; + // additionally retrieve pending logs + if include_pending { + let pending_logs = pending_logs(take_weak!(self.miner).deref(), &filter); + // remove logs about which client was already notified about + let new_pending_logs: Vec<_> = pending_logs.iter() + .filter(|p| !previous_logs.contains(p)) + .cloned() + .collect(); + + // save all logs retrieved by client + *previous_logs = pending_logs.into_iter().collect(); + + // append logs array with new pending logs + logs.extend(new_pending_logs); + } + + // save current block number as next from block number *block_number = current_number; + to_value(&logs) } } @@ -680,11 +735,18 @@ impl EthFilter for EthFilterClient where .and_then(|(index,)| { let mut polls = self.polls.lock().unwrap(); match polls.poll(&index.value()) { - Some(&PollFilter::Logs(ref _block_number, ref filter)) => { - let logs = take_weak!(self.client).logs(filter.clone()) + Some(&PollFilter::Logs(ref _block_number, ref _previous_log, ref filter)) => { + let include_pending = filter.to_block == Some(BlockNumber::Pending); + let filter: EthcoreFilter = filter.clone().into(); + let mut logs = take_weak!(self.client).logs(filter.clone()) .into_iter() .map(From::from) .collect::>(); + + if include_pending { + logs.extend(pending_logs(take_weak!(self.miner).deref(), &filter)); + } + to_value(&logs) }, // just empty array diff --git a/rpc/src/v1/tests/eth.rs b/rpc/src/v1/tests/eth.rs index cb2911b21..d52fc9f4c 100644 --- a/rpc/src/v1/tests/eth.rs +++ b/rpc/src/v1/tests/eth.rs @@ -645,7 +645,7 @@ fn rpc_eth_transaction_receipt() { "params": ["0xb903239f8543d04b5dc1ba6579132b143087c68db1b2168786408fcbce568238"], "id": 1 }"#; - let response = r#"{"jsonrpc":"2.0","result":{"blockHash":"0xed76641c68a1c641aee09a94b3b471f4dc0316efe5ac19cf488e2674cf8d05b5","blockNumber":"0x04510c","contractAddress":null,"cumulativeGasUsed":"0x20","gasUsed":"0x10","logs":[{"address":"0x33990122638b9132ca29c723bdf037f1a891a70c","blockHash":"0xed76641c68a1c641aee09a94b3b471f4dc0316efe5ac19cf488e2674cf8d05b5","blockNumber":"0x04510c","data":"0x","logIndex":"0x01","topics":["0xa6697e974e6a320f454390be03f74955e8978f1a6971ea6730542e37b66179bc","0x4861736852656700000000000000000000000000000000000000000000000000"],"transactionHash":"0x0000000000000000000000000000000000000000000000000000000000000000","transactionIndex":"0x00"}],"transactionHash":"0x0000000000000000000000000000000000000000000000000000000000000000","transactionIndex":"0x00"},"id":1}"#; + let response = r#"{"jsonrpc":"2.0","result":{"blockHash":"0xed76641c68a1c641aee09a94b3b471f4dc0316efe5ac19cf488e2674cf8d05b5","blockNumber":"0x04510c","contractAddress":null,"cumulativeGasUsed":"0x20","gasUsed":"0x10","logs":[{"address":"0x33990122638b9132ca29c723bdf037f1a891a70c","blockHash":"0xed76641c68a1c641aee09a94b3b471f4dc0316efe5ac19cf488e2674cf8d05b5","blockNumber":"0x04510c","data":"0x","logIndex":"0x01","topics":["0xa6697e974e6a320f454390be03f74955e8978f1a6971ea6730542e37b66179bc","0x4861736852656700000000000000000000000000000000000000000000000000"],"transactionHash":"0x0000000000000000000000000000000000000000000000000000000000000000","transactionIndex":"0x00","type":"mined"}],"transactionHash":"0x0000000000000000000000000000000000000000000000000000000000000000","transactionIndex":"0x00"},"id":1}"#; assert_eq!(tester.io.handle_request(request), Some(response.to_owned())); } diff --git a/rpc/src/v1/tests/helpers/miner_service.rs b/rpc/src/v1/tests/helpers/miner_service.rs index 1fbe15ca4..5d57d5d61 100644 --- a/rpc/src/v1/tests/helpers/miner_service.rs +++ b/rpc/src/v1/tests/helpers/miner_service.rs @@ -22,6 +22,7 @@ use ethcore::error::{Error, ExecutionError}; use ethcore::client::{BlockChainClient, Executed}; use ethcore::block::{ClosedBlock, IsBlock}; use ethcore::transaction::SignedTransaction; +use ethcore::receipt::Receipt; use ethminer::{MinerService, MinerStatus, AccountDetails, TransactionImportResult}; /// Test miner service. @@ -32,6 +33,8 @@ pub struct TestMinerService { pub latest_closed_block: Mutex>, /// Pre-existed pending transactions pub pending_transactions: Mutex>, + /// Pre-existed pending receipts + pub pending_receipts: Mutex>, /// Last nonces. pub last_nonces: RwLock>, @@ -48,6 +51,7 @@ impl Default for TestMinerService { imported_transactions: Mutex::new(Vec::new()), latest_closed_block: Mutex::new(None), pending_transactions: Mutex::new(HashMap::new()), + pending_receipts: Mutex::new(BTreeMap::new()), last_nonces: RwLock::new(HashMap::new()), min_gas_price: RwLock::new(U256::from(20_000_000)), gas_floor_target: RwLock::new(U256::from(12345)), @@ -161,10 +165,18 @@ impl MinerService for TestMinerService { self.pending_transactions.lock().unwrap().get(hash).cloned() } + fn all_transactions(&self) -> Vec { + self.pending_transactions.lock().unwrap().values().cloned().collect() + } + fn pending_transactions(&self) -> Vec { self.pending_transactions.lock().unwrap().values().cloned().collect() } + fn pending_receipts(&self) -> BTreeMap { + self.pending_receipts.lock().unwrap().clone() + } + fn last_nonce(&self, address: &Address) -> Option { self.last_nonces.read().unwrap().get(address).cloned() } diff --git a/rpc/src/v1/types/block_number.rs b/rpc/src/v1/types/block_number.rs index 6b622b26d..071486afd 100644 --- a/rpc/src/v1/types/block_number.rs +++ b/rpc/src/v1/types/block_number.rs @@ -19,7 +19,7 @@ use serde::de::Visitor; use ethcore::client::BlockID; /// Represents rpc api block number param. -#[derive(Debug, PartialEq)] +#[derive(Debug, PartialEq, Clone)] pub enum BlockNumber { Num(u64), Latest, diff --git a/rpc/src/v1/types/bytes.rs b/rpc/src/v1/types/bytes.rs index efae15ef6..4febacec9 100644 --- a/rpc/src/v1/types/bytes.rs +++ b/rpc/src/v1/types/bytes.rs @@ -20,7 +20,7 @@ use serde::de::Visitor; use util::common::FromHex; /// Wrapper structure around vector of bytes. -#[derive(Debug, PartialEq, Default)] +#[derive(Debug, PartialEq, Eq, Default, Hash, Clone)] pub struct Bytes(pub Vec); impl Bytes { diff --git a/rpc/src/v1/types/filter.rs b/rpc/src/v1/types/filter.rs index 90967219c..e50ec9c32 100644 --- a/rpc/src/v1/types/filter.rs +++ b/rpc/src/v1/types/filter.rs @@ -22,7 +22,7 @@ use v1::types::BlockNumber; use ethcore::filter::Filter as EthFilter; use ethcore::client::BlockID; -#[derive(Debug, PartialEq)] +#[derive(Debug, PartialEq, Clone)] pub enum VariadicValue where T: Deserialize { Single(T), Multiple(Vec), @@ -47,7 +47,7 @@ impl Deserialize for VariadicValue where T: Deserialize { pub type FilterAddress = VariadicValue
; pub type Topic = VariadicValue; -#[derive(Debug, PartialEq, Deserialize)] +#[derive(Debug, PartialEq, Clone, Deserialize)] #[serde(deny_unknown_fields)] pub struct Filter { #[serde(rename="fromBlock")] diff --git a/rpc/src/v1/types/log.rs b/rpc/src/v1/types/log.rs index c8dfb1aec..426ca68f1 100644 --- a/rpc/src/v1/types/log.rs +++ b/rpc/src/v1/types/log.rs @@ -15,24 +15,26 @@ // along with Parity. If not, see . use util::numbers::*; -use ethcore::log_entry::LocalizedLogEntry; +use ethcore::log_entry::{LocalizedLogEntry, LogEntry}; use v1::types::Bytes; -#[derive(Debug, Serialize)] +#[derive(Debug, Serialize, PartialEq, Eq, Hash, Clone)] pub struct Log { pub address: Address, pub topics: Vec, pub data: Bytes, #[serde(rename="blockHash")] - pub block_hash: H256, + pub block_hash: Option, #[serde(rename="blockNumber")] - pub block_number: U256, + pub block_number: Option, #[serde(rename="transactionHash")] - pub transaction_hash: H256, + pub transaction_hash: Option, #[serde(rename="transactionIndex")] - pub transaction_index: U256, + pub transaction_index: Option, #[serde(rename="logIndex")] - pub log_index: U256, + pub log_index: Option, + #[serde(rename="type")] + pub log_type: String, } impl From for Log { @@ -41,11 +43,28 @@ impl From for Log { address: e.entry.address, topics: e.entry.topics, data: Bytes::new(e.entry.data), - block_hash: e.block_hash, - block_number: From::from(e.block_number), - transaction_hash: e.transaction_hash, - transaction_index: From::from(e.transaction_index), - log_index: From::from(e.log_index) + block_hash: Some(e.block_hash), + block_number: Some(From::from(e.block_number)), + transaction_hash: Some(e.transaction_hash), + transaction_index: Some(From::from(e.transaction_index)), + log_index: Some(From::from(e.log_index)), + log_type: "mined".to_owned(), + } + } +} + +impl From for Log { + fn from(e: LogEntry) -> Log { + Log { + address: e.address, + topics: e.topics, + data: Bytes::new(e.data), + block_hash: None, + block_number: None, + transaction_hash: None, + transaction_index: None, + log_index: None, + log_type: "pending".to_owned(), } } } @@ -59,7 +78,7 @@ mod tests { #[test] fn log_serialization() { - let s = r#"{"address":"0x33990122638b9132ca29c723bdf037f1a891a70c","topics":["0xa6697e974e6a320f454390be03f74955e8978f1a6971ea6730542e37b66179bc","0x4861736852656700000000000000000000000000000000000000000000000000"],"data":"0x","blockHash":"0xed76641c68a1c641aee09a94b3b471f4dc0316efe5ac19cf488e2674cf8d05b5","blockNumber":"0x04510c","transactionHash":"0x0000000000000000000000000000000000000000000000000000000000000000","transactionIndex":"0x00","logIndex":"0x01"}"#; + let s = r#"{"address":"0x33990122638b9132ca29c723bdf037f1a891a70c","topics":["0xa6697e974e6a320f454390be03f74955e8978f1a6971ea6730542e37b66179bc","0x4861736852656700000000000000000000000000000000000000000000000000"],"data":"0x","blockHash":"0xed76641c68a1c641aee09a94b3b471f4dc0316efe5ac19cf488e2674cf8d05b5","blockNumber":"0x04510c","transactionHash":"0x0000000000000000000000000000000000000000000000000000000000000000","transactionIndex":"0x00","logIndex":"0x01","type":"mined"}"#; let log = Log { address: Address::from_str("33990122638b9132ca29c723bdf037f1a891a70c").unwrap(), @@ -68,11 +87,12 @@ mod tests { H256::from_str("4861736852656700000000000000000000000000000000000000000000000000").unwrap() ], data: Bytes::new(vec![]), - block_hash: H256::from_str("ed76641c68a1c641aee09a94b3b471f4dc0316efe5ac19cf488e2674cf8d05b5").unwrap(), - block_number: U256::from(0x4510c), - transaction_hash: H256::new(), - transaction_index: U256::zero(), - log_index: U256::one() + block_hash: Some(H256::from_str("ed76641c68a1c641aee09a94b3b471f4dc0316efe5ac19cf488e2674cf8d05b5").unwrap()), + block_number: Some(U256::from(0x4510c)), + transaction_hash: Some(H256::new()), + transaction_index: Some(U256::zero()), + log_index: Some(U256::one()), + log_type: "mined".to_owned(), }; let serialized = serde_json::to_string(&log).unwrap(); diff --git a/rpc/src/v1/types/receipt.rs b/rpc/src/v1/types/receipt.rs index 4bcfa3eb5..51d914e1a 100644 --- a/rpc/src/v1/types/receipt.rs +++ b/rpc/src/v1/types/receipt.rs @@ -62,7 +62,7 @@ mod tests { #[test] fn receipt_serialization() { - let s = r#"{"transactionHash":"0x0000000000000000000000000000000000000000000000000000000000000000","transactionIndex":"0x00","blockHash":"0xed76641c68a1c641aee09a94b3b471f4dc0316efe5ac19cf488e2674cf8d05b5","blockNumber":"0x04510c","cumulativeGasUsed":"0x20","gasUsed":"0x10","contractAddress":null,"logs":[{"address":"0x33990122638b9132ca29c723bdf037f1a891a70c","topics":["0xa6697e974e6a320f454390be03f74955e8978f1a6971ea6730542e37b66179bc","0x4861736852656700000000000000000000000000000000000000000000000000"],"data":"0x","blockHash":"0xed76641c68a1c641aee09a94b3b471f4dc0316efe5ac19cf488e2674cf8d05b5","blockNumber":"0x04510c","transactionHash":"0x0000000000000000000000000000000000000000000000000000000000000000","transactionIndex":"0x00","logIndex":"0x01"}]}"#; + let s = r#"{"transactionHash":"0x0000000000000000000000000000000000000000000000000000000000000000","transactionIndex":"0x00","blockHash":"0xed76641c68a1c641aee09a94b3b471f4dc0316efe5ac19cf488e2674cf8d05b5","blockNumber":"0x04510c","cumulativeGasUsed":"0x20","gasUsed":"0x10","contractAddress":null,"logs":[{"address":"0x33990122638b9132ca29c723bdf037f1a891a70c","topics":["0xa6697e974e6a320f454390be03f74955e8978f1a6971ea6730542e37b66179bc","0x4861736852656700000000000000000000000000000000000000000000000000"],"data":"0x","blockHash":"0xed76641c68a1c641aee09a94b3b471f4dc0316efe5ac19cf488e2674cf8d05b5","blockNumber":"0x04510c","transactionHash":"0x0000000000000000000000000000000000000000000000000000000000000000","transactionIndex":"0x00","logIndex":"0x01","type":"mined"}]}"#; let receipt = Receipt { transaction_hash: H256::zero(), @@ -79,11 +79,12 @@ mod tests { H256::from_str("4861736852656700000000000000000000000000000000000000000000000000").unwrap() ], data: Bytes::new(vec![]), - block_hash: H256::from_str("ed76641c68a1c641aee09a94b3b471f4dc0316efe5ac19cf488e2674cf8d05b5").unwrap(), - block_number: U256::from(0x4510c), - transaction_hash: H256::new(), - transaction_index: U256::zero(), - log_index: U256::one() + block_hash: Some(H256::from_str("ed76641c68a1c641aee09a94b3b471f4dc0316efe5ac19cf488e2674cf8d05b5").unwrap()), + block_number: Some(U256::from(0x4510c)), + transaction_hash: Some(H256::new()), + transaction_index: Some(U256::zero()), + log_index: Some(U256::one()), + log_type: "mined".to_owned(), }] }; diff --git a/sync/src/chain.rs b/sync/src/chain.rs index 68700d5d2..426c4d4a5 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -1225,7 +1225,7 @@ impl ChainSync { return 0; } - let mut transactions = self.miner.pending_transactions(); + let mut transactions = self.miner.all_transactions(); if transactions.is_empty() { return 0; }