Merge branch 'master' into rotating-key

Conflicts:
	ethcore/src/account_provider/mod.rs
	rpc/src/v1/types/mod.rs.in
This commit is contained in:
Tomasz Drwięga
2016-12-09 07:37:34 +00:00
342 changed files with 10048 additions and 4061 deletions

View File

@@ -5,6 +5,10 @@ license = "GPL-3.0"
name = "ethcore-light"
version = "1.5.0"
authors = ["Ethcore <admin@ethcore.io>"]
build = "build.rs"
[build-dependencies]
"ethcore-ipc-codegen" = { path = "../../ipc/codegen" }
[dependencies]
log = "0.3"
@@ -12,5 +16,6 @@ ethcore = { path = ".." }
ethcore-util = { path = "../../util" }
ethcore-network = { path = "../../util/network" }
ethcore-io = { path = "../../util/io" }
ethcore-ipc = { path = "../../ipc/rpc" }
rlp = { path = "../../util/rlp" }
time = "0.1"

21
ethcore/light/build.rs Normal file
View File

@@ -0,0 +1,21 @@
// Copyright 2015, 2016 Ethcore (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
extern crate ethcore_ipc_codegen;
fn main() {
ethcore_ipc_codegen::derive_binary("src/types/mod.rs.in").unwrap();
}

View File

@@ -101,7 +101,7 @@ impl Provider for Client {
Vec::new()
}
fn code(&self, _req: request::ContractCodes) -> Vec<Bytes> {
fn contract_code(&self, _req: request::ContractCodes) -> Vec<Bytes> {
Vec::new()
}

View File

@@ -28,20 +28,25 @@
//! It starts by performing a header-only sync, verifying random samples
//! of members of the chain to varying degrees.
// TODO: remove when integrating with parity.
// TODO: remove when integrating with the rest of parity.
#![allow(dead_code)]
pub mod client;
pub mod net;
pub mod provider;
pub mod request;
mod types;
pub use self::provider::Provider;
pub use types::les_request as request;
#[macro_use]
extern crate log;
extern crate ethcore;
extern crate ethcore_util as util;
extern crate ethcore_network as network;
extern crate ethcore_io as io;
extern crate ethcore;
extern crate ethcore_ipc as ipc;
extern crate rlp;
extern crate time;
#[macro_use]
extern crate log;
extern crate time;

View File

@@ -206,6 +206,39 @@ impl FlowParams {
cost.0 + (amount * cost.1)
}
/// Compute the maximum number of costs of a specific kind which can be made
/// with the given buffer.
/// Saturates at `usize::max()`. This is not a problem in practice because
/// this amount of requests is already prohibitively large.
pub fn max_amount(&self, buffer: &Buffer, kind: request::Kind) -> usize {
use util::Uint;
use std::usize;
let cost = match kind {
request::Kind::Headers => &self.costs.headers,
request::Kind::Bodies => &self.costs.bodies,
request::Kind::Receipts => &self.costs.receipts,
request::Kind::StateProofs => &self.costs.state_proofs,
request::Kind::Codes => &self.costs.contract_codes,
request::Kind::HeaderProofs => &self.costs.header_proofs,
};
let start = buffer.current();
if start <= cost.0 {
return 0;
} else if cost.1 == U256::zero() {
return usize::MAX;
}
let max = (start - cost.0) / cost.1;
if max >= usize::MAX.into() {
usize::MAX
} else {
max.as_u64() as usize
}
}
/// Create initial buffer parameter.
pub fn create_buffer(&self) -> Buffer {
Buffer {
@@ -228,6 +261,16 @@ impl FlowParams {
buf.estimate = ::std::cmp::min(self.limit, buf.estimate + (elapsed * self.recharge));
}
/// Refund some buffer which was previously deducted.
/// Does not update the recharge timestamp.
pub fn refund(&self, buf: &mut Buffer, refund_amount: U256) {
buf.estimate = buf.estimate + refund_amount;
if buf.estimate > self.limit {
buf.estimate = self.limit
}
}
}
#[cfg(test)]

View File

@@ -52,6 +52,8 @@ pub enum Error {
UnexpectedHandshake,
/// Peer on wrong network (wrong NetworkId or genesis hash)
WrongNetwork,
/// Unknown peer.
UnknownPeer,
}
impl Error {
@@ -64,6 +66,7 @@ impl Error {
Error::UnrecognizedPacket(_) => Punishment::Disconnect,
Error::UnexpectedHandshake => Punishment::Disconnect,
Error::WrongNetwork => Punishment::Disable,
Error::UnknownPeer => Punishment::Disconnect,
}
}
}
@@ -89,6 +92,7 @@ impl fmt::Display for Error {
Error::UnrecognizedPacket(code) => write!(f, "Unrecognized packet: 0x{:x}", code),
Error::UnexpectedHandshake => write!(f, "Unexpected handshake"),
Error::WrongNetwork => write!(f, "Wrong network"),
Error::UnknownPeer => write!(f, "unknown peer"),
}
}
}

View File

