From c26cfc1c5ab0223ec84ee2b6f8f9e231fe51fa2b Mon Sep 17 00:00:00 2001 From: Gav Wood Date: Tue, 5 Jul 2016 17:50:46 +0200 Subject: [PATCH] Silent running operating modes (#1477) * Command=line options. * Keep alive for the eth protocol. * Wire up final pieces. * No network when dark. * Passive and dark mode work. * Ensure all RPCs keep alive. * Fix tests. * Fix minor bug. * Minor whitespace. * Split out some of the sleep-state. * Fix help text. --- ethcore/src/client/client.rs | 94 ++++++++++++++++++++++++-- ethcore/src/client/config.rs | 20 ++++++ ethcore/src/client/mod.rs | 7 +- parity/cli.rs | 17 ++++- parity/configuration.rs | 13 +++- parity/main.rs | 11 ++- parity/rpc_apis.rs | 2 +- rpc/src/v1/impls/eth.rs | 48 +++++++++++++ rpc/src/v1/impls/eth_filter.rs | 12 ++++ rpc/src/v1/impls/eth_signing.rs | 16 +++++ rpc/src/v1/impls/ethcore.rs | 21 ++++++ rpc/src/v1/impls/ethcore_set.rs | 33 +++++++-- rpc/src/v1/impls/personal.rs | 11 +++ rpc/src/v1/impls/personal_signer.rs | 9 +++ rpc/src/v1/impls/traces.rs | 11 +++ rpc/src/v1/tests/mocked/ethcore_set.rs | 25 +++++-- 16 files changed, 326 insertions(+), 24 deletions(-) diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index 6f3249f4c..f1c260970 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -17,7 +17,8 @@ //! Blockchain database client. use std::path::PathBuf; -use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering}; +use std::time::Instant; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering as AtomicOrdering}; use util::*; use util::panics::*; use views::BlockView; @@ -38,7 +39,7 @@ use filter::Filter; use log_entry::LocalizedLogEntry; use block_queue::{BlockQueue, BlockQueueInfo}; use blockchain::{BlockChain, BlockProvider, TreeRoute, ImportRoute}; -use client::{BlockID, TransactionID, UncleID, TraceId, ClientConfig, DatabaseCompactionProfile, +use client::{BlockID, TransactionID, UncleID, TraceId, Mode, ClientConfig, DatabaseCompactionProfile, BlockChainClient, MiningBlockChainClient, TraceFilter, CallAnalytics, TransactionImportError, BlockImportError, TransactionImportResult}; use client::Error as ClientError; @@ -54,6 +55,7 @@ use evm::Factory as EvmFactory; use miner::{Miner, MinerService, AccountDetails}; const MAX_TX_QUEUE_SIZE: usize = 4096; +const MAX_QUEUE_SIZE_TO_SLEEP_ON: usize = 2; impl fmt::Display for BlockChainInfo { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { @@ -83,9 +85,24 @@ impl ClientReport { } } +struct SleepState { + last_activity: Option, + last_autosleep: Option, +} + +impl SleepState { + fn new(awake: bool) -> Self { + SleepState { + last_activity: match awake { false => None, true => Some(Instant::now()) }, + last_autosleep: match awake { false => Some(Instant::now()), true => None }, + } + } +} + /// Blockchain database client backed by a persistent database. Owns and manages a blockchain and a block queue. /// Call `import_block()` to import a block asynchronously; `flush_queue()` flushes the queue. pub struct Client { + mode: Mode, chain: Arc, tracedb: Arc>, engine: Arc>, @@ -98,6 +115,8 @@ pub struct Client { vm_factory: Arc, trie_factory: TrieFactory, miner: Arc, + sleep_state: Mutex, + liveness: AtomicBool, io_channel: IoChannel, queue_transactions: AtomicUsize, } @@ -134,9 +153,8 @@ impl Client { spec: Spec, path: &Path, miner: Arc, - message_channel: IoChannel) - -> Result, ClientError> - { + message_channel: IoChannel + ) -> Result, ClientError> { let path = get_db_path(path, config.pruning, spec.genesis_header().hash()); let gb = spec.genesis_block(); let chain = Arc::new(BlockChain::new(config.blockchain, &gb, &path)); @@ -167,7 +185,11 @@ impl Client { let panic_handler = PanicHandler::new_in_arc(); panic_handler.forward_from(&block_queue); + let awake = match config.mode { Mode::Dark(..) => false, _ => true }; let client = Client { + sleep_state: Mutex::new(SleepState::new(awake)), + liveness: AtomicBool::new(awake), + mode: config.mode, chain: chain, tracedb: tracedb, engine: engine, @@ -183,7 +205,6 @@ impl Client { io_channel: message_channel, queue_transactions: AtomicUsize::new(0), }; - Ok(Arc::new(client)) } @@ -449,9 +470,41 @@ impl Client { } /// Tick the client. + // TODO: manage by real events. pub fn tick(&self) { self.chain.collect_garbage(); self.block_queue.collect_garbage(); + + match self.mode { + Mode::Dark(timeout) => { + let mut ss = self.sleep_state.lock().unwrap(); + if let Some(t) = ss.last_activity { + if Instant::now() > t + timeout { + self.sleep(); + ss.last_activity = None; + } + } + } + Mode::Passive(timeout, wakeup_after) => { + let mut ss = self.sleep_state.lock().unwrap(); + let now = Instant::now(); + if let Some(t) = ss.last_activity { + if now > t + timeout { + self.sleep(); + ss.last_activity = None; + ss.last_autosleep = Some(now); + } + } + if let Some(t) = ss.last_autosleep { + if now > t + wakeup_after { + self.wake_up(); + ss.last_activity = Some(now); + ss.last_autosleep = None; + } + } + } + _ => {} + } } /// Set up the cache behaviour. @@ -487,6 +540,29 @@ impl Client { }) } } + + fn wake_up(&self) { + if !self.liveness.load(AtomicOrdering::Relaxed) { + self.liveness.store(true, AtomicOrdering::Relaxed); + self.io_channel.send(NetworkIoMessage::User(SyncMessage::StartNetwork)).unwrap(); + trace!(target: "mode", "wake_up: Waking."); + } + } + + fn sleep(&self) { + if self.liveness.load(AtomicOrdering::Relaxed) { + // only sleep if the import queue is mostly empty. + if self.queue_info().total_queue_size() <= MAX_QUEUE_SIZE_TO_SLEEP_ON { + self.liveness.store(false, AtomicOrdering::Relaxed); + self.io_channel.send(NetworkIoMessage::User(SyncMessage::StopNetwork)).unwrap(); + trace!(target: "mode", "sleep: Sleeping."); + } else { + trace!(target: "mode", "sleep: Cannot sleep - syncing ongoing."); + // TODO: Consider uncommenting. + //*self.last_activity.lock().unwrap() = Some(Instant::now()); + } + } + } } impl BlockChainClient for Client { @@ -528,6 +604,12 @@ impl BlockChainClient for Client { ret } + fn keep_alive(&self) { + if self.mode != Mode::Active { + self.wake_up(); + (*self.sleep_state.lock().unwrap()).last_activity = Some(Instant::now()); + } + } fn block_header(&self, id: BlockID) -> Option { Self::block_hash(&self.chain, id).and_then(|hash| self.chain.block(&hash).map(|bytes| BlockView::new(&bytes).rlp().at(0).as_raw().to_vec())) diff --git a/ethcore/src/client/config.rs b/ethcore/src/client/config.rs index 6cb34c151..1010ce656 100644 --- a/ethcore/src/client/config.rs +++ b/ethcore/src/client/config.rs @@ -14,6 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . +pub use std::time::Duration; pub use block_queue::BlockQueueConfig; pub use blockchain::Config as BlockChainConfig; pub use trace::{Config as TraceConfig, Switch}; @@ -35,6 +36,23 @@ impl Default for DatabaseCompactionProfile { fn default() -> Self { DatabaseCompactionProfile::Default } } +/// Operating mode for the client. +#[derive(Debug, Eq, PartialEq)] +pub enum Mode { + /// Always on. + Active, + /// Goes offline after RLP is inactive for some (given) time, but + /// comes back online after a while of inactivity. + Passive(Duration, Duration), + /// Goes offline after RLP is inactive for some (given) time and + /// stays inactive. + Dark(Duration), +} + +impl Default for Mode { + fn default() -> Self { Mode::Active } +} + /// Client configuration. Includes configs for all sub-systems. #[derive(Debug, Default)] pub struct ClientConfig { @@ -56,6 +74,8 @@ pub struct ClientConfig { pub db_cache_size: Option, /// State db compaction profile pub db_compaction: DatabaseCompactionProfile, + /// Operating mode + pub mode: Mode, /// Type of block verifier used by client. pub verifier_type: VerifierType, } diff --git a/ethcore/src/client/mod.rs b/ethcore/src/client/mod.rs index c09da450d..b81efe43a 100644 --- a/ethcore/src/client/mod.rs +++ b/ethcore/src/client/mod.rs @@ -23,7 +23,7 @@ mod test_client; mod trace; pub use self::client::*; -pub use self::config::{ClientConfig, DatabaseCompactionProfile, BlockQueueConfig, BlockChainConfig, Switch, VMType}; +pub use self::config::{Mode, ClientConfig, DatabaseCompactionProfile, BlockQueueConfig, BlockChainConfig, Switch, VMType}; pub use self::error::Error; pub use types::ids::*; pub use self::test_client::{TestBlockChainClient, EachBlockWith}; @@ -63,6 +63,11 @@ pub struct CallAnalytics { /// Blockchain database client. Owns and manages a blockchain and a block queue. pub trait BlockChainClient : Sync + Send { + + /// Should be called by any external-facing interface when actively using the client. + /// To minimise chatter, there's no need to call more than once every 30s. + fn keep_alive(&self) {} + /// Get raw block header data by block id. fn block_header(&self, id: BlockID) -> Option; diff --git a/parity/cli.rs b/parity/cli.rs index 17954eb2a..4c5d4cbc9 100644 --- a/parity/cli.rs +++ b/parity/cli.rs @@ -32,7 +32,19 @@ Usage: parity [options] parity ui [options] -Protocol Options: +Operating Options: + --mode MODE Set the operating mode. MODE can be one of: + active - Parity continuously syncs the chain. + passive - Parity syncs initially, then sleeps and + wakes regularly to resync. + dark - Parity syncs only when an external interface + is active. [default: active]. + --mode-timeout SECS Specify the number of seconds before inactivity + timeout occurs when mode is dark or passive + [default: 300]. + --mode-alarm SECS Specify the number of seconds before auto sleep + reawake timeout occurs when mode is passive + [default: 3600]. --chain CHAIN Specify the blockchain type. CHAIN may be either a JSON chain specification file or olympic, frontier, homestead, mainnet, morden, or testnet @@ -269,6 +281,9 @@ pub struct Args { pub arg_pid_file: String, pub arg_file: Option, pub arg_path: Vec, + pub flag_mode: String, + pub flag_mode_timeout: u64, + pub flag_mode_alarm: u64, pub flag_chain: String, pub flag_db_path: String, pub flag_identity: String, diff --git a/parity/configuration.rs b/parity/configuration.rs index 14932423c..0a1a9c0a7 100644 --- a/parity/configuration.rs +++ b/parity/configuration.rs @@ -28,7 +28,7 @@ use util::*; use util::log::Colour::*; use ethcore::account_provider::AccountProvider; use util::network_settings::NetworkSettings; -use ethcore::client::{append_path, get_db_path, ClientConfig, DatabaseCompactionProfile, Switch, VMType}; +use ethcore::client::{append_path, get_db_path, Mode, ClientConfig, DatabaseCompactionProfile, Switch, VMType}; use ethcore::miner::{MinerOptions, PendingSet}; use ethcore::ethereum; use ethcore::spec::Spec; @@ -61,6 +61,15 @@ impl Configuration { } } + pub fn mode(&self) -> Mode { + match &(self.args.flag_mode[..]) { + "active" => Mode::Active, + "passive" => Mode::Passive(Duration::from_secs(self.args.flag_mode_timeout), Duration::from_secs(self.args.flag_mode_alarm)), + "dark" => Mode::Dark(Duration::from_secs(self.args.flag_mode_timeout)), + _ => die!("{}: Invalid address for --mode. Must be one of active, passive or dark.", self.args.flag_mode), + } + } + fn net_port(&self) -> u16 { self.args.flag_port } @@ -302,6 +311,8 @@ impl Configuration { pub fn client_config(&self, spec: &Spec) -> ClientConfig { let mut client_config = ClientConfig::default(); + client_config.mode = self.mode(); + match self.args.flag_cache { Some(mb) => { client_config.blockchain.max_cache_size = mb * 1024 * 1024; diff --git a/parity/main.rs b/parity/main.rs index b7a106181..0b26054d9 100644 --- a/parity/main.rs +++ b/parity/main.rs @@ -82,7 +82,7 @@ use rustc_serialize::hex::FromHex; use ctrlc::CtrlC; use util::{H256, ToPretty, NetworkConfiguration, PayloadInfo, Bytes, UtilError, paint, Colour, version}; use util::panics::{MayPanic, ForwardPanic, PanicHandler}; -use ethcore::client::{BlockID, BlockChainClient, ClientConfig, get_db_path, BlockImportError}; +use ethcore::client::{Mode, BlockID, BlockChainClient, ClientConfig, get_db_path, BlockImportError}; use ethcore::error::{ImportError}; use ethcore::service::ClientService; use ethcore::spec::Spec; @@ -213,7 +213,12 @@ fn execute_client(conf: Configuration, spec: Spec, client_config: ClientConfig) // Build client let mut service = ClientService::start( - client_config, spec, net_settings, Path::new(&conf.path()), miner.clone(), !conf.args.flag_no_network + client_config, + spec, + net_settings, + Path::new(&conf.path()), + miner.clone(), + match conf.mode() { Mode::Dark(..) => false, _ => !conf.args.flag_no_network } ).unwrap_or_else(|e| die_with_error("Client", e)); panic_handler.forward_from(&service); @@ -282,7 +287,7 @@ fn execute_client(conf: Configuration, spec: Spec, client_config: ClientConfig) }); // Register IO handler - let io_handler = Arc::new(ClientIoHandler { + let io_handler = Arc::new(ClientIoHandler { client: service.client(), info: Informant::new(conf.have_color()), sync: sync.clone(), diff --git a/parity/rpc_apis.rs b/parity/rpc_apis.rs index c0daaa926..0187f4058 100644 --- a/parity/rpc_apis.rs +++ b/parity/rpc_apis.rs @@ -166,7 +166,7 @@ pub fn setup_rpc(server: T, deps: Arc, apis: ApiSet server.add_delegate(EthcoreClient::new(&deps.client, &deps.miner, deps.logger.clone(), deps.settings.clone(), queue).to_delegate()) }, Api::EthcoreSet => { - server.add_delegate(EthcoreSetClient::new(&deps.miner, &deps.net_service).to_delegate()) + server.add_delegate(EthcoreSetClient::new(&deps.client, &deps.miner, &deps.net_service).to_delegate()) }, Api::Traces => { server.add_delegate(TracesClient::new(&deps.client, &deps.miner).to_delegate()) diff --git a/rpc/src/v1/impls/eth.rs b/rpc/src/v1/impls/eth.rs index f2325de93..4ccefc0bf 100644 --- a/rpc/src/v1/impls/eth.rs +++ b/rpc/src/v1/impls/eth.rs @@ -241,6 +241,19 @@ fn no_author_err() -> Error { } } +impl EthClient where + C: MiningBlockChainClient + 'static, + S: SyncProvider + 'static, + M: MinerService + 'static, + EM: ExternalMinerService + 'static { + + fn active(&self) -> Result<(), Error> { + // TODO: only call every 30s at most. + take_weak!(self.client).keep_alive(); + Ok(()) + } +} + impl Eth for EthClient where C: MiningBlockChainClient + 'static, S: SyncProvider + 'static, @@ -248,6 +261,7 @@ impl Eth for EthClient where EM: ExternalMinerService + 'static { fn protocol_version(&self, params: Params) -> Result { + try!(self.active()); match params { Params::None => Ok(Value::String(format!("{}", take_weak!(self.sync).status().protocol_version).to_owned())), _ => Err(Error::invalid_params()) @@ -255,6 +269,7 @@ impl Eth for EthClient where } fn syncing(&self, params: Params) -> Result { + try!(self.active()); match params { Params::None => { let status = take_weak!(self.sync).status(); @@ -281,6 +296,7 @@ impl Eth for EthClient where } fn author(&self, params: Params) -> Result { + try!(self.active()); match params { Params::None => to_value(&take_weak!(self.miner).author()), _ => Err(Error::invalid_params()), @@ -288,6 +304,7 @@ impl Eth for EthClient where } fn is_mining(&self, params: Params) -> Result { + try!(self.active()); match params { Params::None => to_value(&self.external_miner.is_mining()), _ => Err(Error::invalid_params()) @@ -295,6 +312,7 @@ impl Eth for EthClient where } fn hashrate(&self, params: Params) -> Result { + try!(self.active()); match params { Params::None => to_value(&self.external_miner.hashrate()), _ => Err(Error::invalid_params()) @@ -302,6 +320,7 @@ impl Eth for EthClient where } fn gas_price(&self, params: Params) -> Result { + try!(self.active()); match params { Params::None => { let (client, miner) = (take_weak!(self.client), take_weak!(self.miner)); @@ -312,11 +331,13 @@ impl Eth for EthClient where } fn accounts(&self, _: Params) -> Result { + try!(self.active()); let store = take_weak!(self.accounts); to_value(&store.accounts()) } fn block_number(&self, params: Params) -> Result { + try!(self.active()); match params { Params::None => to_value(&U256::from(take_weak!(self.client).chain_info().best_block_number)), _ => Err(Error::invalid_params()) @@ -324,6 +345,7 @@ impl Eth for EthClient where } fn balance(&self, params: Params) -> Result { + try!(self.active()); from_params_default_second(params) .and_then(|(address, block_number,)| match block_number { BlockNumber::Pending => to_value(&take_weak!(self.miner).balance(take_weak!(self.client).deref(), &address)), @@ -332,6 +354,7 @@ impl Eth for EthClient where } fn storage_at(&self, params: Params) -> Result { + try!(self.active()); from_params_default_third::(params) .and_then(|(address, position, block_number,)| match block_number { BlockNumber::Pending => to_value(&U256::from(take_weak!(self.miner).storage_at(&*take_weak!(self.client), &address, &H256::from(position)))), @@ -343,6 +366,7 @@ impl Eth for EthClient where } fn transaction_count(&self, params: Params) -> Result { + try!(self.active()); from_params_default_second(params) .and_then(|(address, block_number,)| match block_number { BlockNumber::Pending => to_value(&take_weak!(self.miner).nonce(take_weak!(self.client).deref(), &address)), @@ -351,6 +375,7 @@ impl Eth for EthClient where } fn block_transaction_count_by_hash(&self, params: Params) -> Result { + try!(self.active()); from_params::<(H256,)>(params) .and_then(|(hash,)| // match take_weak!(self.client).block(BlockID::Hash(hash)) @@ -358,6 +383,7 @@ impl Eth for EthClient where } fn block_transaction_count_by_number(&self, params: Params) -> Result { + try!(self.active()); from_params::<(BlockNumber,)>(params) .and_then(|(block_number,)| match block_number { BlockNumber::Pending => to_value( @@ -369,6 +395,7 @@ impl Eth for EthClient where } fn block_uncles_count_by_hash(&self, params: Params) -> Result { + try!(self.active()); from_params::<(H256,)>(params) .and_then(|(hash,)| take_weak!(self.client).block(BlockID::Hash(hash)) @@ -376,6 +403,7 @@ impl Eth for EthClient where } fn block_uncles_count_by_number(&self, params: Params) -> Result { + try!(self.active()); from_params::<(BlockNumber,)>(params) .and_then(|(block_number,)| match block_number { BlockNumber::Pending => to_value(&U256::from(0)), @@ -385,6 +413,7 @@ impl Eth for EthClient where } fn code_at(&self, params: Params) -> Result { + try!(self.active()); from_params_default_second(params) .and_then(|(address, block_number,)| match block_number { BlockNumber::Pending => to_value(&take_weak!(self.miner).code(take_weak!(self.client).deref(), &address).map_or_else(Bytes::default, Bytes::new)), @@ -394,16 +423,19 @@ impl Eth for EthClient where } fn block_by_hash(&self, params: Params) -> Result { + try!(self.active()); from_params::<(H256, bool)>(params) .and_then(|(hash, include_txs)| self.block(BlockID::Hash(hash), include_txs)) } fn block_by_number(&self, params: Params) -> Result { + try!(self.active()); from_params::<(BlockNumber, bool)>(params) .and_then(|(number, include_txs)| self.block(number.into(), include_txs)) } fn transaction_by_hash(&self, params: Params) -> Result { + try!(self.active()); from_params::<(H256,)>(params) .and_then(|(hash,)| { let miner = take_weak!(self.miner); @@ -415,16 +447,19 @@ impl Eth for EthClient where } fn transaction_by_block_hash_and_index(&self, params: Params) -> Result { + try!(self.active()); from_params::<(H256, Index)>(params) .and_then(|(hash, index)| self.transaction(TransactionID::Location(BlockID::Hash(hash), index.value()))) } fn transaction_by_block_number_and_index(&self, params: Params) -> Result { + try!(self.active()); from_params::<(BlockNumber, Index)>(params) .and_then(|(number, index)| self.transaction(TransactionID::Location(number.into(), index.value()))) } fn transaction_receipt(&self, params: Params) -> Result { + try!(self.active()); from_params::<(H256,)>(params) .and_then(|(hash,)| { let miner = take_weak!(self.miner); @@ -440,16 +475,19 @@ impl Eth for EthClient where } fn uncle_by_block_hash_and_index(&self, params: Params) -> Result { + try!(self.active()); from_params::<(H256, Index)>(params) .and_then(|(hash, index)| self.uncle(UncleID { block: BlockID::Hash(hash), position: index.value() })) } fn uncle_by_block_number_and_index(&self, params: Params) -> Result { + try!(self.active()); from_params::<(BlockNumber, Index)>(params) .and_then(|(number, index)| self.uncle(UncleID { block: number.into(), position: index.value() })) } fn compilers(&self, params: Params) -> Result { + try!(self.active()); match params { Params::None => to_value(&vec![] as &Vec), _ => Err(Error::invalid_params()) @@ -457,6 +495,7 @@ impl Eth for EthClient where } fn logs(&self, params: Params) -> Result { + try!(self.active()); from_params::<(Filter,)>(params) .and_then(|(filter,)| { let include_pending = filter.to_block == Some(BlockNumber::Pending); @@ -476,6 +515,7 @@ impl Eth for EthClient where } fn work(&self, params: Params) -> Result { + try!(self.active()); match params { Params::None => { let client = take_weak!(self.client); @@ -512,6 +552,7 @@ impl Eth for EthClient where } fn submit_work(&self, params: Params) -> Result { + try!(self.active()); from_params::<(H64, H256, H256)>(params).and_then(|(nonce, pow_hash, mix_hash)| { trace!(target: "miner", "submit_work: Decoded: nonce={}, pow_hash={}, mix_hash={}", nonce, pow_hash, mix_hash); let miner = take_weak!(self.miner); @@ -523,6 +564,7 @@ impl Eth for EthClient where } fn submit_hashrate(&self, params: Params) -> Result { + try!(self.active()); from_params::<(U256, H256)>(params).and_then(|(rate, id)| { self.external_miner.submit_hashrate(rate, id); to_value(&true) @@ -530,6 +572,7 @@ impl Eth for EthClient where } fn send_raw_transaction(&self, params: Params) -> Result { + try!(self.active()); from_params::<(Bytes, )>(params) .and_then(|(raw_transaction, )| { let raw_transaction = raw_transaction.to_vec(); @@ -541,6 +584,7 @@ impl Eth for EthClient where } fn call(&self, params: Params) -> Result { + try!(self.active()); trace!(target: "jsonrpc", "call: {:?}", params); from_params_default_second(params) .and_then(|(request, block_number,)| { @@ -555,6 +599,7 @@ impl Eth for EthClient where } fn estimate_gas(&self, params: Params) -> Result { + try!(self.active()); from_params_default_second(params) .and_then(|(request, block_number,)| { let signed = try!(self.sign_call(request)); @@ -568,14 +613,17 @@ impl Eth for EthClient where } fn compile_lll(&self, _: Params) -> Result { + try!(self.active()); rpc_unimplemented!() } fn compile_serpent(&self, _: Params) -> Result { + try!(self.active()); rpc_unimplemented!() } fn compile_solidity(&self, _: Params) -> Result { + try!(self.active()); rpc_unimplemented!() } } diff --git a/rpc/src/v1/impls/eth_filter.rs b/rpc/src/v1/impls/eth_filter.rs index b34a4f703..40ced7187 100644 --- a/rpc/src/v1/impls/eth_filter.rs +++ b/rpc/src/v1/impls/eth_filter.rs @@ -52,6 +52,12 @@ impl EthFilterClient where polls: Mutex::new(PollManager::new()), } } + + fn active(&self) -> Result<(), Error> { + // TODO: only call every 30s at most. + take_weak!(self.client).keep_alive(); + Ok(()) + } } impl EthFilter for EthFilterClient where @@ -59,6 +65,7 @@ impl EthFilter for EthFilterClient where M: MinerService + 'static { fn new_filter(&self, params: Params) -> Result { + try!(self.active()); from_params::<(Filter,)>(params) .and_then(|(filter,)| { let mut polls = self.polls.lock().unwrap(); @@ -69,6 +76,7 @@ impl EthFilter for EthFilterClient where } fn new_block_filter(&self, params: Params) -> Result { + try!(self.active()); match params { Params::None => { let mut polls = self.polls.lock().unwrap(); @@ -80,6 +88,7 @@ impl EthFilter for EthFilterClient where } fn new_pending_transaction_filter(&self, params: Params) -> Result { + try!(self.active()); match params { Params::None => { let mut polls = self.polls.lock().unwrap(); @@ -93,6 +102,7 @@ impl EthFilter for EthFilterClient where } fn filter_changes(&self, params: Params) -> Result { + try!(self.active()); let client = take_weak!(self.client); from_params::<(Index,)>(params) .and_then(|(index,)| { @@ -181,6 +191,7 @@ impl EthFilter for EthFilterClient where } fn filter_logs(&self, params: Params) -> Result { + try!(self.active()); from_params::<(Index,)>(params) .and_then(|(index,)| { let mut polls = self.polls.lock().unwrap(); @@ -206,6 +217,7 @@ impl EthFilter for EthFilterClient where } fn uninstall_filter(&self, params: Params) -> Result { + try!(self.active()); from_params::<(Index,)>(params) .and_then(|(index,)| { self.polls.lock().unwrap().remove_poll(&index.value()); diff --git a/rpc/src/v1/impls/eth_signing.rs b/rpc/src/v1/impls/eth_signing.rs index fa4330f46..6dd01bdf1 100644 --- a/rpc/src/v1/impls/eth_signing.rs +++ b/rpc/src/v1/impls/eth_signing.rs @@ -61,6 +61,12 @@ impl EthSigningQueueClient where C: MiningBlockChainClient, M: Miner miner: Arc::downgrade(miner), } } + + fn active(&self) -> Result<(), Error> { + // TODO: only call every 30s at most. + take_weak!(self.client).keep_alive(); + Ok(()) + } } impl EthSigning for EthSigningQueueClient @@ -68,12 +74,14 @@ impl EthSigning for EthSigningQueueClient { fn sign(&self, _params: Params) -> Result { + try!(self.active()); warn!("Invoking eth_sign is not yet supported with signer enabled."); // TODO [ToDr] Implement sign when rest of the signing queue is ready. rpc_unimplemented!() } fn send_transaction(&self, params: Params) -> Result { + try!(self.active()); from_params::<(TransactionRequest, )>(params) .and_then(|(mut request, )| { let accounts = take_weak!(self.accounts); @@ -118,6 +126,12 @@ impl EthSigningUnsafeClient where accounts: Arc::downgrade(accounts), } } + + fn active(&self) -> Result<(), Error> { + // TODO: only call every 30s at most. + take_weak!(self.client).keep_alive(); + Ok(()) + } } impl EthSigning for EthSigningUnsafeClient where @@ -125,12 +139,14 @@ impl EthSigning for EthSigningUnsafeClient where M: MinerService + 'static { fn sign(&self, params: Params) -> Result { + try!(self.active()); from_params::<(Address, H256)>(params).and_then(|(addr, msg)| { to_value(&take_weak!(self.accounts).sign(addr, msg).unwrap_or(H520::zero())) }) } fn send_transaction(&self, params: Params) -> Result { + try!(self.active()); from_params::<(TransactionRequest, )>(params) .and_then(|(request, )| { let sender = request.from; diff --git a/rpc/src/v1/impls/ethcore.rs b/rpc/src/v1/impls/ethcore.rs index 3a58d8672..fe2313c30 100644 --- a/rpc/src/v1/impls/ethcore.rs +++ b/rpc/src/v1/impls/ethcore.rs @@ -52,56 +52,74 @@ impl EthcoreClient where C: MiningBlockChainClient, M: MinerService confirmations_queue: queue, } } + + fn active(&self) -> Result<(), Error> { + // TODO: only call every 30s at most. + take_weak!(self.client).keep_alive(); + Ok(()) + } } impl Ethcore for EthcoreClient where M: MinerService + 'static, C: MiningBlockChainClient + 'static { fn transactions_limit(&self, _: Params) -> Result { + try!(self.active()); to_value(&take_weak!(self.miner).transactions_limit()) } fn min_gas_price(&self, _: Params) -> Result { + try!(self.active()); to_value(&take_weak!(self.miner).minimal_gas_price()) } fn extra_data(&self, _: Params) -> Result { + try!(self.active()); to_value(&Bytes::new(take_weak!(self.miner).extra_data())) } fn gas_floor_target(&self, _: Params) -> Result { + try!(self.active()); to_value(&take_weak!(self.miner).gas_floor_target()) } fn gas_ceil_target(&self, _: Params) -> Result { + try!(self.active()); to_value(&take_weak!(self.miner).gas_ceil_target()) } fn dev_logs(&self, _params: Params) -> Result { + try!(self.active()); let logs = self.logger.logs(); to_value(&logs.deref().as_slice()) } fn dev_logs_levels(&self, _params: Params) -> Result { + try!(self.active()); to_value(&self.logger.levels()) } fn net_chain(&self, _params: Params) -> Result { + try!(self.active()); to_value(&self.settings.chain) } fn net_max_peers(&self, _params: Params) -> Result { + try!(self.active()); to_value(&self.settings.max_peers) } fn net_port(&self, _params: Params) -> Result { + try!(self.active()); to_value(&self.settings.network_port) } fn node_name(&self, _params: Params) -> Result { + try!(self.active()); to_value(&self.settings.name) } fn rpc_settings(&self, _params: Params) -> Result { + try!(self.active()); let mut map = BTreeMap::new(); map.insert("enabled".to_owned(), Value::Bool(self.settings.rpc_enabled)); map.insert("interface".to_owned(), Value::String(self.settings.rpc_interface.clone())); @@ -110,6 +128,7 @@ impl Ethcore for EthcoreClient where M: MinerService + 'static, C: M } fn default_extra_data(&self, params: Params) -> Result { + try!(self.active()); match params { Params::None => to_value(&Bytes::new(version_data())), _ => Err(Error::invalid_params()), @@ -117,6 +136,7 @@ impl Ethcore for EthcoreClient where M: MinerService + 'static, C: M } fn gas_price_statistics(&self, params: Params) -> Result { + try!(self.active()); match params { Params::None => match take_weak!(self.client).gas_price_statistics(100, 8) { Ok(stats) => to_value(&stats @@ -130,6 +150,7 @@ impl Ethcore for EthcoreClient where M: MinerService + 'static, C: M } fn unsigned_transactions_count(&self, _params: Params) -> Result { + try!(self.active()); match self.confirmations_queue { None => Err(Error { code: ErrorCode::ServerError(error_codes::SIGNER_DISABLED), diff --git a/rpc/src/v1/impls/ethcore_set.rs b/rpc/src/v1/impls/ethcore_set.rs index baf5bf134..1a41509f7 100644 --- a/rpc/src/v1/impls/ethcore_set.rs +++ b/rpc/src/v1/impls/ethcore_set.rs @@ -20,31 +20,46 @@ use util::network::{NetworkService, NonReservedPeerMode}; use std::sync::{Arc, Weak}; use jsonrpc_core::*; use ethcore::miner::MinerService; +use ethcore::client::MiningBlockChainClient; use ethcore::service::SyncMessage; use v1::traits::EthcoreSet; use v1::types::Bytes; /// Ethcore-specific rpc interface for operations altering the settings. -pub struct EthcoreSetClient where +pub struct EthcoreSetClient where + C: MiningBlockChainClient, M: MinerService { + client: Weak, miner: Weak, net: Weak>, } -impl EthcoreSetClient where M: MinerService { +impl EthcoreSetClient where + C: MiningBlockChainClient, + M: MinerService { /// Creates new `EthcoreSetClient`. - pub fn new(miner: &Arc, net: &Arc>) -> Self { + pub fn new(client: &Arc, miner: &Arc, net: &Arc>) -> Self { EthcoreSetClient { + client: Arc::downgrade(client), miner: Arc::downgrade(miner), net: Arc::downgrade(net), } } + + fn active(&self) -> Result<(), Error> { + // TODO: only call every 30s at most. + take_weak!(self.client).keep_alive(); + Ok(()) + } } -impl EthcoreSet for EthcoreSetClient where M: MinerService + 'static { +impl EthcoreSet for EthcoreSetClient where + C: MiningBlockChainClient + 'static, + M: MinerService + 'static { fn set_min_gas_price(&self, params: Params) -> Result { + try!(self.active()); from_params::<(U256,)>(params).and_then(|(gas_price,)| { take_weak!(self.miner).set_minimal_gas_price(gas_price); to_value(&true) @@ -52,6 +67,7 @@ impl EthcoreSet for EthcoreSetClient where M: MinerService + 'static { } fn set_gas_floor_target(&self, params: Params) -> Result { + try!(self.active()); from_params::<(U256,)>(params).and_then(|(target,)| { take_weak!(self.miner).set_gas_floor_target(target); to_value(&true) @@ -59,6 +75,7 @@ impl EthcoreSet for EthcoreSetClient where M: MinerService + 'static { } fn set_gas_ceil_target(&self, params: Params) -> Result { + try!(self.active()); from_params::<(U256,)>(params).and_then(|(target,)| { take_weak!(self.miner).set_gas_ceil_target(target); to_value(&true) @@ -66,6 +83,7 @@ impl EthcoreSet for EthcoreSetClient where M: MinerService + 'static { } fn set_extra_data(&self, params: Params) -> Result { + try!(self.active()); from_params::<(Bytes,)>(params).and_then(|(extra_data,)| { take_weak!(self.miner).set_extra_data(extra_data.to_vec()); to_value(&true) @@ -73,6 +91,7 @@ impl EthcoreSet for EthcoreSetClient where M: MinerService + 'static { } fn set_author(&self, params: Params) -> Result { + try!(self.active()); from_params::<(Address,)>(params).and_then(|(author,)| { take_weak!(self.miner).set_author(author); to_value(&true) @@ -80,6 +99,7 @@ impl EthcoreSet for EthcoreSetClient where M: MinerService + 'static { } fn set_transactions_limit(&self, params: Params) -> Result { + try!(self.active()); from_params::<(usize,)>(params).and_then(|(limit,)| { take_weak!(self.miner).set_transactions_limit(limit); to_value(&true) @@ -87,6 +107,7 @@ impl EthcoreSet for EthcoreSetClient where M: MinerService + 'static { } fn set_tx_gas_limit(&self, params: Params) -> Result { + try!(self.active()); from_params::<(U256,)>(params).and_then(|(limit,)| { take_weak!(self.miner).set_tx_gas_limit(limit.into()); to_value(&true) @@ -94,6 +115,7 @@ impl EthcoreSet for EthcoreSetClient where M: MinerService + 'static { } fn add_reserved_peer(&self, params: Params) -> Result { + try!(self.active()); from_params::<(String,)>(params).and_then(|(peer,)| { match take_weak!(self.net).add_reserved_peer(&peer) { Ok(()) => to_value(&true), @@ -103,6 +125,7 @@ impl EthcoreSet for EthcoreSetClient where M: MinerService + 'static { } fn remove_reserved_peer(&self, params: Params) -> Result { + try!(self.active()); from_params::<(String,)>(params).and_then(|(peer,)| { match take_weak!(self.net).remove_reserved_peer(&peer) { Ok(()) => to_value(&true), @@ -112,11 +135,13 @@ impl EthcoreSet for EthcoreSetClient where M: MinerService + 'static { } fn drop_non_reserved_peers(&self, _: Params) -> Result { + try!(self.active()); take_weak!(self.net).set_non_reserved_mode(NonReservedPeerMode::Deny); to_value(&true) } fn accept_non_reserved_peers(&self, _: Params) -> Result { + try!(self.active()); take_weak!(self.net).set_non_reserved_mode(NonReservedPeerMode::Accept); to_value(&true) } diff --git a/rpc/src/v1/impls/personal.rs b/rpc/src/v1/impls/personal.rs index 38191ed90..94636ae99 100644 --- a/rpc/src/v1/impls/personal.rs +++ b/rpc/src/v1/impls/personal.rs @@ -43,22 +43,31 @@ impl PersonalClient where C: MiningBlockChainClient, M: MinerService signer_port: signer_port, } } + + fn active(&self) -> Result<(), Error> { + // TODO: only call every 30s at most. + take_weak!(self.client).keep_alive(); + Ok(()) + } } impl Personal for PersonalClient where C: MiningBlockChainClient, M: MinerService { fn signer_enabled(&self, _: Params) -> Result { + try!(self.active()); self.signer_port .map(|v| to_value(&v)) .unwrap_or_else(|| to_value(&false)) } fn accounts(&self, _: Params) -> Result { + try!(self.active()); let store = take_weak!(self.accounts); to_value(&store.accounts()) } fn new_account(&self, params: Params) -> Result { + try!(self.active()); from_params::<(String, )>(params).and_then( |(pass, )| { let store = take_weak!(self.accounts); @@ -71,6 +80,7 @@ impl Personal for PersonalClient where C: MiningBl } fn unlock_account(&self, params: Params) -> Result { + try!(self.active()); from_params::<(Address, String, u64)>(params).and_then( |(account, account_pass, _)|{ let store = take_weak!(self.accounts); @@ -82,6 +92,7 @@ impl Personal for PersonalClient where C: MiningBl } fn sign_and_send_transaction(&self, params: Params) -> Result { + try!(self.active()); from_params::<(TransactionRequest, String)>(params) .and_then(|(request, password)| { let sender = request.from; diff --git a/rpc/src/v1/impls/personal_signer.rs b/rpc/src/v1/impls/personal_signer.rs index 89f15c787..97749657e 100644 --- a/rpc/src/v1/impls/personal_signer.rs +++ b/rpc/src/v1/impls/personal_signer.rs @@ -46,16 +46,24 @@ impl SignerClient where C: MiningBlockChainClient, miner: Arc::downgrade(miner), } } + + fn active(&self) -> Result<(), Error> { + // TODO: only call every 30s at most. + take_weak!(self.client).keep_alive(); + Ok(()) + } } impl PersonalSigner for SignerClient where C: MiningBlockChainClient, M: MinerService { fn transactions_to_confirm(&self, _params: Params) -> Result { + try!(self.active()); let queue = take_weak!(self.queue); to_value(&queue.requests()) } fn confirm_transaction(&self, params: Params) -> Result { + try!(self.active()); from_params::<(U256, TransactionModification, String)>(params).and_then( |(id, modification, pass)| { let accounts = take_weak!(self.accounts); @@ -87,6 +95,7 @@ impl PersonalSigner for SignerClient where C: Mini } fn reject_transaction(&self, params: Params) -> Result { + try!(self.active()); from_params::<(U256, )>(params).and_then( |(id, )| { let queue = take_weak!(self.queue); diff --git a/rpc/src/v1/impls/traces.rs b/rpc/src/v1/impls/traces.rs index caf549c84..45daa2500 100644 --- a/rpc/src/v1/impls/traces.rs +++ b/rpc/src/v1/impls/traces.rs @@ -55,10 +55,17 @@ impl TracesClient where C: BlockChainClient, M: MinerService { data: request.data.map_or_else(Vec::new, |d| d.to_vec()) }.fake_sign(from)) } + + fn active(&self) -> Result<(), Error> { + // TODO: only call every 30s at most. + take_weak!(self.client).keep_alive(); + Ok(()) + } } impl Traces for TracesClient where C: BlockChainClient + 'static, M: MinerService + 'static { fn filter(&self, params: Params) -> Result { + try!(self.active()); from_params::<(TraceFilter,)>(params) .and_then(|(filter, )| { let client = take_weak!(self.client); @@ -69,6 +76,7 @@ impl Traces for TracesClient where C: BlockChainClient + 'static, M: } fn block_traces(&self, params: Params) -> Result { + try!(self.active()); from_params::<(BlockNumber,)>(params) .and_then(|(block_number,)| { let client = take_weak!(self.client); @@ -79,6 +87,7 @@ impl Traces for TracesClient where C: BlockChainClient + 'static, M: } fn transaction_traces(&self, params: Params) -> Result { + try!(self.active()); from_params::<(H256,)>(params) .and_then(|(transaction_hash,)| { let client = take_weak!(self.client); @@ -89,6 +98,7 @@ impl Traces for TracesClient where C: BlockChainClient + 'static, M: } fn trace(&self, params: Params) -> Result { + try!(self.active()); from_params::<(H256, Vec)>(params) .and_then(|(transaction_hash, address)| { let client = take_weak!(self.client); @@ -103,6 +113,7 @@ impl Traces for TracesClient where C: BlockChainClient + 'static, M: } fn call(&self, params: Params) -> Result { + try!(self.active()); trace!(target: "jsonrpc", "call: {:?}", params); from_params(params) .and_then(|(request, flags)| { diff --git a/rpc/src/v1/tests/mocked/ethcore_set.rs b/rpc/src/v1/tests/mocked/ethcore_set.rs index 19f52025f..f43233733 100644 --- a/rpc/src/v1/tests/mocked/ethcore_set.rs +++ b/rpc/src/v1/tests/mocked/ethcore_set.rs @@ -20,6 +20,7 @@ use jsonrpc_core::IoHandler; use v1::{EthcoreSet, EthcoreSetClient}; use ethcore::miner::MinerService; use ethcore::service::SyncMessage; +use ethcore::client::TestBlockChainClient; use v1::tests::helpers::TestMinerService; use util::numbers::*; use util::network::{NetworkConfiguration, NetworkService}; @@ -29,20 +30,25 @@ fn miner_service() -> Arc { Arc::new(TestMinerService::default()) } +fn client_service() -> Arc { + Arc::new(TestBlockChainClient::default()) +} + fn network_service() -> Arc> { Arc::new(NetworkService::new(NetworkConfiguration::new()).unwrap()) } -fn ethcore_set_client(miner: &Arc, net: &Arc>) -> EthcoreSetClient { - EthcoreSetClient::new(miner, net) +fn ethcore_set_client(client: &Arc, miner: &Arc, net: &Arc>) -> EthcoreSetClient { + EthcoreSetClient::new(client, miner, net) } #[test] fn rpc_ethcore_set_min_gas_price() { let miner = miner_service(); + let client = client_service(); let network = network_service(); let io = IoHandler::new(); - io.add_delegate(ethcore_set_client(&miner, &network).to_delegate()); + io.add_delegate(ethcore_set_client(&client, &miner, &network).to_delegate()); let request = r#"{"jsonrpc": "2.0", "method": "ethcore_setMinGasPrice", "params":["0xcd1722f3947def4cf144679da39c4c32bdc35681"], "id": 1}"#; let response = r#"{"jsonrpc":"2.0","result":true,"id":1}"#; @@ -50,12 +56,14 @@ fn rpc_ethcore_set_min_gas_price() { assert_eq!(io.handle_request(request), Some(response.to_owned())); assert_eq!(miner.minimal_gas_price(), U256::from_str("cd1722f3947def4cf144679da39c4c32bdc35681").unwrap()); } + #[test] fn rpc_ethcore_set_gas_floor_target() { let miner = miner_service(); + let client = client_service(); let network = network_service(); let io = IoHandler::new(); - io.add_delegate(ethcore_set_client(&miner, &network).to_delegate()); + io.add_delegate(ethcore_set_client(&client, &miner, &network).to_delegate()); let request = r#"{"jsonrpc": "2.0", "method": "ethcore_setGasFloorTarget", "params":["0xcd1722f3947def4cf144679da39c4c32bdc35681"], "id": 1}"#; let response = r#"{"jsonrpc":"2.0","result":true,"id":1}"#; @@ -67,9 +75,10 @@ fn rpc_ethcore_set_gas_floor_target() { #[test] fn rpc_ethcore_set_extra_data() { let miner = miner_service(); + let client = client_service(); let network = network_service(); let io = IoHandler::new(); - io.add_delegate(ethcore_set_client(&miner, &network).to_delegate()); + io.add_delegate(ethcore_set_client(&client, &miner, &network).to_delegate()); let request = r#"{"jsonrpc": "2.0", "method": "ethcore_setExtraData", "params":["0xcd1722f3947def4cf144679da39c4c32bdc35681"], "id": 1}"#; let response = r#"{"jsonrpc":"2.0","result":true,"id":1}"#; @@ -81,9 +90,10 @@ fn rpc_ethcore_set_extra_data() { #[test] fn rpc_ethcore_set_author() { let miner = miner_service(); + let client = client_service(); let network = network_service(); let io = IoHandler::new(); - io.add_delegate(ethcore_set_client(&miner, &network).to_delegate()); + io.add_delegate(ethcore_set_client(&client, &miner, &network).to_delegate()); let request = r#"{"jsonrpc": "2.0", "method": "ethcore_setAuthor", "params":["0xcd1722f3947def4cf144679da39c4c32bdc35681"], "id": 1}"#; let response = r#"{"jsonrpc":"2.0","result":true,"id":1}"#; @@ -95,9 +105,10 @@ fn rpc_ethcore_set_author() { #[test] fn rpc_ethcore_set_transactions_limit() { let miner = miner_service(); + let client = client_service(); let network = network_service(); let io = IoHandler::new(); - io.add_delegate(ethcore_set_client(&miner, &network).to_delegate()); + io.add_delegate(ethcore_set_client(&client, &miner, &network).to_delegate()); let request = r#"{"jsonrpc": "2.0", "method": "ethcore_setTransactionsLimit", "params":[10240240], "id": 1}"#; let response = r#"{"jsonrpc":"2.0","result":true,"id":1}"#;