Merge branch 'master' of github.com:ethcore/parity

This commit is contained in:
Gav Wood 2016-03-13 13:13:23 +01:00
commit f0862acffe
33 changed files with 931 additions and 173 deletions

24
Cargo.lock generated
View File

@ -2,7 +2,7 @@
name = "parity"
version = "0.9.99"
dependencies = [
"clippy 0.0.49 (registry+https://github.com/rust-lang/crates.io-index)",
"clippy 0.0.50 (registry+https://github.com/rust-lang/crates.io-index)",
"ctrlc 1.1.1 (git+https://github.com/tomusdrw/rust-ctrlc.git)",
"daemonize 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
"docopt 0.6.78 (registry+https://github.com/rust-lang/crates.io-index)",
@ -94,7 +94,7 @@ dependencies = [
[[package]]
name = "clippy"
version = "0.0.49"
version = "0.0.50"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"regex-syntax 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)",
@ -207,7 +207,7 @@ dependencies = [
name = "ethcore"
version = "0.9.99"
dependencies = [
"clippy 0.0.49 (registry+https://github.com/rust-lang/crates.io-index)",
"clippy 0.0.50 (registry+https://github.com/rust-lang/crates.io-index)",
"crossbeam 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)",
"env_logger 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
"ethash 0.9.99",
@ -233,7 +233,7 @@ dependencies = [
name = "ethcore-rpc"
version = "0.9.99"
dependencies = [
"clippy 0.0.49 (registry+https://github.com/rust-lang/crates.io-index)",
"clippy 0.0.50 (registry+https://github.com/rust-lang/crates.io-index)",
"ethash 0.9.99",
"ethcore 0.9.99",
"ethcore-util 0.9.99",
@ -256,7 +256,7 @@ dependencies = [
"arrayvec 0.3.16 (registry+https://github.com/rust-lang/crates.io-index)",
"bigint 0.1.0",
"chrono 0.2.20 (registry+https://github.com/rust-lang/crates.io-index)",
"clippy 0.0.49 (registry+https://github.com/rust-lang/crates.io-index)",
"clippy 0.0.50 (registry+https://github.com/rust-lang/crates.io-index)",
"crossbeam 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)",
"elastic-array 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
"env_logger 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
@ -271,7 +271,7 @@ dependencies = [
"log 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)",
"mio 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)",
"rocksdb 0.4.2 (git+https://github.com/arkpar/rust-rocksdb.git)",
"rocksdb 0.4.3 (git+https://github.com/arkpar/rust-rocksdb.git)",
"rust-crypto 0.2.34 (registry+https://github.com/rust-lang/crates.io-index)",
"rustc-serialize 0.3.18 (registry+https://github.com/rust-lang/crates.io-index)",
"rustc_version 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)",
@ -288,7 +288,7 @@ dependencies = [
name = "ethsync"
version = "0.9.99"
dependencies = [
"clippy 0.0.49 (registry+https://github.com/rust-lang/crates.io-index)",
"clippy 0.0.50 (registry+https://github.com/rust-lang/crates.io-index)",
"env_logger 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
"ethcore 0.9.99",
"ethcore-util 0.9.99",
@ -466,8 +466,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "librocksdb-sys"
version = "0.2.2"
source = "git+https://github.com/arkpar/rust-rocksdb.git#a4f89fea20ee3ae92b692df65d56426a5c0b6fd5"
version = "0.2.3"
source = "git+https://github.com/arkpar/rust-rocksdb.git#ebb602fc74b4067f9f51310bdc0401b8e59b7156"
dependencies = [
"libc 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)",
]
@ -688,11 +688,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "rocksdb"
version = "0.4.2"
source = "git+https://github.com/arkpar/rust-rocksdb.git#a4f89fea20ee3ae92b692df65d56426a5c0b6fd5"
version = "0.4.3"
source = "git+https://github.com/arkpar/rust-rocksdb.git#ebb602fc74b4067f9f51310bdc0401b8e59b7156"
dependencies = [
"libc 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)",
"librocksdb-sys 0.2.2 (git+https://github.com/arkpar/rust-rocksdb.git)",
"librocksdb-sys 0.2.3 (git+https://github.com/arkpar/rust-rocksdb.git)",
]
[[package]]

View File

@ -19,7 +19,7 @@ ctrlc = { git = "https://github.com/tomusdrw/rust-ctrlc.git" }
fdlimit = { path = "util/fdlimit" }
daemonize = "0.2"
number_prefix = "0.2"
clippy = { version = "0.0.49", optional = true }
clippy = { version = "0.0.50", optional = true }
ethcore = { path = "ethcore" }
ethcore-util = { path = "util" }
ethsync = { path = "sync" }

View File

@ -17,7 +17,7 @@ ethcore-util = { path = "../util" }
evmjit = { path = "../evmjit", optional = true }
ethash = { path = "../ethash" }
num_cpus = "0.2"
clippy = { version = "0.0.49", optional = true }
clippy = { version = "0.0.50", optional = true }
crossbeam = "0.1.5"
lazy_static = "0.1"
ethcore-devtools = { path = "../devtools" }

View File

@ -523,7 +523,7 @@ mod tests {
let engine = spec.to_engine().unwrap();
let mut config = BlockQueueConfig::default();
config.max_mem_use = super::MIN_MEM_LIMIT; // empty queue uses about 15000
let mut queue = BlockQueue::new(config, Arc::new(engine), IoChannel::disconnected());
let queue = BlockQueue::new(config, Arc::new(engine), IoChannel::disconnected());
assert!(!queue.queue_info().is_full());
let mut blocks = get_good_dummy_block_seq(50);
for b in blocks.drain(..) {

View File

@ -28,9 +28,15 @@ pub struct MemoryCache {
blooms: HashMap<BloomIndex, H2048>,
}
impl Default for MemoryCache {
fn default() -> Self {
MemoryCache::new()
}
}
impl MemoryCache {
/// Default constructor for MemoryCache
pub fn new() -> MemoryCache {
pub fn new() -> Self {
MemoryCache { blooms: HashMap::new() }
}

View File

@ -449,6 +449,10 @@ impl<V> BlockChainClient for Client<V> where V: Verifier {
self.state().code(address)
}
fn balance(&self, address: &Address) -> U256 {
self.state().balance(address)
}
fn transaction(&self, id: TransactionId) -> Option<LocalizedTransaction> {
match id {
TransactionId::Hash(ref hash) => self.chain.transaction_address(hash),

View File

@ -66,6 +66,9 @@ pub trait BlockChainClient : Sync + Send {
/// Get address code.
fn code(&self, address: &Address) -> Option<Bytes>;
/// Get address balance.
fn balance(&self, address: &Address) -> U256;
/// Get transaction with given hash.
fn transaction(&self, id: TransactionId) -> Option<LocalizedTransaction>;

View File

@ -40,6 +40,8 @@ pub struct TestBlockChainClient {
pub last_hash: RwLock<H256>,
/// Difficulty.
pub difficulty: RwLock<U256>,
/// Balances.
pub balances: RwLock<HashMap<Address, U256>>,
}
#[derive(Clone)]
@ -55,9 +57,15 @@ pub enum EachBlockWith {
UncleAndTransaction
}
impl Default for TestBlockChainClient {
fn default() -> Self {
TestBlockChainClient::new()
}
}
impl TestBlockChainClient {
/// Creates new test client.
pub fn new() -> TestBlockChainClient {
pub fn new() -> Self {
let mut client = TestBlockChainClient {
blocks: RwLock::new(HashMap::new()),
@ -65,12 +73,17 @@ impl TestBlockChainClient {
genesis_hash: H256::new(),
last_hash: RwLock::new(H256::new()),
difficulty: RwLock::new(From::from(0)),
balances: RwLock::new(HashMap::new()),
};
client.add_blocks(1, EachBlockWith::Nothing); // add genesis block
client.genesis_hash = client.last_hash.read().unwrap().clone();
client
}
pub fn set_balance(&mut self, address: Address, balance: U256) {
self.balances.write().unwrap().insert(address, balance);
}
/// Add blocks to test client.
pub fn add_blocks(&mut self, count: usize, with: EachBlockWith) {
let len = self.numbers.read().unwrap().len();
@ -165,6 +178,10 @@ impl BlockChainClient for TestBlockChainClient {
unimplemented!();
}
fn balance(&self, address: &Address) -> U256 {
self.balances.read().unwrap().get(address).cloned().unwrap_or_else(U256::zero)
}
fn transaction(&self, _id: TransactionId) -> Option<LocalizedTransaction> {
unimplemented!();
}

View File

@ -301,8 +301,14 @@ mod tests {
env_info: EnvInfo
}
impl Default for TestSetup {
fn default() -> Self {
TestSetup::new()
}
}
impl TestSetup {
fn new() -> TestSetup {
fn new() -> Self {
TestSetup {
state: get_temp_state(),
engine: get_test_spec().to_engine().unwrap(),

View File

@ -20,6 +20,7 @@ use error::Error;
use header::Header;
use super::Verifier;
#[allow(dead_code)]
pub struct NoopVerifier;
impl Verifier for NoopVerifier {

View File

@ -255,8 +255,14 @@ mod tests {
numbers: HashMap<BlockNumber, H256>,
}
impl Default for TestBlockChain {
fn default() -> Self {
TestBlockChain::new()
}
}
impl TestBlockChain {
pub fn new() -> TestBlockChain {
pub fn new() -> Self {
TestBlockChain {
blocks: HashMap::new(),
numbers: HashMap::new(),

View File

@ -314,7 +314,7 @@ impl Configuration {
fn init_nodes(&self, spec: &Spec) -> Vec<String> {
let mut r = if self.args.flag_no_bootstrap { Vec::new() } else { spec.nodes().clone() };
if let Some(ref x) = self.args.flag_bootnodes {
r.extend(x.split(",").map(|s| Self::normalize_enode(s).unwrap_or_else(|| die!("{}: Invalid node address format given for a boot node.", s))));
r.extend(x.split(',').map(|s| Self::normalize_enode(s).unwrap_or_else(|| die!("{}: Invalid node address format given for a boot node.", s))));
}
r
}
@ -327,7 +327,7 @@ impl Configuration {
let host = IpAddr::from_str(host).unwrap_or_else(|_| die!("Invalid host given with `--nat extip:{}`", host));
Some(SocketAddr::new(host, self.args.flag_port))
} else {
listen_address.clone()
listen_address
};
(listen_address, public_address)
}
@ -388,12 +388,13 @@ impl Configuration {
}
if self.args.cmd_list {
println!("Known addresses:");
for &(addr, _) in secret_store.accounts().unwrap().iter() {
for &(addr, _) in &secret_store.accounts().unwrap() {
println!("{:?}", addr);
}
}
}
#[cfg_attr(feature="dev", allow(useless_format))]
fn execute_client(&self) {
// Setup panic handler
let panic_handler = PanicHandler::new_in_arc();
@ -406,7 +407,11 @@ impl Configuration {
let spec = self.spec();
let net_settings = self.net_settings(&spec);
let mut sync_config = SyncConfig::default();
sync_config.network_id = self.args.flag_networkid.as_ref().map(|id| U256::from_str(id).unwrap_or_else(|_| die!("{}: Invalid index given with --networkid", id))).unwrap_or(spec.network_id());
sync_config.network_id = self.args.flag_networkid.as_ref().map_or(spec.network_id(), |id| {
U256::from_str(id).unwrap_or_else(|_| {
die!("{}: Invalid index given with --networkid", id)
})
});
// Build client
let mut client_config = ClientConfig::default();
@ -421,8 +426,7 @@ impl Configuration {
}
}
client_config.pruning = match self.args.flag_pruning.as_str() {
"" => journaldb::Algorithm::Archive,
"archive" => journaldb::Algorithm::Archive,
"" | "archive" => journaldb::Algorithm::Archive,
"pruned" => journaldb::Algorithm::EarlyMerge,
"fast" => journaldb::Algorithm::OverlayRecent,
// "slow" => journaldb::Algorithm::RefCounted, // TODO: @gavofyork uncomment this once ref-count algo is merged.
@ -452,7 +456,7 @@ impl Configuration {
let cors = self.args.flag_rpccorsdomain.as_ref().unwrap_or(&self.args.flag_jsonrpc_cors);
// TODO: use this as the API list.
let apis = self.args.flag_rpcapi.as_ref().unwrap_or(&self.args.flag_jsonrpc_apis);
let server_handler = setup_rpc_server(service.client(), sync.clone(), account_service.clone(), &url, cors, apis.split(",").collect());
let server_handler = setup_rpc_server(service.client(), sync.clone(), account_service.clone(), &url, cors, apis.split(',').collect());
if let Some(handler) = server_handler {
panic_handler.forward_from(handler.deref());
}

View File

@ -18,7 +18,7 @@ ethcore-util = { path = "../util" }
ethcore = { path = "../ethcore" }
ethash = { path = "../ethash" }
ethsync = { path = "../sync" }
clippy = { version = "0.0.49", optional = true }
clippy = { version = "0.0.50", optional = true }
rustc-serialize = "0.3"
transient-hashmap = "0.1"
serde_macros = { version = "0.7.0", optional = true }

View File

@ -155,6 +155,14 @@ impl<C, S, A> Eth for EthClient<C, S, A> where C: BlockChainClient + 'static, S:
}
}
fn accounts(&self, _: Params) -> Result<Value, Error> {
let store = take_weak!(self.accounts);
match store.accounts() {
Ok(account_list) => to_value(&account_list),
Err(_) => Err(Error::internal_error())
}
}
fn block_number(&self, params: Params) -> Result<Value, Error> {
match params {
Params::None => to_value(&U256::from(take_weak!(self.client).chain_info().best_block_number)),
@ -162,6 +170,11 @@ impl<C, S, A> Eth for EthClient<C, S, A> where C: BlockChainClient + 'static, S:
}
}
fn balance(&self, params: Params) -> Result<Value, Error> {
from_params::<(Address, BlockNumber)>(params)
.and_then(|(address, _block_number)| to_value(&take_weak!(self.client).balance(&address)))
}
fn block_transaction_count_by_hash(&self, params: Params) -> Result<Value, Error> {
from_params::<(H256,)>(params)
.and_then(|(hash,)| match take_weak!(self.client).block(BlockId::Hash(hash)) {

View File

@ -39,12 +39,7 @@ impl<A> Personal for PersonalClient<A> where A: AccountProvider + 'static {
fn accounts(&self, _: Params) -> Result<Value, Error> {
let store = take_weak!(self.accounts);
match store.accounts() {
Ok(account_list) => {
Ok(Value::Array(account_list.iter()
.map(|&account| Value::String(format!("{:?}", account)))
.collect::<Vec<Value>>())
)
}
Ok(account_list) => to_value(&account_list),
Err(_) => Err(Error::internal_error())
}
}

90
rpc/src/v1/tests/eth.rs Normal file
View File

@ -0,0 +1,90 @@
// Copyright 2015, 2016 Ethcore (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::collections::HashMap;
use std::sync::Arc;
use jsonrpc_core::IoHandler;
use util::hash::Address;
use util::numbers::U256;
use ethcore::client::{TestBlockChainClient, EachBlockWith};
use v1::{Eth, EthClient};
use v1::tests::helpers::{TestAccount, TestAccountProvider, TestSyncProvider, Config};
fn blockchain_client() -> Arc<TestBlockChainClient> {
let mut client = TestBlockChainClient::new();
client.add_blocks(10, EachBlockWith::Nothing);
client.set_balance(Address::from(1), U256::from(5));
Arc::new(client)
}
fn accounts_provider() -> Arc<TestAccountProvider> {
let mut accounts = HashMap::new();
accounts.insert(Address::from(1), TestAccount::new("test"));
let ap = TestAccountProvider::new(accounts);
Arc::new(ap)
}
fn sync_provider() -> Arc<TestSyncProvider> {
Arc::new(TestSyncProvider::new(Config {
protocol_version: 65,
num_peers: 120,
}))
}
struct EthTester {
client: Arc<TestBlockChainClient>,
sync: Arc<TestSyncProvider>,
accounts_provider: Arc<TestAccountProvider>,
pub io: IoHandler,
}
impl Default for EthTester {
fn default() -> Self {
let client = blockchain_client();
let sync = sync_provider();
let ap = accounts_provider();
let eth = EthClient::new(&client, &sync, &ap).to_delegate();
let io = IoHandler::new();
io.add_delegate(eth);
EthTester {
client: client,
sync: sync,
accounts_provider: ap,
io: io
}
}
}
#[test]
fn rpc_eth_accounts() {
let request = r#"{"jsonrpc": "2.0", "method": "eth_accounts", "params": [], "id": 1}"#;
let response = r#"{"jsonrpc":"2.0","result":["0x0000000000000000000000000000000000000001"],"id":1}"#;
assert_eq!(EthTester::default().io.handle_request(request), Some(response.to_owned()));
}
#[test]
fn rpc_eth_balance() {
let request = r#"{
"jsonrpc": "2.0",
"method": "eth_getBalance",
"params": ["0x0000000000000000000000000000000000000001", "latest"],
"id": 1
}"#;
let response = r#"{"jsonrpc":"2.0","result":"0x05","id":1}"#;
assert_eq!(EthTester::default().io.handle_request(request), Some(response.to_owned()));
}

View File

@ -0,0 +1,84 @@
// Copyright 2015, 2016 Ethcore (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::sync::RwLock;
use std::collections::HashMap;
use std::io;
use util::hash::{Address, H256};
use util::crypto::{Secret, Signature};
use util::keys::store::{AccountProvider, SigningError, EncryptedHashMapError};
/// Account mock.
#[derive(Clone)]
pub struct TestAccount {
/// True if account is unlocked.
pub unlocked: bool,
/// Account's password.
pub password: String,
}
impl TestAccount {
pub fn new(password: &str) -> Self {
TestAccount {
unlocked: false,
password: password.to_owned(),
}
}
}
/// Test account provider.
pub struct TestAccountProvider {
accounts: RwLock<HashMap<Address, TestAccount>>,
}
impl TestAccountProvider {
/// Basic constructor.
pub fn new(accounts: HashMap<Address, TestAccount>) -> Self {
TestAccountProvider {
accounts: RwLock::new(accounts),
}
}
}
impl AccountProvider for TestAccountProvider {
fn accounts(&self) -> Result<Vec<Address>, io::Error> {
Ok(self.accounts.read().unwrap().keys().cloned().collect())
}
fn unlock_account(&self, account: &Address, pass: &str) -> Result<(), EncryptedHashMapError> {
match self.accounts.write().unwrap().get_mut(account) {
Some(ref mut acc) if acc.password == pass => {
acc.unlocked = true;
Ok(())
},
Some(_) => Err(EncryptedHashMapError::InvalidPassword),
None => Err(EncryptedHashMapError::UnknownIdentifier),
}
}
fn new_account(&self, _pass: &str) -> Result<Address, io::Error> {
unimplemented!()
}
fn account_secret(&self, _account: &Address) -> Result<Secret, SigningError> {
unimplemented!()
}
fn sign(&self, _account: &Address, _message: &H256) -> Result<Signature, SigningError> {
unimplemented!()
}
}

View File

@ -14,6 +14,8 @@
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
mod account_provider;
mod sync_provider;
pub use self::account_provider::{TestAccount, TestAccountProvider};
pub use self::sync_provider::{Config, TestSyncProvider};

View File

@ -16,6 +16,7 @@
//!TODO: load custom blockchain state and test
mod eth;
mod net;
mod web3;
mod helpers;

View File

@ -130,7 +130,7 @@ pub trait Eth: Sized + Send + Sync + 'static {
delegate.add_method("eth_gasPrice", Eth::gas_price);
delegate.add_method("eth_accounts", Eth::accounts);
delegate.add_method("eth_blockNumber", Eth::block_number);
delegate.add_method("eth_balance", Eth::balance);
delegate.add_method("eth_getBalance", Eth::balance);
delegate.add_method("eth_getStorageAt", Eth::storage_at);
delegate.add_method("eth_getTransactionCount", Eth::transaction_count);
delegate.add_method("eth_getBlockTransactionCountByHash", Eth::block_transaction_count_by_hash);

View File

@ -17,7 +17,6 @@
use util::numbers::*;
use ethcore::transaction::{LocalizedTransaction, Action};
use v1::types::{Bytes, OptionalValue};
use serde::Error;
#[derive(Debug, Default, Serialize)]
pub struct Transaction {

View File

@ -10,7 +10,7 @@ authors = ["Ethcore <admin@ethcore.io"]
[dependencies]
ethcore-util = { path = "../util" }
ethcore = { path = "../ethcore" }
clippy = { version = "0.0.49", optional = true }
clippy = { version = "0.0.50", optional = true }
log = "0.3"
env_logger = "0.3"
time = "0.1.34"

View File

@ -298,8 +298,6 @@ impl ChainSync {
/// Restart sync
pub fn restart(&mut self, io: &mut SyncIo) {
self.reset();
self.last_imported_block = None;
self.last_imported_hash = None;
self.starting_block = 0;
self.highest_block = None;
self.have_common_block = false;
@ -366,7 +364,7 @@ impl ChainSync {
for i in 0..item_count {
let info: BlockHeader = try!(r.val_at(i));
let number = BlockNumber::from(info.number);
if number <= self.current_base_block() || self.headers.have_item(&number) {
if (number <= self.current_base_block() && self.have_common_block) || self.headers.have_item(&number) {
trace!(target: "sync", "Skipping existing block header");
continue;
}
@ -376,11 +374,17 @@ impl ChainSync {
}
let hash = info.hash();
match io.chain().block_status(BlockId::Hash(hash.clone())) {
BlockStatus::InChain => {
self.have_common_block = true;
self.last_imported_block = Some(number);
self.last_imported_hash = Some(hash.clone());
trace!(target: "sync", "Found common header {} ({})", number, hash);
BlockStatus::InChain | BlockStatus::Queued => {
if !self.have_common_block || self.current_base_block() < number {
self.last_imported_block = Some(number);
self.last_imported_hash = Some(hash.clone());
}
if !self.have_common_block {
self.have_common_block = true;
trace!(target: "sync", "Found common header {} ({})", number, hash);
} else {
trace!(target: "sync", "Header already in chain {} ({})", number, hash);
}
},
_ => {
if self.have_common_block {
@ -588,7 +592,7 @@ impl ChainSync {
pub fn on_peer_connected(&mut self, io: &mut SyncIo, peer: PeerId) {
trace!(target: "sync", "== Connected {}", peer);
if let Err(e) = self.send_status(io) {
warn!(target:"sync", "Error sending status request: {:?}", e);
debug!(target:"sync", "Error sending status request: {:?}", e);
io.disable_peer(peer);
}
}
@ -656,10 +660,7 @@ impl ChainSync {
let mut needed_numbers: Vec<BlockNumber> = Vec::new();
if self.have_common_block && !self.headers.is_empty() && self.headers.range_iter().next().unwrap().0 == self.current_base_block() + 1 {
for (start, ref items) in self.headers.range_iter() {
if needed_bodies.len() >= MAX_BODIES_TO_REQUEST {
break;
}
if let Some((start, ref items)) = self.headers.range_iter().next() {
let mut index: BlockNumber = 0;
while index != items.len() as BlockNumber && needed_bodies.len() < MAX_BODIES_TO_REQUEST {
let block = start + index;
@ -703,7 +704,10 @@ impl ChainSync {
if !self.have_common_block {
// download backwards until common block is found 1 header at a time
let chain_info = io.chain().chain_info();
start = chain_info.best_block_number;
start = match self.last_imported_block {
Some(n) => n,
None => chain_info.best_block_number,
};
if !self.headers.is_empty() {
start = min(start, self.headers.range_iter().next().unwrap().0 - 1);
}
@ -844,18 +848,12 @@ impl ChainSync {
/// Remove downloaded bocks/headers starting from specified number.
/// Used to recover from an error and re-download parts of the chain detected as bad.
fn remove_downloaded_blocks(&mut self, start: BlockNumber) {
for n in self.headers.get_tail(&start) {
if let Some(ref header_data) = self.headers.find_item(&n) {
let header_to_delete = HeaderView::new(&header_data.data);
let header_id = HeaderId {
transactions_root: header_to_delete.transactions_root(),
uncles: header_to_delete.uncles_hash()
};
self.header_ids.remove(&header_id);
}
self.downloading_bodies.remove(&n);
self.downloading_headers.remove(&n);
}
let ids = self.header_ids.drain().filter(|&(_, v)| v < start).collect();
self.header_ids = ids;
let hdrs = self.downloading_headers.drain().filter(|v| *v < start).collect();
self.downloading_headers = hdrs;
let bodies = self.downloading_bodies.drain().filter(|v| *v < start).collect();
self.downloading_bodies = bodies;
self.headers.remove_from(&start);
self.bodies.remove_from(&start);
}
@ -1095,7 +1093,7 @@ impl ChainSync {
let rlp = UntrustedRlp::new(data);
if packet_id != STATUS_PACKET && !self.peers.contains_key(&peer) {
warn!(target:"sync", "Unexpected packet from unregistered peer: {}:{}", peer, io.peer_info(peer));
debug!(target:"sync", "Unexpected packet from unregistered peer: {}:{}", peer, io.peer_info(peer));
return;
}
let result = match packet_id {

View File

@ -300,12 +300,17 @@ fn test_range() {
let mut r = ranges.clone();
r.remove_from(&20);
assert_eq!(r.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..]), (16, &['p', 'q', 'r'][..])]), Ordering::Equal);
r.remove_from(&17);
assert_eq!(r.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..]), (16, &['p'][..])]), Ordering::Equal);
r.remove_from(&15);
r.remove_from(&18);
assert!(!r.have_item(&18));
assert_eq!(r.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..]), (16, &['p', 'q'][..])]), Ordering::Equal);
r.remove_from(&16);
assert!(!r.have_item(&16));
assert_eq!(r.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..])]), Ordering::Equal);
r.remove_from(&3);
assert_eq!(r.range_iter().cmp(vec![(2, &['b'][..])]), Ordering::Equal);
r.remove_from(&1);
assert_eq!(r.range_iter().next(), None);
let mut r = ranges.clone();
r.remove_from(&2);
assert_eq!(r.range_iter().next(), None);
}

View File

@ -27,7 +27,7 @@ crossbeam = "0.2"
slab = "0.1"
sha3 = { path = "sha3" }
serde = "0.7.0"
clippy = { version = "0.0.49", optional = true }
clippy = { version = "0.0.50", optional = true }
json-tests = { path = "json-tests" }
rustc_version = "0.1.0"
igd = "0.4.2"

View File

@ -159,6 +159,10 @@ impl JournalDB for ArchiveDB {
try!(self.backing.write(batch));
Ok((inserts + deletes) as u32)
}
fn state(&self, id: &H256) -> Option<Bytes> {
self.backing.get_by_prefix(&id.bytes()[0..12]).and_then(|b| Some(b.to_vec()))
}
}
#[cfg(test)]
@ -301,7 +305,6 @@ mod tests {
assert!(jdb.exists(&foo));
}
#[test]
fn reopen() {
let mut dir = ::std::env::temp_dir();
@ -360,6 +363,7 @@ mod tests {
jdb.commit(5, &b"5".sha3(), Some((4, b"4".sha3()))).unwrap();
}
}
#[test]
fn reopen_fork() {
let mut dir = ::std::env::temp_dir();
@ -385,4 +389,22 @@ mod tests {
assert!(jdb.exists(&foo));
}
}
#[test]
fn returns_state() {
let temp = ::devtools::RandomTempPath::new();
let key = {
let mut jdb = ArchiveDB::new(temp.as_str());
let key = jdb.insert(b"foo");
jdb.commit(0, &b"0".sha3(), None).unwrap();
key
};
{
let jdb = ArchiveDB::new(temp.as_str());
let state = jdb.state(&key);
assert!(state.is_some());
}
}
}

View File

@ -25,6 +25,34 @@ use kvdb::{Database, DBTransaction, DatabaseConfig};
#[cfg(test)]
use std::env;
#[derive(Clone, PartialEq, Eq)]
struct RefInfo {
queue_refs: usize,
in_archive: bool,
}
impl HeapSizeOf for RefInfo {
fn heap_size_of_children(&self) -> usize { 0 }
}
impl fmt::Display for RefInfo {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}+{}", self.queue_refs, if self.in_archive {1} else {0})
}
}
impl fmt::Debug for RefInfo {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}+{}", self.queue_refs, if self.in_archive {1} else {0})
}
}
#[derive(Clone, PartialEq, Eq)]
enum RemoveFrom {
Queue,
Archive,
}
/// Implementation of the HashDB trait for a disk-backed database with a memory overlay
/// and latent-removal semantics.
///
@ -35,7 +63,8 @@ use std::env;
pub struct EarlyMergeDB {
overlay: MemoryDB,
backing: Arc<Database>,
counters: Option<Arc<RwLock<HashMap<H256, i32>>>>,
refs: Option<Arc<RwLock<HashMap<H256, RefInfo>>>>,
latest_era: Option<u64>,
}
// all keys must be at least 12 bytes
@ -62,11 +91,13 @@ impl EarlyMergeDB {
backing.put(&VERSION_KEY, &encode(&DB_VERSION)).expect("Error writing version to database");
}
let counters = Some(Arc::new(RwLock::new(EarlyMergeDB::read_counters(&backing))));
let (latest_era, refs) = EarlyMergeDB::read_refs(&backing);
let refs = Some(Arc::new(RwLock::new(refs)));
EarlyMergeDB {
overlay: MemoryDB::new(),
backing: Arc::new(backing),
counters: counters,
refs: refs,
latest_era: latest_era,
}
}
@ -91,11 +122,14 @@ impl EarlyMergeDB {
backing.get(&Self::morph_key(key, 0)).expect("Low-level database error. Some issue with your hard disk?").is_some()
}
fn insert_keys(inserts: &[(H256, Bytes)], backing: &Database, counters: &mut HashMap<H256, i32>, batch: &DBTransaction) {
fn insert_keys(inserts: &[(H256, Bytes)], backing: &Database, refs: &mut HashMap<H256, RefInfo>, batch: &DBTransaction, trace: bool) {
for &(ref h, ref d) in inserts {
if let Some(c) = counters.get_mut(h) {
if let Some(c) = refs.get_mut(h) {
// already counting. increment.
*c += 1;
c.queue_refs += 1;
if trace {
trace!(target: "jdb.fine", " insert({}): In queue: Incrementing refs to {}", h, c.queue_refs);
}
continue;
}
@ -103,7 +137,10 @@ impl EarlyMergeDB {
if backing.get(&h.bytes()).expect("Low-level database error. Some issue with your hard disk?").is_some() {
// already in the backing DB. start counting, and remember it was already in.
Self::set_already_in(batch, &h);
counters.insert(h.clone(), 1);
refs.insert(h.clone(), RefInfo{queue_refs: 1, in_archive: true});
if trace {
trace!(target: "jdb.fine", " insert({}): New to queue, in DB: Recording and inserting into queue", h);
}
continue;
}
@ -111,62 +148,108 @@ impl EarlyMergeDB {
//Self::reset_already_in(&h);
assert!(!Self::is_already_in(backing, &h));
batch.put(&h.bytes(), d).expect("Low-level database error. Some issue with your hard disk?");
refs.insert(h.clone(), RefInfo{queue_refs: 1, in_archive: false});
if trace {
trace!(target: "jdb.fine", " insert({}): New to queue, not in DB: Inserting into queue and DB", h);
}
}
}
fn replay_keys(inserts: &[H256], backing: &Database, counters: &mut HashMap<H256, i32>) {
trace!("replay_keys: inserts={:?}, counters={:?}", inserts, counters);
fn replay_keys(inserts: &[H256], backing: &Database, refs: &mut HashMap<H256, RefInfo>) {
trace!(target: "jdb.fine", "replay_keys: inserts={:?}, refs={:?}", inserts, refs);
for h in inserts {
if let Some(c) = counters.get_mut(h) {
if let Some(c) = refs.get_mut(h) {
// already counting. increment.
*c += 1;
c.queue_refs += 1;
continue;
}
// this is the first entry for this node in the journal.
// it is initialised to 1 if it was already in.
if Self::is_already_in(backing, h) {
trace!("replace_keys: Key {} was already in!", h);
counters.insert(h.clone(), 1);
}
refs.insert(h.clone(), RefInfo{queue_refs: 1, in_archive: Self::is_already_in(backing, h)});
}
trace!("replay_keys: (end) counters={:?}", counters);
trace!(target: "jdb.fine", "replay_keys: (end) refs={:?}", refs);
}
fn kill_keys(deletes: Vec<H256>, counters: &mut HashMap<H256, i32>, batch: &DBTransaction) {
for h in deletes.into_iter() {
let mut n: Option<i32> = None;
if let Some(c) = counters.get_mut(&h) {
if *c > 1 {
*c -= 1;
fn kill_keys(deletes: &Vec<H256>, refs: &mut HashMap<H256, RefInfo>, batch: &DBTransaction, from: RemoveFrom, trace: bool) {
// with a kill on {queue_refs: 1, in_archive: true}, we have two options:
// - convert to {queue_refs: 1, in_archive: false} (i.e. remove it from the conceptual archive)
// - convert to {queue_refs: 0, in_archive: true} (i.e. remove it from the conceptual queue)
// (the latter option would then mean removing the RefInfo, since it would no longer be counted in the queue.)
// both are valid, but we switch between them depending on context.
// All inserts in queue (i.e. those which may yet be reverted) have an entry in refs.
for h in deletes.iter() {
let mut n: Option<RefInfo> = None;
if let Some(c) = refs.get_mut(h) {
if c.in_archive && from == RemoveFrom::Archive {
c.in_archive = false;
Self::reset_already_in(batch, h);
if trace {
trace!(target: "jdb.fine", " kill({}): In archive, 1 in queue: Reducing to queue only and recording", h);
}
continue;
} else if c.queue_refs > 1 {
c.queue_refs -= 1;
if trace {
trace!(target: "jdb.fine", " kill({}): In queue > 1 refs: Decrementing ref count to {}", h, c.queue_refs);
}
continue;
} else {
n = Some(*c);
n = Some(c.clone());
}
}
match n {
Some(i) if i == 1 => {
counters.remove(&h);
Self::reset_already_in(batch, &h);
Some(RefInfo{queue_refs: 1, in_archive: true}) => {
refs.remove(h);
Self::reset_already_in(batch, h);
if trace {
trace!(target: "jdb.fine", " kill({}): In archive, 1 in queue: Removing from queue and leaving in archive", h);
}
}
Some(RefInfo{queue_refs: 1, in_archive: false}) => {
refs.remove(h);
batch.delete(&h.bytes()).expect("Low-level database error. Some issue with your hard disk?");
if trace {
trace!(target: "jdb.fine", " kill({}): Not in archive, only 1 ref in queue: Removing from queue and DB", h);
}
}
None => {
// Gets removed when moving from 1 to 0 additional refs. Should never be here at 0 additional refs.
//assert!(!Self::is_already_in(db, &h));
batch.delete(&h.bytes()).expect("Low-level database error. Some issue with your hard disk?");
if trace {
trace!(target: "jdb.fine", " kill({}): Not in queue - MUST BE IN ARCHIVE: Removing from DB", h);
}
}
_ => panic!("Invalid value in counters: {:?}", n),
_ => panic!("Invalid value in refs: {:?}", n),
}
}
}
#[cfg(test)]
fn can_reconstruct_refs(&self) -> bool {
let (latest_era, reconstructed) = Self::read_refs(&self.backing);
let refs = self.refs.as_ref().unwrap().write().unwrap();
if *refs != reconstructed || latest_era != self.latest_era {
let clean_refs = refs.iter().filter_map(|(k, v)| if reconstructed.get(k) == Some(v) {None} else {Some((k.clone(), v.clone()))}).collect::<HashMap<_, _>>();
let clean_recon = reconstructed.into_iter().filter_map(|(k, v)| if refs.get(&k) == Some(&v) {None} else {Some((k.clone(), v.clone()))}).collect::<HashMap<_, _>>();
warn!(target: "jdb", "mem: {:?} != log: {:?}", clean_refs, clean_recon);
false
} else {
true
}
}
fn payload(&self, key: &H256) -> Option<Bytes> {
self.backing.get(&key.bytes()).expect("Low-level database error. Some issue with your hard disk?").map(|v| v.to_vec())
}
fn read_counters(db: &Database) -> HashMap<H256, i32> {
let mut counters = HashMap::new();
fn read_refs(db: &Database) -> (Option<u64>, HashMap<H256, RefInfo>) {
let mut refs = HashMap::new();
let mut latest_era = None;
if let Some(val) = db.get(&LATEST_ERA_KEY).expect("Low-level database error.") {
let mut era = decode::<u64>(&val);
latest_era = Some(era);
loop {
let mut index = 0usize;
while let Some(rlp_data) = db.get({
@ -176,10 +259,9 @@ impl EarlyMergeDB {
r.append(&&PADDING[..]);
&r.drain()
}).expect("Low-level database error.") {
trace!("read_counters: era={}, index={}", era, index);
let rlp = Rlp::new(&rlp_data);
let inserts: Vec<H256> = rlp.val_at(1);
Self::replay_keys(&inserts, db, &mut counters);
Self::replay_keys(&inserts, db, &mut refs);
index += 1;
};
if index == 0 || era == 0 {
@ -188,10 +270,9 @@ impl EarlyMergeDB {
era -= 1;
}
}
trace!("Recovered {} counters", counters.len());
counters
(latest_era, refs)
}
}
}
impl HashDB for EarlyMergeDB {
fn keys(&self) -> HashMap<H256, i32> {
@ -243,23 +324,24 @@ impl JournalDB for EarlyMergeDB {
Box::new(EarlyMergeDB {
overlay: MemoryDB::new(),
backing: self.backing.clone(),
counters: self.counters.clone(),
refs: self.refs.clone(),
latest_era: self.latest_era.clone(),
})
}
fn mem_used(&self) -> usize {
self.overlay.mem_used() + match self.counters {
Some(ref c) => c.read().unwrap().heap_size_of_children(),
None => 0
}
}
fn is_empty(&self) -> bool {
self.backing.get(&LATEST_ERA_KEY).expect("Low level database error").is_none()
}
fn mem_used(&self) -> usize {
self.overlay.mem_used() + match self.refs {
Some(ref c) => c.read().unwrap().heap_size_of_children(),
None => 0
}
}
fn commit(&mut self, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result<u32, UtilError> {
// journal format:
// journal format:
// [era, 0] => [ id, [insert_0, ...], [remove_0, ...] ]
// [era, 1] => [ id, [insert_0, ...], [remove_0, ...] ]
// [era, n] => [ ... ]
@ -304,9 +386,9 @@ impl JournalDB for EarlyMergeDB {
//
// record new commit's details.
trace!("commit: #{} ({}), end era: {:?}", now, id, end);
let mut counters = self.counters.as_ref().unwrap().write().unwrap();
let mut refs = self.refs.as_ref().unwrap().write().unwrap();
let batch = DBTransaction::new();
let trace = false;
{
let mut index = 0usize;
let mut last;
@ -323,6 +405,11 @@ impl JournalDB for EarlyMergeDB {
}
let drained = self.overlay.drain();
if trace {
trace!(target: "jdb", "commit: #{} ({}), end era: {:?}", now, id, end);
}
let removes: Vec<H256> = drained
.iter()
.filter_map(|(k, &(_, c))| if c < 0 {Some(k.clone())} else {None})
@ -332,6 +419,9 @@ impl JournalDB for EarlyMergeDB {
.filter_map(|(k, (v, r))| if r > 0 { assert!(r == 1); Some((k, v)) } else { assert!(r >= -1); None })
.collect();
// TODO: check all removes are in the db.
let mut r = RlpStream::new_list(3);
r.append(id);
@ -344,9 +434,17 @@ impl JournalDB for EarlyMergeDB {
r.begin_list(inserts.len());
inserts.iter().foreach(|&(k, _)| {r.append(&k);});
r.append(&removes);
Self::insert_keys(&inserts, &self.backing, &mut counters, &batch);
Self::insert_keys(&inserts, &self.backing, &mut refs, &batch, trace);
if trace {
let ins = inserts.iter().map(|&(k, _)| k).collect::<Vec<_>>();
trace!(target: "jdb.ops", " Inserts: {:?}", ins);
trace!(target: "jdb.ops", " Deletes: {:?}", removes);
}
try!(batch.put(&last, r.as_raw()));
try!(batch.put(&LATEST_ERA_KEY, &encode(&now)));
if self.latest_era.map_or(true, |e| now > e) {
try!(batch.put(&LATEST_ERA_KEY, &encode(&now)));
self.latest_era = Some(now);
}
}
// apply old commits' details
@ -363,17 +461,64 @@ impl JournalDB for EarlyMergeDB {
})) {
let rlp = Rlp::new(&rlp_data);
let inserts: Vec<H256> = rlp.val_at(1);
let deletes: Vec<H256> = rlp.val_at(2);
// Collect keys to be removed. These are removed keys for canonical block, inserted for non-canonical
Self::kill_keys(if canon_id == rlp.val_at(0) {deletes} else {inserts}, &mut counters, &batch);
if canon_id == rlp.val_at(0) {
// Collect keys to be removed. Canon block - remove the (enacted) deletes.
let deletes: Vec<H256> = rlp.val_at(2);
if trace {
trace!(target: "jdb.ops", " Expunging: {:?}", deletes);
}
Self::kill_keys(&deletes, &mut refs, &batch, RemoveFrom::Archive, trace);
if trace {
trace!(target: "jdb.ops", " Finalising: {:?}", inserts);
}
for k in inserts.iter() {
match refs.get(k).cloned() {
None => {
// [in archive] -> SHIFT remove -> SHIFT insert None->Some{queue_refs: 1, in_archive: true} -> TAKE remove Some{queue_refs: 1, in_archive: true}->None -> TAKE insert
// already expunged from the queue (which is allowed since the key is in the archive).
// leave well alone.
}
Some( RefInfo{queue_refs: 1, in_archive: false} ) => {
// just delete the refs entry.
refs.remove(k);
}
Some( RefInfo{queue_refs: x, in_archive: false} ) => {
// must set already in; ,
Self::set_already_in(&batch, k);
refs.insert(k.clone(), RefInfo{ queue_refs: x - 1, in_archive: true });
}
Some( RefInfo{queue_refs: _, in_archive: true} ) => {
// Invalid! Reinserted the same key twice.
warn!("Key {} inserted twice into same fork.", k);
}
}
}
} else {
// Collect keys to be removed. Non-canon block - remove the (reverted) inserts.
if trace {
trace!(target: "jdb.ops", " Reverting: {:?}", inserts);
}
Self::kill_keys(&inserts, &mut refs, &batch, RemoveFrom::Queue, trace);
}
try!(batch.delete(&last));
index += 1;
}
trace!("EarlyMergeDB: delete journal for time #{}.{}, (canon was {})", end_era, index, canon_id);
if trace {
trace!(target: "jdb", "EarlyMergeDB: delete journal for time #{}.{}, (canon was {})", end_era, index, canon_id);
}
}
try!(self.backing.write(batch));
// trace!("EarlyMergeDB::commit() deleted {} nodes", deletes);
// Comment out for now. TODO: automatically enable in tests.
if trace {
trace!(target: "jdb", "OK: {:?}", refs.clone());
}
Ok(0)
}
}
@ -382,8 +527,9 @@ impl JournalDB for EarlyMergeDB {
mod tests {
use common::*;
use super::*;
use super::super::traits::JournalDB;
use hashdb::*;
use journaldb::traits::JournalDB;
use log::init_log;
#[test]
fn insert_same_in_fork() {
@ -392,36 +538,69 @@ mod tests {
let x = jdb.insert(b"X");
jdb.commit(1, &b"1".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.commit(2, &b"2".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.commit(3, &b"1002a".sha3(), Some((1, b"1".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.commit(4, &b"1003a".sha3(), Some((2, b"2".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.remove(&x);
jdb.commit(3, &b"1002b".sha3(), Some((1, b"1".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
let x = jdb.insert(b"X");
jdb.commit(4, &b"1003b".sha3(), Some((2, b"2".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.commit(5, &b"1004a".sha3(), Some((3, b"1002a".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.commit(6, &b"1005a".sha3(), Some((4, b"1003a".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(jdb.exists(&x));
}
#[test]
fn insert_older_era() {
let mut jdb = EarlyMergeDB::new_temp();
let foo = jdb.insert(b"foo");
jdb.commit(0, &b"0a".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
let bar = jdb.insert(b"bar");
jdb.commit(1, &b"1".sha3(), Some((0, b"0a".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.remove(&bar);
jdb.commit(0, &b"0b".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.commit(2, &b"2".sha3(), Some((1, b"1".sha3()))).unwrap();
assert!(jdb.exists(&foo));
assert!(jdb.exists(&bar));
}
#[test]
fn long_history() {
// history is 3
let mut jdb = EarlyMergeDB::new_temp();
let h = jdb.insert(b"foo");
jdb.commit(0, &b"0".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(jdb.exists(&h));
jdb.remove(&h);
jdb.commit(1, &b"1".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(jdb.exists(&h));
jdb.commit(2, &b"2".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(jdb.exists(&h));
jdb.commit(3, &b"3".sha3(), Some((0, b"0".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(jdb.exists(&h));
jdb.commit(4, &b"4".sha3(), Some((1, b"1".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(!jdb.exists(&h));
}
@ -433,6 +612,7 @@ mod tests {
let foo = jdb.insert(b"foo");
let bar = jdb.insert(b"bar");
jdb.commit(0, &b"0".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(jdb.exists(&foo));
assert!(jdb.exists(&bar));
@ -440,6 +620,7 @@ mod tests {
jdb.remove(&bar);
let baz = jdb.insert(b"baz");
jdb.commit(1, &b"1".sha3(), Some((0, b"0".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(jdb.exists(&foo));
assert!(jdb.exists(&bar));
assert!(jdb.exists(&baz));
@ -447,17 +628,20 @@ mod tests {
let foo = jdb.insert(b"foo");
jdb.remove(&baz);
jdb.commit(2, &b"2".sha3(), Some((1, b"1".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(jdb.exists(&foo));
assert!(!jdb.exists(&bar));
assert!(jdb.exists(&baz));
jdb.remove(&foo);
jdb.commit(3, &b"3".sha3(), Some((2, b"2".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(jdb.exists(&foo));
assert!(!jdb.exists(&bar));
assert!(!jdb.exists(&baz));
jdb.commit(4, &b"4".sha3(), Some((3, b"3".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(!jdb.exists(&foo));
assert!(!jdb.exists(&bar));
assert!(!jdb.exists(&baz));
@ -471,21 +655,25 @@ mod tests {
let foo = jdb.insert(b"foo");
let bar = jdb.insert(b"bar");
jdb.commit(0, &b"0".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(jdb.exists(&foo));
assert!(jdb.exists(&bar));
jdb.remove(&foo);
let baz = jdb.insert(b"baz");
jdb.commit(1, &b"1a".sha3(), Some((0, b"0".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.remove(&bar);
jdb.commit(1, &b"1b".sha3(), Some((0, b"0".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(jdb.exists(&foo));
assert!(jdb.exists(&bar));
assert!(jdb.exists(&baz));
jdb.commit(2, &b"2b".sha3(), Some((1, b"1b".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(jdb.exists(&foo));
assert!(!jdb.exists(&baz));
assert!(!jdb.exists(&bar));
@ -498,35 +686,113 @@ mod tests {
let foo = jdb.insert(b"foo");
jdb.commit(0, &b"0".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(jdb.exists(&foo));
jdb.remove(&foo);
jdb.commit(1, &b"1".sha3(), Some((0, b"0".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.insert(b"foo");
assert!(jdb.exists(&foo));
jdb.commit(2, &b"2".sha3(), Some((1, b"1".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(jdb.exists(&foo));
jdb.commit(3, &b"2".sha3(), Some((0, b"2".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(jdb.exists(&foo));
}
#[test]
fn fork_same_key() {
// history is 1
let mut jdb = EarlyMergeDB::new_temp();
fn fork_same_key_one() {
let mut dir = ::std::env::temp_dir();
dir.push(H32::random().hex());
let mut jdb = EarlyMergeDB::new(dir.to_str().unwrap());
jdb.commit(0, &b"0".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
let foo = jdb.insert(b"foo");
jdb.commit(1, &b"1a".sha3(), Some((0, b"0".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.insert(b"foo");
jdb.commit(1, &b"1b".sha3(), Some((0, b"0".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.insert(b"foo");
jdb.commit(1, &b"1c".sha3(), Some((0, b"0".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(jdb.exists(&foo));
jdb.commit(2, &b"2a".sha3(), Some((1, b"1a".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(jdb.exists(&foo));
}
#[test]
fn fork_same_key_other() {
let mut dir = ::std::env::temp_dir();
dir.push(H32::random().hex());
let mut jdb = EarlyMergeDB::new(dir.to_str().unwrap());
jdb.commit(0, &b"0".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
let foo = jdb.insert(b"foo");
jdb.commit(1, &b"1a".sha3(), Some((0, b"0".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.insert(b"foo");
jdb.commit(1, &b"1b".sha3(), Some((0, b"0".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.insert(b"foo");
jdb.commit(1, &b"1c".sha3(), Some((0, b"0".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(jdb.exists(&foo));
jdb.commit(2, &b"2b".sha3(), Some((1, b"1b".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(jdb.exists(&foo));
}
#[test]
fn fork_ins_del_ins() {
let mut dir = ::std::env::temp_dir();
dir.push(H32::random().hex());
let mut jdb = EarlyMergeDB::new(dir.to_str().unwrap());
jdb.commit(0, &b"0".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
let foo = jdb.insert(b"foo");
jdb.commit(1, &b"1".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.remove(&foo);
jdb.commit(2, &b"2a".sha3(), Some((0, b"0".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.remove(&foo);
jdb.commit(2, &b"2b".sha3(), Some((0, b"0".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.insert(b"foo");
jdb.commit(3, &b"3a".sha3(), Some((1, b"1".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.insert(b"foo");
jdb.commit(3, &b"3b".sha3(), Some((1, b"1".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.commit(4, &b"4a".sha3(), Some((2, b"2a".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.commit(5, &b"5a".sha3(), Some((3, b"3a".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
}
#[test]
fn reopen() {
@ -540,6 +806,7 @@ mod tests {
let foo = jdb.insert(b"foo");
jdb.emplace(bar.clone(), b"bar".to_vec());
jdb.commit(0, &b"0".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
foo
};
@ -547,6 +814,7 @@ mod tests {
let mut jdb = EarlyMergeDB::new(dir.to_str().unwrap());
jdb.remove(&foo);
jdb.commit(1, &b"1".sha3(), Some((0, b"0".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
}
{
@ -554,40 +822,210 @@ mod tests {
assert!(jdb.exists(&foo));
assert!(jdb.exists(&bar));
jdb.commit(2, &b"2".sha3(), Some((1, b"1".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(!jdb.exists(&foo));
}
}
#[test]
fn reopen_remove() {
fn insert_delete_insert_delete_insert_expunge() {
init_log();
let mut dir = ::std::env::temp_dir();
dir.push(H32::random().hex());
let foo = {
let mut jdb = EarlyMergeDB::new(dir.to_str().unwrap());
// history is 1
let foo = jdb.insert(b"foo");
jdb.commit(0, &b"0".sha3(), None).unwrap();
jdb.commit(1, &b"1".sha3(), Some((0, b"0".sha3()))).unwrap();
let mut jdb = EarlyMergeDB::new(dir.to_str().unwrap());
// foo is ancient history.
// history is 4
let foo = jdb.insert(b"foo");
jdb.commit(0, &b"0".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.remove(&foo);
jdb.commit(1, &b"1".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.insert(b"foo");
jdb.commit(2, &b"2".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.remove(&foo);
jdb.commit(3, &b"3".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.insert(b"foo");
jdb.commit(4, &b"4".sha3(), Some((0, b"0".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
// expunge foo
jdb.commit(5, &b"5".sha3(), Some((1, b"1".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
}
jdb.insert(b"foo");
jdb.commit(2, &b"2".sha3(), Some((1, b"1".sha3()))).unwrap();
foo
};
#[test]
fn forked_insert_delete_insert_delete_insert_expunge() {
init_log();
let mut dir = ::std::env::temp_dir();
dir.push(H32::random().hex());
let mut jdb = EarlyMergeDB::new(dir.to_str().unwrap());
// history is 4
let foo = jdb.insert(b"foo");
jdb.commit(0, &b"0".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.remove(&foo);
jdb.commit(1, &b"1a".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.remove(&foo);
jdb.commit(1, &b"1b".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.insert(b"foo");
jdb.commit(2, &b"2a".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.insert(b"foo");
jdb.commit(2, &b"2b".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.remove(&foo);
jdb.commit(3, &b"3a".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.remove(&foo);
jdb.commit(3, &b"3b".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.insert(b"foo");
jdb.commit(4, &b"4a".sha3(), Some((0, b"0".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.insert(b"foo");
jdb.commit(4, &b"4b".sha3(), Some((0, b"0".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
// expunge foo
jdb.commit(5, &b"5".sha3(), Some((1, b"1a".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
}
#[test]
fn broken_assert() {
let mut dir = ::std::env::temp_dir();
dir.push(H32::random().hex());
let mut jdb = EarlyMergeDB::new(dir.to_str().unwrap());
// history is 1
let foo = jdb.insert(b"foo");
jdb.commit(1, &b"1".sha3(), Some((0, b"0".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
// foo is ancient history.
jdb.remove(&foo);
jdb.commit(2, &b"2".sha3(), Some((1, b"1".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.insert(b"foo");
jdb.commit(3, &b"3".sha3(), Some((2, b"2".sha3()))).unwrap(); // BROKEN
assert!(jdb.can_reconstruct_refs());
assert!(jdb.exists(&foo));
jdb.remove(&foo);
jdb.commit(4, &b"4".sha3(), Some((3, b"3".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.commit(5, &b"5".sha3(), Some((4, b"4".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(!jdb.exists(&foo));
}
#[test]
fn reopen_test() {
let mut dir = ::std::env::temp_dir();
dir.push(H32::random().hex());
let mut jdb = EarlyMergeDB::new(dir.to_str().unwrap());
// history is 4
let foo = jdb.insert(b"foo");
jdb.commit(0, &b"0".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.commit(1, &b"1".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.commit(2, &b"2".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.commit(3, &b"3".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.commit(4, &b"4".sha3(), Some((0, b"0".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
// foo is ancient history.
jdb.insert(b"foo");
let bar = jdb.insert(b"bar");
jdb.commit(5, &b"5".sha3(), Some((1, b"1".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.remove(&foo);
jdb.remove(&bar);
jdb.commit(6, &b"6".sha3(), Some((2, b"2".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.insert(b"foo");
jdb.insert(b"bar");
jdb.commit(7, &b"7".sha3(), Some((3, b"3".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
}
#[test]
fn reopen_remove_three() {
init_log();
let mut dir = ::std::env::temp_dir();
dir.push(H32::random().hex());
let foo = b"foo".sha3();
{
let mut jdb = EarlyMergeDB::new(dir.to_str().unwrap());
// history is 1
jdb.insert(b"foo");
jdb.commit(0, &b"0".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.commit(1, &b"1".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
// foo is ancient history.
jdb.remove(&foo);
jdb.commit(3, &b"3".sha3(), Some((2, b"2".sha3()))).unwrap();
jdb.commit(2, &b"2".sha3(), Some((0, b"0".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(jdb.exists(&foo));
jdb.insert(b"foo");
jdb.commit(3, &b"3".sha3(), Some((1, b"1".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(jdb.exists(&foo));
// incantation to reopen the db
}; { let mut jdb = EarlyMergeDB::new(dir.to_str().unwrap());
jdb.remove(&foo);
jdb.commit(4, &b"4".sha3(), Some((3, b"3".sha3()))).unwrap();
jdb.commit(5, &b"5".sha3(), Some((4, b"4".sha3()))).unwrap();
jdb.commit(4, &b"4".sha3(), Some((2, b"2".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(jdb.exists(&foo));
// incantation to reopen the db
}; { let mut jdb = EarlyMergeDB::new(dir.to_str().unwrap());
jdb.commit(5, &b"5".sha3(), Some((3, b"3".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(jdb.exists(&foo));
// incantation to reopen the db
}; { let mut jdb = EarlyMergeDB::new(dir.to_str().unwrap());
jdb.commit(6, &b"6".sha3(), Some((4, b"4".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(!jdb.exists(&foo));
}
}
#[test]
fn reopen_fork() {
let mut dir = ::std::env::temp_dir();
@ -598,18 +1036,22 @@ mod tests {
let foo = jdb.insert(b"foo");
let bar = jdb.insert(b"bar");
jdb.commit(0, &b"0".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.remove(&foo);
let baz = jdb.insert(b"baz");
jdb.commit(1, &b"1a".sha3(), Some((0, b"0".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.remove(&bar);
jdb.commit(1, &b"1b".sha3(), Some((0, b"0".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
(foo, bar, baz)
};
{
let mut jdb = EarlyMergeDB::new(dir.to_str().unwrap());
jdb.commit(2, &b"2b".sha3(), Some((1, b"1b".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(jdb.exists(&foo));
assert!(!jdb.exists(&baz));
assert!(!jdb.exists(&bar));

View File

@ -33,14 +33,14 @@ use super::JournalDB;
/// immediately. Rather some age (based on a linear but arbitrary metric) must pass before
/// the removals actually take effect.
///
/// There are two memory overlays:
/// - Transaction overlay contains current transaction data. It is merged with with history
/// There are two memory overlays:
/// - Transaction overlay contains current transaction data. It is merged with with history
/// overlay on each `commit()`
/// - History overlay contains all data inserted during the history period. When the node
/// - History overlay contains all data inserted during the history period. When the node
/// in the overlay becomes ancient it is written to disk on `commit()`
///
/// There is also a journal maintained in memory and on the disk as well which lists insertions
/// and removals for each commit during the history period. This is used to track
/// There is also a journal maintained in memory and on the disk as well which lists insertions
/// and removals for each commit during the history period. This is used to track
/// data nodes that go out of history scope and must be written to disk.
///
/// Commit workflow:
@ -50,12 +50,12 @@ use super::JournalDB;
/// 3. Clear the transaction overlay.
/// 4. For a canonical journal record that becomes ancient inserts its insertions into the disk DB
/// 5. For each journal record that goes out of the history scope (becomes ancient) remove its
/// insertions from the history overlay, decreasing the reference counter and removing entry if
/// insertions from the history overlay, decreasing the reference counter and removing entry if
/// if reaches zero.
/// 6. For a canonical journal record that becomes ancient delete its removals from the disk only if
/// 6. For a canonical journal record that becomes ancient delete its removals from the disk only if
/// the removed key is not present in the history overlay.
/// 7. Delete ancient record from memory and disk.
///
pub struct OverlayRecentDB {
transaction_overlay: MemoryDB,
backing: Arc<Database>,
@ -66,7 +66,7 @@ pub struct OverlayRecentDB {
struct JournalOverlay {
backing_overlay: MemoryDB,
journal: HashMap<u64, Vec<JournalEntry>>,
latest_era: u64,
latest_era: Option<u64>,
}
#[derive(PartialEq)]
@ -152,10 +152,10 @@ impl OverlayRecentDB {
let mut journal = HashMap::new();
let mut overlay = MemoryDB::new();
let mut count = 0;
let mut latest_era = 0;
let mut latest_era = None;
if let Some(val) = db.get(&LATEST_ERA_KEY).expect("Low-level database error.") {
latest_era = decode::<u64>(&val);
let mut era = latest_era;
let mut era = decode::<u64>(&val);
latest_era = Some(era);
loop {
let mut index = 0usize;
while let Some(rlp_data) = db.get({
@ -223,7 +223,7 @@ impl JournalDB for OverlayRecentDB {
let mut tx = self.transaction_overlay.drain();
let inserted_keys: Vec<_> = tx.iter().filter_map(|(k, &(_, c))| if c > 0 { Some(k.clone()) } else { None }).collect();
let removed_keys: Vec<_> = tx.iter().filter_map(|(k, &(_, c))| if c < 0 { Some(k.clone()) } else { None }).collect();
// Increase counter for each inserted key no matter if the block is canonical or not.
// Increase counter for each inserted key no matter if the block is canonical or not.
let insertions = tx.drain().filter_map(|(k, (v, c))| if c > 0 { Some((k, v)) } else { None });
r.append(id);
r.begin_list(inserted_keys.len());
@ -236,14 +236,14 @@ impl JournalDB for OverlayRecentDB {
r.append(&removed_keys);
let mut k = RlpStream::new_list(3);
let index = journal_overlay.journal.get(&now).map(|j| j.len()).unwrap_or(0);
let index = journal_overlay.journal.get(&now).map_or(0, |j| j.len());
k.append(&now);
k.append(&index);
k.append(&&PADDING[..]);
try!(batch.put(&k.drain(), r.as_raw()));
if now >= journal_overlay.latest_era {
if journal_overlay.latest_era.map_or(true, |e| now > e) {
try!(batch.put(&LATEST_ERA_KEY, &encode(&now)));
journal_overlay.latest_era = now;
journal_overlay.latest_era = Some(now);
}
journal_overlay.journal.entry(now).or_insert_with(Vec::new).push(JournalEntry { id: id.clone(), insertions: inserted_keys, deletions: removed_keys });
}
@ -345,14 +345,14 @@ impl HashDB for OverlayRecentDB {
self.lookup(key).is_some()
}
fn insert(&mut self, value: &[u8]) -> H256 {
fn insert(&mut self, value: &[u8]) -> H256 {
self.transaction_overlay.insert(value)
}
fn emplace(&mut self, key: H256, value: Bytes) {
self.transaction_overlay.emplace(key, value);
self.transaction_overlay.emplace(key, value);
}
fn kill(&mut self, key: &H256) {
self.transaction_overlay.kill(key);
fn kill(&mut self, key: &H256) {
self.transaction_overlay.kill(key);
}
}
@ -749,7 +749,7 @@ mod tests {
assert!(jdb.can_reconstruct_refs());
assert!(!jdb.exists(&foo));
}
#[test]
fn reopen_test() {
let mut dir = ::std::env::temp_dir();
@ -784,7 +784,7 @@ mod tests {
jdb.commit(7, &b"7".sha3(), Some((3, b"3".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
}
#[test]
fn reopen_remove_three() {
init_log();
@ -838,7 +838,7 @@ mod tests {
assert!(!jdb.exists(&foo));
}
}
#[test]
fn reopen_fork() {
let mut dir = ::std::env::temp_dir();
@ -870,4 +870,24 @@ mod tests {
assert!(!jdb.exists(&bar));
}
}
#[test]
fn insert_older_era() {
let mut jdb = OverlayRecentDB::new_temp();
let foo = jdb.insert(b"foo");
jdb.commit(0, &b"0a".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
let bar = jdb.insert(b"bar");
jdb.commit(1, &b"1".sha3(), Some((0, b"0a".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.remove(&bar);
jdb.commit(0, &b"0b".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
jdb.commit(2, &b"2".sha3(), Some((1, b"1".sha3()))).unwrap();
assert!(jdb.exists(&foo));
assert!(jdb.exists(&bar));
}
}

View File

@ -34,4 +34,9 @@ pub trait JournalDB : HashDB + Send + Sync {
/// Commit all recent insert operations and canonical historical commits' removals from the
/// old era to the backing database, reverting any non-canonical historical commit's inserts.
fn commit(&mut self, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result<u32, UtilError>;
/// State data query
fn state(&self, _id: &H256) -> Option<Bytes> {
None
}
}

View File

@ -161,6 +161,7 @@ mod tests {
}
#[test]
#[cfg(feature="heavy-tests")]
fn can_decrypt_with_imported() {
use keys::store::EncryptedHashMap;

View File

@ -120,9 +120,15 @@ impl AccountProvider for AccountService {
}
}
impl Default for AccountService {
fn default() -> Self {
AccountService::new()
}
}
impl AccountService {
/// New account service with the default location
pub fn new() -> AccountService {
pub fn new() -> Self {
let secret_store = RwLock::new(SecretStore::new());
secret_store.write().unwrap().try_import_existing();
AccountService {
@ -363,6 +369,7 @@ mod vector_tests {
#[test]
#[cfg(feature="heavy-tests")]
fn mac_vector() {
let password = "testpassword";
let salt = H256::from_str("ae3cd4e7013836a3df6bd7241b12db061dbe2c6785853cce422d148a624ce0bd").unwrap();
@ -464,6 +471,7 @@ mod tests {
}
#[test]
#[cfg(feature="heavy-tests")]
fn can_get() {
let temp = RandomTempPath::create_dir();
let key_id = {
@ -568,7 +576,7 @@ mod tests {
let temp = RandomTempPath::create_dir();
let mut sstore = SecretStore::new_test(&temp);
let addr = sstore.new_account("test").unwrap();
let _ok = sstore.unlock_account(&addr, "test").unwrap();
sstore.unlock_account(&addr, "test").unwrap();
let secret = sstore.account_secret(&addr).unwrap();
let kp = KeyPair::from_secret(secret).unwrap();
assert_eq!(Address::from(kp.public().sha3()), addr);

View File

@ -160,12 +160,12 @@ impl Connection {
}
}
/// Get socket token
/// Get socket token
pub fn token(&self) -> StreamToken {
self.token
}
/// Replace socket token
/// Replace socket token
pub fn set_token(&mut self, token: StreamToken) {
self.token = token;
}
@ -261,13 +261,13 @@ pub struct EncryptedConnection {
}
impl EncryptedConnection {
/// Get socket token
/// Get socket token
pub fn token(&self) -> StreamToken {
self.connection.token
}
/// Replace socket token
/// Replace socket token
pub fn set_token(&mut self, token: StreamToken) {
self.connection.set_token(token);
}
@ -513,8 +513,14 @@ mod tests {
buf_size: usize,
}
impl Default for TestSocket {
fn default() -> Self {
TestSocket::new()
}
}
impl TestSocket {
fn new() -> TestSocket {
fn new() -> Self {
TestSocket {
read_buffer: vec![],
write_buffer: vec![],
@ -593,8 +599,14 @@ mod tests {
type TestConnection = GenericConnection<TestSocket>;
impl Default for TestConnection {
fn default() -> Self {
TestConnection::new()
}
}
impl TestConnection {
pub fn new() -> TestConnection {
pub fn new() -> Self {
TestConnection {
token: 999998888usize,
socket: TestSocket::new(),
@ -609,8 +621,14 @@ mod tests {
type TestBrokenConnection = GenericConnection<TestBrokenSocket>;
impl Default for TestBrokenConnection {
fn default() -> Self {
TestBrokenConnection::new()
}
}
impl TestBrokenConnection {
pub fn new() -> TestBrokenConnection {
pub fn new() -> Self {
TestBrokenConnection {
token: 999998888usize,
socket: TestBrokenSocket { error: "test broken socket".to_owned() },

View File

@ -541,7 +541,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
match TcpStream::connect(&address) {
Ok(socket) => socket,
Err(e) => {
warn!("Can't connect to address {:?}: {:?}", address, e);
debug!("Can't connect to address {:?}: {:?}", address, e);
return;
}
}
@ -695,6 +695,14 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
return;
}
};
if !originated {
let session_count = sessions.count();
let ideal_peers = { self.info.read().unwrap().deref().config.ideal_peers };
if session_count >= ideal_peers as usize {
session.disconnect(DisconnectReason::TooManyPeers);
return;
}
}
let result = sessions.insert_with(move |session_token| {
session.set_token(session_token);
io.deregister_stream(token).expect("Error deleting handshake registration");