@@ -19,27 +19,28 @@
//! This uses a "Provider" to answer requests.
//! See https://github.com/ethcore/parity/wiki/Light-Ethereum-Subprotocol-(LES)
use ethcore::transaction::SignedTransaction;
use io::TimerToken;
use network::{NetworkProtocolHandler, NetworkContext, NetworkError, PeerId};
use rlp::{RlpStream, Stream, UntrustedRlp, View};
use util::hash::H256;
use util::RwLock;
use util::{Mutex, RwLock, U256};
use time::SteadyTime;
use std::collections::{HashMap, HashSet};
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::{AtomicUsize, Ordering};
use provider::Provider;
use request::{self, Request};
use self::buffer_flow::{Buffer, FlowParams};
use self::error::{Error, Punishment};
use self::status::{Status, Capabilities};
mod buffer_flow;
mod error;
mod status;
pub use self::status::Announcement;
pub use self::status::{Status, Capabilities, Announcement, NetworkId};
const TIMEOUT: TimerToken = 0;
const TIMEOUT_INTERVAL_MS: u64 = 1000;
@@ -86,6 +87,10 @@ mod packet {
pub const HEADER_PROOFS: u8 = 0x0e;
}
/// A request id.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct ReqId(usize);
// A pending peer: one we've sent our status to but
// may not have received one for.
struct PendingPeer {
@@ -103,32 +108,162 @@ struct Peer {
sent_head: H256, // last head we've given them.
}
impl Peer {
// check the maximum cost of a request, returning an error if there's
// not enough buffer left.
// returns the calculated maximum cost.
fn deduct_max(&mut self, flow_params: &FlowParams, kind: request::Kind, max: usize) -> Result<U256, Error> {
flow_params.recharge(&mut self.local_buffer);
let max_cost = flow_params.compute_cost(kind, max);
try!(self.local_buffer.deduct_cost(max_cost));
Ok(max_cost)
}
// refund buffer for a request. returns new buffer amount.
fn refund(&mut self, flow_params: &FlowParams, amount: U256) -> U256 {
flow_params.refund(&mut self.local_buffer, amount);
self.local_buffer.current()
}
// recharge remote buffer with remote flow params.
fn recharge_remote(&mut self) {
let flow = &mut self.remote_flow;
flow.recharge(&mut self.remote_buffer);
}
}
/// An LES event handler.
pub trait Handler: Send + Sync {
/// Called when a peer connects.
fn on_connect(&self, _id: PeerId, _status: &Status, _capabilities: &Capabilities) { }
/// Called when a peer disconnects
fn on_disconnect(&self, _id: PeerId) { }
/// Called when a peer makes an announcement.
fn on_announcement(&self, _id: PeerId, _announcement: &Announcement) { }
/// Called when a peer requests relay of some transactions.
fn on_transactions(&self, _id: PeerId, _relay: &[SignedTransaction]) { }
}
// a request and the time it was made.
struct Requested {
request: Request,
timestamp: SteadyTime,
}
/// Protocol parameters.
pub struct Params {
/// Genesis hash.
pub genesis_hash: H256,
/// Network id.
pub network_id: NetworkId,
/// Buffer flow parameters.
pub flow_params: FlowParams,
/// Initial capabilities.
pub capabilities: Capabilities,
}
/// This is an implementation of the light ethereum network protocol, abstracted
/// over a `Provider` of data and a p2p network.
///
/// This is simply designed for request-response purposes. Higher level uses
/// of the protocol, such as synchronization, will function as wrappers around
/// this system.
//
// LOCK ORDER:
// Locks must be acquired in the order declared, and when holding a read lock
// on the peers, only one peer may be held at a time.
pub struct LightProtocol {
provider: Box<Provider>,
genesis_hash: H256,
network_id: status::NetworkId,
network_id: NetworkId,
pending_peers: RwLock<HashMap<PeerId, PendingPeer>>,
peers: RwLock<HashMap<PeerId, Peer>>,
pending_requests: RwLock<HashMap<usize, Request>>,
peers: RwLock<HashMap<PeerId, Mutex<Peer>>>,
pending_requests: RwLock<HashMap<usize, Requested>>,
capabilities: RwLock<Capabilities>,
flow_params: FlowParams, // assumed static and same for every peer.
handlers: Vec<Box<Handler>>,
req_id: AtomicUsize,
}
impl LightProtocol {
/// Create a new instance of the protocol manager.
pub fn new(provider: Box<Provider>, params: Params) -> Self {
LightProtocol {
provider: provider,
genesis_hash: params.genesis_hash,
network_id: params.network_id,
pending_peers: RwLock::new(HashMap::new()),
peers: RwLock::new(HashMap::new()),
pending_requests: RwLock::new(HashMap::new()),
capabilities: RwLock::new(params.capabilities),
flow_params: params.flow_params,
handlers: Vec::new(),
req_id: AtomicUsize::new(0),
}
}
/// Check the maximum amount of requests of a specific type
/// which a peer would be able to serve.
pub fn max_requests(&self, peer: PeerId, kind: request::Kind) -> Option<usize> {
self.peers.read().get(&peer).map(|peer| {
let mut peer = peer.lock();
peer.recharge_remote();
peer.remote_flow.max_amount(&peer.remote_buffer, kind)
})
}
/// Make a request to a peer.
///
/// Fails on: nonexistent peer, network error,
/// insufficient buffer. Does not check capabilities before sending.
/// On success, returns a request id which can later be coordinated
/// with an event.
pub fn request_from(&self, io: &NetworkContext, peer_id: &PeerId, request: Request) -> Result<ReqId, Error> {
let peers = self.peers.read();
let peer = try!(peers.get(peer_id).ok_or_else(|| Error::UnknownPeer));
let mut peer = peer.lock();
peer.recharge_remote();
let max = peer.remote_flow.compute_cost(request.kind(), request.amount());
try!(peer.remote_buffer.deduct_cost(max));
let req_id = self.req_id.fetch_add(1, Ordering::SeqCst);
let packet_data = encode_request(&request, req_id);
let packet_id = match request.kind() {
request::Kind::Headers => packet::GET_BLOCK_HEADERS,
request::Kind::Bodies => packet::GET_BLOCK_BODIES,
request::Kind::Receipts => packet::GET_RECEIPTS,
request::Kind::StateProofs => packet::GET_PROOFS,
request::Kind::Codes => packet::GET_CONTRACT_CODES,
request::Kind::HeaderProofs => packet::GET_HEADER_PROOFS,
};
try!(io.send(*peer_id, packet_id, packet_data));
peer.current_asking.insert(req_id);
self.pending_requests.write().insert(req_id, Requested {
request: request,
timestamp: SteadyTime::now(),
});
Ok(ReqId(req_id))
}
/// Make an announcement of new chain head and capabilities to all peers.
/// The announcement is expected to be valid.
pub fn make_announcement(&self, mut announcement: Announcement, io: &NetworkContext) {
pub fn make_announcement(&self, io: &NetworkContext, mut announcement: Announcement) {
let mut reorgs_map = HashMap::new();
// update stored capabilities
self.capabilities.write().update_from(&announcement);
// calculate reorg info and send packets
for (peer_id, peer_info) in self.peers.write().iter_mut() {
for (peer_id, peer_info) in self.peers.read().iter() {
let mut peer_info = peer_info.lock();
let reorg_depth = reorgs_map.entry(peer_info.sent_head)
.or_insert_with(|| {
match self.provider.reorg_depth(&announcement.head_hash, &peer_info.sent_head) {
@@ -151,6 +286,14 @@ impl LightProtocol {
}
}
}
/// Add an event handler.
/// Ownership will be transferred to the protocol structure,
/// and the handler will be kept alive as long as it is.
/// These are intended to be added at the beginning of the
pub fn add_handler(&mut self, handler: Box<Handler>) {
self.handlers.push(handler);
}
}
impl LightProtocol {
@@ -173,7 +316,11 @@ impl LightProtocol {
fn on_disconnect(&self, peer: PeerId) {
// TODO: reassign all requests assigned to this peer.
self.pending_peers.write().remove(&peer);
self.peers.write().remove(&peer);
if self.peers.write().remove(&peer).is_some() {
for handler in &self.handlers {
handler.on_disconnect(peer)
}
}
}
// send status to a peer.
@@ -219,15 +366,19 @@ impl LightProtocol {
return Err(Error::WrongNetwork);
}
self.peers.write().insert(*peer, Peer {
self.peers.write().insert(*peer, Mutex::new(Peer {
local_buffer: self.flow_params.create_buffer(),
remote_buffer: flow_params.create_buffer(),
current_asking: HashSet::new(),
status: status,
capabilities: capabilities,
status: status.clone(),
capabilities: capabilities.clone(),
remote_flow: flow_params,
sent_head: pending.sent_head,
});
}));
for handler in &self.handlers {
handler.on_connect(*peer, &status, &capabilities)
}
Ok(())
}
@@ -240,13 +391,15 @@ impl LightProtocol {
}
let announcement = try!(status::parse_announcement(data));
let mut peers = self.peers.write();
let peers = self.peers.read();
let peer_info = match peers.get_mut(peer) {
let peer_info = match peers.get(peer) {
Some(info) => info,
None => return Ok(()),
};
let mut peer_info = peer_info.lock();
// update status.
{
// TODO: punish peer if they've moved backwards.
@@ -259,15 +412,11 @@ impl LightProtocol {
}
// update capabilities.
{
let caps = &mut peer_info.capabilities;
caps.serve_headers = caps.serve_headers || announcement.serve_headers;
caps.serve_state_since = caps.serve_state_since.or(announcement.serve_state_since);
caps.serve_chain_since = caps.serve_chain_since.or(announcement.serve_chain_since);
caps.tx_relay = caps.tx_relay || announcement.tx_relay;
}
peer_info.capabilities.update_from(&announcement);
// TODO: notify listeners if new best block.
for handler in &self.handlers {
handler.on_announcement(*peer, &announcement);
}
Ok(())
}
@@ -276,45 +425,39 @@ impl LightProtocol {
fn get_block_headers(&self, peer: &PeerId, io: &NetworkContext, data: UntrustedRlp) -> Result<(), Error> {
const MAX_HEADERS: usize = 512;
let mut present_buffer = match self.peers.read().get(peer) {
Some(peer) => peer.local_buffer.clone(),
let peers = self.peers.read();
let peer = match peers.get(peer) {
Some(peer) => peer,
None => {
debug!(target: "les", "Ignoring announcement from unknown peer");
debug!(target: "les", "Ignoring request from unknown peer");
return Ok(())
}
};
self.flow_params.recharge(&mut present_buffer);
let mut peer = peer.lock();
let req_id: u64 = try!(data.val_at(0));
let block = {
let rlp = try!(data.at(1));
(try!(rlp.val_at(0)), try!(rlp.val_at(1)))
};
let req = request::Headers {
block: {
let rlp = try!(data.at(1));
(try!(rlp.val_at(0)), try!(rlp.val_at(1)))
},
block_num: block.0,
block_hash: block.1,
max: ::std::cmp::min(MAX_HEADERS, try!(data.val_at(2))),
skip: try!(data.val_at(3)),
reverse: try!(data.val_at(4)),
};
let max_cost = self.flow_params.compute_cost(request::Kind::Headers, req.max);
try!(present_buffer.deduct_cost(max_cost));
let max_cost = try!(peer.deduct_max(&self.flow_params, request::Kind::Headers, req.max));
let response = self.provider.block_headers(req);
let actual_cost = self.flow_params.compute_cost(request::Kind::Headers, response.len());
assert!(max_cost >= actual_cost, "Actual cost exceeded maximum computed cost.");
let cur_buffer = match self.peers.write().get_mut(peer) {
Some(peer) => {
self.flow_params.recharge(&mut peer.local_buffer);
try!(peer.local_buffer.deduct_cost(actual_cost));
peer.local_buffer.current()
}
None => {
debug!(target: "les", "peer disconnected during serving of request.");
return Ok(())
}
};
let cur_buffer = peer.refund(&self.flow_params, max_cost - actual_cost);
io.respond(packet::BLOCK_HEADERS, {
let mut stream = RlpStream::new_list(response.len() + 2);
stream.append(&req_id).append(&cur_buffer);
@@ -336,39 +479,30 @@ impl LightProtocol {
fn get_block_bodies(&self, peer: &PeerId, io: &NetworkContext, data: UntrustedRlp) -> Result<(), Error> {
const MAX_BODIES: usize = 256;
let mut present_buffer = match self.peers.read().get(peer) {
Some(peer) => peer.local_buffer.clone(),
let peers = self.peers.read();
let peer = match peers.get(peer) {
Some(peer) => peer,
None => {
debug!(target: "les", "Ignoring announcement from unknown peer");
debug!(target: "les", "Ignoring request from unknown peer");
return Ok(())
}
};
let mut peer = peer.lock();
self.flow_params.recharge(&mut present_buffer);
let req_id: u64 = try!(data.val_at(0));
let req = request::Bodies {
block_hashes: try!(data.iter().skip(1).take(MAX_BODIES).map(|x| x.as_val()).collect())
};
let max_cost = self.flow_params.compute_cost(request::Kind::Bodies, req.block_hashes.len());
try!(present_buffer.deduct_cost(max_cost));
let max_cost = try!(peer.deduct_max(&self.flow_params, request::Kind::Bodies, req.block_hashes.len()));
let response = self.provider.block_bodies(req);
let response_len = response.iter().filter(|x| &x[..] != &::rlp::EMPTY_LIST_RLP).count();
let actual_cost = self.flow_params.compute_cost(request::Kind::Bodies, response_len);
assert!(max_cost >= actual_cost, "Actual cost exceeded maximum computed cost.");
let cur_buffer = match self.peers.write().get_mut(peer) {
Some(peer) => {
self.flow_params.recharge(&mut peer.local_buffer);
try!(peer.local_buffer.deduct_cost(actual_cost));
peer.local_buffer.current()
}
None => {
debug!(target: "les", "peer disconnected during serving of request.");
return Ok(())
}
};
let cur_buffer = peer.refund(&self.flow_params, max_cost - actual_cost);
io.respond(packet::BLOCK_BODIES, {
let mut stream = RlpStream::new_list(response.len() + 2);
@@ -388,8 +522,44 @@ impl LightProtocol {
}
// Handle a request for receipts.
fn get_receipts(&self, _: &PeerId, _: &NetworkContext, _: UntrustedRlp) -> Result<(), Error> {
unimplemented!()
fn get_receipts(&self, peer: &PeerId, io: &NetworkContext, data: UntrustedRlp) -> Result<(), Error> {
const MAX_RECEIPTS: usize = 256;
let peers = self.peers.read();
let peer = match peers.get(peer) {
Some(peer) => peer,
None => {
debug!(target: "les", "Ignoring request from unknown peer");
return Ok(())
}
};
let mut peer = peer.lock();
let req_id: u64 = try!(data.val_at(0));
let req = request::Receipts {
block_hashes: try!(data.iter().skip(1).take(MAX_RECEIPTS).map(|x| x.as_val()).collect())
};
let max_cost = try!(peer.deduct_max(&self.flow_params, request::Kind::Receipts, req.block_hashes.len()));
let response = self.provider.receipts(req);
let response_len = response.iter().filter(|x| &x[..] != &::rlp::EMPTY_LIST_RLP).count();
let actual_cost = self.flow_params.compute_cost(request::Kind::Receipts, response_len);
assert!(max_cost >= actual_cost, "Actual cost exceeded maximum computed cost.");
let cur_buffer = peer.refund(&self.flow_params, max_cost - actual_cost);
io.respond(packet::RECEIPTS, {
let mut stream = RlpStream::new_list(response.len() + 2);
stream.append(&req_id).append(&cur_buffer);
for receipts in response {
stream.append_raw(&receipts, 1);
}
stream.out()
}).map_err(Into::into)
}
// Receive a response for receipts.
@@ -398,8 +568,55 @@ impl LightProtocol {
}
// Handle a request for proofs.
fn get_proofs(&self, _: &PeerId, _: &NetworkContext, _: UntrustedRlp) -> Result<(), Error> {
unimplemented!()
fn get_proofs(&self, peer: &PeerId, io: &NetworkContext, data: UntrustedRlp) -> Result<(), Error> {
const MAX_PROOFS: usize = 128;
let peers = self.peers.read();
let peer = match peers.get(peer) {
Some(peer) => peer,
None => {
debug!(target: "les", "Ignoring request from unknown peer");
return Ok(())
}
};
let mut peer = peer.lock();
let req_id: u64 = try!(data.val_at(0));
let req = {
let requests: Result<Vec<_>, Error> = data.iter().skip(1).take(MAX_PROOFS).map(|x| {
Ok(request::StateProof {
block: try!(x.val_at(0)),
key1: try!(x.val_at(1)),
key2: if try!(x.at(2)).is_empty() { None } else { Some(try!(x.val_at(2))) },
from_level: try!(x.val_at(3)),
})
}).collect();
request::StateProofs {
requests: try!(requests),
}
};
let max_cost = try!(peer.deduct_max(&self.flow_params, request::Kind::StateProofs, req.requests.len()));
let response = self.provider.proofs(req);
let response_len = response.iter().filter(|x| &x[..] != &::rlp::EMPTY_LIST_RLP).count();
let actual_cost = self.flow_params.compute_cost(request::Kind::StateProofs, response_len);
assert!(max_cost >= actual_cost, "Actual cost exceeded maximum computed cost.");
let cur_buffer = peer.refund(&self.flow_params, max_cost - actual_cost);
io.respond(packet::PROOFS, {
let mut stream = RlpStream::new_list(response.len() + 2);
stream.append(&req_id).append(&cur_buffer);
for proof in response {
stream.append_raw(&proof, 1);
}
stream.out()
}).map_err(Into::into)
}
// Receive a response for proofs.
@@ -408,8 +625,53 @@ impl LightProtocol {
}
// Handle a request for contract code.
fn get_contract_code(&self, _: &PeerId, _: &NetworkContext, _: UntrustedRlp) -> Result<(), Error> {
unimplemented!()
fn get_contract_code(&self, peer: &PeerId, io: &NetworkContext, data: UntrustedRlp) -> Result<(), Error> {
const MAX_CODES: usize = 256;
let peers = self.peers.read();
let peer = match peers.get(peer) {
Some(peer) => peer,
None => {
debug!(target: "les", "Ignoring request from unknown peer");
return Ok(())
}
};
let mut peer = peer.lock();
let req_id: u64 = try!(data.val_at(0));
let req = {
let requests: Result<Vec<_>, Error> = data.iter().skip(1).take(MAX_CODES).map(|x| {
Ok(request::ContractCode {
block_hash: try!(x.val_at(0)),
account_key: try!(x.val_at(1)),
})
}).collect();
request::ContractCodes {
code_requests: try!(requests),
}
};
let max_cost = try!(peer.deduct_max(&self.flow_params, request::Kind::Codes, req.code_requests.len()));
let response = self.provider.contract_code(req);
let response_len = response.iter().filter(|x| !x.is_empty()).count();
let actual_cost = self.flow_params.compute_cost(request::Kind::Codes, response_len);
assert!(max_cost >= actual_cost, "Actual cost exceeded maximum computed cost.");
let cur_buffer = peer.refund(&self.flow_params, max_cost - actual_cost);
io.respond(packet::CONTRACT_CODES, {
let mut stream = RlpStream::new_list(response.len() + 2);
stream.append(&req_id).append(&cur_buffer);
for code in response {
stream.append_raw(&code, 1);
}
stream.out()
}).map_err(Into::into)
}
// Receive a response for contract code.
@@ -418,8 +680,54 @@ impl LightProtocol {
}
// Handle a request for header proofs
fn get_header_proofs(&self, _: &PeerId, _: &NetworkContext, _: UntrustedRlp) -> Result<(), Error> {
unimplemented!()
fn get_header_proofs(&self, peer: &PeerId, io: &NetworkContext, data: UntrustedRlp) -> Result<(), Error> {
const MAX_PROOFS: usize = 256;
let peers = self.peers.read();
let peer = match peers.get(peer) {
Some(peer) => peer,
None => {
debug!(target: "les", "Ignoring request from unknown peer");
return Ok(())
}
};
let mut peer = peer.lock();
let req_id: u64 = try!(data.val_at(0));
let req = {
let requests: Result<Vec<_>, Error> = data.iter().skip(1).take(MAX_PROOFS).map(|x| {
Ok(request::HeaderProof {
cht_number: try!(x.val_at(0)),
block_number: try!(x.val_at(1)),
from_level: try!(x.val_at(2)),
})
}).collect();
request::HeaderProofs {
requests: try!(requests),
}
};
let max_cost = try!(peer.deduct_max(&self.flow_params, request::Kind::HeaderProofs, req.requests.len()));
let response = self.provider.header_proofs(req);
let response_len = response.iter().filter(|x| &x[..] != ::rlp::EMPTY_LIST_RLP).count();
let actual_cost = self.flow_params.compute_cost(request::Kind::HeaderProofs, response_len);
assert!(max_cost >= actual_cost, "Actual cost exceeded maximum computed cost.");
let cur_buffer = peer.refund(&self.flow_params, max_cost - actual_cost);
io.respond(packet::HEADER_PROOFS, {
let mut stream = RlpStream::new_list(response.len() + 2);
stream.append(&req_id).append(&cur_buffer);
for proof in response {
stream.append_raw(&proof, 1);
}
stream.out()
}).map_err(Into::into)
}
// Receive a response for header proofs
@@ -428,8 +736,18 @@ impl LightProtocol {
}
// Receive a set of transactions to relay.
fn relay_transactions(&self, _: &PeerId, _: &NetworkContext, _: UntrustedRlp) -> Result<(), Error> {
unimplemented!()
fn relay_transactions(&self, peer: &PeerId, data: UntrustedRlp) -> Result<(), Error> {
const MAX_TRANSACTIONS: usize = 256;
let txs: Vec<_> = try!(data.iter().take(MAX_TRANSACTIONS).map(|x| x.as_val::<SignedTransaction>()).collect());
debug!(target: "les", "Received {} transactions to relay from peer {}", txs.len(), peer);
for handler in &self.handlers {
handler.on_transactions(*peer, &txs);
}
Ok(())
}
}
@@ -464,7 +782,7 @@ impl NetworkProtocolHandler for LightProtocol {
packet::GET_HEADER_PROOFS => self.get_header_proofs(peer, io, rlp),
packet::HEADER_PROOFS => self.header_proofs(peer, io, rlp),
packet::SEND_TRANSACTIONS => self.relay_transactions(peer, io, rlp),
packet::SEND_TRANSACTIONS => self.relay_transactions(peer, rlp),
other => {
Err(Error::UnrecognizedPacket(other))
@@ -503,4 +821,86 @@ impl NetworkProtocolHandler for LightProtocol {
_ => warn!(target: "les", "received timeout on unknown token {}", timer),
}
}
}
// Helper for encoding the request to RLP with the given ID.
fn encode_request(req: &Request, req_id: usize) -> Vec<u8> {
match *req {
Request::Headers(ref headers) => {
let mut stream = RlpStream::new_list(5);
stream
.append(&req_id)
.begin_list(2)
.append(&headers.block_num)
.append(&headers.block_hash)
.append(&headers.max)
.append(&headers.skip)
.append(&headers.reverse);
stream.out()
}
Request::Bodies(ref request) => {
let mut stream = RlpStream::new_list(request.block_hashes.len() + 1);
stream.append(&req_id);
for hash in &request.block_hashes {
stream.append(hash);
}
stream.out()
}
Request::Receipts(ref request) => {
let mut stream = RlpStream::new_list(request.block_hashes.len() + 1);
stream.append(&req_id);
for hash in &request.block_hashes {
stream.append(hash);
}
stream.out()
}
Request::StateProofs(ref request) => {
let mut stream = RlpStream::new_list(request.requests.len() + 1);
stream.append(&req_id);
for proof_req in &request.requests {
stream.begin_list(4)
.append(&proof_req.block)
.append(&proof_req.key1);
match proof_req.key2 {
Some(ref key2) => stream.append(key2),
None => stream.append_empty_data(),
};
stream.append(&proof_req.from_level);
}
stream.out()
}
Request::Codes(ref request) => {
let mut stream = RlpStream::new_list(request.code_requests.len() + 1);
stream.append(&req_id);
for code_req in &request.code_requests {
stream.begin_list(2)
.append(&code_req.block_hash)
.append(&code_req.account_key);
}
stream.out()
}
Request::HeaderProofs(ref request) => {
let mut stream = RlpStream::new_list(request.requests.len() + 1);
stream.append(&req_id);
for proof_req in &request.requests {
stream.begin_list(3)
.append(&proof_req.cht_number)
.append(&proof_req.block_number)
.append(&proof_req.from_level);
}
stream.out()
}
}
}

View File

@@ -183,8 +183,10 @@ pub struct Capabilities {
/// Whether this peer can serve headers
pub serve_headers: bool,
/// Earliest block number it can serve block/receipt requests for.
/// `None` means no requests will be servable.
pub serve_chain_since: Option<u64>,
/// Earliest block number it can serve state requests for.
/// `None` means no requests will be servable.
pub serve_state_since: Option<u64>,
/// Whether it can relay transactions to the eth network.
pub tx_relay: bool,
@@ -201,6 +203,16 @@ impl Default for Capabilities {
}
}
impl Capabilities {
/// Update the capabilities from an announcement.
pub fn update_from(&mut self, announcement: &Announcement) {
self.serve_headers = self.serve_headers || announcement.serve_headers;
self.serve_state_since = self.serve_state_since.or(announcement.serve_state_since);
self.serve_chain_since = self.serve_chain_since.or(announcement.serve_chain_since);
self.tx_relay = self.tx_relay || announcement.tx_relay;
}
}
/// Attempt to parse a handshake message into its three parts:
/// - chain status
/// - serving capabilities

View File

@@ -17,8 +17,11 @@
//! A provider for the LES protocol. This is typically a full node, who can
//! give as much data as necessary to its peers.
use ethcore::transaction::SignedTransaction;
use ethcore::blockchain_info::BlockChainInfo;
use ethcore::client::{BlockChainClient, ProvingBlockChainClient};
use ethcore::transaction::SignedTransaction;
use ethcore::ids::BlockID;
use util::{Bytes, H256};
use request;
@@ -26,7 +29,8 @@ use request;
/// Defines the operations that a provider for `LES` must fulfill.
///
/// These are defined at [1], but may be subject to change.
/// Requests which can't be fulfilled should return an empty RLP list.
/// Requests which can't be fulfilled should return either an empty RLP list
/// or empty vector where appropriate.
///
/// [1]: https://github.com/ethcore/parity/wiki/Light-Ethereum-Subprotocol-(LES)
pub trait Provider: Send + Sync {
@@ -34,9 +38,12 @@ pub trait Provider: Send + Sync {
fn chain_info(&self) -> BlockChainInfo;
/// Find the depth of a common ancestor between two blocks.
/// If either block is unknown or an ancestor can't be found
/// then return `None`.
fn reorg_depth(&self, a: &H256, b: &H256) -> Option<u64>;
/// Earliest state.
/// Earliest block where state queries are available.
/// If `None`, no state queries are servable.
fn earliest_state(&self) -> Option<u64>;
/// Provide a list of headers starting at the requested block,
@@ -57,15 +64,105 @@ pub trait Provider: Send + Sync {
/// Provide a set of merkle proofs, as requested. Each request is a
/// block hash and request parameters.
///
/// Returns a vector to RLP-encoded lists satisfying the requests.
/// Returns a vector of RLP-encoded lists satisfying the requests.
fn proofs(&self, req: request::StateProofs) -> Vec<Bytes>;
/// Provide contract code for the specified (block_hash, account_hash) pairs.
fn code(&self, req: request::ContractCodes) -> Vec<Bytes>;
/// Each item in the resulting vector is either the raw bytecode or empty.
fn contract_code(&self, req: request::ContractCodes) -> Vec<Bytes>;
/// Provide header proofs from the Canonical Hash Tries.
fn header_proofs(&self, req: request::HeaderProofs) -> Vec<Bytes>;
/// Provide pending transactions.
fn pending_transactions(&self) -> Vec<SignedTransaction>;
}
// Implementation of a light client data provider for a client.
impl<T: ProvingBlockChainClient + ?Sized> Provider for T {
fn chain_info(&self) -> BlockChainInfo {
BlockChainClient::chain_info(self)
}
fn reorg_depth(&self, a: &H256, b: &H256) -> Option<u64> {
self.tree_route(a, b).map(|route| route.index as u64)
}
fn earliest_state(&self) -> Option<u64> {
Some(self.pruning_info().earliest_state)
}
fn block_headers(&self, req: request::Headers) -> Vec<Bytes> {
let best_num = self.chain_info().best_block_number;
let start_num = req.block_num;
match self.block_hash(BlockID::Number(req.block_num)) {
Some(hash) if hash == req.block_hash => {}
_=> {
trace!(target: "les_provider", "unknown/non-canonical start block in header request: {:?}", (req.block_num, req.block_hash));
return vec![]
}
}
(0u64..req.max as u64)
.map(|x: u64| x.saturating_mul(req.skip))
.take_while(|x| if req.reverse { x < &start_num } else { best_num - start_num < *x })
.map(|x| if req.reverse { start_num - x } else { start_num + x })
.map(|x| self.block_header(BlockID::Number(x)))
.take_while(|x| x.is_some())
.flat_map(|x| x)
.collect()
}
fn block_bodies(&self, req: request::Bodies) -> Vec<Bytes> {
req.block_hashes.into_iter()
.map(|hash| self.block_body(BlockID::Hash(hash)))
.map(|body| body.unwrap_or_else(|| ::rlp::EMPTY_LIST_RLP.to_vec()))
.collect()
}
fn receipts(&self, req: request::Receipts) -> Vec<Bytes> {
req.block_hashes.into_iter()
.map(|hash| self.block_receipts(&hash))
.map(|receipts| receipts.unwrap_or_else(|| ::rlp::EMPTY_LIST_RLP.to_vec()))
.collect()
}
fn proofs(&self, req: request::StateProofs) -> Vec<Bytes> {
use rlp::{RlpStream, Stream};
let mut results = Vec::with_capacity(req.requests.len());
for request in req.requests {
let proof = match request.key2 {
Some(key2) => self.prove_storage(request.key1, key2, request.from_level, BlockID::Hash(request.block)),
None => self.prove_account(request.key1, request.from_level, BlockID::Hash(request.block)),
};
let mut stream = RlpStream::new_list(proof.len());
for node in proof {
stream.append_raw(&node, 1);
}
results.push(stream.out());
}
results
}
fn contract_code(&self, req: request::ContractCodes) -> Vec<Bytes> {
req.code_requests.into_iter()
.map(|req| {
self.code_by_hash(req.account_key, BlockID::Hash(req.block_hash))
})
.collect()
}
fn header_proofs(&self, req: request::HeaderProofs) -> Vec<Bytes> {
req.requests.into_iter().map(|_| ::rlp::EMPTY_LIST_RLP.to_vec()).collect()
}
fn pending_transactions(&self) -> Vec<SignedTransaction> {
BlockChainClient::pending_transactions(self)
}
}

View File

@@ -16,25 +16,26 @@
//! LES request types.
// TODO: make IPC compatible.
use util::H256;
/// A request for block headers.
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq, Binary)]
pub struct Headers {
/// Block information for the request being made.
pub block: (u64, H256),
/// Starting block number
pub block_num: u64,
/// Starting block hash. This and number could be combined but IPC codegen is
/// not robust enough to support it.
pub block_hash: H256,
/// The maximum amount of headers which can be returned.
pub max: usize,
/// The amount of headers to skip between each response entry.
pub skip: usize,
pub skip: u64,
/// Whether the headers should proceed in falling number from the initial block.
pub reverse: bool,
}
/// A request for specific block bodies.
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq, Binary)]
pub struct Bodies {
/// Hashes which bodies are being requested for.
pub block_hashes: Vec<H256>
@@ -44,14 +45,14 @@ pub struct Bodies {
///
/// This request is answered with a list of transaction receipts for each block
/// requested.
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq, Binary)]
pub struct Receipts {
/// Block hashes to return receipts for.
pub block_hashes: Vec<H256>,
}
/// A request for a state proof
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq, Binary)]
pub struct StateProof {
/// Block hash to query state from.
pub block: H256,
@@ -65,21 +66,30 @@ pub struct StateProof {
}
/// A request for state proofs.
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq, Binary)]
pub struct StateProofs {
/// All the proof requests.
pub requests: Vec<StateProof>,
}
/// A request for contract code.
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq, Binary)]
pub struct ContractCode {
/// Block hash
pub block_hash: H256,
/// Account key (== sha3(address))
pub account_key: H256,
}
/// A request for contract code.
#[derive(Debug, Clone, PartialEq, Eq, Binary)]
pub struct ContractCodes {
/// Block hash and account key (== sha3(address)) pairs to fetch code for.
pub code_requests: Vec<(H256, H256)>,
pub code_requests: Vec<ContractCode>,
}
/// A request for a header proof from the Canonical Hash Trie.
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq, Binary)]
pub struct HeaderProof {
/// Number of the CHT.
pub cht_number: u64,
@@ -90,14 +100,14 @@ pub struct HeaderProof {
}
/// A request for header proofs from the CHT.
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq, Binary)]
pub struct HeaderProofs {
/// All the proof requests.
pub requests: Vec<HeaderProofs>,
pub requests: Vec<HeaderProof>,
}
/// Kinds of requests.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Binary)]
pub enum Kind {
/// Requesting headers.
Headers,
@@ -114,7 +124,7 @@ pub enum Kind {
}
/// Encompasses all possible types of requests in a single structure.
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq, Binary)]
pub enum Request {
/// Requesting headers.
Headers(Headers),
@@ -142,4 +152,16 @@ impl Request {
Request::HeaderProofs(_) => Kind::HeaderProofs,
}
}
/// Get the amount of requests being made.
pub fn amount(&self) -> usize {
match *self {
Request::Headers(ref req) => req.max,
Request::Bodies(ref req) => req.block_hashes.len(),
Request::Receipts(ref req) => req.block_hashes.len(),
Request::StateProofs(ref req) => req.requests.len(),
Request::Codes(ref req) => req.code_requests.len(),
Request::HeaderProofs(ref req) => req.requests.len(),
}
}
}

View File

@@ -0,0 +1,20 @@
// Copyright 2015, 2016 Ethcore (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! Types used in the public (IPC) api which require custom code generation.
#![allow(dead_code, unused_assignments, unused_variables)] // codegen issues
include!(concat!(env!("OUT_DIR"), "/mod.rs.in"));

View File

@@ -0,0 +1,17 @@
// Copyright 2015, 2016 Ethcore (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
pub mod les_request;

View File

@@ -21,8 +21,7 @@
"genesis": {
"seal": {
"generic": {
"fields": 2,
"rlp": "0x200"
"rlp": "0xc28080"
}
},
"difficulty": "0x20000",

View File

@@ -12,7 +12,6 @@
"genesis": {
"seal": {
"generic": {
"fields": 0,
"rlp": "0x0"
}
},

View File

@@ -16,9 +16,12 @@
//! Account management.
use std::{fs, fmt};
mod stores;
use self::stores::{AddressBook, DappsSettingsStore};
use std::fmt;
use std::collections::HashMap;
use std::path::PathBuf;
use std::time::{Instant, Duration};
use util::{Mutex, RwLock, Itertools};
use ethstore::{SimpleSecretStore, SecretStore, Error as SSError, SafeAccount, EthStore, EthMultiStore, random_string};
@@ -106,77 +109,8 @@ impl KeyDirectory for NullDir {
}
}
/// Disk-backed map from Address to String. Uses JSON.
struct AddressBook {
path: PathBuf,
cache: HashMap<Address, AccountMeta>,
transient: bool,
}
impl AddressBook {
pub fn new(path: String) -> Self {
trace!(target: "addressbook", "new({})", path);
let mut path: PathBuf = path.into();
path.push("address_book.json");
trace!(target: "addressbook", "path={:?}", path);
let mut r = AddressBook {
path: path,
cache: HashMap::new(),
transient: false,
};
r.revert();
r
}
pub fn transient() -> Self {
let mut book = AddressBook::new(Default::default());
book.transient = true;
book
}
pub fn get(&self) -> HashMap<Address, AccountMeta> {
self.cache.clone()
}
pub fn set_name(&mut self, a: Address, name: String) {
let mut x = self.cache.get(&a)
.cloned()
.unwrap_or_else(|| AccountMeta {name: Default::default(), meta: "{}".to_owned(), uuid: None});
x.name = name;
self.cache.insert(a, x);
self.save();
}
pub fn set_meta(&mut self, a: Address, meta: String) {
let mut x = self.cache.get(&a)
.cloned()
.unwrap_or_else(|| AccountMeta {name: "Anonymous".to_owned(), meta: Default::default(), uuid: None});
x.meta = meta;
self.cache.insert(a, x);
self.save();
}
fn revert(&mut self) {
if self.transient { return; }
trace!(target: "addressbook", "revert");
let _ = fs::File::open(self.path.clone())
.map_err(|e| trace!(target: "addressbook", "Couldn't open address book: {}", e))
.and_then(|f| AccountMeta::read_address_map(&f)
.map_err(|e| warn!(target: "addressbook", "Couldn't read address book: {}", e))
.and_then(|m| { self.cache = m; Ok(()) })
);
}
fn save(&mut self) {
if self.transient { return; }
trace!(target: "addressbook", "save");
let _ = fs::File::create(self.path.clone())
.map_err(|e| warn!(target: "addressbook", "Couldn't open address book for writing: {}", e))
.and_then(|mut f| AccountMeta::write_address_map(&self.cache, &mut f)
.map_err(|e| warn!(target: "addressbook", "Couldn't write to address book: {}", e))
);
}
}
/// Dapp identifier
pub type DappId = String;
fn transient_sstore() -> EthMultiStore {
EthMultiStore::open(Box::new(NullDir::default())).expect("NullDir load always succeeds; qed")
@@ -189,6 +123,7 @@ type AccountToken = String;
pub struct AccountProvider {
address_book: Mutex<AddressBook>,
unlocked: Mutex<HashMap<Address, AccountData>>,
dapps_settings: RwLock<DappsSettingsStore>,
/// Accounts on disk
sstore: Box<SecretStore>,
/// Accounts unlocked with rolling tokens
@@ -200,7 +135,8 @@ impl AccountProvider {
pub fn new(sstore: Box<SecretStore>) -> Self {
AccountProvider {
unlocked: Mutex::new(HashMap::new()),
address_book: Mutex::new(AddressBook::new(sstore.local_path().into())),
address_book: RwLock::new(AddressBook::new(sstore.local_path().into())),
dapps_settings: RwLock::new(DappsSettingsStore::new(sstore.local_path().into())),
sstore: sstore,
transient_sstore: transient_sstore(),
}
@@ -209,8 +145,9 @@ impl AccountProvider {
/// Creates not disk backed provider.
pub fn transient_provider() -> Self {
AccountProvider {
unlocked: Mutex::new(HashMap::new()),
address_book: Mutex::new(AddressBook::transient()),
unlocked: Mutex::new(HashMap::new()),
dapps_settings: RwLock::new(DappsSettingsStore::transient()),
sstore: Box::new(EthStore::open(Box::new(NullDir::default())).expect("NullDir load always succeeds; qed")),
transient_sstore: transient_sstore(),
}
@@ -255,19 +192,36 @@ impl AccountProvider {
Ok(accounts)
}
/// Gets addresses visile for dapp.
pub fn dapps_addresses(&self, dapp: DappId) -> Result<Vec<Address>, Error> {
let accounts = self.dapps_settings.read().get();
Ok(accounts.get(&dapp).map(|settings| settings.accounts.clone()).unwrap_or_else(Vec::new))
}
/// Sets addresses visile for dapp.
pub fn set_dapps_addresses(&self, dapp: DappId, addresses: Vec<Address>) -> Result<(), Error> {
self.dapps_settings.write().set_accounts(dapp, addresses);
Ok(())
}
/// Returns each address along with metadata.
pub fn addresses_info(&self) -> Result<HashMap<Address, AccountMeta>, Error> {
Ok(self.address_book.lock().get())
Ok(self.address_book.read().get())
}
/// Returns each address along with metadata.
pub fn set_address_name(&self, account: Address, name: String) -> Result<(), Error> {
Ok(self.address_book.lock().set_name(account, name))
Ok(self.address_book.write().set_name(account, name))
}
/// Returns each address along with metadata.
pub fn set_address_meta(&self, account: Address, meta: String) -> Result<(), Error> {
Ok(self.address_book.lock().set_meta(account, meta))
Ok(self.address_book.write().set_meta(account, meta))
}
/// Removes and address from the addressbook
pub fn remove_address(&self, addr: Address) -> Result<(), Error> {
Ok(self.address_book.write().remove(addr))
}
/// Returns each account along with name and meta.
@@ -443,23 +397,9 @@ impl AccountProvider {
#[cfg(test)]
mod tests {
use super::{AccountProvider, AddressBook, Unlock};
use std::collections::HashMap;
use super::{AccountProvider, Unlock};
use std::time::Instant;
use ethjson::misc::AccountMeta;
use ethstore::ethkey::{Generator, Random};
use devtools::RandomTempPath;
#[test]
fn should_save_and_reload_address_book() {
let temp = RandomTempPath::create_dir();
let path = temp.as_str().to_owned();
let mut b = AddressBook::new(path.clone());
b.set_name(1.into(), "One".to_owned());
b.set_meta(1.into(), "{1:1}".to_owned());
let b = AddressBook::new(path);
assert_eq!(b.get(), hash_map![1.into() => AccountMeta{name: "One".to_owned(), meta: "{1:1}".to_owned(), uuid: None}]);
}
#[test]
fn unlock_account_temp() {
@@ -513,4 +453,16 @@ mod tests {
.expect("First usage of token should be correct.");
assert!(ap.sign_with_token(kp.address(), token, Default::default()).is_err(), "Second usage of the same token should fail.");
}
fn should_set_dapps_addresses() {
// given
let ap = AccountProvider::transient_provider();
let app = "app1".to_owned();
// when
ap.set_dapps_addresses(app.clone(), vec![1.into(), 2.into()]).unwrap();
// then
assert_eq!(ap.dapps_addresses(app.clone()).unwrap(), vec![1.into(), 2.into()]);
}
}

View File

@@ -0,0 +1,271 @@
// Copyright 2015, 2016 Ethcore (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! Address Book and Dapps Settings Store
use std::{fs, fmt, hash, ops};
use std::collections::HashMap;
use std::path::PathBuf;
use ethstore::ethkey::Address;
use ethjson::misc::{AccountMeta, DappsSettings as JsonSettings};
use account_provider::DappId;
/// Disk-backed map from Address to String. Uses JSON.
pub struct AddressBook {
cache: DiskMap<Address, AccountMeta>,
}
impl AddressBook {
/// Creates new address book at given directory.
pub fn new(path: String) -> Self {
let mut r = AddressBook {
cache: DiskMap::new(path, "address_book.json".into())
};
r.cache.revert(AccountMeta::read_address_map);
r
}
/// Creates transient address book (no changes are saved to disk).
pub fn transient() -> Self {
AddressBook {
cache: DiskMap::transient()
}
}
/// Get the address book.
pub fn get(&self) -> HashMap<Address, AccountMeta> {
self.cache.clone()
}
fn save(&self) {
self.cache.save(AccountMeta::write_address_map)
}
/// Sets new name for given address.
pub fn set_name(&mut self, a: Address, name: String) {
{
let mut x = self.cache.entry(a)
.or_insert_with(|| AccountMeta {name: Default::default(), meta: "{}".to_owned(), uuid: None});
x.name = name;
}
self.save();
}
/// Sets new meta for given address.
pub fn set_meta(&mut self, a: Address, meta: String) {
{
let mut x = self.cache.entry(a)
.or_insert_with(|| AccountMeta {name: "Anonymous".to_owned(), meta: Default::default(), uuid: None});
x.meta = meta;
}
self.save();
}
/// Removes an entry
pub fn remove(&mut self, a: Address) {
self.cache.remove(&a);
self.save();
}
}
/// Dapps user settings
#[derive(Debug, Default, Clone, Eq, PartialEq)]
pub struct DappsSettings {
/// A list of visible accounts
pub accounts: Vec<Address>,
}
impl From<JsonSettings> for DappsSettings {
fn from(s: JsonSettings) -> Self {
DappsSettings {
accounts: s.accounts.into_iter().map(Into::into).collect(),
}
}
}
impl From<DappsSettings> for JsonSettings {
fn from(s: DappsSettings) -> Self {
JsonSettings {
accounts: s.accounts.into_iter().map(Into::into).collect(),
}
}
}
/// Disk-backed map from DappId to Settings. Uses JSON.
pub struct DappsSettingsStore {
cache: DiskMap<DappId, DappsSettings>,
}
impl DappsSettingsStore {
/// Creates new store at given directory path.
pub fn new(path: String) -> Self {
let mut r = DappsSettingsStore {
cache: DiskMap::new(path, "dapps_accounts.json".into())
};
r.cache.revert(JsonSettings::read_dapps_settings);
r
}
/// Creates transient store (no changes are saved to disk).
pub fn transient() -> Self {
DappsSettingsStore {
cache: DiskMap::transient()
}
}
/// Get copy of the dapps settings
pub fn get(&self) -> HashMap<DappId, DappsSettings> {
self.cache.clone()
}
fn save(&self) {
self.cache.save(JsonSettings::write_dapps_settings)
}
pub fn set_accounts(&mut self, id: DappId, accounts: Vec<Address>) {
{
let mut settings = self.cache.entry(id).or_insert_with(DappsSettings::default);
settings.accounts = accounts;
}
self.save();
}
}
/// Disk-serializable HashMap
#[derive(Debug)]
struct DiskMap<K: hash::Hash + Eq, V> {
path: PathBuf,
cache: HashMap<K, V>,
transient: bool,
}
impl<K: hash::Hash + Eq, V> ops::Deref for DiskMap<K, V> {
type Target = HashMap<K, V>;
fn deref(&self) -> &Self::Target {
&self.cache
}
}
impl<K: hash::Hash + Eq, V> ops::DerefMut for DiskMap<K, V> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.cache
}
}
impl<K: hash::Hash + Eq, V> DiskMap<K, V> {
pub fn new(path: String, file_name: String) -> Self {
trace!(target: "diskmap", "new({})", path);
let mut path: PathBuf = path.into();
path.push(file_name);
trace!(target: "diskmap", "path={:?}", path);
DiskMap {
path: path,
cache: HashMap::new(),
transient: false,
}
}
pub fn transient() -> Self {
let mut map = DiskMap::new(Default::default(), "diskmap.json".into());
map.transient = true;
map
}
fn revert<F, E>(&mut self, read: F) where
F: Fn(fs::File) -> Result<HashMap<K, V>, E>,
E: fmt::Display,
{
if self.transient { return; }
trace!(target: "diskmap", "revert {:?}", self.path);
let _ = fs::File::open(self.path.clone())
.map_err(|e| trace!(target: "diskmap", "Couldn't open disk map: {}", e))
.and_then(|f| read(f).map_err(|e| warn!(target: "diskmap", "Couldn't read disk map: {}", e)))
.and_then(|m| {
self.cache = m;
Ok(())
});
}
fn save<F, E>(&self, write: F) where
F: Fn(&HashMap<K, V>, &mut fs::File) -> Result<(), E>,
E: fmt::Display,
{
if self.transient { return; }
trace!(target: "diskmap", "save {:?}", self.path);
let _ = fs::File::create(self.path.clone())
.map_err(|e| warn!(target: "diskmap", "Couldn't open disk map for writing: {}", e))
.and_then(|mut f| {
write(&self.cache, &mut f).map_err(|e| warn!(target: "diskmap", "Couldn't write to disk map: {}", e))
});
}
}
#[cfg(test)]
mod tests {
use super::{AddressBook, DappsSettingsStore, DappsSettings};
use std::collections::HashMap;
use ethjson::misc::AccountMeta;
use devtools::RandomTempPath;
#[test]
fn should_save_and_reload_address_book() {
let temp = RandomTempPath::create_dir();
let path = temp.as_str().to_owned();
let mut b = AddressBook::new(path.clone());
b.set_name(1.into(), "One".to_owned());
b.set_meta(1.into(), "{1:1}".to_owned());
let b = AddressBook::new(path);
assert_eq!(b.get(), hash_map![1.into() => AccountMeta{name: "One".to_owned(), meta: "{1:1}".to_owned(), uuid: None}]);
}
#[test]
fn should_save_and_reload_dapps_settings() {
// given
let temp = RandomTempPath::create_dir();
let path = temp.as_str().to_owned();
let mut b = DappsSettingsStore::new(path.clone());
// when
b.set_accounts("dappOne".into(), vec![1.into(), 2.into()]);
// then
let b = DappsSettingsStore::new(path);
assert_eq!(b.get(), hash_map![
"dappOne".into() => DappsSettings {
accounts: vec![1.into(), 2.into()],
}
]);
}
#[test]
fn should_remove_address() {
let temp = RandomTempPath::create_dir();
let path = temp.as_str().to_owned();
let mut b = AddressBook::new(path.clone());
b.set_name(1.into(), "One".to_owned());
b.set_name(2.into(), "Two".to_owned());
b.set_name(3.into(), "Three".to_owned());
b.remove(2.into());
let b = AddressBook::new(path);
assert_eq!(b.get(), hash_map![
1.into() => AccountMeta{name: "One".to_owned(), meta: "{}".to_owned(), uuid: None},
3.into() => AccountMeta{name: "Three".to_owned(), meta: "{}".to_owned(), uuid: None}
]);
}
}

View File

@@ -34,6 +34,7 @@ use blockchain::update::ExtrasUpdate;
use blockchain::{CacheSize, ImportRoute, Config};
use db::{self, Writable, Readable, CacheUpdatePolicy};
use cache_manager::CacheManager;
use engines::Engine;
const LOG_BLOOMS_LEVELS: usize = 3;
const LOG_BLOOMS_ELEMENTS_PER_INDEX: usize = 16;
@@ -198,6 +199,9 @@ pub struct BlockChain {
pending_block_hashes: RwLock<HashMap<BlockNumber, H256>>,
pending_block_details: RwLock<HashMap<H256, BlockDetails>>,
pending_transaction_addresses: RwLock<HashMap<H256, Option<TransactionAddress>>>,
// Used for block ordering.
engine: Arc<Engine>,
}
impl BlockProvider for BlockChain {
@@ -415,9 +419,8 @@ impl<'a> Iterator for AncestryIter<'a> {
}
impl BlockChain {
#[cfg_attr(feature="dev", allow(useless_let_if_seq))]
/// Create new instance of blockchain from given Genesis
pub fn new(config: Config, genesis: &[u8], db: Arc<Database>) -> BlockChain {
/// Create new instance of blockchain from given Genesis and block picking rules of Engine.
pub fn new(config: Config, genesis: &[u8], db: Arc<Database>, engine: Arc<Engine>) -> BlockChain {
// 400 is the avarage size of the key
let cache_man = CacheManager::new(config.pref_cache_size, config.max_cache_size, 400);
@@ -442,6 +445,7 @@ impl BlockChain {
pending_block_hashes: RwLock::new(HashMap::new()),
pending_block_details: RwLock::new(HashMap::new()),
pending_transaction_addresses: RwLock::new(HashMap::new()),
engine: engine,
};
// load best block
@@ -858,13 +862,12 @@ impl BlockChain {
let number = header.number();
let parent_hash = header.parent_hash();
let parent_details = self.block_details(&parent_hash).unwrap_or_else(|| panic!("Invalid parent hash: {:?}", parent_hash));
let total_difficulty = parent_details.total_difficulty + header.difficulty();
let is_new_best = total_difficulty > self.best_block_total_difficulty();
let is_new_best = self.engine.is_new_best_block(self.best_block_total_difficulty(), HeaderView::new(&self.best_block_header()), &parent_details, header);
BlockInfo {
hash: hash,
number: number,
total_difficulty: total_difficulty,
total_difficulty: parent_details.total_difficulty + header.difficulty(),
location: if is_new_best {
// on new best block we need to make sure that all ancestors
// are moved to "canon chain"
@@ -1319,11 +1322,16 @@ mod tests {
use views::BlockView;
use transaction::{Transaction, Action};
use log_entry::{LogEntry, LocalizedLogEntry};
use spec::Spec;
fn new_db(path: &str) -> Arc<Database> {
Arc::new(Database::open(&DatabaseConfig::with_columns(::db::NUM_COLUMNS), path).unwrap())
}
fn new_chain(genesis: &[u8], db: Arc<Database>) -> BlockChain {
BlockChain::new(Config::default(), genesis, db, Spec::new_null().engine)
}
#[test]
fn should_cache_best_block() {
// given
@@ -1334,7 +1342,7 @@ mod tests {
let temp = RandomTempPath::new();
let db = new_db(temp.as_str());
let bc = BlockChain::new(Config::default(), &genesis, db.clone());
let bc = new_chain(&genesis, db.clone());
assert_eq!(bc.best_block_number(), 0);
// when
@@ -1360,7 +1368,7 @@ mod tests {
let temp = RandomTempPath::new();
let db = new_db(temp.as_str());
let bc = BlockChain::new(Config::default(), &genesis, db.clone());
let bc = new_chain(&genesis, db.clone());
assert_eq!(bc.genesis_hash(), genesis_hash.clone());
assert_eq!(bc.best_block_hash(), genesis_hash.clone());
@@ -1391,7 +1399,7 @@ mod tests {
let temp = RandomTempPath::new();
let db = new_db(temp.as_str());
let bc = BlockChain::new(Config::default(), &genesis, db.clone());
let bc = new_chain(&genesis, db.clone());
let mut block_hashes = vec![genesis_hash.clone()];
let mut batch = db.transaction();
@@ -1427,7 +1435,7 @@ mod tests {
let temp = RandomTempPath::new();
let db = new_db(temp.as_str());
let bc = BlockChain::new(Config::default(), &genesis, db.clone());
let bc = new_chain(&genesis, db.clone());
let mut batch =db.transaction();
for b in &[&b1a, &b1b, &b2a, &b2b, &b3a, &b3b, &b4a, &b4b, &b5a, &b5b] {
@@ -1489,7 +1497,7 @@ mod tests {
let temp = RandomTempPath::new();
let db = new_db(temp.as_str());
let bc = BlockChain::new(Config::default(), &genesis, db.clone());
let bc = new_chain(&genesis, db.clone());
let mut batch = db.transaction();
let _ = bc.insert_block(&mut batch, &b1a, vec![]);
@@ -1577,7 +1585,7 @@ mod tests {
let temp = RandomTempPath::new();
let db = new_db(temp.as_str());
let bc = BlockChain::new(Config::default(), &genesis, db.clone());
let bc = new_chain(&genesis, db.clone());
let mut batch = db.transaction();
let _ = bc.insert_block(&mut batch, &b1a, vec![]);
@@ -1639,7 +1647,7 @@ mod tests {
let temp = RandomTempPath::new();
let db = new_db(temp.as_str());
let bc = BlockChain::new(Config::default(), &genesis, db.clone());
let bc = new_chain(&genesis, db.clone());
let mut batch = db.transaction();
let ir1 = bc.insert_block(&mut batch, &b1, vec![]);
@@ -1755,7 +1763,7 @@ mod tests {
let temp = RandomTempPath::new();
{
let db = new_db(temp.as_str());
let bc = BlockChain::new(Config::default(), &genesis, db.clone());
let bc = new_chain(&genesis, db.clone());
assert_eq!(bc.best_block_hash(), genesis_hash);
let mut batch =db.transaction();
bc.insert_block(&mut batch, &first, vec![]);
@@ -1766,7 +1774,7 @@ mod tests {
{
let db = new_db(temp.as_str());
let bc = BlockChain::new(Config::default(), &genesis, db.clone());
let bc = new_chain(&genesis, db.clone());
assert_eq!(bc.best_block_hash(), first_hash);
}
@@ -1821,7 +1829,7 @@ mod tests {
let temp = RandomTempPath::new();
let db = new_db(temp.as_str());
let bc = BlockChain::new(Config::default(), &genesis, db.clone());
let bc = new_chain(&genesis, db.clone());
let mut batch =db.transaction();
bc.insert_block(&mut batch, &b1, vec![]);
db.write(batch).unwrap();
@@ -1881,7 +1889,7 @@ mod tests {
let temp = RandomTempPath::new();
let db = new_db(temp.as_str());
let bc = BlockChain::new(Config::default(), &genesis, db.clone());
let bc = new_chain(&genesis, db.clone());
insert_block(&db, &bc, &b1, vec![Receipt {
state_root: H256::default(),
gas_used: 10_000.into(),
@@ -1985,7 +1993,7 @@ mod tests {
let temp = RandomTempPath::new();
let db = new_db(temp.as_str());
let bc = BlockChain::new(Config::default(), &genesis, db.clone());
let bc = new_chain(&genesis, db.clone());
let blocks_b1 = bc.blocks_with_bloom(&bloom_b1, 0, 5);
let blocks_b2 = bc.blocks_with_bloom(&bloom_b2, 0, 5);
@@ -2042,7 +2050,7 @@ mod tests {
{
let db = new_db(temp.as_str());
let bc = BlockChain::new(Config::default(), &genesis, db.clone());
let bc = new_chain(&genesis, db.clone());
let uncle = canon_chain.fork(1).generate(&mut finalizer.fork()).unwrap();
let mut batch =db.transaction();
@@ -2061,7 +2069,7 @@ mod tests {
// re-loading the blockchain should load the correct best block.
let db = new_db(temp.as_str());
let bc = BlockChain::new(Config::default(), &genesis, db.clone());
let bc = new_chain(&genesis, db.clone());
assert_eq!(bc.best_block_number(), 5);
}
@@ -2078,7 +2086,7 @@ mod tests {
let temp = RandomTempPath::new();
let db = new_db(temp.as_str());
let bc = BlockChain::new(Config::default(), &genesis, db.clone());
let bc = new_chain(&genesis, db.clone());
let mut batch =db.transaction();
bc.insert_block(&mut batch, &first, vec![]);

View File

@@ -52,7 +52,7 @@ use blockchain::{BlockChain, BlockProvider, TreeRoute, ImportRoute};
use client::{
BlockID, TransactionID, UncleID, TraceId, ClientConfig, BlockChainClient,
MiningBlockChainClient, TraceFilter, CallAnalytics, BlockImportError, Mode,
ChainNotify,
ChainNotify, PruningInfo, ProvingBlockChainClient,
};
use client::Error as ClientError;
use env_info::EnvInfo;
@@ -164,7 +164,7 @@ impl Client {
let gb = spec.genesis_block();
let db = Arc::new(try!(Database::open(&db_config, &path.to_str().expect("DB path could not be converted to string.")).map_err(ClientError::Database)));
let chain = Arc::new(BlockChain::new(config.blockchain.clone(), &gb, db.clone()));
let chain = Arc::new(BlockChain::new(config.blockchain.clone(), &gb, db.clone(), spec.engine.clone()));
let tracedb = RwLock::new(TraceDB::new(config.tracing.clone(), db.clone(), chain.clone()));
let trie_spec = match config.fat_db {
@@ -787,7 +787,7 @@ impl snapshot::DatabaseRestore for Client {
let cache_size = state_db.cache_size();
*state_db = StateDB::new(journaldb::new(db.clone(), self.pruning, ::db::COL_STATE), cache_size);
*chain = Arc::new(BlockChain::new(self.config.blockchain.clone(), &[], db.clone()));
*chain = Arc::new(BlockChain::new(self.config.blockchain.clone(), &[], db.clone(), self.engine.clone()));
*tracedb = TraceDB::new(self.config.tracing.clone(), db.clone(), chain.clone());
Ok(())
}
@@ -1272,7 +1272,7 @@ impl BlockChainClient for Client {
self.miner.pending_transactions(self.chain.read().best_block_number())
}
fn signing_network_id(&self) -> Option<u8> {
fn signing_network_id(&self) -> Option<u64> {
self.engine.signing_network_id(&self.latest_env_info())
}
@@ -1286,6 +1286,13 @@ impl BlockChainClient for Client {
self.uncle(id)
.map(|header| self.engine.extra_info(&decode(&header)))
}
fn pruning_info(&self) -> PruningInfo {
PruningInfo {
earliest_chain: self.chain.read().first_block_number().unwrap_or(1),
earliest_state: self.state_db.lock().journal_db().earliest_era().unwrap_or(0),
}
}
}
impl MiningBlockChainClient for Client {
@@ -1370,32 +1377,60 @@ impl MayPanic for Client {
}
}
impl ProvingBlockChainClient for Client {
fn prove_storage(&self, key1: H256, key2: H256, from_level: u32, id: BlockID) -> Vec<Bytes> {
self.state_at(id)
.and_then(move |state| state.prove_storage(key1, key2, from_level).ok())
.unwrap_or_else(Vec::new)
}
#[test]
fn should_not_cache_details_before_commit() {
use tests::helpers::*;
use std::thread;
use std::time::Duration;
use std::sync::atomic::{AtomicBool, Ordering};
fn prove_account(&self, key1: H256, from_level: u32, id: BlockID) -> Vec<Bytes> {
self.state_at(id)
.and_then(move |state| state.prove_account(key1, from_level).ok())
.unwrap_or_else(Vec::new)
}
let client = generate_dummy_client(0);
let genesis = client.chain_info().best_block_hash;
let (new_hash, new_block) = get_good_dummy_block_hash();
let go = {
// Separate thread uncommited transaction
let go = Arc::new(AtomicBool::new(false));
let go_thread = go.clone();
let another_client = client.reference().clone();
thread::spawn(move || {
let mut batch = DBTransaction::new(&*another_client.chain.read().db().clone());
another_client.chain.read().insert_block(&mut batch, &new_block, Vec::new());
go_thread.store(true, Ordering::SeqCst);
});
go
};
while !go.load(Ordering::SeqCst) { thread::park_timeout(Duration::from_millis(5)); }
assert!(client.tree_route(&genesis, &new_hash).is_none());
fn code_by_hash(&self, account_key: H256, id: BlockID) -> Bytes {
self.state_at(id)
.and_then(move |state| state.code_by_address_hash(account_key).ok())
.and_then(|x| x)
.unwrap_or_else(Vec::new)
}
}
#[cfg(test)]
mod tests {
#[test]
fn should_not_cache_details_before_commit() {
use client::BlockChainClient;
use tests::helpers::*;
use std::thread;
use std::time::Duration;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use util::kvdb::DBTransaction;
let client = generate_dummy_client(0);
let genesis = client.chain_info().best_block_hash;
let (new_hash, new_block) = get_good_dummy_block_hash();
let go = {
// Separate thread uncommited transaction
let go = Arc::new(AtomicBool::new(false));
let go_thread = go.clone();
let another_client = client.reference().clone();
thread::spawn(move || {
let mut batch = DBTransaction::new(&*another_client.chain.read().db().clone());
another_client.chain.read().insert_block(&mut batch, &new_block, Vec::new());
go_thread.store(true, Ordering::SeqCst);
});
go
};
while !go.load(Ordering::SeqCst) { thread::park_timeout(Duration::from_millis(5)); }
assert!(client.tree_route(&genesis, &new_hash).is_none());
}
}

View File

@@ -25,18 +25,21 @@ mod client;
pub use self::client::*;
pub use self::config::{Mode, ClientConfig, DatabaseCompactionProfile, BlockChainConfig, VMType};
pub use self::error::Error;
pub use types::ids::*;
pub use self::test_client::{TestBlockChainClient, EachBlockWith};
pub use self::chain_notify::ChainNotify;
pub use self::traits::{BlockChainClient, MiningBlockChainClient, ProvingBlockChainClient};
pub use types::ids::*;
pub use types::trace_filter::Filter as TraceFilter;
pub use types::pruning_info::PruningInfo;
pub use types::call_analytics::CallAnalytics;
pub use executive::{Executed, Executive, TransactOptions};
pub use env_info::{LastHashes, EnvInfo};
pub use self::chain_notify::ChainNotify;
pub use types::call_analytics::CallAnalytics;
pub use block_import_error::BlockImportError;
pub use transaction_import::TransactionImportResult;
pub use transaction_import::TransactionImportError;
pub use self::traits::{BlockChainClient, MiningBlockChainClient};
pub use verification::VerifierType;
/// IPC interfaces

View File

@@ -38,6 +38,7 @@ use evm::{Factory as EvmFactory, VMType, Schedule};
use miner::{Miner, MinerService, TransactionImportResult};
use spec::Spec;
use types::mode::Mode;
use types::pruning_info::PruningInfo;
use views::BlockView;
use verification::queue::QueueInfo;
@@ -662,9 +663,16 @@ impl BlockChainClient for TestBlockChainClient {
self.miner.pending_transactions(self.chain_info().best_block_number)
}
fn signing_network_id(&self) -> Option<u8> { None }
fn signing_network_id(&self) -> Option<u64> { None }
fn mode(&self) -> Mode { Mode::Active }
fn set_mode(&self, _: Mode) { unimplemented!(); }
fn pruning_info(&self) -> PruningInfo {
PruningInfo {
earliest_chain: 1,
earliest_state: 1,
}
}
}

View File

@@ -39,6 +39,7 @@ use types::call_analytics::CallAnalytics;
use types::blockchain_info::BlockChainInfo;
use types::block_status::BlockStatus;
use types::mode::Mode;
use types::pruning_info::PruningInfo;
#[ipc(client_ident="RemoteClient")]
/// Blockchain database client. Owns and manages a blockchain and a block queue.
@@ -237,7 +238,7 @@ pub trait BlockChainClient : Sync + Send {
}
/// Get the preferred network ID to sign on
fn signing_network_id(&self) -> Option<u8>;
fn signing_network_id(&self) -> Option<u64>;
/// Get the mode.
fn mode(&self) -> Mode;
@@ -250,10 +251,15 @@ pub trait BlockChainClient : Sync + Send {
/// Returns engine-related extra info for `UncleID`.
fn uncle_extra_info(&self, id: UncleID) -> Option<BTreeMap<String, String>>;
/// Returns information about pruning/data availability.
fn pruning_info(&self) -> PruningInfo;
}
impl IpcConfig for BlockChainClient { }
/// Extended client interface used for mining
pub trait MiningBlockChainClient : BlockChainClient {
pub trait MiningBlockChainClient: BlockChainClient {
/// Returns OpenBlock prepared for closing.
fn prepare_open_block(&self,
author: Address,
@@ -271,4 +277,23 @@ pub trait MiningBlockChainClient : BlockChainClient {
fn latest_schedule(&self) -> Schedule;
}
impl IpcConfig for BlockChainClient { }
/// Extended client interface for providing proofs of the state.
pub trait ProvingBlockChainClient: BlockChainClient {
/// Prove account storage at a specific block id.
///
/// Both provided keys assume a secure trie.
/// Returns a vector of raw trie nodes (in order from the root) proving the storage query.
/// Nodes after `from_level` may be omitted.
/// An empty vector indicates unservable query.
fn prove_storage(&self, key1: H256, key2: H256, from_level: u32, id: BlockID) -> Vec<Bytes>;
/// Prove account existence at a specific block id.
/// The key is the keccak hash of the account's address.
/// Returns a vector of raw trie nodes (in order from the root) proving the query.
/// Nodes after `from_level` may be omitted.
/// An empty vector indicates unservable query.
fn prove_account(&self, key1: H256, from_level: u32, id: BlockID) -> Vec<Bytes>;
/// Get code by address hash.
fn code_by_hash(&self, account_key: H256, id: BlockID) -> Bytes;
}

View File

@@ -21,13 +21,15 @@ use std::sync::Weak;
use std::time::{UNIX_EPOCH, Duration};
use util::*;
use ethkey::{verify_address, Signature};
use rlp::{UntrustedRlp, View, encode};
use rlp::{UntrustedRlp, Rlp, View, encode};
use account_provider::AccountProvider;
use block::*;
use spec::CommonParams;
use engines::Engine;
use header::Header;
use error::{Error, BlockError};
use blockchain::extras::BlockDetails;
use views::HeaderView;
use evm::Schedule;
use ethjson;
use io::{IoContext, IoHandler, TimerToken, IoService, IoChannel};
@@ -66,18 +68,20 @@ pub struct AuthorityRound {
params: CommonParams,
our_params: AuthorityRoundParams,
builtins: BTreeMap<Address, Builtin>,
transition_service: IoService<BlockArrived>,
transition_service: IoService<()>,
message_channel: Mutex<Option<IoChannel<ClientIoMessage>>>,
step: AtomicUsize,
proposed: AtomicBool,
account_provider: Mutex<Option<Arc<AccountProvider>>>,
password: RwLock<Option<String>>,
}
fn header_step(header: &Header) -> Result<usize, ::rlp::DecoderError> {
UntrustedRlp::new(&header.seal()[0]).as_val()
UntrustedRlp::new(&header.seal().get(0).expect("was either checked with verify_block_basic or is genesis; has 2 fields; qed (Make sure the spec file has a correct genesis seal)")).as_val()
}
fn header_signature(header: &Header) -> Result<Signature, ::rlp::DecoderError> {
UntrustedRlp::new(&header.seal()[1]).as_val::<H520>().map(Into::into)
UntrustedRlp::new(&header.seal().get(1).expect("was checked with verify_block_basic; has 2 fields; qed")).as_val::<H520>().map(Into::into)
}
trait AsMillis {
@@ -99,10 +103,12 @@ impl AuthorityRound {
params: params,
our_params: our_params,
builtins: builtins,
transition_service: try!(IoService::<BlockArrived>::start()),
transition_service: try!(IoService::<()>::start()),
message_channel: Mutex::new(None),
step: AtomicUsize::new(initial_step),
proposed: AtomicBool::new(false)
proposed: AtomicBool::new(false),
account_provider: Mutex::new(None),
password: RwLock::new(None),
});
let handler = TransitionHandler { engine: Arc::downgrade(&engine) };
try!(engine.transition_service.register_handler(Arc::new(handler)));
@@ -141,20 +147,17 @@ struct TransitionHandler {
engine: Weak<AuthorityRound>,
}
#[derive(Clone)]
struct BlockArrived;
const ENGINE_TIMEOUT_TOKEN: TimerToken = 23;
impl IoHandler<BlockArrived> for TransitionHandler {
fn initialize(&self, io: &IoContext<BlockArrived>) {
impl IoHandler<()> for TransitionHandler {
fn initialize(&self, io: &IoContext<()>) {
if let Some(engine) = self.engine.upgrade() {
io.register_timer_once(ENGINE_TIMEOUT_TOKEN, engine.remaining_step_duration().as_millis())
.unwrap_or_else(|e| warn!(target: "poa", "Failed to start consensus step timer: {}.", e))
}
}
fn timeout(&self, io: &IoContext<BlockArrived>, timer: TimerToken) {
fn timeout(&self, io: &IoContext<()>, timer: TimerToken) {
if timer == ENGINE_TIMEOUT_TOKEN {
if let Some(engine) = self.engine.upgrade() {
engine.step.fetch_add(1, AtomicOrdering::SeqCst);
@@ -206,10 +209,6 @@ impl Engine for AuthorityRound {
});
}
/// Apply the block reward on finalisation of the block.
/// This assumes that all uncles are valid uncles (i.e. of at least one generation before the current).
fn on_close_block(&self, _block: &mut ExecutedBlock) {}
fn is_sealer(&self, author: &Address) -> Option<bool> {
let p = &self.our_params;
Some(p.authorities.contains(author))
@@ -219,14 +218,14 @@ impl Engine for AuthorityRound {
///
/// This operation is synchronous and may (quite reasonably) not be available, in which `false` will
/// be returned.
fn generate_seal(&self, block: &ExecutedBlock, accounts: Option<&AccountProvider>) -> Option<Vec<Bytes>> {
fn generate_seal(&self, block: &ExecutedBlock) -> Option<Vec<Bytes>> {
if self.proposed.load(AtomicOrdering::SeqCst) { return None; }
let header = block.header();
let step = self.step();
if self.is_step_proposer(step, header.author()) {
if let Some(ap) = accounts {
if let Some(ref ap) = *self.account_provider.lock() {
// Account should be permanently unlocked, otherwise sealing will fail.
if let Ok(signature) = ap.sign(*header.author(), None, header.bare_hash()) {
if let Ok(signature) = ap.sign(*header.author(), self.password.read().clone(), header.bare_hash()) {
trace!(target: "poa", "generate_seal: Issuing a block for step {}.", step);
self.proposed.store(true, AtomicOrdering::SeqCst);
return Some(vec![encode(&step).to_vec(), encode(&(&*signature as &[u8])).to_vec()]);
@@ -272,7 +271,6 @@ impl Engine for AuthorityRound {
}
fn verify_block_family(&self, header: &Header, parent: &Header, _block: Option<&[u8]>) -> Result<(), Error> {
// Don't calculate difficulty for genesis blocks.
if header.number() == 0 {
return Err(From::from(BlockError::RidiculousNumber(OutOfBounds { min: Some(1), max: None, found: header.number() })));
}
@@ -284,10 +282,6 @@ impl Engine for AuthorityRound {
try!(Err(BlockError::DoubleVote(header.author().clone())));
}
// Check difficulty is correct given the two timestamps.
if header.difficulty() != parent.difficulty() {
return Err(From::from(BlockError::InvalidDifficulty(Mismatch { expected: *parent.difficulty(), found: *header.difficulty() })))
}
let gas_limit_divisor = self.our_params.gas_limit_bound_divisor;
let min_gas = parent.gas_limit().clone() - parent.gas_limit().clone() / gas_limit_divisor;
let max_gas = parent.gas_limit().clone() + parent.gas_limit().clone() / gas_limit_divisor;
@@ -306,9 +300,29 @@ impl Engine for AuthorityRound {
t.sender().map(|_|()) // Perform EC recovery and cache sender
}
fn is_new_best_block(&self, _best_total_difficulty: U256, best_header: HeaderView, _parent_details: &BlockDetails, new_header: &HeaderView) -> bool {
let new_number = new_header.number();
let best_number = best_header.number();
if new_number != best_number {
new_number > best_number
} else {
// Take the oldest step at given height.
let new_step: usize = Rlp::new(&new_header.seal()[0]).as_val();
let best_step: usize = Rlp::new(&best_header.seal()[0]).as_val();
new_step < best_step
}
}
fn register_message_channel(&self, message_channel: IoChannel<ClientIoMessage>) {
let mut guard = self.message_channel.lock();
*guard = Some(message_channel);
*self.message_channel.lock() = Some(message_channel);
}
fn set_signer(&self, _address: Address, password: String) {
*self.password.write() = Some(password);
}
fn register_account_provider(&self, account_provider: Arc<AccountProvider>) {
*self.account_provider.lock() = Some(account_provider);
}
}
@@ -377,12 +391,11 @@ mod tests {
fn generates_seal_and_does_not_double_propose() {
let tap = AccountProvider::transient_provider();
let addr1 = tap.insert_account("1".sha3(), "1").unwrap();
tap.unlock_account_permanently(addr1, "1".into()).unwrap();
let addr2 = tap.insert_account("2".sha3(), "2").unwrap();
tap.unlock_account_permanently(addr2, "2".into()).unwrap();
let spec = Spec::new_test_round();
let engine = &*spec.engine;
engine.register_account_provider(Arc::new(tap));
let genesis_header = spec.genesis_header();
let mut db1 = get_temp_state_db().take();
spec.ensure_db_good(&mut db1, &TrieFactory::new(TrieSpec::Secure)).unwrap();
@@ -394,16 +407,18 @@ mod tests {
let b2 = OpenBlock::new(engine, Default::default(), false, db2, &genesis_header, last_hashes, addr2, (3141562.into(), 31415620.into()), vec![]).unwrap();
let b2 = b2.close_and_lock();
if let Some(seal) = engine.generate_seal(b1.block(), Some(&tap)) {
engine.set_signer(addr1, "1".into());
if let Some(seal) = engine.generate_seal(b1.block()) {
assert!(b1.clone().try_seal(engine, seal).is_ok());
// Second proposal is forbidden.
assert!(engine.generate_seal(b1.block(), Some(&tap)).is_none());
assert!(engine.generate_seal(b1.block()).is_none());
}
if let Some(seal) = engine.generate_seal(b2.block(), Some(&tap)) {
engine.set_signer(addr2, "2".into());
if let Some(seal) = engine.generate_seal(b2.block()) {
assert!(b2.clone().try_seal(engine, seal).is_ok());
// Second proposal is forbidden.
assert!(engine.generate_seal(b2.block(), Some(&tap)).is_none());
assert!(engine.generate_seal(b2.block()).is_none());
}
}

View File

@@ -58,6 +58,8 @@ pub struct BasicAuthority {
params: CommonParams,
our_params: BasicAuthorityParams,
builtins: BTreeMap<Address, Builtin>,
account_provider: Mutex<Option<Arc<AccountProvider>>>,
password: RwLock<Option<String>>,
}
impl BasicAuthority {
@@ -67,6 +69,8 @@ impl BasicAuthority {
params: params,
our_params: our_params,
builtins: builtins,
account_provider: Mutex::new(None),
password: RwLock::new(None),
}
}
}
@@ -98,13 +102,8 @@ impl Engine for BasicAuthority {
max(gas_floor_target, gas_limit - gas_limit / bound_divisor + 1.into())
}
});
// info!("ethash: populate_from_parent #{}: difficulty={} and gas_limit={}", header.number, header.difficulty, header.gas_limit);
}
/// Apply the block reward on finalisation of the block.
/// This assumes that all uncles are valid uncles (i.e. of at least one generation before the current).
fn on_close_block(&self, _block: &mut ExecutedBlock) {}
fn is_sealer(&self, author: &Address) -> Option<bool> {
Some(self.our_params.authorities.contains(author))
}
@@ -113,12 +112,12 @@ impl Engine for BasicAuthority {
///
/// This operation is synchronous and may (quite reasonably) not be available, in which `false` will
/// be returned.
fn generate_seal(&self, block: &ExecutedBlock, accounts: Option<&AccountProvider>) -> Option<Vec<Bytes>> {
if let Some(ap) = accounts {
fn generate_seal(&self, block: &ExecutedBlock) -> Option<Vec<Bytes>> {
if let Some(ref ap) = *self.account_provider.lock() {
let header = block.header();
let message = header.bare_hash();
// account should be pernamently unlocked, otherwise sealing will fail
if let Ok(signature) = ap.sign(*block.header().author(), None, message) {
if let Ok(signature) = ap.sign(*block.header().author(), self.password.read().clone(), message) {
return Some(vec![::rlp::encode(&(&*signature as &[u8])).to_vec()]);
} else {
trace!(target: "basicauthority", "generate_seal: FAIL: accounts secret key unavailable");
@@ -179,6 +178,14 @@ impl Engine for BasicAuthority {
fn verify_transaction(&self, t: &SignedTransaction, _header: &Header) -> Result<(), Error> {
t.sender().map(|_|()) // Perform EC recovery and cache sender
}
fn set_signer(&self, _address: Address, password: String) {
*self.password.write() = Some(password);
}
fn register_account_provider(&self, ap: Arc<AccountProvider>) {
*self.account_provider.lock() = Some(ap);
}
}
#[cfg(test)]
@@ -250,10 +257,11 @@ mod tests {
fn can_generate_seal() {
let tap = AccountProvider::transient_provider();
let addr = tap.insert_account("".sha3(), "").unwrap();
tap.unlock_account_permanently(addr, "".into()).unwrap();
let spec = new_test_authority();
let engine = &*spec.engine;
engine.set_signer(addr, "".into());
engine.register_account_provider(Arc::new(tap));
let genesis_header = spec.genesis_header();
let mut db_result = get_temp_state_db();
let mut db = db_result.take();
@@ -261,7 +269,7 @@ mod tests {
let last_hashes = Arc::new(vec![genesis_header.hash()]);
let b = OpenBlock::new(engine, Default::default(), false, db, &genesis_header, last_hashes, addr, (3141562.into(), 31415620.into()), vec![]).unwrap();
let b = b.close_and_lock();
let seal = engine.generate_seal(b.block(), Some(&tap)).unwrap();
let seal = engine.generate_seal(b.block()).unwrap();
assert!(b.try_seal(engine, seal).is_ok());
}

View File

@@ -23,7 +23,6 @@ use spec::CommonParams;
use evm::Schedule;
use block::ExecutedBlock;
use util::Bytes;
use account_provider::AccountProvider;
/// An engine which does not provide any consensus mechanism, just seals blocks internally.
pub struct InstantSeal {
@@ -60,7 +59,7 @@ impl Engine for InstantSeal {
fn is_sealer(&self, _author: &Address) -> Option<bool> { Some(true) }
fn generate_seal(&self, _block: &ExecutedBlock, _accounts: Option<&AccountProvider>) -> Option<Vec<Bytes>> {
fn generate_seal(&self, _block: &ExecutedBlock) -> Option<Vec<Bytes>> {
Some(Vec::new())
}
}
@@ -70,16 +69,12 @@ mod tests {
use util::*;
use util::trie::TrieSpec;
use tests::helpers::*;
use account_provider::AccountProvider;
use spec::Spec;
use header::Header;
use block::*;
#[test]
fn instant_can_seal() {
let tap = AccountProvider::transient_provider();
let addr = tap.insert_account("".sha3(), "").unwrap();
let spec = Spec::new_instant();
let engine = &*spec.engine;
let genesis_header = spec.genesis_header();
@@ -87,10 +82,9 @@ mod tests {
let mut db = db_result.take();
spec.ensure_db_good(&mut db, &TrieFactory::new(TrieSpec::Secure)).unwrap();
let last_hashes = Arc::new(vec![genesis_header.hash()]);
let b = OpenBlock::new(engine, Default::default(), false, db, &genesis_header, last_hashes, addr, (3141562.into(), 31415620.into()), vec![]).unwrap();
let b = OpenBlock::new(engine, Default::default(), false, db, &genesis_header, last_hashes, Address::default(), (3141562.into(), 31415620.into()), vec![]).unwrap();
let b = b.close_and_lock();
// Seal with empty AccountProvider.
let seal = engine.generate_seal(b.block(), Some(&tap)).unwrap();
let seal = engine.generate_seal(b.block()).unwrap();
assert!(b.try_seal(engine, seal).is_ok());
}

View File

@@ -38,6 +38,9 @@ use io::IoChannel;
use service::ClientIoMessage;
use header::Header;
use transaction::SignedTransaction;
use ethereum::ethash;
use blockchain::extras::BlockDetails;
use views::HeaderView;
/// A consensus mechanism for the chain. Generally either proof-of-work or proof-of-stake-based.
/// Provides hooks into each of the major parts of block import.
@@ -91,7 +94,7 @@ pub trait Engine : Sync + Send {
///
/// This operation is synchronous and may (quite reasonably) not be available, in which None will
/// be returned.
fn generate_seal(&self, _block: &ExecutedBlock, _accounts: Option<&AccountProvider>) -> Option<Vec<Bytes>> { None }
fn generate_seal(&self, _block: &ExecutedBlock) -> Option<Vec<Bytes>> { None }
/// Phase 1 quick block verification. Only does checks that are cheap. `block` (the header's full block)
/// may be provided for additional checks. Returns either a null `Ok` or a general error detailing the problem with import.
@@ -113,7 +116,7 @@ pub trait Engine : Sync + Send {
fn verify_transaction(&self, _t: &SignedTransaction, _header: &Header) -> Result<(), Error> { Ok(()) }
/// The network ID that transactions should be signed with.
fn signing_network_id(&self, _env_info: &EnvInfo) -> Option<u8> { None }
fn signing_network_id(&self, _env_info: &EnvInfo) -> Option<u64> { None }
/// Verify the seal of a block. This is an auxilliary method that actually just calls other `verify_` methods
/// to get the job done. By default it must pass `verify_basic` and `verify_block_unordered`. If more or fewer
@@ -144,7 +147,17 @@ pub trait Engine : Sync + Send {
self.builtins().get(a).expect("attempted to execute nonexistent builtin").execute(input, output);
}
/// Check if new block should be chosen as the one in chain.
fn is_new_best_block(&self, best_total_difficulty: U256, _best_header: HeaderView, parent_details: &BlockDetails, new_header: &HeaderView) -> bool {
ethash::is_new_best_block(best_total_difficulty, parent_details, new_header)
}
/// Register an account which signs consensus messages.
fn set_signer(&self, _address: Address, _password: String) {}
/// Add a channel for communication with Client which can be used for sealing.
fn register_message_channel(&self, _message_channel: IoChannel<ClientIoMessage>) {}
// TODO: sealing stuff - though might want to leave this for later.
/// Add an account provider useful for Engines that sign stuff.
fn register_account_provider(&self, _account_provider: Arc<AccountProvider>) {}
}

View File

@@ -38,6 +38,12 @@ impl NullEngine {
}
}
impl Default for NullEngine {
fn default() -> Self {
Self::new(Default::default(), Default::default())
}
}
impl Engine for NullEngine {
fn name(&self) -> &str {
"NullEngine"

View File

@@ -21,6 +21,7 @@ use builtin::Builtin;
use env_info::EnvInfo;
use error::{BlockError, TransactionError, Error};
use header::Header;
use views::HeaderView;
use state::CleanupMode;
use spec::CommonParams;
use transaction::SignedTransaction;
@@ -28,6 +29,7 @@ use engines::Engine;
use evm::Schedule;
use ethjson;
use rlp::{self, UntrustedRlp, View};
use blockchain::extras::BlockDetails;
/// Ethash params.
#[derive(Debug, PartialEq)]
@@ -163,9 +165,9 @@ impl Engine for Ethash {
}
}
fn signing_network_id(&self, env_info: &EnvInfo) -> Option<u8> {
if env_info.number >= self.ethash_params.eip155_transition && self.params().network_id < 127 {
Some(self.params().network_id as u8)
fn signing_network_id(&self, env_info: &EnvInfo) -> Option<u64> {
if env_info.number >= self.ethash_params.eip155_transition {
Some(self.params().network_id)
} else {
None
}
@@ -314,7 +316,7 @@ impl Engine for Ethash {
}
if let Some(n) = t.network_id() {
if header.number() < self.ethash_params.eip155_transition || n as usize != self.params().network_id {
if header.number() < self.ethash_params.eip155_transition || n != self.params().network_id {
return Err(TransactionError::InvalidNetworkId.into())
}
}
@@ -325,6 +327,15 @@ impl Engine for Ethash {
fn verify_transaction(&self, t: &SignedTransaction, _header: &Header) -> Result<(), Error> {
t.sender().map(|_|()) // Perform EC recovery and cache sender
}
fn is_new_best_block(&self, best_total_difficulty: U256, _best_header: HeaderView, parent_details: &BlockDetails, new_header: &HeaderView) -> bool {
is_new_best_block(best_total_difficulty, parent_details, new_header)
}
}
/// Check if a new block should replace the best blockchain block.
pub fn is_new_best_block(best_total_difficulty: U256, parent_details: &BlockDetails, new_header: &HeaderView) -> bool {
parent_details.total_difficulty + new_header.difficulty() > best_total_difficulty
}
#[cfg_attr(feature="dev", allow(wrong_self_convention))]

View File

@@ -19,7 +19,7 @@ use std::time::{Instant, Duration};
use util::*;
use util::using_queue::{UsingQueue, GetAction};
use account_provider::AccountProvider;
use account_provider::{AccountProvider, Error as AccountError};
use views::{BlockView, HeaderView};
use header::Header;
use state::{State, CleanupMode};
@@ -464,15 +464,12 @@ impl Miner {
/// Attempts to perform internal sealing (one that does not require work) to return Ok(sealed),
/// Err(Some(block)) returns for unsuccesful sealing while Err(None) indicates misspecified engine.
fn seal_block_internally(&self, block: ClosedBlock) -> Result<SealedBlock, Option<ClosedBlock>> {
trace!(target: "miner", "seal_block_internally: block has transaction - attempting internal seal.");
let s = self.engine.generate_seal(block.block(), match self.accounts {
Some(ref x) => Some(&**x),
None => None,
});
trace!(target: "miner", "seal_block_internally: attempting internal seal.");
let s = self.engine.generate_seal(block.block());
if let Some(seal) = s {
trace!(target: "miner", "seal_block_internally: managed internal seal. importing...");
block.lock().try_seal(&*self.engine, seal).or_else(|_| {
warn!("prepare_sealing: ERROR: try_seal failed when given internally generated seal. WTF?");
block.lock().try_seal(&*self.engine, seal).or_else(|(e, _)| {
warn!("prepare_sealing: ERROR: try_seal failed when given internally generated seal: {}", e);
Err(None)
})
} else {
@@ -485,7 +482,7 @@ impl Miner {
fn seal_and_import_block_internally(&self, chain: &MiningBlockChainClient, block: ClosedBlock) -> bool {
if !block.transactions().is_empty() || self.forced_sealing() {
if let Ok(sealed) = self.seal_block_internally(block) {
if chain.import_block(sealed.rlp_bytes()).is_ok() {
if chain.import_sealed_block(sealed).is_ok() {
trace!(target: "miner", "import_block_internally: imported internally sealed block");
return true
}
@@ -740,6 +737,19 @@ impl MinerService for Miner {
*self.author.write() = author;
}
fn set_engine_signer(&self, address: Address, password: String) -> Result<(), AccountError> {
if self.seals_internally {
if let Some(ref ap) = self.accounts {
try!(ap.sign(address.clone(), Some(password.clone()), Default::default()));
}
let mut sealing_work = self.sealing_work.lock();
sealing_work.enabled = self.engine.is_sealer(&address).unwrap_or(false);
*self.author.write() = address;
self.engine.set_signer(address, password);
}
Ok(())
}
fn set_extra_data(&self, extra_data: Bytes) {
*self.extra_data.write() = extra_data;
}
@@ -1042,7 +1052,7 @@ impl MinerService for Miner {
ret.map(f)
}
fn submit_seal(&self, chain: &MiningBlockChainClient, pow_hash: H256, seal: Vec<Bytes>) -> Result<(), Error> {
fn submit_seal(&self, chain: &MiningBlockChainClient, block_hash: H256, seal: Vec<Bytes>) -> Result<(), Error> {
let result =
if let Some(b) = self.sealing_work.lock().queue.get_used_if(
if self.options.enable_resubmission {
@@ -1050,22 +1060,22 @@ impl MinerService for Miner {
} else {
GetAction::Take
},
|b| &b.hash() == &pow_hash
|b| &b.hash() == &block_hash
) {
trace!(target: "miner", "Sealing block {}={}={} with seal {:?}", pow_hash, b.hash(), b.header().bare_hash(), seal);
trace!(target: "miner", "Submitted block {}={}={} with seal {:?}", block_hash, b.hash(), b.header().bare_hash(), seal);
b.lock().try_seal(&*self.engine, seal).or_else(|(e, _)| {
warn!(target: "miner", "Mined solution rejected: {}", e);
Err(Error::PowInvalid)
})
} else {
warn!(target: "miner", "Mined solution rejected: Block unknown or out of date.");
warn!(target: "miner", "Submitted solution rejected: Block unknown or out of date.");
Err(Error::PowHashInvalid)
};
result.and_then(|sealed| {
let n = sealed.header().number();
let h = sealed.header().hash();
try!(chain.import_sealed_block(sealed));
info!(target: "miner", "Mined block imported OK. #{}: {}", Colour::White.bold().paint(format!("{}", n)), Colour::White.bold().paint(h.hex()));
info!(target: "miner", "Submitted block imported OK. #{}: {}", Colour::White.bold().paint(format!("{}", n)), Colour::White.bold().paint(h.hex()));
Ok(())
})
}

View File

@@ -76,6 +76,9 @@ pub trait MinerService : Send + Sync {
/// Set the author that we will seal blocks as.
fn set_author(&self, author: Address);
/// Set info necessary to sign consensus messages.
fn set_engine_signer(&self, address: Address, password: String) -> Result<(), ::account_provider::Error>;
/// Get the extra_data that we will seal blocks with.
fn extra_data(&self) -> Bytes;

View File

@@ -81,6 +81,7 @@ struct Restoration {
struct RestorationParams<'a> {
manifest: ManifestData, // manifest to base restoration on.
pruning: Algorithm, // pruning algorithm for the database.
engine: Arc<Engine>, // consensus engine of the chain.
db_path: PathBuf, // database path
db_config: &'a DatabaseConfig, // configuration for the database.
writer: Option<LooseWriter>, // writer for recovered snapshot.
@@ -99,7 +100,7 @@ impl Restoration {
let raw_db = Arc::new(try!(Database::open(params.db_config, &*params.db_path.to_string_lossy())
.map_err(UtilError::SimpleString)));
let chain = BlockChain::new(Default::default(), params.genesis, raw_db.clone());
let chain = BlockChain::new(Default::default(), params.genesis, raw_db.clone(), params.engine);
let blocks = try!(BlockRebuilder::new(chain, raw_db.clone(), &manifest));
let root = manifest.state_root.clone();
@@ -420,6 +421,7 @@ impl Service {
let params = RestorationParams {
manifest: manifest,
pruning: self.pruning,
engine: self.engine.clone(),
db_path: self.restoration_db(),
db_config: &self.db_config,
writer: writer,

View File

@@ -37,13 +37,14 @@ fn chunk_and_restore(amount: u64) {
let genesis = canon_chain.generate(&mut finalizer).unwrap();
let db_cfg = DatabaseConfig::with_columns(::db::NUM_COLUMNS);
let engine = Arc::new(::engines::NullEngine::default());
let orig_path = RandomTempPath::create_dir();
let new_path = RandomTempPath::create_dir();
let mut snapshot_path = new_path.as_path().to_owned();
snapshot_path.push("SNAP");
let old_db = Arc::new(Database::open(&db_cfg, orig_path.as_str()).unwrap());
let bc = BlockChain::new(Default::default(), &genesis, old_db.clone());
let bc = BlockChain::new(Default::default(), &genesis, old_db.clone(), engine.clone());
// build the blockchain.
let mut batch = old_db.transaction();
@@ -73,21 +74,20 @@ fn chunk_and_restore(amount: u64) {
// restore it.
let new_db = Arc::new(Database::open(&db_cfg, new_path.as_str()).unwrap());
let new_chain = BlockChain::new(Default::default(), &genesis, new_db.clone());
let new_chain = BlockChain::new(Default::default(), &genesis, new_db.clone(), engine.clone());
let mut rebuilder = BlockRebuilder::new(new_chain, new_db.clone(), &manifest).unwrap();
let reader = PackedReader::new(&snapshot_path).unwrap().unwrap();
let engine = ::engines::NullEngine::new(Default::default(), Default::default());
let flag = AtomicBool::new(true);
for chunk_hash in &reader.manifest().block_hashes {
let compressed = reader.chunk(*chunk_hash).unwrap();
let chunk = snappy::decompress(&compressed).unwrap();
rebuilder.feed(&chunk, &engine, &flag).unwrap();
rebuilder.feed(&chunk, engine.as_ref(), &flag).unwrap();
}
rebuilder.finalize(HashMap::new()).unwrap();
// and test it.
let new_chain = BlockChain::new(Default::default(), &genesis, new_db);
let new_chain = BlockChain::new(Default::default(), &genesis, new_db, engine);
assert_eq!(new_chain.best_block_hash(), best_hash);
}
@@ -121,8 +121,8 @@ fn checks_flag() {
let db_cfg = DatabaseConfig::with_columns(::db::NUM_COLUMNS);
let db = Arc::new(Database::open(&db_cfg, path.as_str()).unwrap());
let chain = BlockChain::new(Default::default(), &genesis, db.clone());
let engine = ::engines::NullEngine::new(Default::default(), Default::default());
let engine = Arc::new(::engines::NullEngine::default());
let chain = BlockChain::new(Default::default(), &genesis, db.clone(), engine.clone());
let manifest = ::snapshot::ManifestData {
state_hashes: Vec::new(),
@@ -134,8 +134,8 @@ fn checks_flag() {
let mut rebuilder = BlockRebuilder::new(chain, db.clone(), &manifest).unwrap();
match rebuilder.feed(&chunk, &engine, &AtomicBool::new(false)) {
match rebuilder.feed(&chunk, engine.as_ref(), &AtomicBool::new(false)) {
Err(Error::Snapshot(SnapshotError::RestorationAborted)) => {}
_ => panic!("Wrong result on abort flag set")
}
}
}

View File

@@ -30,11 +30,9 @@ pub struct Ethereum {
impl Into<Generic> for Ethereum {
fn into(self) -> Generic {
let mut s = RlpStream::new();
s.append(&self.mix_hash);
s.append(&self.nonce);
let mut s = RlpStream::new_list(2);
s.append(&self.mix_hash).append(&self.nonce);
Generic {
fields: 2,
rlp: s.out()
}
}
@@ -42,8 +40,6 @@ impl Into<Generic> for Ethereum {
/// Generic seal.
pub struct Generic {
/// Number of seal fields.
pub fields: usize,
/// Seal rlp.
pub rlp: Vec<u8>,
}
@@ -64,7 +60,6 @@ impl From<ethjson::spec::Seal> for Seal {
mix_hash: eth.mix_hash.into()
}),
ethjson::spec::Seal::Generic(g) => Seal::Generic(Generic {
fields: g.fields,
rlp: g.rlp.into()
})
}

View File

@@ -30,15 +30,14 @@ use ethjson;
use rlp::{Rlp, RlpStream, View, Stream};
/// Parameters common to all engines.
#[derive(Debug, PartialEq, Clone)]
#[cfg_attr(test, derive(Default))]
#[derive(Debug, PartialEq, Clone, Default)]
pub struct CommonParams {
/// Account start nonce.
pub account_start_nonce: U256,
/// Maximum size of extra data.
pub maximum_extra_data_size: usize,
/// Network id.
pub network_id: usize,
pub network_id: u64,
/// Main subprotocol name.
pub subprotocol_name: String,
/// Minimum gas limit.
@@ -94,8 +93,6 @@ pub struct Spec {
pub receipts_root: H256,
/// The genesis block's extra data field.
pub extra_data: Bytes,
/// The number of seal fields in the genesis block.
pub seal_fields: usize,
/// Each seal field, expressed as RLP, concatenated.
pub seal_rlp: Bytes,
@@ -127,7 +124,6 @@ impl From<ethjson::spec::Spec> for Spec {
gas_used: g.gas_used,
timestamp: g.timestamp,
extra_data: g.extra_data,
seal_fields: seal.fields,
seal_rlp: seal.rlp,
state_root_memo: RwLock::new(g.state_root),
genesis_state: From::from(s.accounts),
@@ -167,7 +163,7 @@ impl Spec {
pub fn nodes(&self) -> &[String] { &self.nodes }
/// Get the configured Network ID.
pub fn network_id(&self) -> usize { self.params.network_id }
pub fn network_id(&self) -> u64 { self.params.network_id }
/// Get the configured subprotocol name.
pub fn subprotocol_name(&self) -> String { self.params.subprotocol_name.clone() }
@@ -192,13 +188,8 @@ impl Spec {
header.set_gas_limit(self.gas_limit.clone());
header.set_difficulty(self.difficulty.clone());
header.set_seal({
let seal = {
let mut s = RlpStream::new_list(self.seal_fields);
s.append_raw(&self.seal_rlp, self.seal_fields);
s.out()
};
let r = Rlp::new(&seal);
(0..self.seal_fields).map(|i| r.at(i).as_raw().to_vec()).collect()
let r = Rlp::new(&self.seal_rlp);
r.iter().map(|f| f.as_raw().to_vec()).collect()
});
trace!(target: "spec", "Header hash is {}", header.hash());
header
@@ -227,7 +218,6 @@ impl Spec {
self.gas_used = g.gas_used;
self.timestamp = g.timestamp;
self.extra_data = g.extra_data;
self.seal_fields = seal.fields;
self.seal_rlp = seal.rlp;
self.state_root_memo = RwLock::new(g.state_root);
}

View File

@@ -436,6 +436,27 @@ impl Account {
}
}
// light client storage proof.
impl Account {
/// Prove a storage key's existence or nonexistence in the account's storage
/// trie.
/// `storage_key` is the hash of the desired storage key, meaning
/// this will only work correctly under a secure trie.
/// Returns a merkle proof of the storage trie node with all nodes before `from_level`
/// omitted.
pub fn prove_storage(&self, db: &HashDB, storage_key: H256, from_level: u32) -> Result<Vec<Bytes>, Box<TrieError>> {
use util::trie::{Trie, TrieDB};
use util::trie::recorder::{Recorder, BasicRecorder as TrieRecorder};
let mut recorder = TrieRecorder::with_depth(from_level);
let trie = try!(TrieDB::new(db, &self.storage_root));
let _ = try!(trie.get_recorded(&storage_key, &mut recorder));
Ok(recorder.drain().into_iter().map(|r| r.data).collect())
}
}
impl fmt::Debug for Account {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{:?}", PodAccount::from_account(self))

File diff suppressed because it is too large Load Diff

View File

@@ -286,7 +286,7 @@ fn new_db(path: &str) -> Arc<Database> {
pub fn generate_dummy_blockchain(block_number: u32) -> GuardedTempResult<BlockChain> {
let temp = RandomTempPath::new();
let db = new_db(temp.as_str());
let bc = BlockChain::new(BlockChainConfig::default(), &create_unverifiable_block(0, H256::zero()), db.clone());
let bc = BlockChain::new(BlockChainConfig::default(), &create_unverifiable_block(0, H256::zero()), db.clone(), Spec::new_null().engine);
let mut batch = db.transaction();
for block_order in 1..block_number {
@@ -304,7 +304,7 @@ pub fn generate_dummy_blockchain(block_number: u32) -> GuardedTempResult<BlockCh
pub fn generate_dummy_blockchain_with_extra(block_number: u32) -> GuardedTempResult<BlockChain> {
let temp = RandomTempPath::new();
let db = new_db(temp.as_str());
let bc = BlockChain::new(BlockChainConfig::default(), &create_unverifiable_block(0, H256::zero()), db.clone());
let bc = BlockChain::new(BlockChainConfig::default(), &create_unverifiable_block(0, H256::zero()), db.clone(), Spec::new_null().engine);
let mut batch = db.transaction();
@@ -323,7 +323,7 @@ pub fn generate_dummy_blockchain_with_extra(block_number: u32) -> GuardedTempRes
pub fn generate_dummy_empty_blockchain() -> GuardedTempResult<BlockChain> {
let temp = RandomTempPath::new();
let db = new_db(temp.as_str());
let bc = BlockChain::new(BlockChainConfig::default(), &create_unverifiable_block(0, H256::zero()), db.clone());
let bc = BlockChain::new(BlockChainConfig::default(), &create_unverifiable_block(0, H256::zero()), db.clone(), Spec::new_null().engine);
GuardedTempResult::<BlockChain> {
_temp: temp,

View File

@@ -34,3 +34,4 @@ pub mod block_import_error;
pub mod restoration_status;
pub mod snapshot_manifest;
pub mod mode;
pub mod pruning_info;

View File

@@ -0,0 +1,30 @@
// Copyright 2015, 2016 Ethcore (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! Information about portions of the state and chain which the client may serve.
//!
//! Currently assumes that a client will store everything past a certain point
//! or everything. Will be extended in the future to support a definition
//! of which portions of the ancient chain and current state trie are stored as well.
/// Client pruning info. See module-level docs for more details.
#[derive(Debug, Clone, Binary)]
pub struct PruningInfo {
/// The first block which everything can be served after.
pub earliest_chain: u64,
/// The first block where state requests may be served.
pub earliest_state: u64,
}

View File

@@ -72,7 +72,7 @@ pub struct Transaction {
impl Transaction {
/// Append object with a without signature into RLP stream
pub fn rlp_append_unsigned_transaction(&self, s: &mut RlpStream, network_id: Option<u8>) {
pub fn rlp_append_unsigned_transaction(&self, s: &mut RlpStream, network_id: Option<u64>) {
s.begin_list(if network_id.is_none() { 6 } else { 9 });
s.append(&self.nonce);
s.append(&self.gas_price);
@@ -140,26 +140,26 @@ impl From<ethjson::transaction::Transaction> for SignedTransaction {
impl Transaction {
/// The message hash of the transaction.
pub fn hash(&self, network_id: Option<u8>) -> H256 {
pub fn hash(&self, network_id: Option<u64>) -> H256 {
let mut stream = RlpStream::new();
self.rlp_append_unsigned_transaction(&mut stream, network_id);
stream.out().sha3()
}
/// Signs the transaction as coming from `sender`.
pub fn sign(self, secret: &Secret, network_id: Option<u8>) -> SignedTransaction {
pub fn sign(self, secret: &Secret, network_id: Option<u64>) -> SignedTransaction {
let sig = ::ethkey::sign(secret, &self.hash(network_id))
.expect("data is valid and context has signing capabilities; qed");
self.with_signature(sig, network_id)
}
/// Signs the transaction with signature.
pub fn with_signature(self, sig: Signature, network_id: Option<u8>) -> SignedTransaction {
pub fn with_signature(self, sig: Signature, network_id: Option<u64>) -> SignedTransaction {
SignedTransaction {
unsigned: self,
r: sig.r().into(),
s: sig.s().into(),
v: sig.v() + if let Some(n) = network_id { 35 + n * 2 } else { 27 },
v: sig.v() as u64 + if let Some(n) = network_id { 35 + n * 2 } else { 27 },
hash: Cell::new(None),
sender: Cell::new(None),
}
@@ -211,7 +211,7 @@ pub struct SignedTransaction {
unsigned: Transaction,
/// The V field of the signature; the LS bit described which half of the curve our point falls
/// in. The MS bits describe which network this transaction is for. If 27/28, its for all networks.
v: u8,
v: u64,
/// The R field of the signature; helps describe the point on the curve.
r: U256,
/// The S field of the signature; helps describe the point on the curve.
@@ -302,10 +302,13 @@ impl SignedTransaction {
}
/// 0 if `v` would have been 27 under "Electrum" notation, 1 if 28 or 4 if invalid.
pub fn standard_v(&self) -> u8 { match self.v { v if v == 27 || v == 28 || v > 36 => (v - 1) % 2, _ => 4 } }
pub fn standard_v(&self) -> u8 { match self.v { v if v == 27 || v == 28 || v > 36 => ((v - 1) % 2) as u8, _ => 4 } }
/// The `v` value that appears in the RLP.
pub fn original_v(&self) -> u64 { self.v }
/// The network ID, or `None` if this is a global transaction.
pub fn network_id(&self) -> Option<u8> {
pub fn network_id(&self) -> Option<u64> {
match self.v {
v if v > 36 => Some((v - 35) / 2),
_ => None,

View File

@@ -17,7 +17,7 @@
//! A queue of blocks. Sits between network or other I/O and the `BlockChain`.
//! Sorts them ready for blockchain insertion.
use std::thread::{JoinHandle, self};
use std::thread::{self, JoinHandle};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering as AtomicOrdering};
use std::sync::{Condvar as SCondvar, Mutex as SMutex};
use util::*;
@@ -53,6 +53,8 @@ pub struct Config {
/// Maximum heap memory to use.
/// When the limit is reached, is_full returns true.
pub max_mem_use: usize,
/// Settings for the number of verifiers and adaptation strategy.
pub verifier_settings: VerifierSettings,
}
impl Default for Config {
@@ -60,39 +62,35 @@ impl Default for Config {
Config {
max_queue_size: 30000,
max_mem_use: 50 * 1024 * 1024,
verifier_settings: VerifierSettings::default(),
}
}
}
struct VerifierHandle {
deleting: Arc<AtomicBool>,
sleep: Arc<AtomicBool>,
thread: JoinHandle<()>,
/// Verifier settings.
#[derive(Debug, PartialEq, Clone)]
pub struct VerifierSettings {
/// Whether to scale amount of verifiers according to load.
// Todo: replace w/ strategy enum?
pub scale_verifiers: bool,
/// Beginning amount of verifiers.
pub num_verifiers: usize,
}
impl VerifierHandle {
// signal to the verifier thread that it should sleep.
fn sleep(&self) {
self.sleep.store(true, AtomicOrdering::SeqCst);
impl Default for VerifierSettings {
fn default() -> Self {
VerifierSettings {
scale_verifiers: false,
num_verifiers: MAX_VERIFIERS,
}
}
}
// signal to the verifier thread that it should wake up.
fn wake_up(&self) {
self.sleep.store(false, AtomicOrdering::SeqCst);
self.thread.thread().unpark();
}
// signal to the verifier thread that it should conclude its
// operations.
fn conclude(&self) {
self.wake_up();
self.deleting.store(true, AtomicOrdering::Release);
}
// join the verifier thread.
fn join(self) {
self.thread.join().expect("Verifier thread panicked");
}
// pool states
enum State {
// all threads with id < inner value are to work.
Work(usize),
Exit,
}
/// An item which is in the process of being verified.
@@ -131,7 +129,6 @@ pub struct VerificationQueue<K: Kind> {
engine: Arc<Engine>,
more_to_verify: Arc<SCondvar>,
verification: Arc<Verification<K>>,
verifiers: Mutex<(Vec<VerifierHandle>, usize)>,
deleting: Arc<AtomicBool>,
ready_signal: Arc<QueueSignal>,
empty: Arc<SCondvar>,
@@ -139,6 +136,9 @@ pub struct VerificationQueue<K: Kind> {
ticks_since_adjustment: AtomicUsize,
max_queue_size: usize,
max_mem_use: usize,
scale_verifiers: bool,
verifier_handles: Vec<JoinHandle<()>>,
state: Arc<(Mutex<State>, Condvar)>,
}
struct QueueSignal {
@@ -221,43 +221,45 @@ impl<K: Kind> VerificationQueue<K> {
});
let empty = Arc::new(SCondvar::new());
let panic_handler = PanicHandler::new_in_arc();
let scale_verifiers = config.verifier_settings.scale_verifiers;
let max_verifiers = min(::num_cpus::get(), MAX_VERIFIERS);
let default_amount = max(::num_cpus::get(), 3) - 2;
let mut verifiers = Vec::with_capacity(max_verifiers);
let num_cpus = ::num_cpus::get();
let max_verifiers = min(num_cpus, MAX_VERIFIERS);
let default_amount = max(1, min(max_verifiers, config.verifier_settings.num_verifiers));
let state = Arc::new((Mutex::new(State::Work(default_amount)), Condvar::new()));
let mut verifier_handles = Vec::with_capacity(max_verifiers);
debug!(target: "verification", "Allocating {} verifiers, {} initially active", max_verifiers, default_amount);
debug!(target: "verification", "Verifier auto-scaling {}", if scale_verifiers { "enabled" } else { "disabled" });
for i in 0..max_verifiers {
debug!(target: "verification", "Adding verification thread #{}", i);
let deleting = deleting.clone();
let panic_handler = panic_handler.clone();
let verification = verification.clone();
let engine = engine.clone();
let wait = more_to_verify.clone();
let ready = ready_signal.clone();
let empty = empty.clone();
let state = state.clone();
// enable only the first few verifiers.
let sleep = if i < default_amount {
Arc::new(AtomicBool::new(false))
} else {
Arc::new(AtomicBool::new(true))
};
verifiers.push(VerifierHandle {
deleting: deleting.clone(),
sleep: sleep.clone(),
thread: thread::Builder::new()
.name(format!("Verifier #{}", i))
.spawn(move || {
panic_handler.catch_panic(move || {
VerificationQueue::verify(verification, engine, wait, ready, deleting, empty, sleep)
}).unwrap()
})
.expect("Failed to create verifier thread.")
});
let handle = thread::Builder::new()
.name(format!("Verifier #{}", i))
.spawn(move || {
panic_handler.catch_panic(move || {
VerificationQueue::verify(
verification,
engine,
wait,
ready,
empty,
state,
i,
)
}).unwrap()
})
.expect("Failed to create verifier thread.");
verifier_handles.push(handle);
}
VerificationQueue {
@@ -266,13 +268,15 @@ impl<K: Kind> VerificationQueue<K> {
ready_signal: ready_signal,
more_to_verify: more_to_verify,
verification: verification,
verifiers: Mutex::new((verifiers, default_amount)),
deleting: deleting,
processing: RwLock::new(HashSet::new()),
empty: empty,
ticks_since_adjustment: AtomicUsize::new(0),
max_queue_size: max(config.max_queue_size, MIN_QUEUE_LIMIT),
max_mem_use: max(config.max_mem_use, MIN_MEM_LIMIT),
scale_verifiers: scale_verifiers,
verifier_handles: verifier_handles,
state: state,
}
}
@@ -281,23 +285,30 @@ impl<K: Kind> VerificationQueue<K> {
engine: Arc<Engine>,
wait: Arc<SCondvar>,
ready: Arc<QueueSignal>,
deleting: Arc<AtomicBool>,
empty: Arc<SCondvar>,
sleep: Arc<AtomicBool>,
state: Arc<(Mutex<State>, Condvar)>,
id: usize,
) {
while !deleting.load(AtomicOrdering::Acquire) {
loop {
// check current state.
{
while sleep.load(AtomicOrdering::SeqCst) {
trace!(target: "verification", "Verifier sleeping");
::std::thread::park();
trace!(target: "verification", "Verifier waking up");
let mut cur_state = state.0.lock();
while let State::Work(x) = *cur_state {
// sleep until this thread is required.
if id < x { break }
if deleting.load(AtomicOrdering::Acquire) {
return;
}
debug!(target: "verification", "verifier {} sleeping", id);
state.1.wait(&mut cur_state);
debug!(target: "verification", "verifier {} waking up", id);
}
if let State::Exit = *cur_state {
debug!(target: "verification", "verifier {} exiting", id);
break;
}
}
// wait for work if empty.
{
let mut more_to_verify = verification.more_to_verify.lock().unwrap();
@@ -305,15 +316,22 @@ impl<K: Kind> VerificationQueue<K> {
empty.notify_all();
}
while verification.unverified.lock().is_empty() && !deleting.load(AtomicOrdering::Acquire) {
while verification.unverified.lock().is_empty() {
if let State::Exit = *state.0.lock() {
debug!(target: "verification", "verifier {} exiting", id);
return;
}
more_to_verify = wait.wait(more_to_verify).unwrap();
}
if deleting.load(AtomicOrdering::Acquire) {
if let State::Exit = *state.0.lock() {
debug!(target: "verification", "verifier {} exiting", id);
return;
}
}
// do work.
let item = {
// acquire these locks before getting the item to verify.
let mut unverified = verification.unverified.lock();
@@ -568,6 +586,14 @@ impl<K: Kind> VerificationQueue<K> {
}
}
/// Get the current number of working verifiers.
pub fn num_verifiers(&self) -> usize {
match *self.state.0.lock() {
State::Work(x) => x,
State::Exit => panic!("state only set to exit on drop; queue live now; qed"),
}
}
/// Optimise memory footprint of the heap fields, and adjust the number of threads
/// to better suit the workload.
pub fn collect_garbage(&self) {
@@ -598,13 +624,15 @@ impl<K: Kind> VerificationQueue<K> {
self.processing.write().shrink_to_fit();
if !self.scale_verifiers { return }
if self.ticks_since_adjustment.fetch_add(1, AtomicOrdering::SeqCst) + 1 >= READJUSTMENT_PERIOD {
self.ticks_since_adjustment.store(0, AtomicOrdering::SeqCst);
} else {
return;
}
let current = self.verifiers.lock().1;
let current = self.num_verifiers();
let diff = (v_len - u_len).abs();
let total = v_len + u_len;
@@ -626,27 +654,14 @@ impl<K: Kind> VerificationQueue<K> {
// possible, never going over the amount of initially allocated threads
// or below 1.
fn scale_verifiers(&self, target: usize) {
let mut verifiers = self.verifiers.lock();
let &mut (ref mut verifiers, ref mut verifier_count) = &mut *verifiers;
let target = min(verifiers.len(), target);
let current = self.num_verifiers();
let target = min(self.verifier_handles.len(), target);
let target = max(1, target);
debug!(target: "verification", "Scaling from {} to {} verifiers", verifier_count, target);
debug!(target: "verification", "Scaling from {} to {} verifiers", current, target);
// scaling up
for i in *verifier_count..target {
debug!(target: "verification", "Waking up verifier {}", i);
verifiers[i].wake_up();
}
// scaling down.
for i in target..*verifier_count {
debug!(target: "verification", "Putting verifier {} to sleep", i);
verifiers[i].sleep();
}
*verifier_count = target;
*self.state.0.lock() = State::Work(target);
self.state.1.notify_all();
}
}
@@ -660,22 +675,18 @@ impl<K: Kind> Drop for VerificationQueue<K> {
fn drop(&mut self) {
trace!(target: "shutdown", "[VerificationQueue] Closing...");
self.clear();
self.deleting.store(true, AtomicOrdering::Release);
self.deleting.store(true, AtomicOrdering::SeqCst);
let mut verifiers = self.verifiers.get_mut();
let mut verifiers = &mut verifiers.0;
// first pass to signal conclusion. must be done before
// notify or deadlock possible.
for handle in verifiers.iter() {
handle.conclude();
}
// set exit state; should be done before `more_to_verify` notification.
*self.state.0.lock() = State::Exit;
self.state.1.notify_all();
// wake up all threads waiting for more work.
self.more_to_verify.notify_all();
// second pass to join.
for handle in verifiers.drain(..) {
handle.join();
// wait for all verifier threads to join.
for thread in self.verifier_handles.drain(..) {
thread.join().expect("Propagating verifier thread panic on shutdown");
}
trace!(target: "shutdown", "[VerificationQueue] Closed.");
@@ -687,16 +698,21 @@ mod tests {
use util::*;
use io::*;
use spec::*;
use super::{BlockQueue, Config};
use super::{BlockQueue, Config, State};
use super::kind::blocks::Unverified;
use tests::helpers::*;
use error::*;
use views::*;
fn get_test_queue() -> BlockQueue {
// create a test block queue.
// auto_scaling enables verifier adjustment.
fn get_test_queue(auto_scale: bool) -> BlockQueue {
let spec = get_test_spec();
let engine = spec.engine;
BlockQueue::new(Config::default(), engine, IoChannel::disconnected(), true)
let mut config = Config::default();
config.verifier_settings.scale_verifiers = auto_scale;
BlockQueue::new(config, engine, IoChannel::disconnected(), true)
}
#[test]
@@ -709,7 +725,7 @@ mod tests {
#[test]
fn can_import_blocks() {
let queue = get_test_queue();
let queue = get_test_queue(false);
if let Err(e) = queue.import(Unverified::new(get_good_dummy_block())) {
panic!("error importing block that is valid by definition({:?})", e);
}
@@ -717,7 +733,7 @@ mod tests {
#[test]
fn returns_error_for_duplicates() {
let queue = get_test_queue();
let queue = get_test_queue(false);
if let Err(e) = queue.import(Unverified::new(get_good_dummy_block())) {
panic!("error importing block that is valid by definition({:?})", e);
}
@@ -736,7 +752,7 @@ mod tests {
#[test]
fn returns_ok_for_drained_duplicates() {
let queue = get_test_queue();
let queue = get_test_queue(false);
let block = get_good_dummy_block();
let hash = BlockView::new(&block).header().hash().clone();
if let Err(e) = queue.import(Unverified::new(block)) {
@@ -753,7 +769,7 @@ mod tests {
#[test]
fn returns_empty_once_finished() {
let queue = get_test_queue();
let queue = get_test_queue(false);
queue.import(Unverified::new(get_good_dummy_block()))
.expect("error importing block that is valid by definition");
queue.flush();
@@ -781,30 +797,23 @@ mod tests {
fn scaling_limits() {
use super::MAX_VERIFIERS;
let queue = get_test_queue();
let queue = get_test_queue(true);
queue.scale_verifiers(MAX_VERIFIERS + 1);
assert!(queue.verifiers.lock().1 < MAX_VERIFIERS + 1);
assert!(queue.num_verifiers() < MAX_VERIFIERS + 1);
queue.scale_verifiers(0);
assert!(queue.verifiers.lock().1 == 1);
assert!(queue.num_verifiers() == 1);
}
#[test]
fn readjust_verifiers() {
let queue = get_test_queue();
let queue = get_test_queue(true);
// put all the verifiers to sleep to ensure
// the test isn't timing sensitive.
let num_verifiers = {
let verifiers = queue.verifiers.lock();
for i in 0..verifiers.1 {
verifiers.0[i].sleep();
}
verifiers.1
};
*queue.state.0.lock() = State::Work(0);
for block in get_good_dummy_block_seq(5000) {
queue.import(Unverified::new(block)).expect("Block good by definition; qed");
@@ -812,20 +821,12 @@ mod tests {
// almost all unverified == bump verifier count.
queue.collect_garbage();
assert_eq!(queue.verifiers.lock().1, num_verifiers + 1);
// wake them up again and verify everything.
{
let verifiers = queue.verifiers.lock();
for i in 0..verifiers.1 {
verifiers.0[i].wake_up();
}
}
assert_eq!(queue.num_verifiers(), 1);
queue.flush();
// nothing to verify == use minimum number of verifiers.
queue.collect_garbage();
assert_eq!(queue.verifiers.lock().1, 1);
assert_eq!(queue.num_verifiers(), 1);
}
}