Fixpending (#1074)

* Fix --geth IPC for MacOS.

* fix pending_* methods in MinerService, add pending_receipts

* pending logs

* include pending logs when polling

* fixed returning pending logs multiple timees

* log type

* transactionHash is supplied to pending logs

* miner returns receipts together with hashes

* bring back miners all_transactions used by sync module
This commit is contained in:
Marek Kotewicz 2016-05-24 21:56:32 +02:00 committed by Gav Wood
parent fba5082b00
commit ebd0cdbc7a
12 changed files with 191 additions and 46 deletions

View File

@ -60,9 +60,11 @@ pub use transaction_queue::{TransactionQueue, AccountDetails, TransactionImportR
pub use miner::{Miner};
pub use external::{ExternalMiner, ExternalMinerService};
use std::collections::BTreeMap;
use util::{H256, U256, Address, Bytes};
use ethcore::client::{BlockChainClient, Executed};
use ethcore::block::{ClosedBlock};
use ethcore::receipt::{Receipt};
use ethcore::error::{Error, ExecutionError};
use ethcore::transaction::SignedTransaction;
@ -134,9 +136,15 @@ pub trait MinerService : Send + Sync {
/// Query pending transactions for hash.
fn transaction(&self, hash: &H256) -> Option<SignedTransaction>;
/// Get a list of all transactions.
fn all_transactions(&self) -> Vec<SignedTransaction>;
/// Get a list of all pending transactions.
fn pending_transactions(&self) -> Vec<SignedTransaction>;
/// Get a list of all pending receipts.
fn pending_receipts(&self) -> BTreeMap<H256, Receipt>;
/// Returns highest transaction nonce for given address.
fn last_nonce(&self, address: &Address) -> Option<U256>;

View File

@ -25,6 +25,7 @@ use ethcore::block::{ClosedBlock, IsBlock};
use ethcore::error::*;
use ethcore::client::{Executive, Executed, EnvInfo, TransactOptions};
use ethcore::transaction::SignedTransaction;
use ethcore::receipt::{Receipt};
use ethcore::spec::Spec;
use ethcore::engine::Engine;
use super::{MinerService, MinerStatus, TransactionQueue, AccountDetails, TransactionImportResult, TransactionOrigin};
@ -407,20 +408,56 @@ impl MinerService for Miner {
}
fn pending_transactions_hashes(&self) -> Vec<H256> {
let transaction_queue = self.transaction_queue.lock().unwrap();
transaction_queue.pending_hashes()
match (self.sealing_enabled.load(atomic::Ordering::Relaxed), self.sealing_work.lock().unwrap().peek_last_ref()) {
(true, Some(pending)) => pending.transactions().iter().map(|t| t.hash()).collect(),
_ => {
let queue = self.transaction_queue.lock().unwrap();
queue.pending_hashes()
}
}
}
fn transaction(&self, hash: &H256) -> Option<SignedTransaction> {
match (self.sealing_enabled.load(atomic::Ordering::Relaxed), self.sealing_work.lock().unwrap().peek_last_ref()) {
(true, Some(pending)) => pending.transactions().iter().find(|t| &t.hash() == hash).map(|t| t.clone()),
_ => {
let queue = self.transaction_queue.lock().unwrap();
queue.find(hash)
}
}
}
fn pending_transactions(&self) -> Vec<SignedTransaction> {
fn all_transactions(&self) -> Vec<SignedTransaction> {
let queue = self.transaction_queue.lock().unwrap();
queue.top_transactions()
}
fn pending_transactions(&self) -> Vec<SignedTransaction> {
// TODO: should only use the sealing_work when it's current (it could be an old block)
match (self.sealing_enabled.load(atomic::Ordering::Relaxed), self.sealing_work.lock().unwrap().peek_last_ref()) {
(true, Some(pending)) => pending.transactions().clone(),
_ => {
let queue = self.transaction_queue.lock().unwrap();
queue.top_transactions()
}
}
}
fn pending_receipts(&self) -> BTreeMap<H256, Receipt> {
match (self.sealing_enabled.load(atomic::Ordering::Relaxed), self.sealing_work.lock().unwrap().peek_last_ref()) {
(true, Some(pending)) => {
let hashes = pending.transactions()
.iter()
.map(|t| t.hash());
let receipts = pending.receipts().clone().into_iter();
hashes.zip(receipts).collect()
},
_ => BTreeMap::new()
}
}
fn last_nonce(&self, address: &Address) -> Option<U256> {
self.transaction_queue.lock().unwrap().last_nonce(address)
}

View File

@ -1,13 +1,18 @@
//! Helper type with all filter possibilities.
//! Helper type with all filter state data.
use std::collections::HashSet;
use util::hash::H256;
use ethcore::filter::Filter;
use v1::types::{Filter, Log};
pub type BlockNumber = u64;
/// Filter state.
#[derive(Clone)]
pub enum PollFilter {
/// Number of last block which client was notified about.
Block(BlockNumber),
/// Hashes of all transactions which client was notified about.
PendingTransaction(Vec<H256>),
Logs(BlockNumber, Filter)
/// Number of From block number, pending logs and log filter iself.
Logs(BlockNumber, HashSet<Log>, Filter)
}

View File

@ -33,6 +33,8 @@ use ethcore::block::IsBlock;
use ethcore::views::*;
use ethcore::ethereum::Ethash;
use ethcore::transaction::{Transaction as EthTransaction, SignedTransaction, Action};
use ethcore::log_entry::LogEntry;
use ethcore::filter::Filter as EthcoreFilter;
use self::ethash::SeedHashCompute;
use v1::traits::{Eth, EthFilter};
use v1::types::{Block, BlockTransactions, BlockNumber, Bytes, SyncStatus, SyncInfo, Transaction, TransactionRequest, CallRequest, OptionalValue, Index, Filter, Log, Receipt};
@ -236,6 +238,25 @@ fn from_params_default_third<F1, F2>(params: Params) -> Result<(F1, F2, BlockNum
}
}
fn pending_logs<M>(miner: &M, filter: &EthcoreFilter) -> Vec<Log> where M: MinerService {
let receipts = miner.pending_receipts();
let pending_logs = receipts.into_iter()
.flat_map(|(hash, r)| r.logs.into_iter().map(|l| (hash.clone(), l)).collect::<Vec<(H256, LogEntry)>>())
.collect::<Vec<(H256, LogEntry)>>();
let result = pending_logs.into_iter()
.filter(|pair| filter.matches(&pair.1))
.map(|pair| {
let mut log = Log::from(pair.1);
log.transaction_hash = Some(pair.0);
log
})
.collect();
result
}
impl<C, S, A, M, EM> Eth for EthClient<C, S, A, M, EM> where
C: BlockChainClient + 'static,
S: SyncProvider + 'static,
@ -447,10 +468,18 @@ impl<C, S, A, M, EM> Eth for EthClient<C, S, A, M, EM> where
fn logs(&self, params: Params) -> Result<Value, Error> {
from_params::<(Filter,)>(params)
.and_then(|(filter,)| {
let logs = take_weak!(self.client).logs(filter.into())
let include_pending = filter.to_block == Some(BlockNumber::Pending);
let filter: EthcoreFilter = filter.into();
let mut logs = take_weak!(self.client).logs(filter.clone())
.into_iter()
.map(From::from)
.collect::<Vec<Log>>();
if include_pending {
let pending = pending_logs(take_weak!(self.miner).deref(), &filter);
logs.extend(pending);
}
to_value(&logs)
})
}
@ -593,7 +622,7 @@ impl<C, M> EthFilter for EthFilterClient<C, M> where
.and_then(|(filter,)| {
let mut polls = self.polls.lock().unwrap();
let block_number = take_weak!(self.client).chain_info().best_block_number;
let id = polls.create_poll(PollFilter::Logs(block_number, filter.into()));
let id = polls.create_poll(PollFilter::Logs(block_number, Default::default(), filter));
to_value(&U256::from(id))
})
}
@ -656,18 +685,44 @@ impl<C, M> EthFilter for EthFilterClient<C, M> where
to_value(&diff)
},
PollFilter::Logs(ref mut block_number, ref filter) => {
let mut filter = filter.clone();
PollFilter::Logs(ref mut block_number, ref mut previous_logs, ref filter) => {
// retrive the current block number
let current_number = client.chain_info().best_block_number;
// check if we need to check pending hashes
let include_pending = filter.to_block == Some(BlockNumber::Pending);
// build appropriate filter
let mut filter: EthcoreFilter = filter.clone().into();
filter.from_block = BlockID::Number(*block_number);
filter.to_block = BlockID::Latest;
let logs = client.logs(filter)
// retrieve logs in range from_block..min(BlockID::Latest..to_block)
let mut logs = client.logs(filter.clone())
.into_iter()
.map(From::from)
.collect::<Vec<Log>>();
let current_number = client.chain_info().best_block_number;
// additionally retrieve pending logs
if include_pending {
let pending_logs = pending_logs(take_weak!(self.miner).deref(), &filter);
// remove logs about which client was already notified about
let new_pending_logs: Vec<_> = pending_logs.iter()
.filter(|p| !previous_logs.contains(p))
.cloned()
.collect();
// save all logs retrieved by client
*previous_logs = pending_logs.into_iter().collect();
// append logs array with new pending logs
logs.extend(new_pending_logs);
}
// save current block number as next from block number
*block_number = current_number;
to_value(&logs)
}
}
@ -680,11 +735,18 @@ impl<C, M> EthFilter for EthFilterClient<C, M> where
.and_then(|(index,)| {
let mut polls = self.polls.lock().unwrap();
match polls.poll(&index.value()) {
Some(&PollFilter::Logs(ref _block_number, ref filter)) => {
let logs = take_weak!(self.client).logs(filter.clone())
Some(&PollFilter::Logs(ref _block_number, ref _previous_log, ref filter)) => {
let include_pending = filter.to_block == Some(BlockNumber::Pending);
let filter: EthcoreFilter = filter.clone().into();
let mut logs = take_weak!(self.client).logs(filter.clone())
.into_iter()
.map(From::from)
.collect::<Vec<Log>>();
if include_pending {
logs.extend(pending_logs(take_weak!(self.miner).deref(), &filter));
}
to_value(&logs)
},
// just empty array

View File

@ -645,7 +645,7 @@ fn rpc_eth_transaction_receipt() {
"params": ["0xb903239f8543d04b5dc1ba6579132b143087c68db1b2168786408fcbce568238"],
"id": 1
}"#;
let response = r#"{"jsonrpc":"2.0","result":{"blockHash":"0xed76641c68a1c641aee09a94b3b471f4dc0316efe5ac19cf488e2674cf8d05b5","blockNumber":"0x04510c","contractAddress":null,"cumulativeGasUsed":"0x20","gasUsed":"0x10","logs":[{"address":"0x33990122638b9132ca29c723bdf037f1a891a70c","blockHash":"0xed76641c68a1c641aee09a94b3b471f4dc0316efe5ac19cf488e2674cf8d05b5","blockNumber":"0x04510c","data":"0x","logIndex":"0x01","topics":["0xa6697e974e6a320f454390be03f74955e8978f1a6971ea6730542e37b66179bc","0x4861736852656700000000000000000000000000000000000000000000000000"],"transactionHash":"0x0000000000000000000000000000000000000000000000000000000000000000","transactionIndex":"0x00"}],"transactionHash":"0x0000000000000000000000000000000000000000000000000000000000000000","transactionIndex":"0x00"},"id":1}"#;
let response = r#"{"jsonrpc":"2.0","result":{"blockHash":"0xed76641c68a1c641aee09a94b3b471f4dc0316efe5ac19cf488e2674cf8d05b5","blockNumber":"0x04510c","contractAddress":null,"cumulativeGasUsed":"0x20","gasUsed":"0x10","logs":[{"address":"0x33990122638b9132ca29c723bdf037f1a891a70c","blockHash":"0xed76641c68a1c641aee09a94b3b471f4dc0316efe5ac19cf488e2674cf8d05b5","blockNumber":"0x04510c","data":"0x","logIndex":"0x01","topics":["0xa6697e974e6a320f454390be03f74955e8978f1a6971ea6730542e37b66179bc","0x4861736852656700000000000000000000000000000000000000000000000000"],"transactionHash":"0x0000000000000000000000000000000000000000000000000000000000000000","transactionIndex":"0x00","type":"mined"}],"transactionHash":"0x0000000000000000000000000000000000000000000000000000000000000000","transactionIndex":"0x00"},"id":1}"#;
assert_eq!(tester.io.handle_request(request), Some(response.to_owned()));
}

View File

@ -22,6 +22,7 @@ use ethcore::error::{Error, ExecutionError};
use ethcore::client::{BlockChainClient, Executed};
use ethcore::block::{ClosedBlock, IsBlock};
use ethcore::transaction::SignedTransaction;
use ethcore::receipt::Receipt;
use ethminer::{MinerService, MinerStatus, AccountDetails, TransactionImportResult};
/// Test miner service.
@ -32,6 +33,8 @@ pub struct TestMinerService {
pub latest_closed_block: Mutex<Option<ClosedBlock>>,
/// Pre-existed pending transactions
pub pending_transactions: Mutex<HashMap<H256, SignedTransaction>>,
/// Pre-existed pending receipts
pub pending_receipts: Mutex<BTreeMap<H256, Receipt>>,
/// Last nonces.
pub last_nonces: RwLock<HashMap<Address, U256>>,
@ -48,6 +51,7 @@ impl Default for TestMinerService {
imported_transactions: Mutex::new(Vec::new()),
latest_closed_block: Mutex::new(None),
pending_transactions: Mutex::new(HashMap::new()),
pending_receipts: Mutex::new(BTreeMap::new()),
last_nonces: RwLock::new(HashMap::new()),
min_gas_price: RwLock::new(U256::from(20_000_000)),
gas_floor_target: RwLock::new(U256::from(12345)),
@ -161,10 +165,18 @@ impl MinerService for TestMinerService {
self.pending_transactions.lock().unwrap().get(hash).cloned()
}
fn all_transactions(&self) -> Vec<SignedTransaction> {
self.pending_transactions.lock().unwrap().values().cloned().collect()
}
fn pending_transactions(&self) -> Vec<SignedTransaction> {
self.pending_transactions.lock().unwrap().values().cloned().collect()
}
fn pending_receipts(&self) -> BTreeMap<H256, Receipt> {
self.pending_receipts.lock().unwrap().clone()
}
fn last_nonce(&self, address: &Address) -> Option<U256> {
self.last_nonces.read().unwrap().get(address).cloned()
}

View File

@ -19,7 +19,7 @@ use serde::de::Visitor;
use ethcore::client::BlockID;
/// Represents rpc api block number param.
#[derive(Debug, PartialEq)]
#[derive(Debug, PartialEq, Clone)]
pub enum BlockNumber {
Num(u64),
Latest,

View File

@ -20,7 +20,7 @@ use serde::de::Visitor;
use util::common::FromHex;
/// Wrapper structure around vector of bytes.
#[derive(Debug, PartialEq, Default)]
#[derive(Debug, PartialEq, Eq, Default, Hash, Clone)]
pub struct Bytes(pub Vec<u8>);
impl Bytes {

View File

@ -22,7 +22,7 @@ use v1::types::BlockNumber;
use ethcore::filter::Filter as EthFilter;
use ethcore::client::BlockID;
#[derive(Debug, PartialEq)]
#[derive(Debug, PartialEq, Clone)]
pub enum VariadicValue<T> where T: Deserialize {
Single(T),
Multiple(Vec<T>),
@ -47,7 +47,7 @@ impl<T> Deserialize for VariadicValue<T> where T: Deserialize {
pub type FilterAddress = VariadicValue<Address>;
pub type Topic = VariadicValue<H256>;
#[derive(Debug, PartialEq, Deserialize)]
#[derive(Debug, PartialEq, Clone, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct Filter {
#[serde(rename="fromBlock")]

View File

@ -15,24 +15,26 @@
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use util::numbers::*;
use ethcore::log_entry::LocalizedLogEntry;
use ethcore::log_entry::{LocalizedLogEntry, LogEntry};
use v1::types::Bytes;
#[derive(Debug, Serialize)]
#[derive(Debug, Serialize, PartialEq, Eq, Hash, Clone)]
pub struct Log {
pub address: Address,
pub topics: Vec<H256>,
pub data: Bytes,
#[serde(rename="blockHash")]
pub block_hash: H256,
pub block_hash: Option<H256>,
#[serde(rename="blockNumber")]
pub block_number: U256,
pub block_number: Option<U256>,
#[serde(rename="transactionHash")]
pub transaction_hash: H256,
pub transaction_hash: Option<H256>,
#[serde(rename="transactionIndex")]
pub transaction_index: U256,
pub transaction_index: Option<U256>,
#[serde(rename="logIndex")]
pub log_index: U256,
pub log_index: Option<U256>,
#[serde(rename="type")]
pub log_type: String,
}
impl From<LocalizedLogEntry> for Log {
@ -41,11 +43,28 @@ impl From<LocalizedLogEntry> for Log {
address: e.entry.address,
topics: e.entry.topics,
data: Bytes::new(e.entry.data),
block_hash: e.block_hash,
block_number: From::from(e.block_number),
transaction_hash: e.transaction_hash,
transaction_index: From::from(e.transaction_index),
log_index: From::from(e.log_index)
block_hash: Some(e.block_hash),
block_number: Some(From::from(e.block_number)),
transaction_hash: Some(e.transaction_hash),
transaction_index: Some(From::from(e.transaction_index)),
log_index: Some(From::from(e.log_index)),
log_type: "mined".to_owned(),
}
}
}
impl From<LogEntry> for Log {
fn from(e: LogEntry) -> Log {
Log {
address: e.address,
topics: e.topics,
data: Bytes::new(e.data),
block_hash: None,
block_number: None,
transaction_hash: None,
transaction_index: None,
log_index: None,
log_type: "pending".to_owned(),
}
}
}
@ -59,7 +78,7 @@ mod tests {
#[test]
fn log_serialization() {
let s = r#"{"address":"0x33990122638b9132ca29c723bdf037f1a891a70c","topics":["0xa6697e974e6a320f454390be03f74955e8978f1a6971ea6730542e37b66179bc","0x4861736852656700000000000000000000000000000000000000000000000000"],"data":"0x","blockHash":"0xed76641c68a1c641aee09a94b3b471f4dc0316efe5ac19cf488e2674cf8d05b5","blockNumber":"0x04510c","transactionHash":"0x0000000000000000000000000000000000000000000000000000000000000000","transactionIndex":"0x00","logIndex":"0x01"}"#;
let s = r#"{"address":"0x33990122638b9132ca29c723bdf037f1a891a70c","topics":["0xa6697e974e6a320f454390be03f74955e8978f1a6971ea6730542e37b66179bc","0x4861736852656700000000000000000000000000000000000000000000000000"],"data":"0x","blockHash":"0xed76641c68a1c641aee09a94b3b471f4dc0316efe5ac19cf488e2674cf8d05b5","blockNumber":"0x04510c","transactionHash":"0x0000000000000000000000000000000000000000000000000000000000000000","transactionIndex":"0x00","logIndex":"0x01","type":"mined"}"#;
let log = Log {
address: Address::from_str("33990122638b9132ca29c723bdf037f1a891a70c").unwrap(),
@ -68,11 +87,12 @@ mod tests {
H256::from_str("4861736852656700000000000000000000000000000000000000000000000000").unwrap()
],
data: Bytes::new(vec![]),
block_hash: H256::from_str("ed76641c68a1c641aee09a94b3b471f4dc0316efe5ac19cf488e2674cf8d05b5").unwrap(),
block_number: U256::from(0x4510c),
transaction_hash: H256::new(),
transaction_index: U256::zero(),
log_index: U256::one()
block_hash: Some(H256::from_str("ed76641c68a1c641aee09a94b3b471f4dc0316efe5ac19cf488e2674cf8d05b5").unwrap()),
block_number: Some(U256::from(0x4510c)),
transaction_hash: Some(H256::new()),
transaction_index: Some(U256::zero()),
log_index: Some(U256::one()),
log_type: "mined".to_owned(),
};
let serialized = serde_json::to_string(&log).unwrap();

View File

@ -62,7 +62,7 @@ mod tests {
#[test]
fn receipt_serialization() {
let s = r#"{"transactionHash":"0x0000000000000000000000000000000000000000000000000000000000000000","transactionIndex":"0x00","blockHash":"0xed76641c68a1c641aee09a94b3b471f4dc0316efe5ac19cf488e2674cf8d05b5","blockNumber":"0x04510c","cumulativeGasUsed":"0x20","gasUsed":"0x10","contractAddress":null,"logs":[{"address":"0x33990122638b9132ca29c723bdf037f1a891a70c","topics":["0xa6697e974e6a320f454390be03f74955e8978f1a6971ea6730542e37b66179bc","0x4861736852656700000000000000000000000000000000000000000000000000"],"data":"0x","blockHash":"0xed76641c68a1c641aee09a94b3b471f4dc0316efe5ac19cf488e2674cf8d05b5","blockNumber":"0x04510c","transactionHash":"0x0000000000000000000000000000000000000000000000000000000000000000","transactionIndex":"0x00","logIndex":"0x01"}]}"#;
let s = r#"{"transactionHash":"0x0000000000000000000000000000000000000000000000000000000000000000","transactionIndex":"0x00","blockHash":"0xed76641c68a1c641aee09a94b3b471f4dc0316efe5ac19cf488e2674cf8d05b5","blockNumber":"0x04510c","cumulativeGasUsed":"0x20","gasUsed":"0x10","contractAddress":null,"logs":[{"address":"0x33990122638b9132ca29c723bdf037f1a891a70c","topics":["0xa6697e974e6a320f454390be03f74955e8978f1a6971ea6730542e37b66179bc","0x4861736852656700000000000000000000000000000000000000000000000000"],"data":"0x","blockHash":"0xed76641c68a1c641aee09a94b3b471f4dc0316efe5ac19cf488e2674cf8d05b5","blockNumber":"0x04510c","transactionHash":"0x0000000000000000000000000000000000000000000000000000000000000000","transactionIndex":"0x00","logIndex":"0x01","type":"mined"}]}"#;
let receipt = Receipt {
transaction_hash: H256::zero(),
@ -79,11 +79,12 @@ mod tests {
H256::from_str("4861736852656700000000000000000000000000000000000000000000000000").unwrap()
],
data: Bytes::new(vec![]),
block_hash: H256::from_str("ed76641c68a1c641aee09a94b3b471f4dc0316efe5ac19cf488e2674cf8d05b5").unwrap(),
block_number: U256::from(0x4510c),
transaction_hash: H256::new(),
transaction_index: U256::zero(),
log_index: U256::one()
block_hash: Some(H256::from_str("ed76641c68a1c641aee09a94b3b471f4dc0316efe5ac19cf488e2674cf8d05b5").unwrap()),
block_number: Some(U256::from(0x4510c)),
transaction_hash: Some(H256::new()),
transaction_index: Some(U256::zero()),
log_index: Some(U256::one()),
log_type: "mined".to_owned(),
}]
};

View File

@ -1225,7 +1225,7 @@ impl ChainSync {
return 0;
}
let mut transactions = self.miner.pending_transactions();
let mut transactions = self.miner.all_transactions();
if transactions.is_empty() {
return 0;
}