Shared hash-fetch
This commit is contained in:
		
							parent
							
								
									ce3f4911a2
								
							
						
					
					
						commit
						8e2aca719f
					
				| @ -26,7 +26,7 @@ use fetch::{Fetch, FetchError, Client as FetchClient}; | ||||
| use urlhint::{ContractClient, URLHintContract, URLHint, URLHintResult}; | ||||
| 
 | ||||
| /// API for fetching by hash.
 | ||||
| pub trait HashFetch { | ||||
| pub trait HashFetch: Send + Sync + 'static { | ||||
| 	/// Fetch hash-addressed content.
 | ||||
| 	/// Parameters:
 | ||||
| 	/// 1. `hash` - content hash
 | ||||
| @ -42,7 +42,12 @@ pub enum Error { | ||||
| 	/// Hash could not be resolved to a valid content address.
 | ||||
| 	NoResolution, | ||||
| 	/// Downloaded content hash does not match.
 | ||||
| 	HashMismatch { expected: H256, got: H256 }, | ||||
| 	HashMismatch { | ||||
| 		/// Expected hash
 | ||||
| 		expected: H256, | ||||
| 		/// Computed hash
 | ||||
| 		got: H256, | ||||
| 	}, | ||||
| 	/// IO Error while validating hash.
 | ||||
| 	IO(io::Error), | ||||
| 	/// Error during fetch.
 | ||||
|  | ||||
| @ -72,6 +72,8 @@ use state_db::StateDB; | ||||
| use rand::OsRng; | ||||
| use client::updater::Updater; | ||||
| use client::registry::Registry; | ||||
| use client::fetch::FetchHandler; | ||||
| use fetch::{self, HashFetch}; | ||||
| 
 | ||||
| // re-export
 | ||||
| pub use types::blockchain_info::BlockChainInfo; | ||||
| @ -154,6 +156,7 @@ pub struct Client { | ||||
| 	rng: Mutex<OsRng>, | ||||
| 	on_mode_change: Mutex<Option<Box<FnMut(&Mode) + 'static + Send>>>, | ||||
| 	registrar: Mutex<Option<Registry>>, | ||||
| 	fetch_service: Mutex<Option<Arc<HashFetch>>>, | ||||
| } | ||||
| 
 | ||||
| impl Client { | ||||
| @ -226,8 +229,6 @@ impl Client { | ||||
| 			accountdb: Default::default(), | ||||
| 		}; | ||||
| 
 | ||||
| 
 | ||||
| 
 | ||||
| 		let client = Arc::new(Client { | ||||
| 			sleep_state: Mutex::new(SleepState::new(awake)), | ||||
| 			liveness: AtomicBool::new(awake), | ||||
| @ -255,18 +256,23 @@ impl Client { | ||||
| 			rng: Mutex::new(try!(OsRng::new().map_err(::util::UtilError::StdIo))), | ||||
| 			on_mode_change: Mutex::new(None), | ||||
| 			registrar: Mutex::new(None), | ||||
| 			fetch_service: Mutex::new(None), | ||||
| 		}); | ||||
| 		if let Some(reg_addr) = client.additional_params().get("registrar").and_then(|s| Address::from_str(s).ok()) { | ||||
| 			trace!(target: "client", "Found registrar at {}", reg_addr); | ||||
| 			let weak = Arc::downgrade(&client); | ||||
| 			let fetch = Arc::new(fetch::Client::new(Arc::new(FetchHandler::new(weak.clone())))); | ||||
| 			let registrar = Registry::new(reg_addr, move |a, d| weak.upgrade().ok_or("No client!".into()).and_then(|c| c.call_contract(a, d))); | ||||
| 			// TODO [ToDr] The address might not be available when client is starting (but may be available later).
 | ||||
| 			// Shouldn't this be moved inside the `Updater`?
 | ||||
| 			if let Ok(ops_addr) = registrar.get_address(&(&b"operations"[..]).sha3(), "A") { | ||||
| 				if !ops_addr.is_zero() { | ||||
| 					trace!(target: "client", "Found operations at {}", ops_addr); | ||||
| 					*client.updater.lock() = Some(Updater::new(Arc::downgrade(&client), ops_addr, client.config.update_policy.clone())); | ||||
| 					*client.updater.lock() = Some(Updater::new(Arc::downgrade(&client), Arc::downgrade(&fetch), ops_addr, client.config.update_policy.clone())); | ||||
| 				} | ||||
| 			} | ||||
| 			*client.registrar.lock() = Some(registrar); | ||||
| 			*client.fetch_service.lock() = Some(fetch); | ||||
| 		} | ||||
| 		Ok(client) | ||||
| 	} | ||||
|  | ||||
							
								
								
									
										49
									
								
								ethcore/src/client/fetch.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										49
									
								
								ethcore/src/client/fetch.rs
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,49 @@ | ||||
| // Copyright 2015, 2016 Ethcore (UK) Ltd.
 | ||||
| // This file is part of Parity.
 | ||||
| 
 | ||||
| // Parity is free software: you can redistribute it and/or modify
 | ||||
| // it under the terms of the GNU General Public License as published by
 | ||||
| // the Free Software Foundation, either version 3 of the License, or
 | ||||
| // (at your option) any later version.
 | ||||
| 
 | ||||
| // Parity is distributed in the hope that it will be useful,
 | ||||
| // but WITHOUT ANY WARRANTY; without even the implied warranty of
 | ||||
| // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | ||||
| // GNU General Public License for more details.
 | ||||
| 
 | ||||
| // You should have received a copy of the GNU General Public License
 | ||||
| // along with Parity.  If not, see <http://www.gnu.org/licenses/>.
 | ||||
| 
 | ||||
| use std::sync::Weak; | ||||
| use std::str::FromStr; | ||||
| use util::{Bytes, Address}; | ||||
| 
 | ||||
| use client::{Client, BlockChainClient}; | ||||
| use fetch; | ||||
| 
 | ||||
| /// Client wrapper implementing `fetch::urlhint::ContractClient`
 | ||||
| pub struct FetchHandler { | ||||
| 	client: Weak<Client>, | ||||
| } | ||||
| 
 | ||||
| impl FetchHandler { | ||||
| 	/// Creates new wrapper
 | ||||
| 	pub fn new(client: Weak<Client>) -> Self { | ||||
| 		FetchHandler { client: client } | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| impl fetch::urlhint::ContractClient for FetchHandler { | ||||
| 	fn registrar(&self) -> Result<Address, String> { | ||||
| 		self.client.upgrade().ok_or_else(|| "Client not available".to_owned())? | ||||
| 			.additional_params() | ||||
| 			.get("registrar") | ||||
| 			.and_then(|s| Address::from_str(s).ok()) | ||||
| 			.ok_or_else(|| "Registrar not available".into()) | ||||
| 	} | ||||
| 
 | ||||
| 	fn call(&self, address: Address, data: Bytes) -> Result<Bytes, String> { | ||||
| 		self.client.upgrade().ok_or_else(|| "Client not available".to_owned())? | ||||
| 			.call_contract(address, data) | ||||
| 	} | ||||
| } | ||||
| @ -24,6 +24,7 @@ mod test_client; | ||||
| mod trace; | ||||
| mod client; | ||||
| mod updater; | ||||
| mod fetch; | ||||
| 
 | ||||
| pub use self::client::*; | ||||
| pub use self::config::{Mode, ClientConfig, UpdatePolicy, UpdateFilter, DatabaseCompactionProfile, BlockChainConfig, VMType}; | ||||
|  | ||||
| @ -14,11 +14,10 @@ | ||||
| // You should have received a copy of the GNU General Public License
 | ||||
| // along with Parity.  If not, see <http://www.gnu.org/licenses/>.
 | ||||
| 
 | ||||
| use std::sync::{Mutex, Weak, Arc}; | ||||
| use std::sync::{Weak, Arc}; | ||||
| use std::path::PathBuf; | ||||
| use util::misc::{VersionInfo, ReleaseTrack, platform}; | ||||
| use std::str::FromStr; | ||||
| use util::{Bytes, Address, H160, H256, FixedHash}; | ||||
| use util::{Bytes, Address, H160, H256, FixedHash, Mutex}; | ||||
| use client::operations::Operations; | ||||
| use client::{Client, BlockChainClient, UpdatePolicy, BlockId}; | ||||
| use fetch::HashFetch; | ||||
| @ -42,9 +41,10 @@ pub struct OperationsInfo { | ||||
| 
 | ||||
| pub struct Updater { | ||||
| 	client: Weak<Client>, | ||||
| 	fetch: Weak<HashFetch>, | ||||
| 	operations: Operations, | ||||
| 	update_policy: UpdatePolicy, | ||||
| 	fetch_handler: Mutex<Option<fetch::Client>>, | ||||
| 	fetch_handler: Mutex<Option<()>>, | ||||
| 
 | ||||
| 	// These don't change
 | ||||
| 	pub this: VersionInfo, | ||||
| @ -56,35 +56,15 @@ pub struct Updater { | ||||
| 
 | ||||
| const CLIENT_ID: &'static str = "parity"; | ||||
| 
 | ||||
| struct FetchHandler { | ||||
| 	client: Weak<Client>, | ||||
| } | ||||
| 
 | ||||
| impl fetch::urlhint::ContractClient for FetchHandler { | ||||
| 	fn registrar(&self) -> Result<Address, String> { | ||||
| 		self.client.upgrade().ok_or_else(|| "Client not available".to_owned())? | ||||
| 			.additional_params() | ||||
| 			.get("registrar") | ||||
| 			.and_then(|s| Address::from_str(s).ok()) | ||||
| 			.ok_or_else(|| "Registrar not available".into()) | ||||
| 	} | ||||
| 
 | ||||
| 	fn call(&self, address: Address, data: Bytes) -> Result<Bytes, String> { | ||||
| 		self.client.upgrade().ok_or_else(|| "Client not available".to_owned())? | ||||
| 			.call_contract(address, data) | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| fn start_fetch(client: Weak<Client>, hash: H256, on_done: Box<Fn(Result<PathBuf, fetch::Error>) + Send>) -> Result<fetch::Client, fetch::Error> { | ||||
| 	let f = fetch::Client::new(Arc::new(FetchHandler { client: client, })); | ||||
| 	let r = f.fetch(hash, on_done); | ||||
| 	r.map(|_| f) | ||||
| fn start_fetch(fetch: Arc<HashFetch>, hash: H256, on_done: Box<Fn(Result<PathBuf, fetch::Error>) + Send>) -> Result<(), fetch::Error> { | ||||
| 	fetch.fetch(hash, on_done) | ||||
| } | ||||
| 
 | ||||
| impl Updater { | ||||
| 	pub fn new(client: Weak<Client>, operations: Address, update_policy: UpdatePolicy) -> Self { | ||||
| 	pub fn new(client: Weak<Client>, fetch: Weak<fetch::Client>, operations: Address, update_policy: UpdatePolicy) -> Self { | ||||
| 		let mut u = Updater { | ||||
| 			client: client.clone(), | ||||
| 			fetch: fetch.clone(), | ||||
| 			operations: Operations::new(operations, move |a, d| client.upgrade().ok_or("No client!".into()).and_then(|c| c.call_contract(a, d))), | ||||
| 			update_policy: update_policy, | ||||
| 			fetch_handler: Mutex::new(None), | ||||
| @ -172,9 +152,7 @@ impl Updater { | ||||
| 	} | ||||
| 
 | ||||
| 	fn fetch_done(&self, _r: Result<PathBuf, fetch::Error>) { | ||||
| 		if let Ok(mut x) = self.fetch_handler.lock() { | ||||
| 			*x = None; | ||||
| 		} | ||||
| 		*self.fetch_handler.lock() = None; | ||||
| 	} | ||||
| 
 | ||||
| 	pub fn tick(&mut self) { | ||||
| @ -195,11 +173,12 @@ impl Updater { | ||||
| 				 } | ||||
| 			); | ||||
| 			if let Some(b) = latest.track.binary { | ||||
| 				if let Ok(mut fetch_handler) = self.fetch_handler.lock() { | ||||
| 					if fetch_handler.is_none() { | ||||
| 						let c = self.client.clone(); | ||||
| 						let f = move |r: Result<PathBuf, fetch::Error>| if let Some(c) = c.upgrade() { c.updater().as_ref().expect("updater exists; updater only owned by client; qed").fetch_done(r); }; 
 | ||||
| 						*fetch_handler = start_fetch(self.client.clone(), b, Box::new(f)).ok(); | ||||
| 				let mut fetch_handler = self.fetch_handler.lock(); | ||||
| 				if fetch_handler.is_none() { | ||||
| 					let c = self.client.clone(); | ||||
| 					let f = move |r: Result<PathBuf, fetch::Error>| if let Some(c) = c.upgrade() { c.updater().as_ref().expect("updater exists; updater only owned by client; qed").fetch_done(r); }; | ||||
| 					if let Some(fetch) = self.fetch.clone().upgrade() { | ||||
| 						*fetch_handler = start_fetch(fetch, b, Box::new(f)).ok(); | ||||
| 					} | ||||
| 				} | ||||
| 			} | ||||
|  | ||||
		Loading…
	
		Reference in New Issue
	
	Block a user