Merge branch 'master' into rocksdb-flush-limit

This commit is contained in:
NikVolf 2016-06-21 17:32:07 +03:00
commit de079ebe31
43 changed files with 727 additions and 168 deletions

4
Cargo.lock generated
View File

@ -424,7 +424,6 @@ dependencies = [
name = "ethkey" name = "ethkey"
version = "0.2.0" version = "0.2.0"
dependencies = [ dependencies = [
"docopt 0.6.80 (registry+https://github.com/rust-lang/crates.io-index)",
"eth-secp256k1 0.5.4 (git+https://github.com/ethcore/rust-secp256k1)", "eth-secp256k1 0.5.4 (git+https://github.com/ethcore/rust-secp256k1)",
"lazy_static 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "lazy_static 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)",
@ -436,7 +435,6 @@ dependencies = [
name = "ethstore" name = "ethstore"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"docopt 0.6.80 (registry+https://github.com/rust-lang/crates.io-index)",
"ethkey 0.2.0", "ethkey 0.2.0",
"libc 0.2.12 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.12 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)",
@ -924,7 +922,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]] [[package]]
name = "parity-dapps" name = "parity-dapps"
version = "0.3.0" version = "0.3.0"
source = "git+https://github.com/ethcore/parity-dapps-rs.git#1f065d93aa49338e4a453c77c839957f2db78895" source = "git+https://github.com/ethcore/parity-dapps-rs.git#8cc812c26c903cf5764ce0f4cc3f2a7c3ddb0dc2"
dependencies = [ dependencies = [
"aster 0.17.0 (registry+https://github.com/rust-lang/crates.io-index)", "aster 0.17.0 (registry+https://github.com/rust-lang/crates.io-index)",
"glob 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", "glob 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)",

View File

@ -236,7 +236,6 @@ impl AccountProvider {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::AccountProvider; use super::AccountProvider;
use ethstore::SecretStore;
use ethstore::ethkey::{Generator, Random}; use ethstore::ethkey::{Generator, Random};
#[test] #[test]

View File

@ -130,7 +130,9 @@ impl QueueSignal {
} }
if self.signalled.compare_and_swap(false, true, AtomicOrdering::Relaxed) == false { if self.signalled.compare_and_swap(false, true, AtomicOrdering::Relaxed) == false {
self.message_channel.send(UserMessage(SyncMessage::BlockVerified)).expect("Error sending BlockVerified message"); if let Err(e) = self.message_channel.send(UserMessage(SyncMessage::BlockVerified)) {
debug!("Error sending BlockVerified message: {:?}", e);
}
} }
} }

View File

@ -18,6 +18,7 @@
use std::marker::PhantomData; use std::marker::PhantomData;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
use util::*; use util::*;
use util::panics::*; use util::panics::*;
use views::BlockView; use views::BlockView;
@ -50,6 +51,8 @@ pub use types::block_status::BlockStatus;
use evm::Factory as EvmFactory; use evm::Factory as EvmFactory;
use miner::{Miner, MinerService, TransactionImportResult, AccountDetails}; use miner::{Miner, MinerService, TransactionImportResult, AccountDetails};
const MAX_TX_QUEUE_SIZE: usize = 4096;
impl fmt::Display for BlockChainInfo { impl fmt::Display for BlockChainInfo {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "#{}.{}", self.best_block_number, self.best_block_hash) write!(f, "#{}.{}", self.best_block_number, self.best_block_hash)
@ -92,6 +95,8 @@ pub struct Client<V = CanonVerifier> where V: Verifier {
verifier: PhantomData<V>, verifier: PhantomData<V>,
vm_factory: Arc<EvmFactory>, vm_factory: Arc<EvmFactory>,
miner: Arc<Miner>, miner: Arc<Miner>,
io_channel: IoChannel<NetSyncMessage>,
queue_transactions: AtomicUsize,
} }
const HISTORY: u64 = 1200; const HISTORY: u64 = 1200;
@ -152,7 +157,7 @@ impl<V> Client<V> where V: Verifier {
let engine = Arc::new(spec.engine); let engine = Arc::new(spec.engine);
let block_queue = BlockQueue::new(config.queue, engine.clone(), message_channel); let block_queue = BlockQueue::new(config.queue, engine.clone(), message_channel.clone());
let panic_handler = PanicHandler::new_in_arc(); let panic_handler = PanicHandler::new_in_arc();
panic_handler.forward_from(&block_queue); panic_handler.forward_from(&block_queue);
@ -168,6 +173,8 @@ impl<V> Client<V> where V: Verifier {
verifier: PhantomData, verifier: PhantomData,
vm_factory: Arc::new(EvmFactory::new(config.vm_type)), vm_factory: Arc::new(EvmFactory::new(config.vm_type)),
miner: miner, miner: miner,
io_channel: message_channel,
queue_transactions: AtomicUsize::new(0),
}; };
Ok(Arc::new(client)) Ok(Arc::new(client))
@ -274,6 +281,7 @@ impl<V> Client<V> where V: Verifier {
let mut import_results = Vec::with_capacity(max_blocks_to_import); let mut import_results = Vec::with_capacity(max_blocks_to_import);
let _import_lock = self.import_lock.lock(); let _import_lock = self.import_lock.lock();
let _timer = PerfTimer::new("import_verified_blocks");
let blocks = self.block_queue.drain(max_blocks_to_import); let blocks = self.block_queue.drain(max_blocks_to_import);
let original_best = self.chain_info().best_block_hash; let original_best = self.chain_info().best_block_hash;
@ -364,6 +372,19 @@ impl<V> Client<V> where V: Verifier {
imported imported
} }
/// Import transactions from the IO queue
pub fn import_queued_transactions(&self, transactions: &[Bytes]) -> usize {
let _timer = PerfTimer::new("import_queued_transactions");
self.queue_transactions.fetch_sub(transactions.len(), AtomicOrdering::SeqCst);
let fetch_account = |a: &Address| AccountDetails {
nonce: self.latest_nonce(a),
balance: self.latest_balance(a),
};
let tx = transactions.iter().filter_map(|bytes| UntrustedRlp::new(&bytes).as_val().ok()).collect();
let results = self.miner.import_transactions(self, tx, fetch_account);
results.len()
}
/// Attempt to get a copy of a specific block's state. /// Attempt to get a copy of a specific block's state.
/// ///
/// This will not fail if given BlockID::Latest. /// This will not fail if given BlockID::Latest.
@ -750,7 +771,23 @@ impl<V> BlockChainClient for Client<V> where V: Verifier {
nonce: self.latest_nonce(a), nonce: self.latest_nonce(a),
balance: self.latest_balance(a), balance: self.latest_balance(a),
}; };
self.miner.import_transactions(transactions, fetch_account) self.miner.import_transactions(self, transactions, fetch_account)
}
fn queue_transactions(&self, transactions: Vec<Bytes>) {
if self.queue_transactions.load(AtomicOrdering::Relaxed) > MAX_TX_QUEUE_SIZE {
debug!("Ignoring {} transactions: queue is full", transactions.len());
} else {
let len = transactions.len();
match self.io_channel.send(NetworkIoMessage::User(SyncMessage::NewTransactions(transactions))) {
Ok(_) => {
self.queue_transactions.fetch_add(len, AtomicOrdering::SeqCst);
}
Err(e) => {
debug!("Ignoring {} transactions: error queueing: {}", len, e);
}
}
}
} }
fn all_transactions(&self) -> Vec<SignedTransaction> { fn all_transactions(&self) -> Vec<SignedTransaction> {

View File

@ -193,6 +193,9 @@ pub trait BlockChainClient : Sync + Send {
/// import transactions from network/other 3rd party /// import transactions from network/other 3rd party
fn import_transactions(&self, transactions: Vec<SignedTransaction>) -> Vec<Result<TransactionImportResult, EthError>>; fn import_transactions(&self, transactions: Vec<SignedTransaction>) -> Vec<Result<TransactionImportResult, EthError>>;
/// Queue transactions for importing.
fn queue_transactions(&self, transactions: Vec<Bytes>);
/// list all transactions /// list all transactions
fn all_transactions(&self) -> Vec<SignedTransaction>; fn all_transactions(&self) -> Vec<SignedTransaction>;

View File

@ -490,7 +490,13 @@ impl BlockChainClient for TestBlockChainClient {
balance: balances[a], balance: balances[a],
}; };
self.miner.import_transactions(transactions, &fetch_account) self.miner.import_transactions(self, transactions, &fetch_account)
}
fn queue_transactions(&self, transactions: Vec<Bytes>) {
// import right here
let tx = transactions.into_iter().filter_map(|bytes| UntrustedRlp::new(&bytes).as_val().ok()).collect();
self.import_transactions(tx);
} }
fn all_transactions(&self) -> Vec<SignedTransaction> { fn all_transactions(&self) -> Vec<SignedTransaction> {

View File

@ -376,13 +376,19 @@ impl MinerService for Miner {
*self.gas_floor_target.read().unwrap() *self.gas_floor_target.read().unwrap()
} }
fn import_transactions<T>(&self, transactions: Vec<SignedTransaction>, fetch_account: T) -> fn import_transactions<T>(&self, chain: &MiningBlockChainClient, transactions: Vec<SignedTransaction>, fetch_account: T) ->
Vec<Result<TransactionImportResult, Error>> Vec<Result<TransactionImportResult, Error>>
where T: Fn(&Address) -> AccountDetails { where T: Fn(&Address) -> AccountDetails {
let results: Vec<Result<TransactionImportResult, Error>> = {
let mut transaction_queue = self.transaction_queue.lock().unwrap(); let mut transaction_queue = self.transaction_queue.lock().unwrap();
transactions.into_iter() transactions.into_iter()
.map(|tx| transaction_queue.add(tx, &fetch_account, TransactionOrigin::External)) .map(|tx| transaction_queue.add(tx, &fetch_account, TransactionOrigin::External))
.collect() .collect()
};
if !results.is_empty() {
self.update_sealing(chain);
}
results
} }
fn import_own_transaction<T>( fn import_own_transaction<T>(
@ -564,7 +570,7 @@ impl MinerService for Miner {
for tx in &txs { for tx in &txs {
let _sender = tx.sender(); let _sender = tx.sender();
} }
let _ = self.import_transactions(txs, |a| AccountDetails { let _ = self.import_transactions(chain, txs, |a| AccountDetails {
nonce: chain.latest_nonce(a), nonce: chain.latest_nonce(a),
balance: chain.latest_balance(a), balance: chain.latest_balance(a),
}); });

View File

@ -94,7 +94,7 @@ pub trait MinerService : Send + Sync {
fn set_transactions_limit(&self, limit: usize); fn set_transactions_limit(&self, limit: usize);
/// Imports transactions to transaction queue. /// Imports transactions to transaction queue.
fn import_transactions<T>(&self, transactions: Vec<SignedTransaction>, fetch_account: T) -> fn import_transactions<T>(&self, chain: &MiningBlockChainClient, transactions: Vec<SignedTransaction>, fetch_account: T) ->
Vec<Result<TransactionImportResult, Error>> Vec<Result<TransactionImportResult, Error>>
where T: Fn(&Address) -> AccountDetails, Self: Sized; where T: Fn(&Address) -> AccountDetails, Self: Sized;

View File

@ -236,8 +236,8 @@ impl TransactionSet {
self.by_priority.insert(order.clone()); self.by_priority.insert(order.clone());
let r = self.by_address.insert(sender, nonce, order); let r = self.by_address.insert(sender, nonce, order);
// If transaction was replaced remove it from priority queue // If transaction was replaced remove it from priority queue
if let Some(ref order) = r { if let Some(ref old_order) = r {
self.by_priority.remove(order); self.by_priority.remove(old_order);
} }
r r
} }
@ -517,16 +517,9 @@ impl TransactionQueue {
// Remove from current // Remove from current
let order = self.current.drop(&sender, &nonce); let order = self.current.drop(&sender, &nonce);
if order.is_some() { if order.is_some() {
// We will either move transaction to future or remove it completely // This will keep consistency in queue
// so there will be no transactions from this sender in current // Moves all to future and then promotes a batch from current:
self.last_nonces.remove(&sender); self.remove_all(sender, current_nonce);
// First update height of transactions in future to avoid collisions
self.update_future(&sender, current_nonce);
// This should move all current transactions to future and remove old transactions
self.move_all_to_future(&sender, current_nonce);
// And now lets check if there is some chain of transactions in future
// that should be placed in current. It should also update last_nonces.
self.move_matching_future_to_current(sender, current_nonce, current_nonce);
return; return;
} }
} }
@ -682,7 +675,8 @@ impl TransactionQueue {
try!(check_too_cheap(Self::replace_transaction(tx, state_nonce, &mut self.current, &mut self.by_hash))); try!(check_too_cheap(Self::replace_transaction(tx, state_nonce, &mut self.current, &mut self.by_hash)));
// Keep track of highest nonce stored in current // Keep track of highest nonce stored in current
self.last_nonces.insert(address, nonce); let new_max = self.last_nonces.get(&address).map_or(nonce, |n| cmp::max(nonce, *n));
self.last_nonces.insert(address, new_max);
// Update nonces of transactions in future // Update nonces of transactions in future
self.update_future(&address, state_nonce); self.update_future(&address, state_nonce);
// Maybe there are some more items waiting in future? // Maybe there are some more items waiting in future?
@ -692,14 +686,15 @@ impl TransactionQueue {
if let Some(order) = self.future.drop(&address, &nonce) { if let Some(order) = self.future.drop(&address, &nonce) {
// Let's insert that transaction to current (if it has higher gas_price) // Let's insert that transaction to current (if it has higher gas_price)
let future_tx = self.by_hash.remove(&order.hash).unwrap(); let future_tx = self.by_hash.remove(&order.hash).unwrap();
try!(check_too_cheap(Self::replace_transaction(future_tx, state_nonce, &mut self.current, &mut self.by_hash))); // if transaction in `current` (then one we are importing) is replaced it means that it has to low gas_price
try!(check_too_cheap(!Self::replace_transaction(future_tx, state_nonce, &mut self.current, &mut self.by_hash)));
} }
// Also enforce the limit // Also enforce the limit
let removed = self.current.enforce_limit(&mut self.by_hash); let removed = self.current.enforce_limit(&mut self.by_hash);
// If some transaction were removed because of limit we need to update last_nonces also. // If some transaction were removed because of limit we need to update last_nonces also.
self.update_last_nonces(&removed); self.update_last_nonces(&removed);
// Trigger error if we were removed. // Trigger error if the transaction we are importing was removed.
try!(check_if_removed(&address, &nonce, removed)); try!(check_if_removed(&address, &nonce, removed));
trace!(target: "miner", "status: {:?}", self.status()); trace!(target: "miner", "status: {:?}", self.status());
@ -943,7 +938,7 @@ mod test {
let res = txq.add(tx2.clone(), &default_nonce, TransactionOrigin::External); let res = txq.add(tx2.clone(), &default_nonce, TransactionOrigin::External);
// and then there should be only one transaction in current (the one with higher gas_price) // and then there should be only one transaction in current (the one with higher gas_price)
assert_eq!(unwrap_tx_err(res), TransactionError::TooCheapToReplace); assert_eq!(res.unwrap(), TransactionImportResult::Current);
assert_eq!(txq.status().pending, 1); assert_eq!(txq.status().pending, 1);
assert_eq!(txq.status().future, 0); assert_eq!(txq.status().future, 0);
assert_eq!(txq.current.by_priority.len(), 1); assert_eq!(txq.current.by_priority.len(), 1);
@ -1597,4 +1592,37 @@ mod test {
assert_eq!(txq.future.by_priority.iter().next().unwrap().hash, tx1.hash()); assert_eq!(txq.future.by_priority.iter().next().unwrap().hash, tx1.hash());
} }
#[test]
fn should_return_correct_last_nonce() {
// given
let mut txq = TransactionQueue::new();
let (tx1, tx2, tx2_2, tx3) = {
let keypair = KeyPair::create().unwrap();
let secret = &keypair.secret();
let nonce = U256::from(123);
let tx = new_unsigned_tx(nonce);
let tx2 = new_unsigned_tx(nonce + 1.into());
let mut tx2_2 = new_unsigned_tx(nonce + 1.into());
tx2_2.gas_price = U256::from(5);
let tx3 = new_unsigned_tx(nonce + 2.into());
(tx.sign(secret), tx2.sign(secret), tx2_2.sign(secret), tx3.sign(secret))
};
let sender = tx1.sender().unwrap();
txq.add(tx1, &default_nonce, TransactionOrigin::Local).unwrap();
txq.add(tx2, &default_nonce, TransactionOrigin::Local).unwrap();
txq.add(tx3, &default_nonce, TransactionOrigin::Local).unwrap();
assert_eq!(txq.future.by_priority.len(), 0);
assert_eq!(txq.current.by_priority.len(), 3);
// when
let res = txq.add(tx2_2, &default_nonce, TransactionOrigin::Local);
// then
assert_eq!(txq.last_nonce(&sender).unwrap(), 125.into());
assert_eq!(res.unwrap(), TransactionImportResult::Current);
assert_eq!(txq.current.by_priority.len(), 3);
}
} }

View File

@ -41,6 +41,8 @@ pub enum SyncMessage {
NewChainHead, NewChainHead,
/// A block is ready /// A block is ready
BlockVerified, BlockVerified,
/// New transaction RLPs are ready to be imported
NewTransactions(Vec<Bytes>),
/// Start network command. /// Start network command.
StartNetwork, StartNetwork,
/// Stop network command. /// Stop network command.
@ -136,6 +138,9 @@ impl IoHandler<NetSyncMessage> for ClientIoHandler {
SyncMessage::BlockVerified => { SyncMessage::BlockVerified => {
self.client.import_verified_blocks(&io.channel()); self.client.import_verified_blocks(&io.channel());
}, },
SyncMessage::NewTransactions(ref transactions) => {
self.client.import_queued_transactions(&transactions);
},
_ => {}, // ignore other messages _ => {}, // ignore other messages
} }
} }

View File

@ -12,7 +12,7 @@ rustc-serialize = "0.3"
docopt = { version = "0.6", optional = true } docopt = { version = "0.6", optional = true }
[features] [features]
default = ["cli"] default = []
cli = ["docopt"] cli = ["docopt"]
[[bin]] [[bin]]

View File

@ -21,7 +21,7 @@ serde_codegen = { version = "0.7", optional = true }
syntex = "0.33.0" syntex = "0.33.0"
[features] [features]
default = ["cli", "serde_codegen"] default = ["serde_codegen"]
nightly = ["serde_macros"] nightly = ["serde_macros"]
cli = ["docopt"] cli = ["docopt"]

View File

@ -65,8 +65,8 @@ impl Keccak256<[u8; 32]> for [u8] {
/// AES encryption /// AES encryption
pub mod aes { pub mod aes {
use rcrypto::blockmodes::CtrMode; use rcrypto::blockmodes::{CtrMode, CbcDecryptor, PkcsPadding};
use rcrypto::aessafe::AesSafe128Encryptor; use rcrypto::aessafe::{AesSafe128Encryptor, AesSafe128Decryptor};
use rcrypto::symmetriccipher::{Encryptor, Decryptor}; use rcrypto::symmetriccipher::{Encryptor, Decryptor};
use rcrypto::buffer::{RefReadBuffer, RefWriteBuffer}; use rcrypto::buffer::{RefReadBuffer, RefWriteBuffer};
@ -81,5 +81,12 @@ pub mod aes {
let mut encryptor = CtrMode::new(AesSafe128Encryptor::new(k), iv.to_vec()); let mut encryptor = CtrMode::new(AesSafe128Encryptor::new(k), iv.to_vec());
encryptor.decrypt(&mut RefReadBuffer::new(encrypted), &mut RefWriteBuffer::new(dest), true).expect("Invalid length or padding"); encryptor.decrypt(&mut RefReadBuffer::new(encrypted), &mut RefWriteBuffer::new(dest), true).expect("Invalid length or padding");
} }
/// Decrypt a message using cbc mode
pub fn decrypt_cbc(k: &[u8], iv: &[u8], encrypted: &[u8], dest: &mut [u8]) {
let mut encryptor = CbcDecryptor::new(AesSafe128Decryptor::new(k), PkcsPadding, iv.to_vec());
encryptor.decrypt(&mut RefReadBuffer::new(encrypted), &mut RefWriteBuffer::new(dest), true).expect("Invalid length or padding");
}
} }

View File

@ -31,3 +31,11 @@ impl From<json::H160> for Address {
From::from(a) From::from(a)
} }
} }
impl<'a> From<&'a json::H160> for Address {
fn from(json: &'a json::H160) -> Self {
let mut a = [0u8; 20];
a.copy_from_slice(json);
From::from(a)
}
}

