From 8e2aca719f1b563241d7d0e33ca31beabf940c34 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Wed, 30 Nov 2016 10:16:18 +0100 Subject: [PATCH] Shared hash-fetch --- ethcore/hash-fetch/src/client.rs | 9 ++++- ethcore/src/client/client.rs | 14 +++++-- ethcore/src/client/fetch.rs | 49 +++++++++++++++++++++++++ ethcore/src/client/mod.rs | 1 + ethcore/src/client/updater.rs | 63 +++++++++++--------------------- 5 files changed, 88 insertions(+), 48 deletions(-) create mode 100644 ethcore/src/client/fetch.rs diff --git a/ethcore/hash-fetch/src/client.rs b/ethcore/hash-fetch/src/client.rs index f5d19afa5..272c952e0 100644 --- a/ethcore/hash-fetch/src/client.rs +++ b/ethcore/hash-fetch/src/client.rs @@ -26,7 +26,7 @@ use fetch::{Fetch, FetchError, Client as FetchClient}; use urlhint::{ContractClient, URLHintContract, URLHint, URLHintResult}; /// API for fetching by hash. -pub trait HashFetch { +pub trait HashFetch: Send + Sync + 'static { /// Fetch hash-addressed content. /// Parameters: /// 1. `hash` - content hash @@ -42,7 +42,12 @@ pub enum Error { /// Hash could not be resolved to a valid content address. NoResolution, /// Downloaded content hash does not match. - HashMismatch { expected: H256, got: H256 }, + HashMismatch { + /// Expected hash + expected: H256, + /// Computed hash + got: H256, + }, /// IO Error while validating hash. IO(io::Error), /// Error during fetch. diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index 200527222..26750624d 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -72,6 +72,8 @@ use state_db::StateDB; use rand::OsRng; use client::updater::Updater; use client::registry::Registry; +use client::fetch::FetchHandler; +use fetch::{self, HashFetch}; // re-export pub use types::blockchain_info::BlockChainInfo; @@ -154,6 +156,7 @@ pub struct Client { rng: Mutex, on_mode_change: Mutex>>, registrar: Mutex>, + fetch_service: Mutex>>, } impl Client { @@ -226,8 +229,6 @@ impl Client { accountdb: Default::default(), }; - - let client = Arc::new(Client { sleep_state: Mutex::new(SleepState::new(awake)), liveness: AtomicBool::new(awake), @@ -255,18 +256,23 @@ impl Client { rng: Mutex::new(try!(OsRng::new().map_err(::util::UtilError::StdIo))), on_mode_change: Mutex::new(None), registrar: Mutex::new(None), + fetch_service: Mutex::new(None), }); if let Some(reg_addr) = client.additional_params().get("registrar").and_then(|s| Address::from_str(s).ok()) { trace!(target: "client", "Found registrar at {}", reg_addr); let weak = Arc::downgrade(&client); + let fetch = Arc::new(fetch::Client::new(Arc::new(FetchHandler::new(weak.clone())))); let registrar = Registry::new(reg_addr, move |a, d| weak.upgrade().ok_or("No client!".into()).and_then(|c| c.call_contract(a, d))); + // TODO [ToDr] The address might not be available when client is starting (but may be available later). + // Shouldn't this be moved inside the `Updater`? if let Ok(ops_addr) = registrar.get_address(&(&b"operations"[..]).sha3(), "A") { - if !ops_addr.is_zero() { + if !ops_addr.is_zero() { trace!(target: "client", "Found operations at {}", ops_addr); - *client.updater.lock() = Some(Updater::new(Arc::downgrade(&client), ops_addr, client.config.update_policy.clone())); + *client.updater.lock() = Some(Updater::new(Arc::downgrade(&client), Arc::downgrade(&fetch), ops_addr, client.config.update_policy.clone())); } } *client.registrar.lock() = Some(registrar); + *client.fetch_service.lock() = Some(fetch); } Ok(client) } diff --git a/ethcore/src/client/fetch.rs b/ethcore/src/client/fetch.rs new file mode 100644 index 000000000..a94e22b99 --- /dev/null +++ b/ethcore/src/client/fetch.rs @@ -0,0 +1,49 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +use std::sync::Weak; +use std::str::FromStr; +use util::{Bytes, Address}; + +use client::{Client, BlockChainClient}; +use fetch; + +/// Client wrapper implementing `fetch::urlhint::ContractClient` +pub struct FetchHandler { + client: Weak, +} + +impl FetchHandler { + /// Creates new wrapper + pub fn new(client: Weak) -> Self { + FetchHandler { client: client } + } +} + +impl fetch::urlhint::ContractClient for FetchHandler { + fn registrar(&self) -> Result { + self.client.upgrade().ok_or_else(|| "Client not available".to_owned())? + .additional_params() + .get("registrar") + .and_then(|s| Address::from_str(s).ok()) + .ok_or_else(|| "Registrar not available".into()) + } + + fn call(&self, address: Address, data: Bytes) -> Result { + self.client.upgrade().ok_or_else(|| "Client not available".to_owned())? + .call_contract(address, data) + } +} diff --git a/ethcore/src/client/mod.rs b/ethcore/src/client/mod.rs index 7759d08ef..621a4c919 100644 --- a/ethcore/src/client/mod.rs +++ b/ethcore/src/client/mod.rs @@ -24,6 +24,7 @@ mod test_client; mod trace; mod client; mod updater; +mod fetch; pub use self::client::*; pub use self::config::{Mode, ClientConfig, UpdatePolicy, UpdateFilter, DatabaseCompactionProfile, BlockChainConfig, VMType}; diff --git a/ethcore/src/client/updater.rs b/ethcore/src/client/updater.rs index 11bdc0b5c..9417df7bf 100644 --- a/ethcore/src/client/updater.rs +++ b/ethcore/src/client/updater.rs @@ -14,11 +14,10 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . -use std::sync::{Mutex, Weak, Arc}; +use std::sync::{Weak, Arc}; use std::path::PathBuf; use util::misc::{VersionInfo, ReleaseTrack, platform}; -use std::str::FromStr; -use util::{Bytes, Address, H160, H256, FixedHash}; +use util::{Bytes, Address, H160, H256, FixedHash, Mutex}; use client::operations::Operations; use client::{Client, BlockChainClient, UpdatePolicy, BlockId}; use fetch::HashFetch; @@ -42,9 +41,10 @@ pub struct OperationsInfo { pub struct Updater { client: Weak, + fetch: Weak, operations: Operations, update_policy: UpdatePolicy, - fetch_handler: Mutex>, + fetch_handler: Mutex>, // These don't change pub this: VersionInfo, @@ -56,35 +56,15 @@ pub struct Updater { const CLIENT_ID: &'static str = "parity"; -struct FetchHandler { - client: Weak, -} - -impl fetch::urlhint::ContractClient for FetchHandler { - fn registrar(&self) -> Result { - self.client.upgrade().ok_or_else(|| "Client not available".to_owned())? - .additional_params() - .get("registrar") - .and_then(|s| Address::from_str(s).ok()) - .ok_or_else(|| "Registrar not available".into()) - } - - fn call(&self, address: Address, data: Bytes) -> Result { - self.client.upgrade().ok_or_else(|| "Client not available".to_owned())? - .call_contract(address, data) - } -} - -fn start_fetch(client: Weak, hash: H256, on_done: Box) + Send>) -> Result { - let f = fetch::Client::new(Arc::new(FetchHandler { client: client, })); - let r = f.fetch(hash, on_done); - r.map(|_| f) +fn start_fetch(fetch: Arc, hash: H256, on_done: Box) + Send>) -> Result<(), fetch::Error> { + fetch.fetch(hash, on_done) } impl Updater { - pub fn new(client: Weak, operations: Address, update_policy: UpdatePolicy) -> Self { + pub fn new(client: Weak, fetch: Weak, operations: Address, update_policy: UpdatePolicy) -> Self { let mut u = Updater { client: client.clone(), + fetch: fetch.clone(), operations: Operations::new(operations, move |a, d| client.upgrade().ok_or("No client!".into()).and_then(|c| c.call_contract(a, d))), update_policy: update_policy, fetch_handler: Mutex::new(None), @@ -107,12 +87,12 @@ impl Updater { } /// Is the currently running client capable of supporting the current chain? - /// `Some` answer or `None` if information on the running client is not available. + /// `Some` answer or `None` if information on the running client is not available. pub fn is_capable(&self) -> Option { self.latest.as_ref().and_then(|latest| { self.this_fork.map(|this_fork| { let current_number = self.client.upgrade().map_or(0, |c| c.block_number(BlockId::Latest).unwrap_or(0)); - this_fork >= latest.fork || current_number < latest.fork + this_fork >= latest.fork || current_number < latest.fork }) }) } @@ -124,15 +104,15 @@ impl Updater { } /// Actually upgrades the client. Assumes that the binary has been downloaded. - /// @returns `true` on success. + /// @returns `true` on success. pub fn execute_upgrade(&mut self) -> bool { unimplemented!() } - /// Our version info. + /// Our version info. pub fn version_info(&self) -> &VersionInfo { &self.this } - /// Information gathered concerning the release. + /// Information gathered concerning the release. pub fn info(&self) -> &Option { &self.latest } fn collect_release_info(&self, release_id: &H256) -> Result { @@ -172,9 +152,7 @@ impl Updater { } fn fetch_done(&self, _r: Result) { - if let Ok(mut x) = self.fetch_handler.lock() { - *x = None; - } + *self.fetch_handler.lock() = None; } pub fn tick(&mut self) { @@ -186,7 +164,7 @@ impl Updater { if let Some(ref latest) = self.latest { info!(target: "updater", "Latest release in our track is v{} it is {}critical ({} binary is {})", latest.track.version, - if latest.track.is_critical {""} else {"non-"}, + if latest.track.is_critical {""} else {"non-"}, platform(), if let Some(ref b) = latest.track.binary { format!("{}", b) @@ -195,11 +173,12 @@ impl Updater { } ); if let Some(b) = latest.track.binary { - if let Ok(mut fetch_handler) = self.fetch_handler.lock() { - if fetch_handler.is_none() { - let c = self.client.clone(); - let f = move |r: Result| if let Some(c) = c.upgrade() { c.updater().as_ref().expect("updater exists; updater only owned by client; qed").fetch_done(r); }; - *fetch_handler = start_fetch(self.client.clone(), b, Box::new(f)).ok(); + let mut fetch_handler = self.fetch_handler.lock(); + if fetch_handler.is_none() { + let c = self.client.clone(); + let f = move |r: Result| if let Some(c) = c.upgrade() { c.updater().as_ref().expect("updater exists; updater only owned by client; qed").fetch_done(r); }; + if let Some(fetch) = self.fetch.clone().upgrade() { + *fetch_handler = start_fetch(fetch, b, Box::new(f)).ok(); } } }