change broadcast interface, add basic message handling

This commit is contained in:
keorn 2016-08-23 17:19:23 +02:00
parent 207f9d02f2
commit 99a143eb37
5 changed files with 22 additions and 12 deletions

View File

@ -42,7 +42,7 @@ pub trait ChainNotify : Send + Sync {
}
/// fires when chain broadcasts a message
fn broadcast(&self, _data: Vec<u8>) {
fn broadcast(&self, _data: &[u8]) {
}
}

View File

@ -25,6 +25,7 @@ use time::precise_time_ns;
use util::{journaldb, rlp, Bytes, View, PerfTimer, Itertools, Mutex, RwLock};
use util::journaldb::JournalDB;
use util::rlp::{UntrustedRlp};
use util::ec::recover;
use util::{U256, H256, Address, H2048, Uint};
use util::sha3::*;
use util::kvdb::*;
@ -1021,8 +1022,15 @@ impl BlockChainClient for Client {
self.miner.pending_transactions()
}
fn queue_infinity_message(&self, _message: Bytes) {
//TODO: handle message here
fn queue_infinity_message(&self, message: Bytes) {
let full_rlp = UntrustedRlp::new(&message);
let signature = full_rlp.val_at(0).unwrap_or(return);
let message: Vec<_> = full_rlp.val_at(1).unwrap_or(return);
let message_rlp = UntrustedRlp::new(&message);
let pub_key = recover(&signature, &message.sha3()).unwrap_or(return);
if let Some(new_message) = self.engine.handle_message(pub_key.sha3().into(), message_rlp) {
self.notify(|notify| notify.broadcast(new_message.as_raw()));
}
}
}

View File

@ -30,7 +30,7 @@ pub use self::tendermint::Tendermint;
pub use self::signed_vote::{SignedVote, VoteError};
pub use self::propose_collect::{ProposeCollect};
use common::{HashMap, SemanticVersion, Header, EnvInfo, Address, Builtin, BTreeMap, U256, Bytes, SignedTransaction, Error};
use common::{HashMap, SemanticVersion, Header, EnvInfo, Address, Builtin, BTreeMap, U256, Bytes, SignedTransaction, Error, UntrustedRlp};
use account_provider::AccountProvider;
use block::ExecutedBlock;
use spec::CommonParams;
@ -121,7 +121,7 @@ pub trait Engine : Sync + Send {
/// Handle any potential consensus messages;
/// updating consensus state and potentially issuing a new one.
fn handle_message(&self, sender: Address, message: Bytes) -> Option<Vec<u8>> { None }
fn handle_message(&self, sender: Address, message: UntrustedRlp) -> Option<UntrustedRlp> { None }
// TODO: builtin contract routing - to do this properly, it will require removing the built-in configuration-reading logic
// from Spec into here and removing the Spec::builtins field.

View File

@ -88,6 +88,10 @@ impl Tendermint {
let ref p = self.our_params;
p.validators.get(p.proposer_nonce%p.validator_n).unwrap().clone()
}
fn propose_message(&self, message: UntrustedRlp) -> Option<UntrustedRlp> {
None
}
}
impl Engine for Tendermint {
@ -140,13 +144,11 @@ impl Engine for Tendermint {
})
}
fn handle_message(&self, sender: Address, message: Bytes) -> Option<Vec<u8>> {
match message[0] {
0 => println!("0"),
_ => println!("unknown"),
fn handle_message(&self, sender: Address, message: UntrustedRlp) -> Option<UntrustedRlp> {
match message.val_at(0).unwrap_or(return None) {
0u8 if sender == self.proposer() => self.propose_message(message),
_ => None,
}
//let sig: Signature = message.into();
None
}
fn verify_block_basic(&self, header: &Header, _block: Option<&[u8]>) -> result::Result<(), Error> {

View File

@ -189,7 +189,7 @@ impl ChainNotify for EthSync {
self.network.stop().unwrap_or_else(|e| warn!("Error stopping network: {:?}", e));
}
fn broadcast(&self, message: Vec<u8>) {
fn broadcast(&self, message: &[u8]) {
self.network.with_context(ETH_PROTOCOL, |context| {
let mut sync_io = NetSyncIo::new(context, &*self.eth_handler.chain);
self.inf_handler.sync.write().propagate_packet(&mut sync_io, message.clone());