Merge pull request #894 from ethcore/send-transactions
Propagate transaction queue
This commit is contained in:
		
						commit
						46a25b31ab
					
				| @ -105,9 +105,12 @@ pub trait MinerService : Send + Sync { | ||||
| 	/// Get the sealing work package and if `Some`, apply some transform.
 | ||||
| 	fn map_sealing_work<F, T>(&self, chain: &BlockChainClient, f: F) -> Option<T> where F: FnOnce(&ClosedBlock) -> T; | ||||
| 
 | ||||
| 	/// Query pending transactions for hash
 | ||||
| 	/// Query pending transactions for hash.
 | ||||
| 	fn transaction(&self, hash: &H256) -> Option<SignedTransaction>; | ||||
| 
 | ||||
| 	/// Get a list of all pending transactions.
 | ||||
| 	fn pending_transactions(&self) -> Vec<SignedTransaction>; | ||||
| 
 | ||||
| 	/// Returns highest transaction nonce for given address.
 | ||||
| 	fn last_nonce(&self, address: &Address) -> Option<U256>; | ||||
| 
 | ||||
|  | ||||
| @ -228,6 +228,11 @@ impl MinerService for Miner { | ||||
| 		queue.find(hash) | ||||
| 	} | ||||
| 
 | ||||
| 	fn pending_transactions(&self) -> Vec<SignedTransaction> { | ||||
| 		let queue = self.transaction_queue.lock().unwrap(); | ||||
| 		queue.top_transactions() | ||||
| 	} | ||||
| 
 | ||||
| 	fn last_nonce(&self, address: &Address) -> Option<U256> { | ||||
| 		self.transaction_queue.lock().unwrap().last_nonce(address) | ||||
| 	} | ||||
|  | ||||
| @ -186,7 +186,7 @@ impl<C, S, A, M, EM> EthClient<C, S, A, M, EM> | ||||
| 		}.fake_sign(from)) | ||||
| 	} | ||||
| 
 | ||||