View File

@ -14,6 +14,8 @@
// You should have received a copy of the GNU General Public License // You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>. // along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::fmt;
use std::ops;
use std::str::FromStr; use std::str::FromStr;
use rustc_serialize::hex::{FromHex, ToHex}; use rustc_serialize::hex::{FromHex, ToHex};
use serde::{Serialize, Serializer, Deserialize, Deserializer, Error as SerdeError}; use serde::{Serialize, Serializer, Deserialize, Deserializer, Error as SerdeError};
@ -22,9 +24,31 @@ use super::Error;
macro_rules! impl_hash { macro_rules! impl_hash {
($name: ident, $size: expr) => { ($name: ident, $size: expr) => {
#[derive(Debug, PartialEq)]
pub struct $name([u8; $size]); pub struct $name([u8; $size]);
impl fmt::Debug for $name {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
let self_ref: &[u8] = &self.0;
write!(f, "{:?}", self_ref)
}
}
impl PartialEq for $name {
fn eq(&self, other: &Self) -> bool {
let self_ref: &[u8] = &self.0;
let other_ref: &[u8] = &other.0;
self_ref == other_ref
}
}
impl ops::Deref for $name {
type Target = [u8];
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl Serialize for $name { impl Serialize for $name {
fn serialize<S>(&self, serializer: &mut S) -> Result<(), S::Error> fn serialize<S>(&self, serializer: &mut S) -> Result<(), S::Error>
where S: Serializer { where S: Serializer {
@ -85,3 +109,4 @@ macro_rules! impl_hash {
impl_hash!(H128, 16); impl_hash!(H128, 16);
impl_hash!(H160, 20); impl_hash!(H160, 20);
impl_hash!(H256, 32); impl_hash!(H256, 32);
impl_hash!(H768, 96);

View File

@ -5,14 +5,16 @@ mod hash;
mod id; mod id;
mod kdf; mod kdf;
mod key_file; mod key_file;
mod presale;
mod version; mod version;
pub use self::cipher::{Cipher, CipherSer, CipherSerParams, Aes128Ctr}; pub use self::cipher::{Cipher, CipherSer, CipherSerParams, Aes128Ctr};
pub use self::crypto::Crypto; pub use self::crypto::Crypto;
pub use self::error::Error; pub use self::error::Error;
pub use self::hash::{H128, H160, H256}; pub use self::hash::{H128, H160, H256, H768};
pub use self::id::UUID; pub use self::id::UUID;
pub use self::kdf::{Kdf, KdfSer, Prf, Pbkdf2, Scrypt, KdfSerParams}; pub use self::kdf::{Kdf, KdfSer, Prf, Pbkdf2, Scrypt, KdfSerParams};
pub use self::key_file::KeyFile; pub use self::key_file::KeyFile;
pub use self::presale::PresaleWallet;
pub use self::version::Version; pub use self::version::Version;

View File

@ -0,0 +1,42 @@
use std::io::Read;
use serde_json;
use super::{H160, H768};
#[derive(Debug, PartialEq, Deserialize)]
pub struct PresaleWallet {
pub encseed: H768,
#[serde(rename = "ethaddr")]
pub address: H160,
}
impl PresaleWallet {
pub fn load<R>(reader: R) -> Result<Self, serde_json::Error> where R: Read {
serde_json::from_reader(reader)
}
}
#[cfg(test)]
mod tests {
use std::str::FromStr;
use serde_json;
use json::{PresaleWallet, H160, H768};
#[test]
fn presale_wallet() {
let json = r#"
{
"encseed": "137103c28caeebbcea5d7f95edb97a289ded151b72159137cb7b2671f394f54cff8c121589dcb373e267225547b3c71cbdb54f6e48ec85cd549f96cf0dedb3bc0a9ac6c79b9c426c5878ca2c9d06ff42a23cb648312fc32ba83649de0928e066",
"ethaddr": "ede84640d1a1d3e06902048e67aa7db8d52c2ce1",
"email": "123@gmail.com",
"btcaddr": "1JvqEc6WLhg6GnyrLBe2ztPAU28KRfuseH"
} "#;
let expected = PresaleWallet {
encseed: H768::from_str("137103c28caeebbcea5d7f95edb97a289ded151b72159137cb7b2671f394f54cff8c121589dcb373e267225547b3c71cbdb54f6e48ec85cd549f96cf0dedb3bc0a9ac6c79b9c426c5878ca2c9d06ff42a23cb648312fc32ba83649de0928e066").unwrap(),
address: H160::from_str("ede84640d1a1d3e06902048e67aa7db8d52c2ce1").unwrap(),
};
let wallet: PresaleWallet = serde_json::from_str(json).unwrap();
assert_eq!(expected, wallet);
}
}

View File

@ -37,6 +37,7 @@ mod crypto;
mod error; mod error;
mod ethstore; mod ethstore;
mod import; mod import;
mod presale;
mod random; mod random;
mod secret_store; mod secret_store;
@ -44,5 +45,6 @@ pub use self::account::SafeAccount;
pub use self::error::Error; pub use self::error::Error;
pub use self::ethstore::EthStore; pub use self::ethstore::EthStore;
pub use self::import::import_accounts; pub use self::import::import_accounts;
pub use self::presale::PresaleWallet;
pub use self::secret_store::SecretStore; pub use self::secret_store::SecretStore;

80
ethstore/src/presale.rs Normal file
View File

@ -0,0 +1,80 @@
use std::fs;
use std::path::Path;
use rcrypto::pbkdf2::pbkdf2;
use rcrypto::sha2::Sha256;
use rcrypto::hmac::Hmac;
use json;
use ethkey::{Address, Secret, KeyPair};
use crypto::Keccak256;
use {crypto, Error};
pub struct PresaleWallet {
iv: [u8; 16],
ciphertext: [u8; 80],
address: Address,
}
impl From<json::PresaleWallet> for PresaleWallet {
fn from(wallet: json::PresaleWallet) -> Self {
let mut iv = [0u8; 16];
iv.copy_from_slice(&wallet.encseed[..16]);
let mut ciphertext = [0u8; 80];
ciphertext.copy_from_slice(&wallet.encseed[16..]);
PresaleWallet {
iv: iv,
ciphertext: ciphertext,
address: Address::from(wallet.address),
}
}
}
impl PresaleWallet {
pub fn open<P>(path: P) -> Result<Self, Error> where P: AsRef<Path> {
let file = try!(fs::File::open(path));
let presale = json::PresaleWallet::load(file).unwrap();
Ok(PresaleWallet::from(presale))
}
pub fn decrypt(&self, password: &str) -> Result<KeyPair, Error> {
let mut h_mac = Hmac::new(Sha256::new(), password.as_bytes());
let mut derived_key = vec![0u8; 16];
pbkdf2(&mut h_mac, password.as_bytes(), 2000, &mut derived_key);
let mut key = [0u8; 64];
crypto::aes::decrypt_cbc(&derived_key, &self.iv, &self.ciphertext, &mut key);
let secret = Secret::from(key.keccak256());
if let Ok(kp) = KeyPair::from_secret(secret) {
if kp.address() == self.address {
return Ok(kp)
}
}
Err(Error::InvalidPassword)
}
}
#[cfg(test)]
mod tests {
use ethkey::Address;
use super::PresaleWallet;
use json;
#[test]
fn test() {
let json = r#"
{
"encseed": "137103c28caeebbcea5d7f95edb97a289ded151b72159137cb7b2671f394f54cff8c121589dcb373e267225547b3c71cbdb54f6e48ec85cd549f96cf0dedb3bc0a9ac6c79b9c426c5878ca2c9d06ff42a23cb648312fc32ba83649de0928e066",
"ethaddr": "ede84640d1a1d3e06902048e67aa7db8d52c2ce1",
"email": "123@gmail.com",
"btcaddr": "1JvqEc6WLhg6GnyrLBe2ztPAU28KRfuseH"
} "#;
let wallet = json::PresaleWallet::load(json.as_bytes()).unwrap();
let wallet = PresaleWallet::from(wallet);
let kp = wallet.decrypt("123").unwrap();
assert_eq!(kp.address(), Address::from(wallet.address));
}
}

View File

@ -19,8 +19,10 @@ extern crate ethstore;
mod util; mod util;
use std::str::FromStr;
use ethstore::{SecretStore, EthStore}; use ethstore::{SecretStore, EthStore};
use ethstore::ethkey::{Random, Generator, Secret}; use ethstore::ethkey::{Random, Generator, Secret, Address};
use ethstore::dir::DiskDirectory;
use util::TransientDir; use util::TransientDir;
#[test] #[test]
@ -86,3 +88,40 @@ fn secret_store_remove_account() {
assert_eq!(store.accounts().len(), 0); assert_eq!(store.accounts().len(), 0);
assert!(store.remove_account(&accounts[0], "").is_err()); assert!(store.remove_account(&accounts[0], "").is_err());
} }
fn test_path() -> &'static str {
match ::std::fs::metadata("ethstore") {
Ok(_) => "ethstore/tests/res/geth_keystore",
Err(_) => "tests/res/geth_keystore",
}
}
fn pat_path() -> &'static str {
match ::std::fs::metadata("ethstore") {
Ok(_) => "ethstore/tests/res/pat",
Err(_) => "tests/res/pat",
}
}
#[test]
fn secret_store_laod_geth_files() {
let dir = DiskDirectory::at(test_path());
let store = EthStore::open(Box::new(dir)).unwrap();
assert_eq!(store.accounts(), vec![
Address::from_str("3f49624084b67849c7b4e805c5988c21a430f9d9").unwrap(),
Address::from_str("5ba4dcf897e97c2bdf8315b9ef26c13c085988cf").unwrap(),
Address::from_str("63121b431a52f8043c16fcf0d1df9cb7b5f66649").unwrap(),
]);
}
#[test]
fn secret_store_load_pat_files() {
let dir = DiskDirectory::at(pat_path());
let store = EthStore::open(Box::new(dir)).unwrap();
assert_eq!(store.accounts(), vec![
Address::from_str("3f49624084b67849c7b4e805c5988c21a430f9d9").unwrap(),
Address::from_str("5ba4dcf897e97c2bdf8315b9ef26c13c085988cf").unwrap(),
]);
}

View File

@ -200,7 +200,10 @@ impl Configuration {
net_path.push("network"); net_path.push("network");
ret.config_path = Some(net_path.to_str().unwrap().to_owned()); ret.config_path = Some(net_path.to_str().unwrap().to_owned());
ret.reserved_nodes = self.init_reserved_nodes(); ret.reserved_nodes = self.init_reserved_nodes();
ret.reserved_only = self.args.flag_reserved_only;
if self.args.flag_reserved_only {
ret.non_reserved_mode = ::util::network::NonReservedPeerMode::Deny;
}
ret ret
} }
@ -292,16 +295,16 @@ impl Configuration {
}).collect::<Vec<_>>(); }).collect::<Vec<_>>();
if !self.args.flag_no_import_keys { if !self.args.flag_no_import_keys {
let dir_type = match self.args.flag_testnet { let dir_type = if self.args.flag_testnet {
true => DirectoryType::Testnet, DirectoryType::Testnet
false => DirectoryType::Main, } else {
DirectoryType::Main
}; };
let from = GethDirectory::open(dir_type); let from = GethDirectory::open(dir_type);
let to = DiskDirectory::create(self.keys_path()).unwrap(); let to = DiskDirectory::create(self.keys_path()).unwrap();
if let Err(e) = import_accounts(&from, &to) { // ignore error, cause geth may not exist
warn!("Could not import accounts {}", e); let _ = import_accounts(&from, &to);
}
} }
let dir = Box::new(DiskDirectory::create(self.keys_path()).unwrap()); let dir = Box::new(DiskDirectory::create(self.keys_path()).unwrap());

View File

@ -39,9 +39,8 @@ impl IoHandler<NetSyncMessage> for ClientIoHandler {
} }
fn timeout(&self, _io: &IoContext<NetSyncMessage>, timer: TimerToken) { fn timeout(&self, _io: &IoContext<NetSyncMessage>, timer: TimerToken) {
match timer { if let INFO_TIMER = timer {
INFO_TIMER => { self.info.tick(&self.client, Some(&self.sync)); } self.info.tick(&self.client, Some(&self.sync));
_ => {}
} }
} }

View File

@ -230,6 +230,7 @@ fn execute_client(conf: Configuration, spec: Spec, client_config: ClientConfig)
logger: logger.clone(), logger: logger.clone(),
settings: network_settings.clone(), settings: network_settings.clone(),
allow_pending_receipt_query: !conf.args.flag_geth, allow_pending_receipt_query: !conf.args.flag_geth,
net_service: service.network(),
}); });
let dependencies = rpc::Dependencies { let dependencies = rpc::Dependencies {
@ -315,11 +316,11 @@ fn execute_export(conf: Configuration) {
udp_port: None, udp_port: None,
nat_enabled: false, nat_enabled: false,
discovery_enabled: false, discovery_enabled: false,
reserved_only: true,
boot_nodes: Vec::new(), boot_nodes: Vec::new(),
use_secret: None, use_secret: None,
ideal_peers: 0, ideal_peers: 0,
reserved_nodes: Vec::new(), reserved_nodes: Vec::new(),
non_reserved_mode: ::util::network::NonReservedPeerMode::Accept,
}; };
let client_config = conf.client_config(&spec); let client_config = conf.client_config(&spec);
@ -387,11 +388,11 @@ fn execute_import(conf: Configuration) {
udp_port: None, udp_port: None,
nat_enabled: false, nat_enabled: false,
discovery_enabled: false, discovery_enabled: false,
reserved_only: true,
boot_nodes: Vec::new(), boot_nodes: Vec::new(),
use_secret: None, use_secret: None,
ideal_peers: 0, ideal_peers: 0,
reserved_nodes: Vec::new(), reserved_nodes: Vec::new(),
non_reserved_mode: ::util::network::NonReservedPeerMode::Accept,
}; };
let client_config = conf.client_config(&spec); let client_config = conf.client_config(&spec);
@ -487,7 +488,7 @@ fn execute_signer(conf: Configuration) {
} }
fn execute_account_cli(conf: Configuration) { fn execute_account_cli(conf: Configuration) {
use ethcore::ethstore::{SecretStore, EthStore, import_accounts}; use ethcore::ethstore::{EthStore, import_accounts};
use ethcore::ethstore::dir::DiskDirectory; use ethcore::ethstore::dir::DiskDirectory;
use ethcore::account_provider::AccountProvider; use ethcore::account_provider::AccountProvider;
use rpassword::read_password; use rpassword::read_password;

View File

@ -89,6 +89,7 @@ fn current_version(path: &PathBuf) -> Result<u32, Error> {
/// Writes current database version to the file. /// Writes current database version to the file.
/// Creates a new file if the version file does not exist yet. /// Creates a new file if the version file does not exist yet.
fn update_version(path: &PathBuf) -> Result<(), Error> { fn update_version(path: &PathBuf) -> Result<(), Error> {
try!(fs::create_dir_all(path));
let mut file = try!(File::create(version_file_path(path))); let mut file = try!(File::create(version_file_path(path)));
try!(file.write_all(format!("{}", CURRENT_VERSION).as_bytes())); try!(file.write_all(format!("{}", CURRENT_VERSION).as_bytes()));
Ok(()) Ok(())
@ -151,8 +152,6 @@ fn migrate_database(version: u32, path: PathBuf, migrations: MigrationManager) -
return Ok(()) return Ok(())
} }
println!("Migrating database {} from version {} to {}", path.to_string_lossy(), version, CURRENT_VERSION);
let temp_path = temp_database_path(&path); let temp_path = temp_database_path(&path);
let backup_path = backup_database_path(&path); let backup_path = backup_database_path(&path);
// remote the dir if it exists // remote the dir if it exists
@ -188,20 +187,26 @@ fn migrate_database(version: u32, path: PathBuf, migrations: MigrationManager) -
// remove backup // remove backup
try!(fs::remove_dir_all(&backup_path)); try!(fs::remove_dir_all(&backup_path));
println!("Migration finished");
Ok(()) Ok(())
} }
fn exists(path: &PathBuf) -> bool {
fs::metadata(path).is_ok()
}
/// Migrates the database. /// Migrates the database.
pub fn migrate(path: &PathBuf) -> Result<(), Error> { pub fn migrate(path: &PathBuf) -> Result<(), Error> {
// read version file. // read version file.
let version = try!(current_version(path)); let version = try!(current_version(path));
// migrate the databases. // migrate the databases.
if version != CURRENT_VERSION { // main db directory may already exists, so let's check if we have blocks dir
if version != CURRENT_VERSION && exists(&blocks_database_path(path)) {
println!("Migrating database from version {} to {}", version, CURRENT_VERSION);
try!(migrate_database(version, blocks_database_path(path), try!(blocks_database_migrations()))); try!(migrate_database(version, blocks_database_path(path), try!(blocks_database_migrations())));
try!(migrate_database(version, extras_database_path(path), try!(extras_database_migrations()))); try!(migrate_database(version, extras_database_path(path), try!(extras_database_migrations())));
println!("Migration finished");
} }
// update version file. // update version file.

View File

@ -25,6 +25,7 @@ use ethcore::client::Client;
use util::RotatingLogger; use util::RotatingLogger;
use ethcore::account_provider::AccountProvider; use ethcore::account_provider::AccountProvider;
use util::network_settings::NetworkSettings; use util::network_settings::NetworkSettings;
use util::network::NetworkService;
#[cfg(feature="rpc")] #[cfg(feature="rpc")]
pub use ethcore_rpc::ConfirmationsQueue; pub use ethcore_rpc::ConfirmationsQueue;
@ -89,6 +90,7 @@ pub struct Dependencies {
pub logger: Arc<RotatingLogger>, pub logger: Arc<RotatingLogger>,
pub settings: Arc<NetworkSettings>, pub settings: Arc<NetworkSettings>,
pub allow_pending_receipt_query: bool, pub allow_pending_receipt_query: bool,
pub net_service: Arc<NetworkService<::ethcore::service::SyncMessage>>,
} }
fn to_modules(apis: &[Api]) -> BTreeMap<String, String> { fn to_modules(apis: &[Api]) -> BTreeMap<String, String> {
@ -163,7 +165,7 @@ pub fn setup_rpc<T: Extendable>(server: T, deps: Arc<Dependencies>, apis: ApiSet
server.add_delegate(EthcoreClient::new(&deps.client, &deps.miner, deps.logger.clone(), deps.settings.clone()).to_delegate()) server.add_delegate(EthcoreClient::new(&deps.client, &deps.miner, deps.logger.clone(), deps.settings.clone()).to_delegate())
}, },
Api::EthcoreSet => { Api::EthcoreSet => {
server.add_delegate(EthcoreSetClient::new(&deps.miner).to_delegate()) server.add_delegate(EthcoreSetClient::new(&deps.miner, &deps.net_service).to_delegate())
}, },
Api::Traces => { Api::Traces => {
server.add_delegate(TracesClient::new(&deps.client, &deps.miner).to_delegate()) server.add_delegate(TracesClient::new(&deps.client, &deps.miner).to_delegate())

View File

@ -16,9 +16,11 @@
/// Ethcore-specific rpc interface for operations altering the settings. /// Ethcore-specific rpc interface for operations altering the settings.
use util::{U256, Address}; use util::{U256, Address};
use util::network::{NetworkService, NonReservedPeerMode};
use std::sync::{Arc, Weak}; use std::sync::{Arc, Weak};
use jsonrpc_core::*; use jsonrpc_core::*;
use ethcore::miner::MinerService; use ethcore::miner::MinerService;
use ethcore::service::SyncMessage;
use v1::traits::EthcoreSet; use v1::traits::EthcoreSet;
use v1::types::{Bytes}; use v1::types::{Bytes};
@ -27,13 +29,15 @@ pub struct EthcoreSetClient<M> where
M: MinerService { M: MinerService {
miner: Weak<M>, miner: Weak<M>,
net: Weak<NetworkService<SyncMessage>>,
} }
impl<M> EthcoreSetClient<M> where M: MinerService { impl<M> EthcoreSetClient<M> where M: MinerService {
/// Creates new `EthcoreSetClient`. /// Creates new `EthcoreSetClient`.
pub fn new(miner: &Arc<M>) -> Self { pub fn new(miner: &Arc<M>, net: &Arc<NetworkService<SyncMessage>>) -> Self {
EthcoreSetClient { EthcoreSetClient {
miner: Arc::downgrade(miner), miner: Arc::downgrade(miner),
net: Arc::downgrade(net),
} }
} }
} }
@ -74,4 +78,32 @@ impl<M> EthcoreSet for EthcoreSetClient<M> where M: MinerService + 'static {
to_value(&true) to_value(&true)
}) })
} }
fn add_reserved_peer(&self, params: Params) -> Result<Value, Error> {
from_params::<(String,)>(params).and_then(|(peer,)| {
match take_weak!(self.net).add_reserved_peer(&peer) {
Ok(()) => to_value(&true),
Err(_) => Err(Error::invalid_params()),
}
})
}
fn remove_reserved_peer(&self, params: Params) -> Result<Value, Error> {
from_params::<(String,)>(params).and_then(|(peer,)| {
match take_weak!(self.net).remove_reserved_peer(&peer) {
Ok(()) => to_value(&true),
Err(_) => Err(Error::invalid_params()),
}
})
}
fn drop_non_reserved_peers(&self, _: Params) -> Result<Value, Error> {
take_weak!(self.net).set_non_reserved_mode(NonReservedPeerMode::Deny);
to_value(&true)
}
fn accept_non_reserved_peers(&self, _: Params) -> Result<Value, Error> {
take_weak!(self.net).set_non_reserved_mode(NonReservedPeerMode::Accept);
to_value(&true)
}
} }

