add internal timeout service, test proposer switching
This commit is contained in:
		
							parent
							
								
									8851acec7c
								
							
						
					
					
						commit
						0af4bf23a9
					
				| @ -17,6 +17,7 @@ | ||||
| //! Tendermint BFT consensus engine with round robin proof-of-authority.
 | ||||
| 
 | ||||
| use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering}; | ||||
| use std::sync::Weak; | ||||
| use common::*; | ||||
| use account_provider::AccountProvider; | ||||
| use block::*; | ||||
| @ -24,8 +25,7 @@ use spec::CommonParams; | ||||
| use engines::{Engine, EngineError, ProposeCollect}; | ||||
| use evm::Schedule; | ||||
| use ethjson; | ||||
| use io::{IoContext, IoHandler, TimerToken}; | ||||
| use service::{ClientIoMessage, ENGINE_TIMEOUT_TOKEN}; | ||||
| use io::{IoContext, IoHandler, TimerToken, IoService}; | ||||
| use time::get_time; | ||||
| 
 | ||||
| /// `Tendermint` params.
 | ||||
| @ -39,14 +39,6 @@ pub struct TendermintParams { | ||||
| 	pub validator_n: usize, | ||||
| 	/// Timeout durations for different steps.
 | ||||
| 	timeouts: DefaultTimeouts, | ||||
| 	/// Consensus round.
 | ||||
| 	r: u64, | ||||
| 	/// Consensus step.
 | ||||
| 	s: RwLock<Step>, | ||||
| 	/// Current step timeout in ms.
 | ||||
| 	timeout: AtomicTimerToken, | ||||
| 	/// Used to swith proposer.
 | ||||
| 	proposer_nonce: AtomicUsize, | ||||
| } | ||||
| 
 | ||||