| 	fn dispatch_transaction(&self, signed_transaction: SignedTransaction, raw_transaction: Vec<u8>) -> Result<Value, Error> { | ||||
| 	fn dispatch_transaction(&self, signed_transaction: SignedTransaction) -> Result<Value, Error> { | ||||
| 		let hash = signed_transaction.hash(); | ||||
| 
 | ||||
| 		let import = { | ||||
| @ -203,7 +203,6 @@ impl<C, S, A, M, EM> EthClient<C, S, A, M, EM> | ||||
| 
 | ||||
| 		match import.into_iter().collect::<Result<Vec<_>, _>>() { | ||||
| 			Ok(_) => { | ||||
| 				take_weak!(self.sync).new_transaction(raw_transaction); | ||||
| 				to_value(&hash) | ||||
| 			} | ||||
| 			Err(e) => { | ||||
| @ -504,8 +503,7 @@ impl<C, S, A, M, EM> Eth for EthClient<C, S, A, M, EM> | ||||
| 								data: request.data.map_or_else(Vec::new, |d| d.to_vec()), | ||||
| 							}.sign(&secret) | ||||
| 						}; | ||||
| 						let raw_transaction = encode(&signed_transaction).to_vec(); | ||||
| 						self.dispatch_transaction(signed_transaction, raw_transaction) | ||||
| 						self.dispatch_transaction(signed_transaction) | ||||
| 					}, | ||||
| 					Err(_) => { to_value(&H256::zero()) } | ||||
| 				} | ||||
| @ -517,7 +515,7 @@ impl<C, S, A, M, EM> Eth for EthClient<C, S, A, M, EM> | ||||
| 			.and_then(|(raw_transaction, )| { | ||||
| 				let raw_transaction = raw_transaction.to_vec(); | ||||
| 				match UntrustedRlp::new(&raw_transaction).as_val() { | ||||
| 					Ok(signed_transaction) => self.dispatch_transaction(signed_transaction, raw_transaction), | ||||
| 					Ok(signed_transaction) => self.dispatch_transaction(signed_transaction), | ||||
| 					Err(_) => to_value(&H256::zero()), | ||||
| 				} | ||||
| 		}) | ||||
|  | ||||
| @ -98,6 +98,10 @@ impl MinerService for TestMinerService { | ||||
| 		self.pending_transactions.lock().unwrap().get(hash).cloned() | ||||
| 	} | ||||
| 
 | ||||
| 	fn pending_transactions(&self) -> Vec<SignedTransaction> { | ||||
| 		self.pending_transactions.lock().unwrap().values().cloned().collect() | ||||
| 	} | ||||
| 
 | ||||
| 	fn last_nonce(&self, address: &Address) -> Option<U256> { | ||||
| 		self.last_nonces.read().unwrap().get(address).cloned() | ||||
| 	} | ||||
|  | ||||
| @ -16,7 +16,7 @@ | ||||
| 
 | ||||
| //! Test implementation of SyncProvider.
 | ||||
| 
 | ||||
| use util::{U256, Bytes}; | ||||
| use util::{U256}; | ||||
| use ethsync::{SyncProvider, SyncStatus, SyncState}; | ||||
| use std::sync::{RwLock}; | ||||
| 
 | ||||
| @ -59,8 +59,5 @@ impl SyncProvider for TestSyncProvider { | ||||
| 	fn status(&self) -> SyncStatus { | ||||
| 		self.status.read().unwrap().clone() | ||||
| 	} | ||||
| 
 | ||||
| 	fn new_transaction(&self, _raw_transaction: Bytes) { | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
|  | ||||
| @ -217,10 +217,6 @@ pub struct ChainSync { | ||||
| 	network_id: U256, | ||||
| 	/// Miner
 | ||||
| 	miner: Arc<Miner>, | ||||
| 
 | ||||
| 	/// Transactions to propagate
 | ||||
| 	// TODO: reconsider where this is in the codebase - seems a little dodgy to have here.
 | ||||
| 	transactions_to_send: Vec<Bytes>, | ||||
| } | ||||
| 
 | ||||
| type RlpResponseResult = Result<Option<(PacketId, RlpStream)>, PacketDecodeError>; | ||||
| @ -247,7 +243,6 @@ impl ChainSync { | ||||
| 			max_download_ahead_blocks: max(MAX_HEADERS_TO_REQUEST, config.max_download_ahead_blocks), | ||||
| 			network_id: config.network_id, | ||||
| 			miner: miner, | ||||
| 			transactions_to_send: vec![], | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| @ -950,11 +945,6 @@ impl ChainSync { | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	/// Place a new transaction on the wire.
 | ||||
| 	pub fn new_transaction(&mut self, raw_transaction: Bytes) { | ||||
| 		self.transactions_to_send.push(raw_transaction); | ||||
| 	} | ||||
| 
 | ||||
| 	/// Called when peer sends us new transactions
 | ||||
| 	fn on_peer_transactions(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { | ||||
| 		// accepting transactions once only fully synced
 | ||||
| @ -1296,11 +1286,16 @@ impl ChainSync { | ||||
| 			return 0; | ||||
| 		} | ||||
| 
 | ||||
| 		let mut packet = RlpStream::new_list(self.transactions_to_send.len()); | ||||
| 		for tx in &self.transactions_to_send { | ||||
| 			packet.append_raw(tx, 1); | ||||
| 		let mut transactions = self.miner.pending_transactions(); | ||||
| 		if transactions.is_empty() { | ||||
| 			return 0; | ||||
| 		} | ||||
| 
 | ||||
| 		let mut packet = RlpStream::new_list(transactions.len()); | ||||
| 		let tx_count = transactions.len(); | ||||
| 		for tx in transactions.drain(..) { | ||||
| 			packet.append(&tx); | ||||
| 		} | ||||
| 		self.transactions_to_send.clear(); | ||||
| 		let rlp = packet.out(); | ||||
| 
 | ||||
| 		let lucky_peers = { | ||||
| @ -1319,13 +1314,12 @@ impl ChainSync { | ||||
| 		for peer_id in lucky_peers { | ||||
| 			self.send_packet(io, peer_id, TRANSACTIONS_PACKET, rlp.clone()); | ||||
| 		} | ||||
| 		trace!(target: "sync", "Sent {} transactions to {} peers.", tx_count, sent); | ||||
| 		sent | ||||
| 	} | ||||
| 
 | ||||
| 	fn propagate_latest_blocks(&mut self, io: &mut SyncIo) { | ||||
| 		if !self.transactions_to_send.is_empty() { | ||||
| 		self.propagate_new_transactions(io); | ||||
| 		} | ||||
| 		let chain_info = io.chain().chain_info(); | ||||
| 		if (((chain_info.best_block_number as i64) - (self.last_sent_block_number as i64)).abs() as BlockNumber) < MAX_PEER_LAG_PROPAGATION { | ||||
| 			let blocks = self.propagate_blocks(&chain_info, io); | ||||
|  | ||||
| @ -66,7 +66,7 @@ use std::ops::*; | ||||
| use std::sync::*; | ||||
| use util::network::{NetworkProtocolHandler, NetworkService, NetworkContext, PeerId}; | ||||
| use util::TimerToken; | ||||
| use util::{U256, Bytes, ONE_U256}; | ||||
| use util::{U256, ONE_U256}; | ||||
| use ethcore::client::Client; | ||||
| use ethcore::service::SyncMessage; | ||||
| use ethminer::Miner; | ||||
| @ -101,9 +101,6 @@ impl Default for SyncConfig { | ||||
| pub trait SyncProvider: Send + Sync { | ||||
| 	/// Get sync status
 | ||||
| 	fn status(&self) -> SyncStatus; | ||||
| 
 | ||||
| 	/// Note that a user has submitted a new transaction.
 | ||||
| 	fn new_transaction(&self, raw_transaction: Bytes); | ||||
| } | ||||
| 
 | ||||
| /// Ethereum network protocol handler
 | ||||
| @ -143,11 +140,6 @@ impl SyncProvider for EthSync { | ||||
| 	fn status(&self) -> SyncStatus { | ||||
| 		self.sync.read().unwrap().status() | ||||
| 	} | ||||
| 
 | ||||
| 	/// Note that a user has submitted a new transaction.
 | ||||
| 	fn new_transaction(&self, raw_transaction: Bytes) { | ||||
| 		self.sync.write().unwrap().new_transaction(raw_transaction); | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| impl NetworkProtocolHandler<SyncMessage> for EthSync { | ||||
|  | ||||
		Loading…
	
		Reference in New Issue
	
	Block a user