View File

@ -115,7 +115,7 @@ impl MinerService for TestMinerService {
} }
/// Imports transactions to transaction queue. /// Imports transactions to transaction queue.
fn import_transactions<T>(&self, transactions: Vec<SignedTransaction>, fetch_account: T) -> fn import_transactions<T>(&self, _chain: &MiningBlockChainClient, transactions: Vec<SignedTransaction>, fetch_account: T) ->
Vec<Result<TransactionImportResult, Error>> Vec<Result<TransactionImportResult, Error>>
where T: Fn(&Address) -> AccountDetails { where T: Fn(&Address) -> AccountDetails {
// lets assume that all txs are valid // lets assume that all txs are valid

View File

@ -19,11 +19,13 @@ use std::str::FromStr;
use jsonrpc_core::IoHandler; use jsonrpc_core::IoHandler;
use v1::{Ethcore, EthcoreClient, EthcoreSet, EthcoreSetClient}; use v1::{Ethcore, EthcoreClient, EthcoreSet, EthcoreSetClient};
use ethcore::miner::MinerService; use ethcore::miner::MinerService;
use ethcore::service::SyncMessage;
use v1::tests::helpers::TestMinerService; use v1::tests::helpers::TestMinerService;
use ethcore::client::{TestBlockChainClient}; use ethcore::client::{TestBlockChainClient};
use util::numbers::*; use util::numbers::*;
use rustc_serialize::hex::FromHex; use rustc_serialize::hex::FromHex;
use util::log::RotatingLogger; use util::log::RotatingLogger;
use util::network::{NetworkConfiguration, NetworkService};
use util::network_settings::NetworkSettings; use util::network_settings::NetworkSettings;
fn miner_service() -> Arc<TestMinerService> { fn miner_service() -> Arc<TestMinerService> {
@ -50,21 +52,26 @@ fn settings() -> Arc<NetworkSettings> {
}) })
} }
fn network_service() -> Arc<NetworkService<SyncMessage>> {
Arc::new(NetworkService::new(NetworkConfiguration::new()).unwrap())
}
fn ethcore_client(client: &Arc<TestBlockChainClient>, miner: &Arc<TestMinerService>) -> EthcoreClient<TestBlockChainClient, TestMinerService> { fn ethcore_client(client: &Arc<TestBlockChainClient>, miner: &Arc<TestMinerService>) -> EthcoreClient<TestBlockChainClient, TestMinerService> {
EthcoreClient::new(client, miner, logger(), settings()) EthcoreClient::new(client, miner, logger(), settings())
} }
fn ethcore_set_client(miner: &Arc<TestMinerService>) -> EthcoreSetClient<TestMinerService> { fn ethcore_set_client(miner: &Arc<TestMinerService>, net: &Arc<NetworkService<SyncMessage>>) -> EthcoreSetClient<TestMinerService> {
EthcoreSetClient::new(miner) EthcoreSetClient::new(miner, net)
} }
#[test] #[test]
fn rpc_ethcore_extra_data() { fn rpc_ethcore_extra_data() {
let miner = miner_service(); let miner = miner_service();
let client = client_service(); let client = client_service();
let network = network_service();
let io = IoHandler::new(); let io = IoHandler::new();
io.add_delegate(ethcore_client(&client, &miner).to_delegate()); io.add_delegate(ethcore_client(&client, &miner).to_delegate());
io.add_delegate(ethcore_set_client(&miner).to_delegate()); io.add_delegate(ethcore_set_client(&miner, &network).to_delegate());
let request = r#"{"jsonrpc": "2.0", "method": "ethcore_extraData", "params": [], "id": 1}"#; let request = r#"{"jsonrpc": "2.0", "method": "ethcore_extraData", "params": [], "id": 1}"#;
let response = r#"{"jsonrpc":"2.0","result":"0x01020304","id":1}"#; let response = r#"{"jsonrpc":"2.0","result":"0x01020304","id":1}"#;
@ -79,9 +86,10 @@ fn rpc_ethcore_default_extra_data() {
let miner = miner_service(); let miner = miner_service();
let client = client_service(); let client = client_service();
let network = network_service();
let io = IoHandler::new(); let io = IoHandler::new();
io.add_delegate(ethcore_client(&client, &miner).to_delegate()); io.add_delegate(ethcore_client(&client, &miner).to_delegate());
io.add_delegate(ethcore_set_client(&miner).to_delegate()); io.add_delegate(ethcore_set_client(&miner, &network).to_delegate());
let request = r#"{"jsonrpc": "2.0", "method": "ethcore_defaultExtraData", "params": [], "id": 1}"#; let request = r#"{"jsonrpc": "2.0", "method": "ethcore_defaultExtraData", "params": [], "id": 1}"#;
let response = format!(r#"{{"jsonrpc":"2.0","result":"0x{}","id":1}}"#, misc::version_data().to_hex()); let response = format!(r#"{{"jsonrpc":"2.0","result":"0x{}","id":1}}"#, misc::version_data().to_hex());
@ -93,9 +101,10 @@ fn rpc_ethcore_default_extra_data() {
fn rpc_ethcore_gas_floor_target() { fn rpc_ethcore_gas_floor_target() {
let miner = miner_service(); let miner = miner_service();
let client = client_service(); let client = client_service();
let network = network_service();
let io = IoHandler::new(); let io = IoHandler::new();
io.add_delegate(ethcore_client(&client, &miner).to_delegate()); io.add_delegate(ethcore_client(&client, &miner).to_delegate());
io.add_delegate(ethcore_set_client(&miner).to_delegate()); io.add_delegate(ethcore_set_client(&miner, &network).to_delegate());
let request = r#"{"jsonrpc": "2.0", "method": "ethcore_gasFloorTarget", "params": [], "id": 1}"#; let request = r#"{"jsonrpc": "2.0", "method": "ethcore_gasFloorTarget", "params": [], "id": 1}"#;
let response = r#"{"jsonrpc":"2.0","result":"0x3039","id":1}"#; let response = r#"{"jsonrpc":"2.0","result":"0x3039","id":1}"#;
@ -107,9 +116,10 @@ fn rpc_ethcore_gas_floor_target() {
fn rpc_ethcore_min_gas_price() { fn rpc_ethcore_min_gas_price() {
let miner = miner_service(); let miner = miner_service();
let client = client_service(); let client = client_service();
let network = network_service();
let io = IoHandler::new(); let io = IoHandler::new();
io.add_delegate(ethcore_client(&client, &miner).to_delegate()); io.add_delegate(ethcore_client(&client, &miner).to_delegate());
io.add_delegate(ethcore_set_client(&miner).to_delegate()); io.add_delegate(ethcore_set_client(&miner, &network).to_delegate());
let request = r#"{"jsonrpc": "2.0", "method": "ethcore_minGasPrice", "params": [], "id": 1}"#; let request = r#"{"jsonrpc": "2.0", "method": "ethcore_minGasPrice", "params": [], "id": 1}"#;
let response = r#"{"jsonrpc":"2.0","result":"0x01312d00","id":1}"#; let response = r#"{"jsonrpc":"2.0","result":"0x01312d00","id":1}"#;
@ -121,9 +131,10 @@ fn rpc_ethcore_min_gas_price() {
fn rpc_ethcore_set_min_gas_price() { fn rpc_ethcore_set_min_gas_price() {
let miner = miner_service(); let miner = miner_service();
let client = client_service(); let client = client_service();
let network = network_service();
let io = IoHandler::new(); let io = IoHandler::new();
io.add_delegate(ethcore_client(&client, &miner).to_delegate()); io.add_delegate(ethcore_client(&client, &miner).to_delegate());
io.add_delegate(ethcore_set_client(&miner).to_delegate()); io.add_delegate(ethcore_set_client(&miner, &network).to_delegate());
let request = r#"{"jsonrpc": "2.0", "method": "ethcore_setMinGasPrice", "params":["0xcd1722f3947def4cf144679da39c4c32bdc35681"], "id": 1}"#; let request = r#"{"jsonrpc": "2.0", "method": "ethcore_setMinGasPrice", "params":["0xcd1722f3947def4cf144679da39c4c32bdc35681"], "id": 1}"#;
let response = r#"{"jsonrpc":"2.0","result":true,"id":1}"#; let response = r#"{"jsonrpc":"2.0","result":true,"id":1}"#;
@ -136,9 +147,10 @@ fn rpc_ethcore_set_min_gas_price() {
fn rpc_ethcore_set_gas_floor_target() { fn rpc_ethcore_set_gas_floor_target() {
let miner = miner_service(); let miner = miner_service();
let client = client_service(); let client = client_service();
let network = network_service();
let io = IoHandler::new(); let io = IoHandler::new();
io.add_delegate(ethcore_client(&client, &miner).to_delegate()); io.add_delegate(ethcore_client(&client, &miner).to_delegate());
io.add_delegate(ethcore_set_client(&miner).to_delegate()); io.add_delegate(ethcore_set_client(&miner, &network).to_delegate());
let request = r#"{"jsonrpc": "2.0", "method": "ethcore_setGasFloorTarget", "params":["0xcd1722f3947def4cf144679da39c4c32bdc35681"], "id": 1}"#; let request = r#"{"jsonrpc": "2.0", "method": "ethcore_setGasFloorTarget", "params":["0xcd1722f3947def4cf144679da39c4c32bdc35681"], "id": 1}"#;
let response = r#"{"jsonrpc":"2.0","result":true,"id":1}"#; let response = r#"{"jsonrpc":"2.0","result":true,"id":1}"#;
@ -151,9 +163,10 @@ fn rpc_ethcore_set_gas_floor_target() {
fn rpc_ethcore_set_extra_data() { fn rpc_ethcore_set_extra_data() {
let miner = miner_service(); let miner = miner_service();
let client = client_service(); let client = client_service();
let network = network_service();
let io = IoHandler::new(); let io = IoHandler::new();
io.add_delegate(ethcore_client(&client, &miner).to_delegate()); io.add_delegate(ethcore_client(&client, &miner).to_delegate());
io.add_delegate(ethcore_set_client(&miner).to_delegate()); io.add_delegate(ethcore_set_client(&miner, &network).to_delegate());
let request = r#"{"jsonrpc": "2.0", "method": "ethcore_setExtraData", "params":["0xcd1722f3947def4cf144679da39c4c32bdc35681"], "id": 1}"#; let request = r#"{"jsonrpc": "2.0", "method": "ethcore_setExtraData", "params":["0xcd1722f3947def4cf144679da39c4c32bdc35681"], "id": 1}"#;
let response = r#"{"jsonrpc":"2.0","result":true,"id":1}"#; let response = r#"{"jsonrpc":"2.0","result":true,"id":1}"#;
@ -166,9 +179,10 @@ fn rpc_ethcore_set_extra_data() {
fn rpc_ethcore_set_author() { fn rpc_ethcore_set_author() {
let miner = miner_service(); let miner = miner_service();
let client = client_service(); let client = client_service();
let network = network_service();
let io = IoHandler::new(); let io = IoHandler::new();
io.add_delegate(ethcore_client(&client, &miner).to_delegate()); io.add_delegate(ethcore_client(&client, &miner).to_delegate());
io.add_delegate(ethcore_set_client(&miner).to_delegate()); io.add_delegate(ethcore_set_client(&miner, &network).to_delegate());
let request = r#"{"jsonrpc": "2.0", "method": "ethcore_setAuthor", "params":["0xcd1722f3947def4cf144679da39c4c32bdc35681"], "id": 1}"#; let request = r#"{"jsonrpc": "2.0", "method": "ethcore_setAuthor", "params":["0xcd1722f3947def4cf144679da39c4c32bdc35681"], "id": 1}"#;
let response = r#"{"jsonrpc":"2.0","result":true,"id":1}"#; let response = r#"{"jsonrpc":"2.0","result":true,"id":1}"#;
@ -181,13 +195,14 @@ fn rpc_ethcore_set_author() {
fn rpc_ethcore_dev_logs() { fn rpc_ethcore_dev_logs() {
let miner = miner_service(); let miner = miner_service();
let client = client_service(); let client = client_service();
let network = network_service();
let logger = logger(); let logger = logger();
logger.append("a".to_owned()); logger.append("a".to_owned());
logger.append("b".to_owned()); logger.append("b".to_owned());
let ethcore = EthcoreClient::new(&client, &miner, logger.clone(), settings()).to_delegate(); let ethcore = EthcoreClient::new(&client, &miner, logger.clone(), settings()).to_delegate();
let io = IoHandler::new(); let io = IoHandler::new();
io.add_delegate(ethcore); io.add_delegate(ethcore);
io.add_delegate(ethcore_set_client(&miner).to_delegate()); io.add_delegate(ethcore_set_client(&miner, &network).to_delegate());
let request = r#"{"jsonrpc": "2.0", "method": "ethcore_devLogs", "params":[], "id": 1}"#; let request = r#"{"jsonrpc": "2.0", "method": "ethcore_devLogs", "params":[], "id": 1}"#;
let response = r#"{"jsonrpc":"2.0","result":["b","a"],"id":1}"#; let response = r#"{"jsonrpc":"2.0","result":["b","a"],"id":1}"#;
@ -199,9 +214,10 @@ fn rpc_ethcore_dev_logs() {
fn rpc_ethcore_dev_logs_levels() { fn rpc_ethcore_dev_logs_levels() {
let miner = miner_service(); let miner = miner_service();
let client = client_service(); let client = client_service();
let network = network_service();
let io = IoHandler::new(); let io = IoHandler::new();
io.add_delegate(ethcore_client(&client, &miner).to_delegate()); io.add_delegate(ethcore_client(&client, &miner).to_delegate());
io.add_delegate(ethcore_set_client(&miner).to_delegate()); io.add_delegate(ethcore_set_client(&miner, &network).to_delegate());
let request = r#"{"jsonrpc": "2.0", "method": "ethcore_devLogsLevels", "params":[], "id": 1}"#; let request = r#"{"jsonrpc": "2.0", "method": "ethcore_devLogsLevels", "params":[], "id": 1}"#;
let response = r#"{"jsonrpc":"2.0","result":"rpc=trace","id":1}"#; let response = r#"{"jsonrpc":"2.0","result":"rpc=trace","id":1}"#;
@ -212,9 +228,10 @@ fn rpc_ethcore_dev_logs_levels() {
fn rpc_ethcore_set_transactions_limit() { fn rpc_ethcore_set_transactions_limit() {
let miner = miner_service(); let miner = miner_service();
let client = client_service(); let client = client_service();
let network = network_service();
let io = IoHandler::new(); let io = IoHandler::new();
io.add_delegate(ethcore_client(&client, &miner).to_delegate()); io.add_delegate(ethcore_client(&client, &miner).to_delegate());
io.add_delegate(ethcore_set_client(&miner).to_delegate()); io.add_delegate(ethcore_set_client(&miner, &network).to_delegate());
let request = r#"{"jsonrpc": "2.0", "method": "ethcore_setTransactionsLimit", "params":[10240240], "id": 1}"#; let request = r#"{"jsonrpc": "2.0", "method": "ethcore_setTransactionsLimit", "params":[10240240], "id": 1}"#;
let response = r#"{"jsonrpc":"2.0","result":true,"id":1}"#; let response = r#"{"jsonrpc":"2.0","result":true,"id":1}"#;
@ -227,9 +244,10 @@ fn rpc_ethcore_set_transactions_limit() {
fn rpc_ethcore_transactions_limit() { fn rpc_ethcore_transactions_limit() {
let miner = miner_service(); let miner = miner_service();
let client = client_service(); let client = client_service();
let network = network_service();
let io = IoHandler::new(); let io = IoHandler::new();
io.add_delegate(ethcore_client(&client, &miner).to_delegate()); io.add_delegate(ethcore_client(&client, &miner).to_delegate());
io.add_delegate(ethcore_set_client(&miner).to_delegate()); io.add_delegate(ethcore_set_client(&miner, &network).to_delegate());
let request = r#"{"jsonrpc": "2.0", "method": "ethcore_transactionsLimit", "params":[], "id": 1}"#; let request = r#"{"jsonrpc": "2.0", "method": "ethcore_transactionsLimit", "params":[], "id": 1}"#;
let response = r#"{"jsonrpc":"2.0","result":1024,"id":1}"#; let response = r#"{"jsonrpc":"2.0","result":1024,"id":1}"#;
@ -241,9 +259,10 @@ fn rpc_ethcore_transactions_limit() {
fn rpc_ethcore_net_chain() { fn rpc_ethcore_net_chain() {
let miner = miner_service(); let miner = miner_service();
let client = client_service(); let client = client_service();
let network = network_service();
let io = IoHandler::new(); let io = IoHandler::new();
io.add_delegate(ethcore_client(&client, &miner).to_delegate()); io.add_delegate(ethcore_client(&client, &miner).to_delegate());
io.add_delegate(ethcore_set_client(&miner).to_delegate()); io.add_delegate(ethcore_set_client(&miner, &network).to_delegate());
let request = r#"{"jsonrpc": "2.0", "method": "ethcore_netChain", "params":[], "id": 1}"#; let request = r#"{"jsonrpc": "2.0", "method": "ethcore_netChain", "params":[], "id": 1}"#;
let response = r#"{"jsonrpc":"2.0","result":"testchain","id":1}"#; let response = r#"{"jsonrpc":"2.0","result":"testchain","id":1}"#;
@ -255,9 +274,10 @@ fn rpc_ethcore_net_chain() {
fn rpc_ethcore_net_max_peers() { fn rpc_ethcore_net_max_peers() {
let miner = miner_service(); let miner = miner_service();
let client = client_service(); let client = client_service();
let network = network_service();
let io = IoHandler::new(); let io = IoHandler::new();
io.add_delegate(ethcore_client(&client, &miner).to_delegate()); io.add_delegate(ethcore_client(&client, &miner).to_delegate());
io.add_delegate(ethcore_set_client(&miner).to_delegate()); io.add_delegate(ethcore_set_client(&miner, &network).to_delegate());
let request = r#"{"jsonrpc": "2.0", "method": "ethcore_netMaxPeers", "params":[], "id": 1}"#; let request = r#"{"jsonrpc": "2.0", "method": "ethcore_netMaxPeers", "params":[], "id": 1}"#;
let response = r#"{"jsonrpc":"2.0","result":25,"id":1}"#; let response = r#"{"jsonrpc":"2.0","result":25,"id":1}"#;
@ -269,9 +289,10 @@ fn rpc_ethcore_net_max_peers() {
fn rpc_ethcore_net_port() { fn rpc_ethcore_net_port() {
let miner = miner_service(); let miner = miner_service();
let client = client_service(); let client = client_service();
let network = network_service();
let io = IoHandler::new(); let io = IoHandler::new();
io.add_delegate(ethcore_client(&client, &miner).to_delegate()); io.add_delegate(ethcore_client(&client, &miner).to_delegate());
io.add_delegate(ethcore_set_client(&miner).to_delegate()); io.add_delegate(ethcore_set_client(&miner, &network).to_delegate());
let request = r#"{"jsonrpc": "2.0", "method": "ethcore_netPort", "params":[], "id": 1}"#; let request = r#"{"jsonrpc": "2.0", "method": "ethcore_netPort", "params":[], "id": 1}"#;
let response = r#"{"jsonrpc":"2.0","result":30303,"id":1}"#; let response = r#"{"jsonrpc":"2.0","result":30303,"id":1}"#;
@ -283,9 +304,10 @@ fn rpc_ethcore_net_port() {
fn rpc_ethcore_rpc_settings() { fn rpc_ethcore_rpc_settings() {
let miner = miner_service(); let miner = miner_service();
let client = client_service(); let client = client_service();
let network = network_service();
let io = IoHandler::new(); let io = IoHandler::new();
io.add_delegate(ethcore_client(&client, &miner).to_delegate()); io.add_delegate(ethcore_client(&client, &miner).to_delegate());
io.add_delegate(ethcore_set_client(&miner).to_delegate()); io.add_delegate(ethcore_set_client(&miner, &network).to_delegate());
let request = r#"{"jsonrpc": "2.0", "method": "ethcore_rpcSettings", "params":[], "id": 1}"#; let request = r#"{"jsonrpc": "2.0", "method": "ethcore_rpcSettings", "params":[], "id": 1}"#;
let response = r#"{"jsonrpc":"2.0","result":{"enabled":true,"interface":"all","port":8545},"id":1}"#; let response = r#"{"jsonrpc":"2.0","result":{"enabled":true,"interface":"all","port":8545},"id":1}"#;
@ -297,9 +319,10 @@ fn rpc_ethcore_rpc_settings() {
fn rpc_ethcore_node_name() { fn rpc_ethcore_node_name() {
let miner = miner_service(); let miner = miner_service();
let client = client_service(); let client = client_service();
let network = network_service();
let io = IoHandler::new(); let io = IoHandler::new();
io.add_delegate(ethcore_client(&client, &miner).to_delegate()); io.add_delegate(ethcore_client(&client, &miner).to_delegate());
io.add_delegate(ethcore_set_client(&miner).to_delegate()); io.add_delegate(ethcore_set_client(&miner, &network).to_delegate());
let request = r#"{"jsonrpc": "2.0", "method": "ethcore_nodeName", "params":[], "id": 1}"#; let request = r#"{"jsonrpc": "2.0", "method": "ethcore_nodeName", "params":[], "id": 1}"#;
let response = r#"{"jsonrpc":"2.0","result":"mynode","id":1}"#; let response = r#"{"jsonrpc":"2.0","result":"mynode","id":1}"#;

View File

@ -37,6 +37,18 @@ pub trait EthcoreSet: Sized + Send + Sync + 'static {
/// Sets the limits for transaction queue. /// Sets the limits for transaction queue.
fn set_transactions_limit(&self, _: Params) -> Result<Value, Error>; fn set_transactions_limit(&self, _: Params) -> Result<Value, Error>;
/// Add a reserved peer.
fn add_reserved_peer(&self, _: Params) -> Result<Value, Error>;
/// Remove a reserved peer.
fn remove_reserved_peer(&self, _: Params) -> Result<Value, Error>;
/// Drop all non-reserved peers.
fn drop_non_reserved_peers(&self, _: Params) -> Result<Value, Error>;
/// Accept non-reserved peers (default behavior)
fn accept_non_reserved_peers(&self, _: Params) -> Result<Value, Error>;
/// Should be used to convert object to io delegate. /// Should be used to convert object to io delegate.
fn to_delegate(self) -> IoDelegate<Self> { fn to_delegate(self) -> IoDelegate<Self> {
let mut delegate = IoDelegate::new(Arc::new(self)); let mut delegate = IoDelegate::new(Arc::new(self));
@ -45,6 +57,10 @@ pub trait EthcoreSet: Sized + Send + Sync + 'static {
delegate.add_method("ethcore_setExtraData", EthcoreSet::set_extra_data); delegate.add_method("ethcore_setExtraData", EthcoreSet::set_extra_data);
delegate.add_method("ethcore_setAuthor", EthcoreSet::set_author); delegate.add_method("ethcore_setAuthor", EthcoreSet::set_author);
delegate.add_method("ethcore_setTransactionsLimit", EthcoreSet::set_transactions_limit); delegate.add_method("ethcore_setTransactionsLimit", EthcoreSet::set_transactions_limit);
delegate.add_method("ethcore_addReservedPeer", EthcoreSet::add_reserved_peer);
delegate.add_method("ethcore_removeReservedPeer", EthcoreSet::remove_reserved_peer);
delegate.add_method("ethcore_dropNonReservedPeers", EthcoreSet::drop_non_reserved_peers);
delegate.add_method("ethcore_acceptNonReservedPeers", EthcoreSet::accept_non_reserved_peers);
delegate delegate
} }

View File

@ -95,7 +95,6 @@ use ethcore::views::{HeaderView, BlockView};
use ethcore::header::{BlockNumber, Header as BlockHeader}; use ethcore::header::{BlockNumber, Header as BlockHeader};
use ethcore::client::{BlockChainClient, BlockStatus, BlockID, BlockChainInfo}; use ethcore::client::{BlockChainClient, BlockStatus, BlockID, BlockChainInfo};
use ethcore::error::*; use ethcore::error::*;
use ethcore::transaction::SignedTransaction;
use ethcore::block::Block; use ethcore::block::Block;
use io::SyncIo; use io::SyncIo;
use time; use time;
@ -940,15 +939,15 @@ impl ChainSync {
return Ok(()); return Ok(());
} }
let item_count = r.item_count(); let mut item_count = r.item_count();
trace!(target: "sync", "{} -> Transactions ({} entries)", peer_id, item_count); trace!(target: "sync", "{} -> Transactions ({} entries)", peer_id, item_count);
item_count = min(item_count, MAX_TX_TO_IMPORT);
let mut transactions = Vec::with_capacity(item_count); let mut transactions = Vec::with_capacity(item_count);
for i in 0 .. min(item_count, MAX_TX_TO_IMPORT) { for i in 0 .. item_count {
let tx: SignedTransaction = try!(r.val_at(i)); let tx = try!(r.at(i)).as_raw().to_vec();
transactions.push(tx); transactions.push(tx);
} }
let _ = io.chain().import_transactions(transactions); let _ = io.chain().queue_transactions(transactions);
Ok(()) Ok(())
} }
@ -1097,7 +1096,7 @@ impl ChainSync {
Ok(Some((RECEIPTS_PACKET, rlp_result))) Ok(Some((RECEIPTS_PACKET, rlp_result)))
} }
fn return_rlp<FRlp, FError>(&self, io: &mut SyncIo, rlp: &UntrustedRlp, peer: PeerId, rlp_func: FRlp, error_func: FError) -> Result<(), PacketDecodeError> fn return_rlp<FRlp, FError>(io: &mut SyncIo, rlp: &UntrustedRlp, peer: PeerId, rlp_func: FRlp, error_func: FError) -> Result<(), PacketDecodeError>
where FRlp : Fn(&SyncIo, &UntrustedRlp, PeerId) -> RlpResponseResult, where FRlp : Fn(&SyncIo, &UntrustedRlp, PeerId) -> RlpResponseResult,
FError : FnOnce(UtilError) -> String FError : FnOnce(UtilError) -> String
{ {
@ -1114,13 +1113,41 @@ impl ChainSync {
} }
/// Dispatch incoming requests and responses /// Dispatch incoming requests and responses
pub fn on_packet(&mut self, io: &mut SyncIo, peer: PeerId, packet_id: u8, data: &[u8]) { pub fn dispatch_packet(sync: &RwLock<ChainSync>, io: &mut SyncIo, peer: PeerId, packet_id: u8, data: &[u8]) {
let rlp = UntrustedRlp::new(data); let rlp = UntrustedRlp::new(data);
let result = match packet_id {
GET_BLOCK_BODIES_PACKET => ChainSync::return_rlp(io, &rlp, peer,
ChainSync::return_block_bodies,
|e| format!("Error sending block bodies: {:?}", e)),
GET_BLOCK_HEADERS_PACKET => ChainSync::return_rlp(io, &rlp, peer,
ChainSync::return_block_headers,
|e| format!("Error sending block headers: {:?}", e)),
GET_RECEIPTS_PACKET => ChainSync::return_rlp(io, &rlp, peer,
ChainSync::return_receipts,
|e| format!("Error sending receipts: {:?}", e)),
GET_NODE_DATA_PACKET => ChainSync::return_rlp(io, &rlp, peer,
ChainSync::return_node_data,
|e| format!("Error sending nodes: {:?}", e)),
_ => {
sync.write().unwrap().on_packet(io, peer, packet_id, data);
Ok(())
}
};
result.unwrap_or_else(|e| {
debug!(target:"sync", "{} -> Malformed packet {} : {}", peer, packet_id, e);
})
}
pub fn on_packet(&mut self, io: &mut SyncIo, peer: PeerId, packet_id: u8, data: &[u8]) {
if packet_id != STATUS_PACKET && !self.peers.contains_key(&peer) { if packet_id != STATUS_PACKET && !self.peers.contains_key(&peer) {
debug!(target:"sync", "Unexpected packet from unregistered peer: {}:{}", peer, io.peer_info(peer)); debug!(target:"sync", "Unexpected packet from unregistered peer: {}:{}", peer, io.peer_info(peer));
return; return;
} }
let rlp = UntrustedRlp::new(data);
let result = match packet_id { let result = match packet_id {
STATUS_PACKET => self.on_peer_status(io, peer, &rlp), STATUS_PACKET => self.on_peer_status(io, peer, &rlp),
TRANSACTIONS_PACKET => self.on_peer_transactions(io, peer, &rlp), TRANSACTIONS_PACKET => self.on_peer_transactions(io, peer, &rlp),
@ -1128,23 +1155,6 @@ impl ChainSync {
BLOCK_BODIES_PACKET => self.on_peer_block_bodies(io, peer, &rlp), BLOCK_BODIES_PACKET => self.on_peer_block_bodies(io, peer, &rlp),
NEW_BLOCK_PACKET => self.on_peer_new_block(io, peer, &rlp), NEW_BLOCK_PACKET => self.on_peer_new_block(io, peer, &rlp),
NEW_BLOCK_HASHES_PACKET => self.on_peer_new_hashes(io, peer, &rlp), NEW_BLOCK_HASHES_PACKET => self.on_peer_new_hashes(io, peer, &rlp),
GET_BLOCK_BODIES_PACKET => self.return_rlp(io, &rlp, peer,
ChainSync::return_block_bodies,
|e| format!("Error sending block bodies: {:?}", e)),
GET_BLOCK_HEADERS_PACKET => self.return_rlp(io, &rlp, peer,
ChainSync::return_block_headers,
|e| format!("Error sending block headers: {:?}", e)),
GET_RECEIPTS_PACKET => self.return_rlp(io, &rlp, peer,
ChainSync::return_receipts,
|e| format!("Error sending receipts: {:?}", e)),
GET_NODE_DATA_PACKET => self.return_rlp(io, &rlp, peer,
ChainSync::return_node_data,
|e| format!("Error sending nodes: {:?}", e)),
_ => { _ => {
debug!(target: "sync", "Unknown packet {}", packet_id); debug!(target: "sync", "Unknown packet {}", packet_id);
Ok(()) Ok(())
@ -1424,7 +1434,7 @@ mod tests {
fn return_receipts() { fn return_receipts() {
let mut client = TestBlockChainClient::new(); let mut client = TestBlockChainClient::new();
let mut queue = VecDeque::new(); let mut queue = VecDeque::new();
let mut sync = dummy_sync_with_peer(H256::new(), &client); let sync = dummy_sync_with_peer(H256::new(), &client);
let mut io = TestIo::new(&mut client, &mut queue, None); let mut io = TestIo::new(&mut client, &mut queue, None);
let mut receipt_list = RlpStream::new_list(4); let mut receipt_list = RlpStream::new_list(4);
@ -1445,7 +1455,7 @@ mod tests {
assert_eq!(603, rlp_result.unwrap().1.out().len()); assert_eq!(603, rlp_result.unwrap().1.out().len());
io.sender = Some(2usize); io.sender = Some(2usize);
sync.on_packet(&mut io, 0usize, super::GET_RECEIPTS_PACKET, &receipts_request); ChainSync::dispatch_packet(&RwLock::new(sync), &mut io, 0usize, super::GET_RECEIPTS_PACKET, &receipts_request);
assert_eq!(1, io.queue.len()); assert_eq!(1, io.queue.len());
} }
@ -1517,7 +1527,7 @@ mod tests {
fn return_nodes() { fn return_nodes() {
let mut client = TestBlockChainClient::new(); let mut client = TestBlockChainClient::new();
let mut queue = VecDeque::new(); let mut queue = VecDeque::new();
let mut sync = dummy_sync_with_peer(H256::new(), &client); let sync = dummy_sync_with_peer(H256::new(), &client);
let mut io = TestIo::new(&mut client, &mut queue, None); let mut io = TestIo::new(&mut client, &mut queue, None);
let mut node_list = RlpStream::new_list(3); let mut node_list = RlpStream::new_list(3);
@ -1537,7 +1547,8 @@ mod tests {
assert_eq!(34, rlp_result.unwrap().1.out().len()); assert_eq!(34, rlp_result.unwrap().1.out().len());
io.sender = Some(2usize); io.sender = Some(2usize);
sync.on_packet(&mut io, 0usize, super::GET_NODE_DATA_PACKET, &node_request);
ChainSync::dispatch_packet(&RwLock::new(sync), &mut io, 0usize, super::GET_NODE_DATA_PACKET, &node_request);
assert_eq!(1, io.queue.len()); assert_eq!(1, io.queue.len());
} }

View File

@ -169,7 +169,7 @@ impl NetworkProtocolHandler<SyncMessage> for EthSync {
} }
fn read(&self, io: &NetworkContext<SyncMessage>, peer: &PeerId, packet_id: u8, data: &[u8]) { fn read(&self, io: &NetworkContext<SyncMessage>, peer: &PeerId, packet_id: u8, data: &[u8]) {
self.sync.write().unwrap().on_packet(&mut NetSyncIo::new(io, self.chain.deref()) , *peer, packet_id, data); ChainSync::dispatch_packet(&self.sync, &mut NetSyncIo::new(io, self.chain.deref()) , *peer, packet_id, data);
} }
fn connected(&self, io: &NetworkContext<SyncMessage>, peer: &PeerId) { fn connected(&self, io: &NetworkContext<SyncMessage>, peer: &PeerId) {

View File

@ -47,7 +47,7 @@ fn status_after_sync() {
net.peer_mut(1).chain.add_blocks(1000, EachBlockWith::Uncle); net.peer_mut(1).chain.add_blocks(1000, EachBlockWith::Uncle);
net.peer_mut(2).chain.add_blocks(1000, EachBlockWith::Uncle); net.peer_mut(2).chain.add_blocks(1000, EachBlockWith::Uncle);
net.sync(); net.sync();
let status = net.peer(0).sync.status(); let status = net.peer(0).sync.read().unwrap().status();
assert_eq!(status.state, SyncState::Idle); assert_eq!(status.state, SyncState::Idle);
} }
@ -107,14 +107,14 @@ fn restart() {
assert!(net.peer(0).chain.chain_info().best_block_number > 100); assert!(net.peer(0).chain.chain_info().best_block_number > 100);
net.restart_peer(0); net.restart_peer(0);
let status = net.peer(0).sync.status(); let status = net.peer(0).sync.read().unwrap().status();
assert_eq!(status.state, SyncState::ChainHead); assert_eq!(status.state, SyncState::ChainHead);
} }
#[test] #[test]
fn status_empty() { fn status_empty() {
let net = TestNet::new(2); let net = TestNet::new(2);
assert_eq!(net.peer(0).sync.status().state, SyncState::Idle); assert_eq!(net.peer(0).sync.read().unwrap().status().state, SyncState::Idle);
} }
#[test] #[test]

View File

@ -78,7 +78,7 @@ pub struct TestPacket {
pub struct TestPeer { pub struct TestPeer {
pub chain: TestBlockChainClient, pub chain: TestBlockChainClient,
pub sync: ChainSync, pub sync: RwLock<ChainSync>,
pub queue: VecDeque<TestPacket>, pub queue: VecDeque<TestPacket>,
} }
@ -97,7 +97,7 @@ impl TestNet {
let chain = TestBlockChainClient::new(); let chain = TestBlockChainClient::new();
let sync = ChainSync::new(SyncConfig::default(), &chain); let sync = ChainSync::new(SyncConfig::default(), &chain);
net.peers.push(TestPeer { net.peers.push(TestPeer {
sync: sync, sync: RwLock::new(sync),
chain: chain, chain: chain,
queue: VecDeque::new(), queue: VecDeque::new(),
}); });
@ -118,7 +118,7 @@ impl TestNet {
for client in 0..self.peers.len() { for client in 0..self.peers.len() {
if peer != client { if peer != client {
let mut p = self.peers.get_mut(peer).unwrap(); let mut p = self.peers.get_mut(peer).unwrap();
p.sync.on_peer_connected(&mut TestIo::new(&mut p.chain, &mut p.queue, Some(client as PeerId)), client as PeerId); p.sync.write().unwrap().on_peer_connected(&mut TestIo::new(&mut p.chain, &mut p.queue, Some(client as PeerId)), client as PeerId);
} }
} }
} }
@ -129,22 +129,22 @@ impl TestNet {
if let Some(packet) = self.peers[peer].queue.pop_front() { if let Some(packet) = self.peers[peer].queue.pop_front() {
let mut p = self.peers.get_mut(packet.recipient).unwrap(); let mut p = self.peers.get_mut(packet.recipient).unwrap();
trace!("--- {} -> {} ---", peer, packet.recipient); trace!("--- {} -> {} ---", peer, packet.recipient);
p.sync.on_packet(&mut TestIo::new(&mut p.chain, &mut p.queue, Some(peer as PeerId)), peer as PeerId, packet.packet_id, &packet.data); ChainSync::dispatch_packet(&p.sync, &mut TestIo::new(&mut p.chain, &mut p.queue, Some(peer as PeerId)), peer as PeerId, packet.packet_id, &packet.data);
trace!("----------------"); trace!("----------------");
} }
let mut p = self.peers.get_mut(peer).unwrap(); let mut p = self.peers.get_mut(peer).unwrap();
p.sync.maintain_sync(&mut TestIo::new(&mut p.chain, &mut p.queue, None)); p.sync.write().unwrap().maintain_sync(&mut TestIo::new(&mut p.chain, &mut p.queue, None));
} }
} }
pub fn sync_step_peer(&mut self, peer_num: usize) { pub fn sync_step_peer(&mut self, peer_num: usize) {
let mut peer = self.peer_mut(peer_num); let mut peer = self.peer_mut(peer_num);
peer.sync.maintain_sync(&mut TestIo::new(&mut peer.chain, &mut peer.queue, None)); peer.sync.write().unwrap().maintain_sync(&mut TestIo::new(&mut peer.chain, &mut peer.queue, None));
} }
pub fn restart_peer(&mut self, i: usize) { pub fn restart_peer(&mut self, i: usize) {
let peer = self.peer_mut(i); let peer = self.peer_mut(i);
peer.sync.restart(&mut TestIo::new(&mut peer.chain, &mut peer.queue, None)); peer.sync.write().unwrap().restart(&mut TestIo::new(&mut peer.chain, &mut peer.queue, None));
} }
pub fn sync(&mut self) -> u32 { pub fn sync(&mut self) -> u32 {
@ -173,6 +173,6 @@ impl TestNet {
pub fn trigger_chain_new_blocks(&mut self, peer_id: usize) { pub fn trigger_chain_new_blocks(&mut self, peer_id: usize) {
let mut peer = self.peer_mut(peer_id); let mut peer = self.peer_mut(peer_id);
peer.sync.chain_new_blocks(&mut TestIo::new(&mut peer.chain, &mut peer.queue, None), &[], &[], &[], &[]); peer.sync.write().unwrap().chain_new_blocks(&mut TestIo::new(&mut peer.chain, &mut peer.queue, None), &[], &[], &[], &[]);
} }
} }

View File

@ -154,6 +154,7 @@ pub mod panics;
pub mod table; pub mod table;
pub mod network_settings; pub mod network_settings;
pub mod path; pub mod path;
mod timer;
pub use common::*; pub use common::*;
pub use misc::*; pub use misc::*;
@ -175,6 +176,7 @@ pub use network::*;
pub use io::*; pub use io::*;
pub use log::*; pub use log::*;
pub use kvdb::*; pub use kvdb::*;
pub use timer::*;
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {

View File

@ -14,9 +14,9 @@
// You should have received a copy of the GNU General Public License // You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>. // along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::net::{SocketAddr}; use std::net::SocketAddr;
use std::collections::{HashMap}; use std::collections::{HashMap, HashSet};
use std::str::{FromStr}; use std::str::FromStr;
use std::sync::*; use std::sync::*;
use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering as AtomicOrdering}; use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering as AtomicOrdering};
use std::ops::*; use std::ops::*;
@ -35,7 +35,7 @@ use rlp::*;
use network::session::{Session, SessionData}; use network::session::{Session, SessionData};
use error::*; use error::*;
use io::*; use io::*;
use network::{NetworkProtocolHandler, PROTOCOL_VERSION}; use network::{NetworkProtocolHandler, NonReservedPeerMode, PROTOCOL_VERSION};
use network::node_table::*; use network::node_table::*;
use network::stats::NetworkStats; use network::stats::NetworkStats;
use network::error::{NetworkError, DisconnectReason}; use network::error::{NetworkError, DisconnectReason};
@ -65,8 +65,6 @@ pub struct NetworkConfiguration {
pub nat_enabled: bool, pub nat_enabled: bool,
/// Enable discovery /// Enable discovery
pub discovery_enabled: bool, pub discovery_enabled: bool,
/// Pin to reserved nodes only
pub reserved_only: bool,
/// List of initial node addresses /// List of initial node addresses
pub boot_nodes: Vec<String>, pub boot_nodes: Vec<String>,
/// Use provided node key instead of default /// Use provided node key instead of default
@ -75,6 +73,8 @@ pub struct NetworkConfiguration {
pub ideal_peers: u32, pub ideal_peers: u32,
/// List of reserved node addresses. /// List of reserved node addresses.
pub reserved_nodes: Vec<String>, pub reserved_nodes: Vec<String>,
/// The non-reserved peer mode.
pub non_reserved_mode: NonReservedPeerMode,
} }
impl Default for NetworkConfiguration { impl Default for NetworkConfiguration {
@ -93,11 +93,11 @@ impl NetworkConfiguration {
udp_port: None, udp_port: None,
nat_enabled: true, nat_enabled: true,
discovery_enabled: true, discovery_enabled: true,
reserved_only: false,
boot_nodes: Vec::new(), boot_nodes: Vec::new(),
use_secret: None, use_secret: None,
ideal_peers: 25, ideal_peers: 25,
reserved_nodes: Vec::new(), reserved_nodes: Vec::new(),
non_reserved_mode: NonReservedPeerMode::Accept,
} }
} }
@ -191,13 +191,15 @@ pub struct NetworkContext<'s, Message> where Message: Send + Sync + Clone + 'sta
sessions: Arc<RwLock<Slab<SharedSession>>>, sessions: Arc<RwLock<Slab<SharedSession>>>,
session: Option<SharedSession>, session: Option<SharedSession>,
session_id: Option<StreamToken>, session_id: Option<StreamToken>,
reserved_peers: &'s HashSet<NodeId>,
} }
impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone + 'static, { impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone + 'static, {
/// Create a new network IO access point. Takes references to all the data that can be updated within the IO handler. /// Create a new network IO access point. Takes references to all the data that can be updated within the IO handler.
fn new(io: &'s IoContext<NetworkIoMessage<Message>>, fn new(io: &'s IoContext<NetworkIoMessage<Message>>,
protocol: ProtocolId, protocol: ProtocolId,
session: Option<SharedSession>, sessions: Arc<RwLock<Slab<SharedSession>>>) -> NetworkContext<'s, Message> { session: Option<SharedSession>, sessions: Arc<RwLock<Slab<SharedSession>>>,
reserved_peers: &'s HashSet<NodeId>) -> NetworkContext<'s, Message> {
let id = session.as_ref().map(|s| s.lock().unwrap().token()); let id = session.as_ref().map(|s| s.lock().unwrap().token());
NetworkContext { NetworkContext {
io: io, io: io,
@ -205,6 +207,7 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone
session_id: id, session_id: id,
session: session, session: session,
sessions: sessions, sessions: sessions,
reserved_peers: reserved_peers,
} }
} }
@ -237,7 +240,7 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone
self.io.message(NetworkIoMessage::User(msg)); self.io.message(NetworkIoMessage::User(msg));
} }
/// Send an IO message /// Get an IoChannel.
pub fn io_channel(&self) -> IoChannel<NetworkIoMessage<Message>> { pub fn io_channel(&self) -> IoChannel<NetworkIoMessage<Message>> {
self.io.channel() self.io.channel()
} }
@ -335,7 +338,7 @@ pub struct Host<Message> where Message: Send + Sync + Clone {
timers: RwLock<HashMap<TimerToken, ProtocolTimer>>, timers: RwLock<HashMap<TimerToken, ProtocolTimer>>,
timer_counter: RwLock<usize>, timer_counter: RwLock<usize>,
stats: Arc<NetworkStats>, stats: Arc<NetworkStats>,
pinned_nodes: Vec<NodeId>, reserved_nodes: RwLock<HashSet<NodeId>>,
num_sessions: AtomicUsize, num_sessions: AtomicUsize,
stopping: AtomicBool, stopping: AtomicBool,
} }
@ -390,28 +393,28 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
timers: RwLock::new(HashMap::new()), timers: RwLock::new(HashMap::new()),
timer_counter: RwLock::new(USER_TIMER), timer_counter: RwLock::new(USER_TIMER),
stats: stats, stats: stats,
pinned_nodes: Vec::new(), reserved_nodes: RwLock::new(HashSet::new()),
num_sessions: AtomicUsize::new(0), num_sessions: AtomicUsize::new(0),
stopping: AtomicBool::new(false), stopping: AtomicBool::new(false),
}; };
for n in boot_nodes { for n in boot_nodes {
// don't pin boot nodes. host.add_node(&n);
host.add_node(&n, false);
} }
for n in reserved_nodes { for n in reserved_nodes {
host.add_node(&n, true); if let Err(e) = host.add_reserved_node(&n) {
debug!(target: "network", "Error parsing node id: {}: {:?}", n, e);
}
} }
Ok(host) Ok(host)
} }
pub fn add_node(&mut self, id: &str, pin: bool) { pub fn add_node(&mut self, id: &str) {
match Node::from_str(id) { match Node::from_str(id) {
Err(e) => { debug!(target: "network", "Could not add node {}: {:?}", id, e); }, Err(e) => { debug!(target: "network", "Could not add node {}: {:?}", id, e); },
Ok(n) => { Ok(n) => {
let entry = NodeEntry { endpoint: n.endpoint.clone(), id: n.id.clone() }; let entry = NodeEntry { endpoint: n.endpoint.clone(), id: n.id.clone() };
if pin { self.pinned_nodes.push(n.id.clone()) }
self.nodes.write().unwrap().add_node(n); self.nodes.write().unwrap().add_node(n);
if let Some(ref mut discovery) = *self.discovery.lock().unwrap() { if let Some(ref mut discovery) = *self.discovery.lock().unwrap() {
@ -421,6 +424,56 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
} }
} }
pub fn add_reserved_node(&self, id: &str) -> Result<(), UtilError> {
let n = try!(Node::from_str(id));
let entry = NodeEntry { endpoint: n.endpoint.clone(), id: n.id.clone() };
self.reserved_nodes.write().unwrap().insert(n.id.clone());
if let Some(ref mut discovery) = *self.discovery.lock().unwrap() {
discovery.add_node(entry);
}
Ok(())
}
pub fn set_non_reserved_mode(&self, mode: NonReservedPeerMode, io: &IoContext<NetworkIoMessage<Message>>) {
let mut info = self.info.write().unwrap();
if info.config.non_reserved_mode != mode {
info.config.non_reserved_mode = mode.clone();
drop(info);
if let NonReservedPeerMode::Deny = mode {
// disconnect all non-reserved peers here.
let reserved: HashSet<NodeId> = self.reserved_nodes.read().unwrap().clone();
let mut to_kill = Vec::new();
for e in self.sessions.write().unwrap().iter_mut() {
let mut s = e.lock().unwrap();
{
let id = s.id();
if id.is_some() && reserved.contains(id.unwrap()) {
continue;
}
}
s.disconnect(io, DisconnectReason::ClientQuit);
to_kill.push(s.token());
}
for p in to_kill {
trace!(target: "network", "Disconnecting on reserved-only mode: {}", p);
self.kill_connection(p, io, false);
}
}
}
}
pub fn remove_reserved_node(&self, id: &str) -> Result<(), UtilError> {
let n = try!(Node::from_str(id));
self.reserved_nodes.write().unwrap().remove(&n.id);
Ok(())
}
pub fn client_version() -> String { pub fn client_version() -> String {
version() version()
} }
@ -483,7 +536,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
// Initialize discovery. // Initialize discovery.
let discovery = { let discovery = {
let info = self.info.read().unwrap(); let info = self.info.read().unwrap();
if info.config.discovery_enabled && !info.config.reserved_only { if info.config.discovery_enabled && info.config.non_reserved_mode == NonReservedPeerMode::Accept {
Some(Discovery::new(&info.keys, public_endpoint.address.clone(), public_endpoint, DISCOVERY)) Some(Discovery::new(&info.keys, public_endpoint.address.clone(), public_endpoint, DISCOVERY))
} else { None } } else { None }
}; };
@ -540,17 +593,26 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
} }
fn connect_peers(&self, io: &IoContext<NetworkIoMessage<Message>>) { fn connect_peers(&self, io: &IoContext<NetworkIoMessage<Message>>) {
if self.info.read().unwrap().capabilities.is_empty() { let (ideal_peers, mut pin) = {
let info = self.info.read().unwrap();
if info.capabilities.is_empty() {
return; return;
} }
let ideal_peers = { self.info.read().unwrap().config.ideal_peers }; let config = &info.config;
let pin = { self.info.read().unwrap().config.reserved_only };
(config.ideal_peers, config.non_reserved_mode == NonReservedPeerMode::Deny)
};
let session_count = self.session_count(); let session_count = self.session_count();
if session_count >= ideal_peers as usize + self.pinned_nodes.len() { let reserved_nodes = self.reserved_nodes.read().unwrap();
if session_count >= ideal_peers as usize + reserved_nodes.len() {
// check if all pinned nodes are connected. // check if all pinned nodes are connected.
if self.pinned_nodes.iter().all(|n| self.have_session(n) && self.connecting_to(n)) { if reserved_nodes.iter().all(|n| self.have_session(n) && self.connecting_to(n)) {
return; return;
} }
// if not, only attempt connect to reserved peers
pin = true;
} }
let handshake_count = self.handshake_count(); let handshake_count = self.handshake_count();
@ -562,7 +624,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
// iterate over all nodes, reserved ones coming first. // iterate over all nodes, reserved ones coming first.
// if we are pinned to only reserved nodes, ignore all others. // if we are pinned to only reserved nodes, ignore all others.
let nodes = self.pinned_nodes.iter().cloned().chain(if !pin { let nodes = reserved_nodes.iter().cloned().chain(if !pin {
self.nodes.read().unwrap().nodes() self.nodes.read().unwrap().nodes()
} else { } else {
Vec::new() Vec::new()
@ -684,14 +746,11 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
match s.readable(io, &self.info.read().unwrap()) { match s.readable(io, &self.info.read().unwrap()) {
Err(e) => { Err(e) => {
trace!(target: "network", "Session read error: {}:{:?} ({:?}) {:?}", token, s.id(), s.remote_addr(), e); trace!(target: "network", "Session read error: {}:{:?} ({:?}) {:?}", token, s.id(), s.remote_addr(), e);
match e { if let UtilError::Network(NetworkError::Disconnect(DisconnectReason::IncompatibleProtocol)) = e {
UtilError::Network(NetworkError::Disconnect(DisconnectReason::IncompatibleProtocol)) => {
if let Some(id) = s.id() { if let Some(id) = s.id() {
self.nodes.write().unwrap().mark_as_useless(id); self.nodes.write().unwrap().mark_as_useless(id);
} }
} }
_ => (),
}
kill = true; kill = true;
break; break;
}, },
@ -699,11 +758,20 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
self.num_sessions.fetch_add(1, AtomicOrdering::SeqCst); self.num_sessions.fetch_add(1, AtomicOrdering::SeqCst);
if !s.info.originated { if !s.info.originated {
let session_count = self.session_count(); let session_count = self.session_count();
let ideal_peers = { self.info.read().unwrap().config.ideal_peers }; let reserved_nodes = self.reserved_nodes.read().unwrap();
if session_count >= ideal_peers as usize { let (ideal_peers, reserved_only) = {
let info = self.info.read().unwrap();
(info.config.ideal_peers, info.config.non_reserved_mode == NonReservedPeerMode::Deny)
};
if session_count >= ideal_peers as usize || reserved_only {
// only proceed if the connecting peer is reserved.
if !reserved_nodes.contains(s.id().unwrap()) {
s.disconnect(io, DisconnectReason::TooManyPeers); s.disconnect(io, DisconnectReason::TooManyPeers);
return; return;
} }
}
// Add it no node table // Add it no node table
if let Ok(address) = s.remote_addr() { if let Ok(address) = s.remote_addr() {
let entry = NodeEntry { id: s.id().unwrap().clone(), endpoint: NodeEndpoint { address: address, udp_port: address.port() } }; let entry = NodeEntry { id: s.id().unwrap().clone(), endpoint: NodeEndpoint { address: address, udp_port: address.port() } };
@ -738,14 +806,17 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
if kill { if kill {
self.kill_connection(token, io, true); self.kill_connection(token, io, true);
} }
let handlers = self.handlers.read().unwrap();
for p in ready_data { for p in ready_data {
let h = self.handlers.read().unwrap().get(p).unwrap().clone(); let h = handlers.get(p).unwrap().clone();
self.stats.inc_sessions(); self.stats.inc_sessions();
h.connected(&NetworkContext::new(io, p, session.clone(), self.sessions.clone()), &token); let reserved = self.reserved_nodes.read().unwrap();
h.connected(&NetworkContext::new(io, p, session.clone(), self.sessions.clone(), &reserved), &token);
} }
for (p, packet_id, data) in packet_data { for (p, packet_id, data) in packet_data {
let h = self.handlers.read().unwrap().get(p).unwrap().clone(); let h = handlers.get(p).unwrap().clone();
h.read(&NetworkContext::new(io, p, session.clone(), self.sessions.clone()), &token, packet_id, &data[1..]); let reserved = self.reserved_nodes.read().unwrap();
h.read(&NetworkContext::new(io, p, session.clone(), self.sessions.clone(), &reserved), &token, packet_id, &data[1..]);
} }
} }
@ -786,7 +857,8 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
} }
for p in to_disconnect { for p in to_disconnect {
let h = self.handlers.read().unwrap().get(p).unwrap().clone(); let h = self.handlers.read().unwrap().get(p).unwrap().clone();
h.disconnected(&NetworkContext::new(io, p, expired_session.clone(), self.sessions.clone()), &token); let reserved = self.reserved_nodes.read().unwrap();
h.disconnected(&NetworkContext::new(io, p, expired_session.clone(), self.sessions.clone(), &reserved), &token);
} }
if deregister { if deregister {
io.deregister_stream(token).unwrap_or_else(|e| debug!("Error deregistering stream: {:?}", e)); io.deregister_stream(token).unwrap_or_else(|e| debug!("Error deregistering stream: {:?}", e));
@ -889,7 +961,10 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
_ => match self.timers.read().unwrap().get(&token).cloned() { _ => match self.timers.read().unwrap().get(&token).cloned() {
Some(timer) => match self.handlers.read().unwrap().get(timer.protocol).cloned() { Some(timer) => match self.handlers.read().unwrap().get(timer.protocol).cloned() {
None => { warn!(target: "network", "No handler found for protocol: {:?}", timer.protocol) }, None => { warn!(target: "network", "No handler found for protocol: {:?}", timer.protocol) },
Some(h) => { h.timeout(&NetworkContext::new(io, timer.protocol, None, self.sessions.clone()), timer.token); } Some(h) => {
let reserved = self.reserved_nodes.read().unwrap();
h.timeout(&NetworkContext::new(io, timer.protocol, None, self.sessions.clone(), &reserved), timer.token);
}
}, },
None => { warn!("Unknown timer token: {}", token); } // timer is not registerd through us None => { warn!("Unknown timer token: {}", token); } // timer is not registerd through us
} }
@ -907,7 +982,8 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
ref versions ref versions
} => { } => {
let h = handler.clone(); let h = handler.clone();
h.initialize(&NetworkContext::new(io, protocol, None, self.sessions.clone())); let reserved = self.reserved_nodes.read().unwrap();
h.initialize(&NetworkContext::new(io, protocol, None, self.sessions.clone(), &reserved));
self.handlers.write().unwrap().insert(protocol, h); self.handlers.write().unwrap().insert(protocol, h);
let mut info = self.info.write().unwrap(); let mut info = self.info.write().unwrap();
for v in versions { for v in versions {
@ -949,8 +1025,9 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
self.kill_connection(*peer, io, false); self.kill_connection(*peer, io, false);
}, },
NetworkIoMessage::User(ref message) => { NetworkIoMessage::User(ref message) => {
let reserved = self.reserved_nodes.read().unwrap();
for (p, h) in self.handlers.read().unwrap().iter() { for (p, h) in self.handlers.read().unwrap().iter() {
h.message(&NetworkContext::new(io, p, None, self.sessions.clone()), &message); h.message(&NetworkContext::new(io, p, None, self.sessions.clone(), &reserved), &message);
} }
} }
} }

View File

@ -112,3 +112,22 @@ pub trait NetworkProtocolHandler<Message>: Sync + Send where Message: Send + Syn
fn message(&self, _io: &NetworkContext<Message>, _message: &Message) {} fn message(&self, _io: &NetworkContext<Message>, _message: &Message) {}
} }
/// Non-reserved peer modes.
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum NonReservedPeerMode {
/// Accept them. This is the default.
Accept,
/// Deny them.
Deny,
}
impl NonReservedPeerMode {
/// Attempt to parse the peer mode from a string.
pub fn parse(s: &str) -> Option<Self> {
match s {
"accept" => Some(NonReservedPeerMode::Accept),
"deny" => Some(NonReservedPeerMode::Deny),
_ => None,
}
}
}

View File

@ -18,9 +18,9 @@ use std::sync::*;
use error::*; use error::*;
use panics::*; use panics::*;
use network::{NetworkProtocolHandler, NetworkConfiguration}; use network::{NetworkProtocolHandler, NetworkConfiguration};
use network::error::{NetworkError}; use network::error::NetworkError;
use network::host::{Host, NetworkIoMessage, ProtocolId}; use network::host::{Host, NetworkIoMessage, ProtocolId};
use network::stats::{NetworkStats}; use network::stats::NetworkStats;
use io::*; use io::*;
/// IO Service with networking /// IO Service with networking
@ -111,6 +111,35 @@ impl<Message> NetworkService<Message> where Message: Send + Sync + Clone + 'stat
*host = None; *host = None;
Ok(()) Ok(())
} }
/// Try to add a reserved peer.
pub fn add_reserved_peer(&self, peer: &str) -> Result<(), UtilError> {
let host = self.host.read().unwrap();
if let Some(ref host) = *host {
host.add_reserved_node(peer)
} else {
Ok(())
}
}
/// Try to remove a reserved peer.
pub fn remove_reserved_peer(&self, peer: &str) -> Result<(), UtilError> {
let host = self.host.read().unwrap();
if let Some(ref host) = *host {
host.remove_reserved_node(peer)
} else {
Ok(())
}
}
/// Set the non-reserved peer mode.
pub fn set_non_reserved_mode(&self, mode: ::network::NonReservedPeerMode) {
let host = self.host.read().unwrap();
if let Some(ref host) = *host {
let io_ctxt = IoContext::new(self.io_service.channel(), 0);
host.set_non_reserved_mode(mode, &io_ctxt);
}
}
} }
impl<Message> MayPanic for NetworkService<Message> where Message: Send + Sync + Clone + 'static { impl<Message> MayPanic for NetworkService<Message> where Message: Send + Sync + Clone + 'static {

51
util/src/timer.rs Normal file
View File

@ -0,0 +1,51 @@
// Copyright 2015, 2016 Ethcore (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! Performance timer with logging
use time::precise_time_ns;
/// Performance timer with logging. Starts measuring time in the constructor, prints
/// elapsed time in the destructor or when `stop` is called.
pub struct PerfTimer {
name: &'static str,
start: u64,
stopped: bool,
}
impl PerfTimer {
/// Create an instance with given name.
pub fn new(name: &'static str) -> PerfTimer {
PerfTimer {
name: name,
start: precise_time_ns(),
stopped: false,
}
}
/// Stop the timer and print elapsed time on trace level with `perf` target.
pub fn stop(&mut self) {
if !self.stopped {
trace!(target: "perf", "{}: {:.2}ms", self.name, (precise_time_ns() - self.start) as f32 / 1000_000.0);
self.stopped = true;
}
}
}
impl Drop for PerfTimer {
fn drop(&mut self) {
self.stop()
}
}