| impl Default for TendermintParams { | ||||
| @ -64,10 +56,6 @@ impl Default for TendermintParams { | ||||
| 				precommit: 3000, | ||||
| 				commit: 3000 | ||||
| 			}, | ||||
| 			r: 0, | ||||
| 			s: RwLock::new(Step::Propose), | ||||
| 			timeout: AtomicUsize::new(propose_timeout), | ||||
| 			proposer_nonce: AtomicUsize::new(0) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
| @ -82,88 +70,70 @@ enum Step { | ||||
| 	Commit(Seal) | ||||
| } | ||||
| 
 | ||||
| #[derive(Debug)] | ||||
| struct DefaultTimeouts { | ||||
| 	propose: TimerToken, | ||||
| 	prevote: TimerToken, | ||||
| 	precommit: TimerToken, | ||||
| 	commit: TimerToken | ||||
| } | ||||
| 
 | ||||
| type Seal = Vec<Bytes>; | ||||
| type AtomicTimerToken = AtomicUsize; | ||||
| 
 | ||||
| impl IoHandler<ClientIoMessage> for Tendermint { | ||||
| 	fn initialize(&self, io: &IoContext<ClientIoMessage>) { | ||||
| 		io.register_timer(ENGINE_TIMEOUT_TOKEN, self.our_params.timeout.load(AtomicOrdering::Relaxed) as u64).expect("Error registering engine timeout"); | ||||
| 	} | ||||
| 
 | ||||
| 	fn timeout(&self, io: &IoContext<ClientIoMessage>, timer: TimerToken) { | ||||
| 		if timer == ENGINE_TIMEOUT_TOKEN { | ||||
| 			println!("Timeout: {:?}", get_time().sec); | ||||
| 			io.clear_timer(ENGINE_TIMEOUT_TOKEN).expect("Failed to cancel consensus timer."); | ||||
| 			match *self.our_params.s.try_read().unwrap() { | ||||
| 				Step::Propose => self.to_propose(), | ||||
| 				Step::Prevote(ref proposal) => self.to_precommit(proposal.hash.clone()), | ||||
| 				Step::Precommit(_, _) => self.to_propose(), | ||||
| 				Step::Commit(_) => self.to_propose(), | ||||
| 			}; | ||||
| 			//io.register_timer(ENGINE_TIMEOUT_TOKEN, 3000).expect("Failed to start new consensus timer.")
 | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	fn message(&self, io: &IoContext<ClientIoMessage>, net_message: &ClientIoMessage) { | ||||
| 		if let &ClientIoMessage::ConsensusStep(next_timeout) = net_message { | ||||
| 			println!("Message: {:?}", get_time().sec); | ||||
| 			io.clear_timer(ENGINE_TIMEOUT_TOKEN).expect("Failed to cancel consensus timer."); | ||||
| 			io.register_timer(ENGINE_TIMEOUT_TOKEN, next_timeout).expect("Failed to start new consensus timer.") | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| impl From<ethjson::spec::TendermintParams> for TendermintParams { | ||||
| 	fn from(p: ethjson::spec::TendermintParams) -> Self { | ||||
| 		let val: Vec<_> = p.validators.into_iter().map(Into::into).collect(); | ||||
| 		let val_n = val.len(); | ||||
| 		let propose_timeout = 3; | ||||
| 		let propose_timeout = 3000; | ||||
| 		TendermintParams { | ||||
| 			gas_limit_bound_divisor: p.gas_limit_bound_divisor.into(), | ||||
| 			validators: val, | ||||
| 			validator_n: val_n, | ||||
| 			timeouts: DefaultTimeouts { | ||||
| 				propose: propose_timeout, | ||||
| 				prevote: 3, | ||||
| 				precommit: 3, | ||||
| 				commit: 3 | ||||
| 				prevote: 3000, | ||||
| 				precommit: 3000, | ||||
| 				commit: 3000 | ||||
| 			}, | ||||
| 			r: 0, | ||||
| 			s: RwLock::new(Step::Propose), | ||||
| 			timeout: AtomicUsize::new(propose_timeout), | ||||
| 			proposer_nonce: AtomicUsize::new(0) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| #[derive(Clone)] | ||||
| struct StepMessage; | ||||
| 
 | ||||
| /// Engine using `Tendermint` consensus algorithm, suitable for EVM chain.
 | ||||
| pub struct Tendermint { | ||||
| 	params: CommonParams, | ||||
| 	our_params: TendermintParams, | ||||
| 	builtins: BTreeMap<Address, Builtin>, | ||||
| 	timeout_service: IoService<StepMessage>, | ||||
| 	/// Consensus round.
 | ||||
| 	r: u64, | ||||
| 	/// Consensus step.
 | ||||
| 	s: RwLock<Step>, | ||||
| 	/// Current step timeout in ms.
 | ||||
| 	timeout: AtomicMs, | ||||
| 	/// Used to swith proposer.
 | ||||
| 	proposer_nonce: AtomicUsize, | ||||
| } | ||||
| 
 | ||||
| struct TimerHandler { | ||||
| 	engine: Weak<Tendermint>, | ||||
| } | ||||
| 
 | ||||
| impl Tendermint { | ||||
| 	/// Create a new instance of Tendermint engine
 | ||||
| 	pub fn new(params: CommonParams, our_params: TendermintParams, builtins: BTreeMap<Address, Builtin>) -> Self { | ||||
| 	pub fn new(params: CommonParams, our_params: TendermintParams, builtins: BTreeMap<Address, Builtin>) -> Arc<Self> { | ||||
| 		let engine = Arc::new( | ||||
| 			Tendermint { | ||||
| 				params: params, | ||||
| 				timeout: AtomicUsize::new(our_params.timeouts.propose), | ||||
| 				our_params: our_params, | ||||
| 				builtins: builtins, | ||||
| 		} | ||||
| 				timeout_service: IoService::<StepMessage>::start().expect("Error creating engine timeout service"), | ||||
| 				r: 0, | ||||
| 				s: RwLock::new(Step::Propose), | ||||
| 				proposer_nonce: AtomicUsize::new(0) | ||||
| 			}); | ||||
| 		let handler = TimerHandler { engine: Arc::downgrade(&engine) }; | ||||
| 		engine.timeout_service.register_handler(Arc::new(handler)).expect("Error creating engine timeout service"); | ||||
| 		engine | ||||
| 	} | ||||
| 
 | ||||
| 	fn proposer(&self) -> Address { | ||||
| 		let ref p = self.our_params; | ||||
| 		p.validators.get(p.proposer_nonce.load(AtomicOrdering::Relaxed)%p.validator_n).unwrap().clone() | ||||
| 		p.validators.get(self.proposer_nonce.load(AtomicOrdering::Relaxed)%p.validator_n).unwrap().clone() | ||||
| 	} | ||||
| 
 | ||||
| 	fn is_proposer(&self, address: &Address) -> bool { | ||||
| @ -180,15 +150,21 @@ impl Tendermint { | ||||
| 							self.threshold()) | ||||
| 	} | ||||
| 
 | ||||
| 	fn to_step(&self, step: Step) { | ||||
| 		let mut guard = self.s.try_write().unwrap(); | ||||
| 		*guard = step; | ||||
| 	} | ||||
| 
 | ||||
| 	fn to_propose(&self) { | ||||
| 		self.our_params.proposer_nonce.fetch_add(1, AtomicOrdering::Relaxed); | ||||
| 		let mut guard = self.our_params.s.write(); | ||||
| 		*guard = Step::Propose; | ||||
| 		trace!(target: "tendermint", "step: entering propose"); | ||||
| 		println!("step: entering propose"); | ||||
| 		self.proposer_nonce.fetch_add(1, AtomicOrdering::Relaxed); | ||||
| 		self.to_step(Step::Propose); | ||||
| 	} | ||||
| 
 | ||||
| 	fn propose_message(&self, message: UntrustedRlp) -> Result<Bytes, Error> { | ||||
| 		// Check if message is for correct step.
 | ||||
| 		match *self.our_params.s.try_read().unwrap() { | ||||
| 		match *self.s.try_read().unwrap() { | ||||
| 			Step::Propose => (), | ||||
| 			_ => try!(Err(EngineError::WrongStep)), | ||||
| 		} | ||||
| @ -198,20 +174,24 @@ impl Tendermint { | ||||
| 	} | ||||
| 
 | ||||
| 	fn to_prevote(&self, proposal: H256) { | ||||
| 		let mut guard = self.our_params.s.write(); | ||||
| 		trace!(target: "tendermint", "step: entering prevote"); | ||||
| 		println!("step: entering prevote"); | ||||
| 		// Proceed to the prevote step.
 | ||||
| 		*guard = Step::Prevote(self.new_vote(proposal)); | ||||
| 		self.to_step(Step::Prevote(self.new_vote(proposal))); | ||||
| 	} | ||||
| 
 | ||||
| 	fn prevote_message(&self, sender: Address, message: UntrustedRlp) -> Result<Bytes, Error> { | ||||
| 		// Check if message is for correct step.
 | ||||
| 		match *self.our_params.s.try_write().unwrap() { | ||||
| 		match *self.s.try_write().unwrap() { | ||||
| 			Step::Prevote(ref mut vote) => { | ||||
| 				// Vote if message is about the right block.
 | ||||
| 				if vote.hash == try!(message.as_val()) { | ||||
| 					vote.vote(sender); | ||||
| 					// Move to next step is prevote is won.
 | ||||
| 					if vote.is_won() { self.to_precommit(vote.hash); } | ||||
| 					if vote.is_won() { | ||||
| 						//self.our_params.timeouts.precommit
 | ||||
| 						self.to_precommit(vote.hash); | ||||
| 					} | ||||
| 					Ok(message.as_raw().to_vec()) | ||||
| 				} else { | ||||
| 					try!(Err(EngineError::WrongVote)) | ||||
| @ -222,13 +202,14 @@ impl Tendermint { | ||||
| 	} | ||||
| 
 | ||||
| 	fn to_precommit(&self, proposal: H256) { | ||||
| 		let mut guard = self.our_params.s.write(); | ||||
| 		*guard = Step::Precommit(self.new_vote(proposal), Vec::new()); | ||||
| 		trace!(target: "tendermint", "step: entering precommit"); | ||||
| 		println!("step: entering precommit"); | ||||
| 		self.to_step(Step::Precommit(self.new_vote(proposal), Vec::new())); | ||||
| 	} | ||||
| 
 | ||||
| 	fn precommit_message(&self, sender: Address, signature: H520, message: UntrustedRlp) -> Result<Bytes, Error> { | ||||
| 		// Check if message is for correct step.
 | ||||
| 		match *self.our_params.s.try_write().unwrap() { | ||||
| 		match *self.s.try_write().unwrap() { | ||||
| 			Step::Precommit(ref mut vote, ref mut seal) => { | ||||
| 				// Vote and accumulate seal if message is about the right block.
 | ||||
| 				if vote.hash == try!(message.as_val()) { | ||||
| @ -245,13 +226,18 @@ impl Tendermint { | ||||
| 	} | ||||
| 
 | ||||
| 	fn to_commit(&self, seal: Seal) { | ||||
| 		let mut guard = self.our_params.s.write(); | ||||
| 		*guard = Step::Commit(seal); | ||||
| 		trace!(target: "tendermint", "step: entering commit"); | ||||
| 		println!("step: entering commit"); | ||||
| 		self.to_step(Step::Commit(seal)); | ||||
| 	} | ||||
| 
 | ||||
| 	fn threshold(&self) -> usize { | ||||
| 		self.our_params.validator_n*2/3 | ||||
| 	} | ||||
| 
 | ||||
| 	fn next_timeout(&self) -> u64 { | ||||
| 		self.timeout.load(AtomicOrdering::Relaxed) as u64 | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| impl Engine for Tendermint { | ||||
| @ -305,7 +291,7 @@ impl Engine for Tendermint { | ||||
| 
 | ||||
| 	fn handle_message(&self, sender: Address, signature: H520, message: UntrustedRlp) -> Result<Bytes, Error> { | ||||
| 		// Check if correct round.
 | ||||
| 		if self.our_params.r != try!(message.val_at(0)) { try!(Err(EngineError::WrongRound)) } | ||||
| 		if self.r != try!(message.val_at(0)) { try!(Err(EngineError::WrongRound)) } | ||||
| 		// Handle according to step.
 | ||||
| 		match try!(message.val_at(1)) { | ||||
| 			0u8 if self.is_proposer(&sender) => self.propose_message(try!(message.at(2))), | ||||
| @ -359,17 +345,55 @@ impl Engine for Tendermint { | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| /// Base timeout of each step in ms.
 | ||||
| #[derive(Debug)] | ||||
| struct DefaultTimeouts { | ||||
| 	propose: Ms, | ||||
| 	prevote: Ms, | ||||
| 	precommit: Ms, | ||||
| 	commit: Ms | ||||
| } | ||||
| 
 | ||||
| type Ms = usize; | ||||
| type Seal = Vec<Bytes>; | ||||
| type AtomicMs = AtomicUsize; | ||||
| 
 | ||||
| /// Timer token representing the consensus step timeouts.
 | ||||
| pub const ENGINE_TIMEOUT_TOKEN: TimerToken = 0; | ||||
| 
 | ||||
| impl IoHandler<StepMessage> for TimerHandler { | ||||
| 	fn initialize(&self, io: &IoContext<StepMessage>) { | ||||
| 		if let Some(engine) = self.engine.upgrade() { | ||||
| 			io.register_timer_once(ENGINE_TIMEOUT_TOKEN, engine.next_timeout()).expect("Error registering engine timeout"); | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	fn timeout(&self, io: &IoContext<StepMessage>, timer: TimerToken) { | ||||
| 		if timer == ENGINE_TIMEOUT_TOKEN { | ||||
| 			if let Some(engine) = self.engine.upgrade() { | ||||
| 				println!("Timeout: {:?}", get_time()); | ||||
| 				engine.to_propose(); | ||||
| 				io.register_timer_once(ENGINE_TIMEOUT_TOKEN, engine.next_timeout()).expect("Failed to restart consensus step timer.") | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	fn message(&self, io: &IoContext<StepMessage>, _net_message: &StepMessage) { | ||||
| 		if let Some(engine) = self.engine.upgrade() { | ||||
| 			println!("Message: {:?}", get_time().sec); | ||||
| 			io.clear_timer(ENGINE_TIMEOUT_TOKEN).expect("Failed to restart consensus step timer."); | ||||
| 			io.register_timer_once(ENGINE_TIMEOUT_TOKEN, engine.next_timeout()).expect("Failed to restart consensus step timer.") | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| #[cfg(test)] | ||||
| mod tests { | ||||
| 	use common::*; | ||||
| 	use std::thread::sleep; | ||||
| 	use std::time::{Duration, Instant}; | ||||
| 	use std::time::{Duration}; | ||||
| 	use block::*; | ||||
| 	use tests::helpers::*; | ||||
| 	use service::{ClientService, ClientIoMessage}; | ||||
| 	use devtools::RandomTempPath; | ||||
| 	use client::ClientConfig; | ||||
| 	use miner::Miner; | ||||
| 	use account_provider::AccountProvider; | ||||
| 	use spec::Spec; | ||||
| 	use engines::Engine; | ||||
| @ -379,23 +403,6 @@ mod tests { | ||||
| 	/// Account "0".sha3() and "1".sha3() are a validators.
 | ||||
| 	fn new_test_tendermint() -> Spec { Spec::load(include_bytes!("../../res/tendermint.json")) } | ||||
| 
 | ||||
| 	fn new_test_client_service() -> ClientService { | ||||
| 		let temp_path = RandomTempPath::new(); | ||||
| 		let mut path = temp_path.as_path().to_owned(); | ||||
| 		path.push("pruning"); | ||||
| 		path.push("db"); | ||||
| 
 | ||||
| 		let spec = get_test_spec(); | ||||
| 		let service = ClientService::start( | ||||
| 			ClientConfig::default(), | ||||
| 			&spec, | ||||
| 			&path, | ||||
| 			&path, | ||||
| 			Arc::new(Miner::with_spec(&spec)), | ||||
| 		); | ||||
| 		service.unwrap() | ||||
| 	} | ||||
| 
 | ||||
| 	fn propose_default(engine: &Arc<Engine>, proposer: Address) -> Result<Bytes, Error> { | ||||
| 		let mut s = RlpStream::new_list(3); | ||||
| 		let header = Header::default(); | ||||
| @ -406,6 +413,10 @@ mod tests { | ||||
| 		engine.handle_message(proposer, H520::default(), propose_rlp) | ||||
| 	} | ||||
| 
 | ||||
| 	fn default_block() -> Vec<u8> { | ||||
| 		vec![160, 39, 191, 179, 126, 80, 124, 233, 13, 161, 65, 48, 114, 4, 177, 198, 186, 36, 25, 67, 128, 97, 53, 144, 172, 80, 202, 75, 29, 113, 152, 255, 101] | ||||
| 	} | ||||
| 
 | ||||
| 	#[test] | ||||
| 	fn has_valid_metadata() { | ||||
| 		let engine = new_test_tendermint().engine; | ||||
| @ -474,36 +485,38 @@ mod tests { | ||||
| 		assert!(propose_default(&engine, not_proposer_addr).is_err()); | ||||
| 
 | ||||
| 		let proposer_addr = tap.insert_account("1".sha3(), "1").unwrap(); | ||||
| 		assert_eq!(vec![160, 39, 191, 179, 126, 80, 124, 233, 13, 161, 65, 48, 114, 4, 177, 198, 186, 36, 25, 67, 128, 97, 53, 144, 172, 80, 202, 75, 29, 113, 152, 255, 101], | ||||
| 				   propose_default(&engine, proposer_addr).unwrap()); | ||||
| 		assert_eq!(default_block(), propose_default(&engine, proposer_addr).unwrap()); | ||||
| 
 | ||||
| 		assert!(propose_default(&engine, proposer_addr).is_err()); | ||||
| 		assert!(propose_default(&engine, not_proposer_addr).is_err()); | ||||
| 	} | ||||
| 
 | ||||
| 	#[test] | ||||
| 	fn proposer_switching() { | ||||
| 		let engine = new_test_tendermint().engine; | ||||
| 		let tap = AccountProvider::transient_provider(); | ||||
| 
 | ||||
| 		let not_proposer_addr = tap.insert_account("0".sha3(), "0").unwrap(); | ||||
| 		assert!(propose_default(&engine, not_proposer_addr).is_err()); | ||||
| 
 | ||||
| 		sleep(Duration::from_secs(3)); | ||||
| 
 | ||||
| 		assert_eq!(default_block(), propose_default(&engine, not_proposer_addr).unwrap()); | ||||
| 	} | ||||
| 
 | ||||
| 	#[test] | ||||
| 	fn prevote_step() { | ||||
| 		let engine = new_test_tendermint().engine; | ||||
| 		propose_default(&engine, Address::default()); | ||||
| 	} | ||||
| 
 | ||||
| 	#[test] | ||||
| 	fn handle_message() { | ||||
| 		false; | ||||
| 	} | ||||
| 
 | ||||
| 	#[test] | ||||
| 	fn timeout_switching() { | ||||
| 		let service = new_test_client_service(); | ||||
| 		let engine = new_test_tendermint().engine; | ||||
| 		let tender = Tendermint::new(engine.params().clone(), TendermintParams::default(), BTreeMap::new()); | ||||
| 		service.register_io_handler(Arc::new(tender)); | ||||
| 	
 | ||||
| 		println!("Waiting for timeout"); | ||||
| 		sleep(Duration::from_secs(10)); | ||||
| 		sleep(Duration::from_secs(60)); | ||||
| 
 | ||||
| 		let message_channel = service.io().channel(); | ||||
| 		message_channel.send(ClientIoMessage::ConsensusStep(1000)); | ||||
| 		sleep(Duration::from_secs(5)); | ||||
| 	} | ||||
| } | ||||
|  | ||||
| @ -46,8 +46,6 @@ pub enum ClientIoMessage { | ||||
| 	FeedStateChunk(H256, Bytes), | ||||
| 	/// Feed a block chunk to the snapshot service
 | ||||
| 	FeedBlockChunk(H256, Bytes), | ||||
| 	/// Signal consensus step timeout.
 | ||||
| 	ConsensusStep(u64), | ||||
| } | ||||
| 
 | ||||
| /// Client service setup. Creates and registers client and network services with the IO subsystem.
 | ||||
| @ -148,8 +146,6 @@ struct ClientIoHandler { | ||||
| 
 | ||||
| const CLIENT_TICK_TIMER: TimerToken = 0; | ||||
| const CLIENT_TICK_MS: u64 = 5000; | ||||
| /// Timer token representing the consensus step timeouts.
 | ||||
| pub const ENGINE_TIMEOUT_TOKEN: TimerToken = 1; | ||||
| 
 | ||||
| impl IoHandler<ClientIoMessage> for ClientIoHandler { | ||||
| 	fn initialize(&self, io: &IoContext<ClientIoMessage>) { | ||||
|  | ||||
| @ -137,7 +137,7 @@ impl Spec { | ||||
| 			ethjson::spec::Engine::InstantSeal => Arc::new(InstantSeal::new(params, builtins)), | ||||
| 			ethjson::spec::Engine::Ethash(ethash) => Arc::new(ethereum::Ethash::new(params, From::from(ethash.params), builtins)), | ||||
| 			ethjson::spec::Engine::BasicAuthority(basic_authority) => Arc::new(BasicAuthority::new(params, From::from(basic_authority.params), builtins)), | ||||
| 			ethjson::spec::Engine::Tendermint(tendermint) => Arc::new(Tendermint::new(params, From::from(tendermint.params), builtins)), | ||||
| 			ethjson::spec::Engine::Tendermint(tendermint) => Tendermint::new(params, From::from(tendermint.params), builtins), | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
|  | ||||
		Loading…
	
		Reference in New Issue
	
	Block a user