From 3438cda4324106c164eac3686ff1a60efc7a3372 Mon Sep 17 00:00:00 2001 From: arkpar Date: Wed, 6 Apr 2016 23:03:07 +0200 Subject: [PATCH 1/2] Propagate transaction queue --- miner/src/lib.rs | 5 +++- miner/src/miner.rs | 5 ++++ rpc/src/v1/impls/eth.rs | 8 +++---- rpc/src/v1/tests/helpers/miner_service.rs | 4 ++++ rpc/src/v1/tests/helpers/sync_provider.rs | 5 +--- sync/src/chain.rs | 28 +++++++++-------------- sync/src/lib.rs | 10 +------- 7 files changed, 29 insertions(+), 36 deletions(-) diff --git a/miner/src/lib.rs b/miner/src/lib.rs index 9f757fb67..c6e07a953 100644 --- a/miner/src/lib.rs +++ b/miner/src/lib.rs @@ -105,9 +105,12 @@ pub trait MinerService : Send + Sync { /// Get the sealing work package and if `Some`, apply some transform. fn map_sealing_work(&self, chain: &BlockChainClient, f: F) -> Option where F: FnOnce(&ClosedBlock) -> T; - /// Query pending transactions for hash + /// Query pending transactions for hash. fn transaction(&self, hash: &H256) -> Option; + /// Get a list of all pending transactions. + fn pending_transactions(&self) -> Vec; + /// Returns highest transaction nonce for given address. fn last_nonce(&self, address: &Address) -> Option; diff --git a/miner/src/miner.rs b/miner/src/miner.rs index 0e31c504f..3b4d1f32d 100644 --- a/miner/src/miner.rs +++ b/miner/src/miner.rs @@ -228,6 +228,11 @@ impl MinerService for Miner { queue.find(hash) } + fn pending_transactions(&self) -> Vec { + let queue = self.transaction_queue.lock().unwrap(); + queue.top_transactions() + } + fn last_nonce(&self, address: &Address) -> Option { self.transaction_queue.lock().unwrap().last_nonce(address) } diff --git a/rpc/src/v1/impls/eth.rs b/rpc/src/v1/impls/eth.rs index 30dc79662..ad4b037df 100644 --- a/rpc/src/v1/impls/eth.rs +++ b/rpc/src/v1/impls/eth.rs @@ -186,7 +186,7 @@ impl EthClient }.fake_sign(from)) } - fn dispatch_transaction(&self, signed_transaction: SignedTransaction, raw_transaction: Vec) -> Result { + fn dispatch_transaction(&self, signed_transaction: SignedTransaction) -> Result { let hash = signed_transaction.hash(); let import = { @@ -203,7 +203,6 @@ impl EthClient match import.into_iter().collect::, _>>() { Ok(_) => { - take_weak!(self.sync).new_transaction(raw_transaction); to_value(&hash) } Err(e) => { @@ -504,8 +503,7 @@ impl Eth for EthClient data: request.data.map_or_else(Vec::new, |d| d.to_vec()), }.sign(&secret) }; - let raw_transaction = encode(&signed_transaction).to_vec(); - self.dispatch_transaction(signed_transaction, raw_transaction) + self.dispatch_transaction(signed_transaction) }, Err(_) => { to_value(&H256::zero()) } } @@ -517,7 +515,7 @@ impl Eth for EthClient .and_then(|(raw_transaction, )| { let raw_transaction = raw_transaction.to_vec(); match UntrustedRlp::new(&raw_transaction).as_val() { - Ok(signed_transaction) => self.dispatch_transaction(signed_transaction, raw_transaction), + Ok(signed_transaction) => self.dispatch_transaction(signed_transaction), Err(_) => to_value(&H256::zero()), } }) diff --git a/rpc/src/v1/tests/helpers/miner_service.rs b/rpc/src/v1/tests/helpers/miner_service.rs index 80a5e356d..815085d3b 100644 --- a/rpc/src/v1/tests/helpers/miner_service.rs +++ b/rpc/src/v1/tests/helpers/miner_service.rs @@ -98,6 +98,10 @@ impl MinerService for TestMinerService { self.pending_transactions.lock().unwrap().get(hash).cloned() } + fn pending_transactions(&self) -> Vec { + self.pending_transactions.lock().unwrap().values().cloned().collect() + } + fn last_nonce(&self, address: &Address) -> Option { self.last_nonces.read().unwrap().get(address).cloned() } diff --git a/rpc/src/v1/tests/helpers/sync_provider.rs b/rpc/src/v1/tests/helpers/sync_provider.rs index 59188f0a7..633e0d45b 100644 --- a/rpc/src/v1/tests/helpers/sync_provider.rs +++ b/rpc/src/v1/tests/helpers/sync_provider.rs @@ -16,7 +16,7 @@ //! Test implementation of SyncProvider. -use util::{U256, Bytes}; +use util::{U256}; use ethsync::{SyncProvider, SyncStatus, SyncState}; use std::sync::{RwLock}; @@ -59,8 +59,5 @@ impl SyncProvider for TestSyncProvider { fn status(&self) -> SyncStatus { self.status.read().unwrap().clone() } - - fn new_transaction(&self, _raw_transaction: Bytes) { - } } diff --git a/sync/src/chain.rs b/sync/src/chain.rs index 1a7d11f51..74bcb8d38 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -217,10 +217,6 @@ pub struct ChainSync { network_id: U256, /// Miner miner: Arc, - - /// Transactions to propagate - // TODO: reconsider where this is in the codebase - seems a little dodgy to have here. - transactions_to_send: Vec, } type RlpResponseResult = Result, PacketDecodeError>; @@ -247,7 +243,6 @@ impl ChainSync { max_download_ahead_blocks: max(MAX_HEADERS_TO_REQUEST, config.max_download_ahead_blocks), network_id: config.network_id, miner: miner, - transactions_to_send: vec![], } } @@ -950,11 +945,6 @@ impl ChainSync { } } - /// Place a new transaction on the wire. - pub fn new_transaction(&mut self, raw_transaction: Bytes) { - self.transactions_to_send.push(raw_transaction); - } - /// Called when peer sends us new transactions fn on_peer_transactions(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { // accepting transactions once only fully synced @@ -1296,11 +1286,16 @@ impl ChainSync { return 0; } - let mut packet = RlpStream::new_list(self.transactions_to_send.len()); - for tx in &self.transactions_to_send { - packet.append_raw(tx, 1); + let mut transactions = self.miner.pending_transactions(); + if transactions.is_empty() { + return 0; + } + + let mut packet = RlpStream::new_list(transactions.len()); + let tx_count = transactions.len(); + for tx in transactions.drain(..) { + packet.append(&tx); } - self.transactions_to_send.clear(); let rlp = packet.out(); let lucky_peers = { @@ -1319,13 +1314,12 @@ impl ChainSync { for peer_id in lucky_peers { self.send_packet(io, peer_id, TRANSACTIONS_PACKET, rlp.clone()); } + trace!(target: "sync", "Sent {} transactions to {} peers.", tx_count, sent); sent } fn propagate_latest_blocks(&mut self, io: &mut SyncIo) { - if !self.transactions_to_send.is_empty() { - self.propagate_new_transactions(io); - } + self.propagate_new_transactions(io); let chain_info = io.chain().chain_info(); if (((chain_info.best_block_number as i64) - (self.last_sent_block_number as i64)).abs() as BlockNumber) < MAX_PEER_LAG_PROPAGATION { let blocks = self.propagate_blocks(&chain_info, io); diff --git a/sync/src/lib.rs b/sync/src/lib.rs index ea4a1daea..a4f6eff38 100644 --- a/sync/src/lib.rs +++ b/sync/src/lib.rs @@ -66,7 +66,7 @@ use std::ops::*; use std::sync::*; use util::network::{NetworkProtocolHandler, NetworkService, NetworkContext, PeerId}; use util::TimerToken; -use util::{U256, Bytes, ONE_U256}; +use util::{U256, ONE_U256}; use ethcore::client::Client; use ethcore::service::SyncMessage; use ethminer::Miner; @@ -101,9 +101,6 @@ impl Default for SyncConfig { pub trait SyncProvider: Send + Sync { /// Get sync status fn status(&self) -> SyncStatus; - - /// Note that a user has submitted a new transaction. - fn new_transaction(&self, raw_transaction: Bytes); } /// Ethereum network protocol handler @@ -143,11 +140,6 @@ impl SyncProvider for EthSync { fn status(&self) -> SyncStatus { self.sync.read().unwrap().status() } - - /// Note that a user has submitted a new transaction. - fn new_transaction(&self, raw_transaction: Bytes) { - self.sync.write().unwrap().new_transaction(raw_transaction); - } } impl NetworkProtocolHandler for EthSync { From 8074fee28c806857f7b604f3562f1a3e8f6bfecb Mon Sep 17 00:00:00 2001 From: arkpar Date: Thu, 7 Apr 2016 14:24:52 +0200 Subject: [PATCH 2/2] Use new json RPC server --- Cargo.lock | 60 ++++++++++++++++++++++++++++++++++++++++++++++---- parity/main.rs | 20 ++++++++++------- rpc/Cargo.toml | 2 +- rpc/src/lib.rs | 12 +++++----- 4 files changed, 74 insertions(+), 20 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7554e3b52..71b6873f6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -246,7 +246,7 @@ dependencies = [ "ethminer 1.1.0", "ethsync 1.1.0", "jsonrpc-core 2.0.1 (registry+https://github.com/rust-lang/crates.io-index)", - "jsonrpc-http-server 4.0.0 (git+https://github.com/tomusdrw/jsonrpc-http-server.git)", + "jsonrpc-http-server 5.0.0 (git+https://github.com/debris/jsonrpc-http-server.git)", "log 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", "rustc-serialize 0.3.18 (registry+https://github.com/rust-lang/crates.io-index)", "serde 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -415,6 +415,27 @@ dependencies = [ "url 0.5.7 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "hyper" +version = "0.9.0-mio" +source = "git+https://github.com/hyperium/hyper?branch=mio#d55a70dc56dac1f0f03bc4c3a83db0314d48e69a" +dependencies = [ + "cookie 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", + "httparse 1.1.2 (registry+https://github.com/rust-lang/crates.io-index)", + "language-tags 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", + "mime 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", + "mio 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", + "rotor 0.6.3 (registry+https://github.com/rust-lang/crates.io-index)", + "rustc-serialize 0.3.18 (registry+https://github.com/rust-lang/crates.io-index)", + "time 0.1.34 (registry+https://github.com/rust-lang/crates.io-index)", + "traitobject 0.0.1 (registry+https://github.com/rust-lang/crates.io-index)", + "typeable 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", + "unicase 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", + "url 0.5.7 (registry+https://github.com/rust-lang/crates.io-index)", + "vecio 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "igd" version = "0.4.2" @@ -453,10 +474,10 @@ dependencies = [ [[package]] name = "jsonrpc-http-server" -version = "4.0.0" -source = "git+https://github.com/tomusdrw/jsonrpc-http-server.git#46bd4e7cf8352e0efc940cf76d3dff99f1a3da15" +version = "5.0.0" +source = "git+https://github.com/debris/jsonrpc-http-server.git#76fa443982b40665721fe6b1ece42fc0a53be996" dependencies = [ - "hyper 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", + "hyper 0.9.0-mio (git+https://github.com/hyperium/hyper?branch=mio)", "jsonrpc-core 2.0.1 (registry+https://github.com/rust-lang/crates.io-index)", "unicase 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -696,6 +717,11 @@ dependencies = [ "syntex_syntax 0.30.0 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "quick-error" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "quine-mc_cluskey" version = "0.2.2" @@ -745,6 +771,18 @@ dependencies = [ "librocksdb-sys 0.2.3 (git+https://github.com/arkpar/rust-rocksdb.git)", ] +[[package]] +name = "rotor" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "log 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", + "mio 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", + "quick-error 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", + "time 0.1.34 (registry+https://github.com/rust-lang/crates.io-index)", + "void 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "rpassword" version = "0.1.3" @@ -1000,6 +1038,15 @@ dependencies = [ "rustc-serialize 0.3.18 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "vecio" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "winapi 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)", + "ws2_32-sys 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "vergen" version = "0.1.0" @@ -1009,6 +1056,11 @@ dependencies = [ "time 0.1.34 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "void" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "winapi" version = "0.2.6" diff --git a/parity/main.rs b/parity/main.rs index 011d761ef..192f88132 100644 --- a/parity/main.rs +++ b/parity/main.rs @@ -43,7 +43,6 @@ extern crate rpassword; #[cfg(feature = "rpc")] extern crate ethcore_rpc as rpc; -use std::any::Any; use std::io::{BufRead, BufReader}; use std::fs::File; use std::net::{SocketAddr, IpAddr}; @@ -64,6 +63,8 @@ use ethminer::{Miner, MinerService}; use docopt::Docopt; use daemonize::Daemonize; use number_prefix::{binary_prefix, Standalone, Prefixed}; +#[cfg(feature = "rpc")] +use rpc::Server as RpcServer; mod price_info; @@ -271,10 +272,10 @@ fn setup_rpc_server( sync: Arc, secret_store: Arc, miner: Arc, - url: &str, + url: &SocketAddr, cors_domain: &str, apis: Vec<&str> -) -> Box { +) -> RpcServer { use rpc::v1::*; let server = rpc::RpcServer::new(); @@ -292,14 +293,17 @@ fn setup_rpc_server( } } } - let start_result = server.start_http(url, cors_domain, ::num_cpus::get()); + let start_result = server.start_http(url, cors_domain); match start_result { Err(rpc::RpcServerError::IoError(err)) => die_with_io_error(err), Err(e) => die!("{:?}", e), - Ok(handle) => Box::new(handle), + Ok(server) => server, } } +#[cfg(not(feature = "rpc"))] +struct RpcServer; + #[cfg(not(feature = "rpc"))] fn setup_rpc_server( _client: Arc, @@ -601,7 +605,7 @@ impl Configuration { }, self.args.flag_rpcport.unwrap_or(self.args.flag_jsonrpc_port) ); - SocketAddr::from_str(&url).unwrap_or_else(|_| die!("{}: Invalid JSONRPC listen host/port given.", url)); + let addr = SocketAddr::from_str(&url).unwrap_or_else(|_| die!("{}: Invalid JSONRPC listen host/port given.", url)); let cors_domain = self.args.flag_rpccorsdomain.as_ref().unwrap_or(&self.args.flag_jsonrpc_cors); Some(setup_rpc_server( @@ -609,7 +613,7 @@ impl Configuration { sync.clone(), account_service.clone(), miner.clone(), - &url, + &addr, &cors_domain, apis.split(',').collect() )) @@ -631,7 +635,7 @@ impl Configuration { } } -fn wait_for_exit(panic_handler: Arc, _rpc_server: Option>) { +fn wait_for_exit(panic_handler: Arc, _rpc_server: Option) { let exit = Arc::new(Condvar::new()); // Handle possible exits diff --git a/rpc/Cargo.toml b/rpc/Cargo.toml index 4c62a45c1..183b0fa9f 100644 --- a/rpc/Cargo.toml +++ b/rpc/Cargo.toml @@ -13,7 +13,7 @@ log = "0.3" serde = "0.7.0" serde_json = "0.7.0" jsonrpc-core = "2.0" -jsonrpc-http-server = { git = "https://github.com/tomusdrw/jsonrpc-http-server.git" } +jsonrpc-http-server = { git = "https://github.com/debris/jsonrpc-http-server.git" } ethcore-util = { path = "../util" } ethcore = { path = "../ethcore" } ethash = { path = "../ethash" } diff --git a/rpc/src/lib.rs b/rpc/src/lib.rs index 4de405211..f059750d2 100644 --- a/rpc/src/lib.rs +++ b/rpc/src/lib.rs @@ -33,9 +33,10 @@ extern crate ethminer; extern crate transient_hashmap; use std::sync::Arc; +use std::net::SocketAddr; use self::jsonrpc_core::{IoHandler, IoDelegate}; -pub use jsonrpc_http_server::{Listening, RpcServerError}; +pub use jsonrpc_http_server::{Server, RpcServerError}; pub mod v1; /// Http server. @@ -56,12 +57,9 @@ impl RpcServer { self.handler.add_delegate(delegate); } - /// Start server asynchronously and returns result with `Listening` handle on success or an error. - pub fn start_http(&self, addr: &str, cors_domain: &str, threads: usize) -> Result { - let addr = addr.to_owned(); + /// Start server asynchronously and returns result with `Server` handle on success or an error. + pub fn start_http(&self, addr: &SocketAddr, cors_domain: &str) -> Result { let cors_domain = cors_domain.to_owned(); - let server = jsonrpc_http_server::Server::new(self.handler.clone()); - - server.start(addr.as_ref(), jsonrpc_http_server::AccessControlAllowOrigin::Value(cors_domain), threads) + Server::start(addr, self.handler.clone(), jsonrpc_http_server::AccessControlAllowOrigin::Value(cors_domain)) } }