Merge remote-tracking branch 'parity/split-internal-seal' into auth-round
This commit is contained in:
@@ -673,6 +673,8 @@ impl Client {
|
||||
impl snapshot::DatabaseRestore for Client {
|
||||
/// Restart the client with a new backend
|
||||
fn restore_db(&self, new_db: &str) -> Result<(), EthcoreError> {
|
||||
trace!(target: "snapshot", "Replacing client database with {:?}", new_db);
|
||||
|
||||
let _import_lock = self.import_lock.lock();
|
||||
let mut state_db = self.state_db.write();
|
||||
let mut chain = self.chain.write();
|
||||
|
||||
@@ -99,6 +99,8 @@ impl Engine for BasicAuthority {
|
||||
/// This assumes that all uncles are valid uncles (i.e. of at least one generation before the current).
|
||||
fn on_close_block(&self, _block: &mut ExecutedBlock) {}
|
||||
|
||||
fn seals_internally(&self) -> bool { true }
|
||||
|
||||
/// Attempt to seal the block internally.
|
||||
///
|
||||
/// This operation is synchronous and may (quite reasonably) not be available, in which `false` will
|
||||
|
||||
@@ -58,6 +58,8 @@ impl Engine for InstantSeal {
|
||||
Schedule::new_homestead()
|
||||
}
|
||||
|
||||
fn seals_internally(&self) -> bool { true }
|
||||
|
||||
fn generate_seal(&self, _block: &ExecutedBlock, _accounts: Option<&AccountProvider>) -> Option<Vec<Bytes>> {
|
||||
Some(Vec::new())
|
||||
}
|
||||
|
||||
@@ -73,6 +73,8 @@ pub trait Engine : Sync + Send {
|
||||
/// Block transformation functions, after the transactions.
|
||||
fn on_close_block(&self, _block: &mut ExecutedBlock) {}
|
||||
|
||||
/// If true, generate_seal has to be implemented.
|
||||
fn seals_internally(&self) -> bool { false }
|
||||
/// Attempt to seal the block internally.
|
||||
///
|
||||
/// If `Some` is returned, then you get a valid seal.
|
||||
|
||||
@@ -24,7 +24,7 @@ use views::{BlockView, HeaderView};
|
||||
use state::State;
|
||||
use client::{MiningBlockChainClient, Executive, Executed, EnvInfo, TransactOptions, BlockID, CallAnalytics};
|
||||
use executive::contract_address;
|
||||
use block::{ClosedBlock, IsBlock, Block};
|
||||
use block::{ClosedBlock, SealedBlock, IsBlock, Block};
|
||||
use error::*;
|
||||
use transaction::{Action, SignedTransaction};
|
||||
use receipt::{Receipt, RichReceipt};
|
||||
@@ -243,19 +243,15 @@ impl Miner {
|
||||
}
|
||||
|
||||
/// Prepares new block for sealing including top transactions from queue.
|
||||
#[cfg_attr(feature="dev", allow(match_same_arms))]
|
||||
#[cfg_attr(feature="dev", allow(cyclomatic_complexity))]
|
||||
fn prepare_sealing(&self, chain: &MiningBlockChainClient) {
|
||||
trace!(target: "miner", "prepare_sealing: entering");
|
||||
|
||||
fn prepare_block(&self, chain: &MiningBlockChainClient) -> (ClosedBlock, Option<H256>) {
|
||||
{
|
||||
trace!(target: "miner", "recalibrating...");
|
||||
trace!(target: "miner", "prepare_block: recalibrating...");
|
||||
let txq = self.transaction_queue.clone();
|
||||
self.gas_pricer.lock().recalibrate(move |price| {
|
||||
trace!(target: "miner", "Got gas price! {}", price);
|
||||
trace!(target: "miner", "prepare_block: Got gas price! {}", price);
|
||||
txq.lock().set_minimal_gas_price(price);
|
||||
});
|
||||
trace!(target: "miner", "done recalibration.");
|
||||
trace!(target: "miner", "prepare_block: done recalibration.");
|
||||
}
|
||||
|
||||
let (transactions, mut open_block, original_work_hash) = {
|
||||
@@ -273,13 +269,13 @@ impl Miner {
|
||||
*/
|
||||
let open_block = match sealing_work.queue.pop_if(|b| b.block().fields().header.parent_hash() == &best_hash) {
|
||||
Some(old_block) => {
|
||||
trace!(target: "miner", "Already have previous work; updating and returning");
|
||||
trace!(target: "miner", "prepare_block: Already have previous work; updating and returning");
|
||||
// add transactions to old_block
|
||||
old_block.reopen(&*self.engine)
|
||||
}
|
||||
None => {
|
||||
// block not found - create it.
|
||||
trace!(target: "miner", "No existing work - making new block");
|
||||
trace!(target: "miner", "prepare_block: No existing work - making new block");
|
||||
chain.prepare_open_block(
|
||||
self.author(),
|
||||
(self.gas_floor_target(), self.gas_ceil_target()),
|
||||
@@ -334,37 +330,49 @@ impl Miner {
|
||||
queue.remove_invalid(&hash, &fetch_account);
|
||||
}
|
||||
}
|
||||
(block, original_work_hash)
|
||||
}
|
||||
|
||||
/// Attempts to perform internal sealing to return Ok(sealed),
|
||||
/// Err(Some(block)) returns for unsuccesful sealing while Err(None) indicates misspecified engine.
|
||||
fn seal_block_internally(&self, block: ClosedBlock) -> Result<SealedBlock, Option<ClosedBlock>> {
|
||||
trace!(target: "miner", "seal_block_internally: block has transaction - attempting internal seal.");
|
||||
let s = self.engine.generate_seal(block.block(), match self.accounts {
|
||||
Some(ref x) => Some(&**x),
|
||||
None => None,
|
||||
});
|
||||
if let Some(seal) = s {
|
||||
trace!(target: "miner", "seal_block_internally: managed internal seal. importing...");
|
||||
block.lock().try_seal(&*self.engine, seal).or_else(|_| {
|
||||
warn!("prepare_sealing: ERROR: try_seal failed when given internally generated seal. WTF?");
|
||||
Err(None)
|
||||
})
|
||||
} else {
|
||||
trace!(target: "miner", "seal_block_internally: unable to generate seal internally");
|
||||
Err(Some(block))
|
||||
}
|
||||
}
|
||||
|
||||
/// Uses engine to seal the block and then imports it to chain.
|
||||
fn seal_and_import_block_internally(&self, chain: &MiningBlockChainClient, block: ClosedBlock) -> bool {
|
||||
if !block.transactions().is_empty() {
|
||||
trace!(target: "miner", "prepare_sealing: block has transaction - attempting internal seal.");
|
||||
// block with transactions - see if we can seal immediately.
|
||||
let s = self.engine.generate_seal(block.block(), match self.accounts {
|
||||
Some(ref x) => Some(&**x),
|
||||
None => None,
|
||||
});
|
||||
if let Some(seal) = s {
|
||||
trace!(target: "miner", "prepare_sealing: managed internal seal. importing...");
|
||||
if let Ok(sealed) = block.lock().try_seal(&*self.engine, seal) {
|
||||
if let Ok(_) = chain.import_block(sealed.rlp_bytes()) {
|
||||
trace!(target: "miner", "prepare_sealing: sealed internally and imported. leaving.");
|
||||
} else {
|
||||
warn!("prepare_sealing: ERROR: could not import internally sealed block. WTF?");
|
||||
}
|
||||
} else {
|
||||
warn!("prepare_sealing: ERROR: try_seal failed when given internally generated seal. WTF?");
|
||||
if let Ok(sealed) = self.seal_block_internally(block) {
|
||||
if chain.import_block(sealed.rlp_bytes()).is_ok() {
|
||||
return true
|
||||
}
|
||||
return;
|
||||
} else {
|
||||
trace!(target: "miner", "prepare_sealing: unable to generate seal internally");
|
||||
}
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
/// Prepares work which has to be done to seal.
|
||||
fn prepare_work(&self, block: ClosedBlock, original_work_hash: Option<H256>) {
|
||||
let (work, is_new) = {
|
||||
let mut sealing_work = self.sealing_work.lock();
|
||||
let last_work_hash = sealing_work.queue.peek_last_ref().map(|pb| pb.block().fields().header.hash());
|
||||
trace!(target: "miner", "Checking whether we need to reseal: orig={:?} last={:?}, this={:?}", original_work_hash, last_work_hash, block.block().fields().header.hash());
|
||||
trace!(target: "miner", "prepare_work: Checking whether we need to reseal: orig={:?} last={:?}, this={:?}", original_work_hash, last_work_hash, block.block().fields().header.hash());
|
||||
let (work, is_new) = if last_work_hash.map_or(true, |h| h != block.block().fields().header.hash()) {
|
||||
trace!(target: "miner", "Pushing a new, refreshed or borrowed pending {}...", block.block().fields().header.hash());
|
||||
trace!(target: "miner", "prepare_work: Pushing a new, refreshed or borrowed pending {}...", block.block().fields().header.hash());
|
||||
let pow_hash = block.block().fields().header.hash();
|
||||
let number = block.block().fields().header.number();
|
||||
let difficulty = *block.block().fields().header.difficulty();
|
||||
@@ -378,7 +386,7 @@ impl Miner {
|
||||
} else {
|
||||
(None, false)
|
||||
};
|
||||
trace!(target: "miner", "prepare_sealing: leaving (last={:?})", sealing_work.queue.peek_last_ref().map(|b| b.block().fields().header.hash()));
|
||||
trace!(target: "miner", "prepare_work: leaving (last={:?})", sealing_work.queue.peek_last_ref().map(|b| b.block().fields().header.hash()));
|
||||
(work, is_new)
|
||||
};
|
||||
if is_new {
|
||||
@@ -393,12 +401,12 @@ impl Miner {
|
||||
}
|
||||
|
||||
/// Returns true if we had to prepare new pending block
|
||||
fn enable_and_prepare_sealing(&self, chain: &MiningBlockChainClient) -> bool {
|
||||
trace!(target: "miner", "enable_and_prepare_sealing: entering");
|
||||
fn prepare_work_sealing(&self, chain: &MiningBlockChainClient) -> bool {
|
||||
trace!(target: "miner", "prepare_work_sealing: entering");
|
||||
let prepare_new = {
|
||||
let mut sealing_work = self.sealing_work.lock();
|
||||
let have_work = sealing_work.queue.peek_last_ref().is_some();
|
||||
trace!(target: "miner", "enable_and_prepare_sealing: have_work={}", have_work);
|
||||
trace!(target: "miner", "prepare_work_sealing: have_work={}", have_work);
|
||||
if !have_work {
|
||||
sealing_work.enabled = true;
|
||||
true
|
||||
@@ -411,12 +419,13 @@ impl Miner {
|
||||
// | NOTE Code below requires transaction_queue and sealing_work locks. |
|
||||
// | Make sure to release the locks before calling that method. |
|
||||
// --------------------------------------------------------------------------
|
||||
self.prepare_sealing(chain);
|
||||
let (block, original_work_hash) = self.prepare_block(chain);
|
||||
self.prepare_work(block, original_work_hash);
|
||||
}
|
||||
let mut sealing_block_last_request = self.sealing_block_last_request.lock();
|
||||
let best_number = chain.chain_info().best_block_number;
|
||||
if *sealing_block_last_request != best_number {
|
||||
trace!(target: "miner", "enable_and_prepare_sealing: Miner received request (was {}, now {}) - waking up.", *sealing_block_last_request, best_number);
|
||||
trace!(target: "miner", "prepare_work_sealing: Miner received request (was {}, now {}) - waking up.", *sealing_block_last_request, best_number);
|
||||
*sealing_block_last_request = best_number;
|
||||
}
|
||||
|
||||
@@ -635,7 +644,7 @@ impl MinerService for Miner {
|
||||
trace!(target: "own_tx", "Importing transaction: {:?}", transaction);
|
||||
|
||||
let imported = {
|
||||
// Be sure to release the lock before we call enable_and_prepare_sealing
|
||||
// Be sure to release the lock before we call prepare_work_sealing
|
||||
let mut transaction_queue = self.transaction_queue.lock();
|
||||
let import = self.add_transactions_to_queue(
|
||||
chain, vec![transaction], TransactionOrigin::Local, &mut transaction_queue
|
||||
@@ -662,7 +671,7 @@ impl MinerService for Miner {
|
||||
if imported.is_ok() && self.options.reseal_on_own_tx && self.tx_reseal_allowed() {
|
||||
// Make sure to do it after transaction is imported and lock is droped.
|
||||
// We need to create pending block and enable sealing
|
||||
let prepared = self.enable_and_prepare_sealing(chain);
|
||||
let prepared = self.prepare_work_sealing(chain);
|
||||
// If new block has not been prepared (means we already had one)
|
||||
// we need to update sealing
|
||||
if !prepared {
|
||||
@@ -804,7 +813,15 @@ impl MinerService for Miner {
|
||||
// | NOTE Code below requires transaction_queue and sealing_work locks. |
|
||||
// | Make sure to release the locks before calling that method. |
|
||||
// --------------------------------------------------------------------------
|
||||
self.prepare_sealing(chain);
|
||||
trace!(target: "miner", "update_sealing: preparing a block");
|
||||
let (block, original_work_hash) = self.prepare_block(chain);
|
||||
if self.engine.seals_internally() {
|
||||
trace!(target: "miner", "update_sealing: engine indicates internal sealing");
|
||||
self.seal_and_import_block_internally(chain, block);
|
||||
} else {
|
||||
trace!(target: "miner", "update_sealing: engine does not seal internally, preparing work");
|
||||
self.prepare_work(block, original_work_hash);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -814,7 +831,7 @@ impl MinerService for Miner {
|
||||
|
||||
fn map_sealing_work<F, T>(&self, chain: &MiningBlockChainClient, f: F) -> Option<T> where F: FnOnce(&ClosedBlock) -> T {
|
||||
trace!(target: "miner", "map_sealing_work: entering");
|
||||
self.enable_and_prepare_sealing(chain);
|
||||
self.prepare_work_sealing(chain);
|
||||
trace!(target: "miner", "map_sealing_work: sealing prepared");
|
||||
let mut sealing_work = self.sealing_work.lock();
|
||||
let ret = sealing_work.queue.use_last_ref();
|
||||
@@ -1002,7 +1019,7 @@ mod tests {
|
||||
assert_eq!(miner.pending_transactions_hashes().len(), 1);
|
||||
assert_eq!(miner.pending_receipts().len(), 1);
|
||||
// This method will let us know if pending block was created (before calling that method)
|
||||
assert_eq!(miner.enable_and_prepare_sealing(&client), false);
|
||||
assert_eq!(miner.prepare_work_sealing(&client), false);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -1032,6 +1049,6 @@ mod tests {
|
||||
assert_eq!(miner.pending_transactions().len(), 0);
|
||||
assert_eq!(miner.pending_receipts().len(), 0);
|
||||
// This method will let us know if pending block was created (before calling that method)
|
||||
assert_eq!(miner.enable_and_prepare_sealing(&client), true);
|
||||
assert_eq!(miner.prepare_work_sealing(&client), true);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -94,7 +94,6 @@ impl ClientService {
|
||||
pruning: pruning,
|
||||
channel: io_service.channel(),
|
||||
snapshot_root: snapshot_path.into(),
|
||||
client_db: client_path.into(),
|
||||
db_restore: client.clone(),
|
||||
};
|
||||
let snapshot = Arc::new(try!(SnapshotService::new(snapshot_params)));
|
||||
@@ -187,7 +186,7 @@ impl IoHandler<ClientIoMessage> for ClientIoHandler {
|
||||
ClientIoMessage::BlockVerified => { self.client.import_verified_blocks(); }
|
||||
ClientIoMessage::NewTransactions(ref transactions) => { self.client.import_queued_transactions(transactions); }
|
||||
ClientIoMessage::BeginRestoration(ref manifest) => {
|
||||
if let Err(e) = self.snapshot.init_restore(manifest.clone()) {
|
||||
if let Err(e) = self.snapshot.init_restore(manifest.clone(), true) {
|
||||
warn!("Failed to initialize snapshot restoration: {}", e);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -35,7 +35,7 @@ use service::ClientIoMessage;
|
||||
|
||||
use io::IoChannel;
|
||||
|
||||
use util::{Bytes, H256, Mutex, RwLock, UtilError};
|
||||
use util::{Bytes, H256, Mutex, RwLock, RwLockReadGuard, UtilError};
|
||||
use util::journaldb::Algorithm;
|
||||
use util::kvdb::{Database, DatabaseConfig};
|
||||
use util::snappy;
|
||||
@@ -70,7 +70,7 @@ struct Restoration {
|
||||
block_chunks_left: HashSet<H256>,
|
||||
state: StateRebuilder,
|
||||
blocks: BlockRebuilder,
|
||||
writer: LooseWriter,
|
||||
writer: Option<LooseWriter>,
|
||||
snappy_buffer: Bytes,
|
||||
final_state_root: H256,
|
||||
guard: Guard,
|
||||
@@ -80,8 +80,8 @@ struct RestorationParams<'a> {
|
||||
manifest: ManifestData, // manifest to base restoration on.
|
||||
pruning: Algorithm, // pruning algorithm for the database.
|
||||
db_path: PathBuf, // database path
|
||||
db_config: &'a DatabaseConfig,
|
||||
writer: LooseWriter, // writer for recovered snapshot.
|
||||
db_config: &'a DatabaseConfig, // configuration for the database.
|
||||
writer: Option<LooseWriter>, // writer for recovered snapshot.
|
||||
genesis: &'a [u8], // genesis block of the chain.
|
||||
guard: Guard, // guard for the restoration directory.
|
||||
}
|
||||
@@ -120,7 +120,10 @@ impl Restoration {
|
||||
let len = try!(snappy::decompress_into(chunk, &mut self.snappy_buffer));
|
||||
|
||||
try!(self.state.feed(&self.snappy_buffer[..len]));
|
||||
try!(self.writer.write_state_chunk(hash, chunk));
|
||||
|
||||
if let Some(ref mut writer) = self.writer.as_mut() {
|
||||
try!(writer.write_state_chunk(hash, chunk));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -132,7 +135,9 @@ impl Restoration {
|
||||
let len = try!(snappy::decompress_into(chunk, &mut self.snappy_buffer));
|
||||
|
||||
try!(self.blocks.feed(&self.snappy_buffer[..len], engine));
|
||||
try!(self.writer.write_block_chunk(hash, chunk));
|
||||
if let Some(ref mut writer) = self.writer.as_mut() {
|
||||
try!(writer.write_block_chunk(hash, chunk));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -157,7 +162,9 @@ impl Restoration {
|
||||
// connect out-of-order chunks.
|
||||
self.blocks.glue_chunks();
|
||||
|
||||
try!(self.writer.finish(self.manifest));
|
||||
if let Some(writer) = self.writer {
|
||||
try!(writer.finish(self.manifest));
|
||||
}
|
||||
|
||||
self.guard.disarm();
|
||||
Ok(())
|
||||
@@ -187,9 +194,6 @@ pub struct ServiceParams {
|
||||
/// The directory to put snapshots in.
|
||||
/// Usually "<chain hash>/snapshot"
|
||||
pub snapshot_root: PathBuf,
|
||||
/// The client's database directory.
|
||||
/// Usually "<chain hash>/<pruning>/db".
|
||||
pub client_db: PathBuf,
|
||||
/// A handle for database restoration.
|
||||
pub db_restore: Arc<DatabaseRestore>,
|
||||
}
|
||||
@@ -198,7 +202,6 @@ pub struct ServiceParams {
|
||||
/// This controls taking snapshots and restoring from them.
|
||||
pub struct Service {
|
||||
restoration: Mutex<Option<Restoration>>,
|
||||
client_db: PathBuf,
|
||||
snapshot_root: PathBuf,
|
||||
db_config: DatabaseConfig,
|
||||
io_channel: Channel,
|
||||
@@ -219,7 +222,6 @@ impl Service {
|
||||
pub fn new(params: ServiceParams) -> Result<Self, Error> {
|
||||
let mut service = Service {
|
||||
restoration: Mutex::new(None),
|
||||
client_db: params.client_db,
|
||||
snapshot_root: params.snapshot_root,
|
||||
db_config: params.db_config,
|
||||
io_channel: params.channel,
|
||||
@@ -301,11 +303,15 @@ impl Service {
|
||||
fn replace_client_db(&self) -> Result<(), Error> {
|
||||
let our_db = self.restoration_db();
|
||||
|
||||
trace!(target: "snapshot", "replacing {:?} with {:?}", self.client_db, our_db);
|
||||
try!(self.db_restore.restore_db(our_db.to_str().unwrap()));
|
||||
try!(self.db_restore.restore_db(&*our_db.to_string_lossy()));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Get a reference to the snapshot reader.
|
||||
pub fn reader(&self) -> RwLockReadGuard<Option<LooseReader>> {
|
||||
self.reader.read()
|
||||
}
|
||||
|
||||
/// Tick the snapshot service. This will log any active snapshot
|
||||
/// being taken.
|
||||
pub fn tick(&self) {
|
||||
@@ -357,11 +363,15 @@ impl Service {
|
||||
}
|
||||
|
||||
/// Initialize the restoration synchronously.
|
||||
pub fn init_restore(&self, manifest: ManifestData) -> Result<(), Error> {
|
||||
/// The recover flag indicates whether to recover the restored snapshot.
|
||||
pub fn init_restore(&self, manifest: ManifestData, recover: bool) -> Result<(), Error> {
|
||||
let rest_dir = self.restoration_dir();
|
||||
|
||||
let mut res = self.restoration.lock();
|
||||
|
||||
self.state_chunks.store(0, Ordering::SeqCst);
|
||||
self.block_chunks.store(0, Ordering::SeqCst);
|
||||
|
||||
// tear down existing restoration.
|
||||
*res = None;
|
||||
|
||||
@@ -376,7 +386,10 @@ impl Service {
|
||||
try!(fs::create_dir_all(&rest_dir));
|
||||
|
||||
// make new restoration.
|
||||
let writer = try!(LooseWriter::new(self.temp_recovery_dir()));
|
||||
let writer = match recover {
|
||||
true => Some(try!(LooseWriter::new(self.temp_recovery_dir()))),
|
||||
false => None
|
||||
};
|
||||
|
||||
let params = RestorationParams {
|
||||
manifest: manifest,
|
||||
@@ -391,8 +404,8 @@ impl Service {
|
||||
*res = Some(try!(Restoration::new(params)));
|
||||
|
||||
*self.status.lock() = RestorationStatus::Ongoing {
|
||||
state_chunks_done: self.state_chunks.load(Ordering::Relaxed) as u32,
|
||||
block_chunks_done: self.block_chunks.load(Ordering::Relaxed) as u32,
|
||||
state_chunks_done: self.state_chunks.load(Ordering::SeqCst) as u32,
|
||||
block_chunks_done: self.block_chunks.load(Ordering::SeqCst) as u32,
|
||||
};
|
||||
Ok(())
|
||||
}
|
||||
@@ -403,35 +416,35 @@ impl Service {
|
||||
fn finalize_restoration(&self, rest: &mut Option<Restoration>) -> Result<(), Error> {
|
||||
trace!(target: "snapshot", "finalizing restoration");
|
||||
|
||||
self.state_chunks.store(0, Ordering::SeqCst);
|
||||
self.block_chunks.store(0, Ordering::SeqCst);
|
||||
let recover = rest.as_ref().map_or(false, |rest| rest.writer.is_some());
|
||||
|
||||
// destroy the restoration before replacing databases and snapshot.
|
||||
try!(rest.take().map(Restoration::finalize).unwrap_or(Ok(())));
|
||||
try!(self.replace_client_db());
|
||||
|
||||
let mut reader = self.reader.write();
|
||||
*reader = None; // destroy the old reader if it existed.
|
||||
if recover {
|
||||
let mut reader = self.reader.write();
|
||||
*reader = None; // destroy the old reader if it existed.
|
||||
|
||||
let snapshot_dir = self.snapshot_dir();
|
||||
let snapshot_dir = self.snapshot_dir();
|
||||
|
||||
trace!(target: "snapshot", "removing old snapshot dir at {}", snapshot_dir.to_string_lossy());
|
||||
if let Err(e) = fs::remove_dir_all(&snapshot_dir) {
|
||||
match e.kind() {
|
||||
ErrorKind::NotFound => {}
|
||||
_ => return Err(e.into()),
|
||||
trace!(target: "snapshot", "removing old snapshot dir at {}", snapshot_dir.to_string_lossy());
|
||||
if let Err(e) = fs::remove_dir_all(&snapshot_dir) {
|
||||
match e.kind() {
|
||||
ErrorKind::NotFound => {}
|
||||
_ => return Err(e.into()),
|
||||
}
|
||||
}
|
||||
|
||||
try!(fs::create_dir(&snapshot_dir));
|
||||
|
||||
trace!(target: "snapshot", "copying restored snapshot files over");
|
||||
try!(fs::rename(self.temp_recovery_dir(), &snapshot_dir));
|
||||
|
||||
*reader = Some(try!(LooseReader::new(snapshot_dir)));
|
||||
}
|
||||
|
||||
try!(fs::create_dir(&snapshot_dir));
|
||||
|
||||
trace!(target: "snapshot", "copying restored snapshot files over");
|
||||
try!(fs::rename(self.temp_recovery_dir(), &snapshot_dir));
|
||||
|
||||
let _ = fs::remove_dir_all(self.restoration_dir());
|
||||
|
||||
*reader = Some(try!(LooseReader::new(snapshot_dir)));
|
||||
|
||||
*self.status.lock() = RestorationStatus::Inactive;
|
||||
|
||||
Ok(())
|
||||
@@ -512,7 +525,13 @@ impl SnapshotService for Service {
|
||||
}
|
||||
|
||||
fn status(&self) -> RestorationStatus {
|
||||
*self.status.lock()
|
||||
let mut cur_status = self.status.lock();
|
||||
if let RestorationStatus::Ongoing { ref mut state_chunks_done, ref mut block_chunks_done } = *cur_status {
|
||||
*state_chunks_done = self.state_chunks.load(Ordering::SeqCst) as u32;
|
||||
*block_chunks_done = self.block_chunks.load(Ordering::SeqCst) as u32;
|
||||
}
|
||||
|
||||
cur_status.clone()
|
||||
}
|
||||
|
||||
fn begin_restore(&self, manifest: ManifestData) {
|
||||
@@ -523,12 +542,6 @@ impl SnapshotService for Service {
|
||||
fn abort_restore(&self) {
|
||||
*self.restoration.lock() = None;
|
||||
*self.status.lock() = RestorationStatus::Inactive;
|
||||
if let Err(e) = fs::remove_dir_all(&self.restoration_dir()) {
|
||||
match e.kind() {
|
||||
ErrorKind::NotFound => {},
|
||||
_ => warn!("encountered error {} while deleting snapshot restoration dir.", e),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn restore_state_chunk(&self, hash: H256, chunk: Bytes) {
|
||||
@@ -585,7 +598,6 @@ mod tests {
|
||||
pruning: Algorithm::Archive,
|
||||
channel: service.channel(),
|
||||
snapshot_root: dir,
|
||||
client_db: client_db,
|
||||
db_restore: Arc::new(NoopDBRestore),
|
||||
};
|
||||
|
||||
|
||||
@@ -18,6 +18,7 @@
|
||||
|
||||
mod blocks;
|
||||
mod state;
|
||||
mod service;
|
||||
|
||||
pub mod helpers;
|
||||
|
||||
|
||||
143
ethcore/src/snapshot/tests/service.rs
Normal file
143
ethcore/src/snapshot/tests/service.rs
Normal file
@@ -0,0 +1,143 @@
|
||||
// Copyright 2015, 2016 Ethcore (UK) Ltd.
|
||||
// This file is part of Parity.
|
||||
|
||||
// Parity is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Parity is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
//! Tests for the snapshot service.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use client::{BlockChainClient, Client};
|
||||
use ids::BlockID;
|
||||
use snapshot::service::{Service, ServiceParams};
|
||||
use snapshot::{self, ManifestData, SnapshotService};
|
||||
use spec::Spec;
|
||||
use tests::helpers::generate_dummy_client_with_spec_and_data;
|
||||
|
||||
use devtools::RandomTempPath;
|
||||
use io::IoChannel;
|
||||
use util::kvdb::DatabaseConfig;
|
||||
|
||||
struct NoopDBRestore;
|
||||
|
||||
impl snapshot::DatabaseRestore for NoopDBRestore {
|
||||
fn restore_db(&self, _new_db: &str) -> Result<(), ::error::Error> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn restored_is_equivalent() {
|
||||
const NUM_BLOCKS: u32 = 400;
|
||||
const TX_PER: usize = 5;
|
||||
|
||||
let gas_prices = vec![1.into(), 2.into(), 3.into(), 999.into()];
|
||||
|
||||
let client = generate_dummy_client_with_spec_and_data(Spec::new_null, NUM_BLOCKS, TX_PER, &gas_prices);
|
||||
|
||||
let path = RandomTempPath::create_dir();
|
||||
let mut path = path.as_path().clone();
|
||||
let mut client_db = path.clone();
|
||||
|
||||
client_db.push("client_db");
|
||||
path.push("snapshot");
|
||||
|
||||
let db_config = DatabaseConfig::with_columns(::db::NUM_COLUMNS);
|
||||
|
||||
let spec = Spec::new_null();
|
||||
let client2 = Client::new(
|
||||
Default::default(),
|
||||
&spec,
|
||||
&client_db,
|
||||
Arc::new(::miner::Miner::with_spec(&spec)),
|
||||
IoChannel::disconnected(),
|
||||
&db_config,
|
||||
).unwrap();
|
||||
|
||||
let service_params = ServiceParams {
|
||||
engine: spec.engine.clone(),
|
||||
genesis_block: spec.genesis_block(),
|
||||
db_config: db_config,
|
||||
pruning: ::util::journaldb::Algorithm::Archive,
|
||||
channel: IoChannel::disconnected(),
|
||||
snapshot_root: path,
|
||||
db_restore: client2.clone(),
|
||||
};
|
||||
|
||||
let service = Service::new(service_params).unwrap();
|
||||
service.take_snapshot(&client, NUM_BLOCKS as u64).unwrap();
|
||||
|
||||
let manifest = service.manifest().unwrap();
|
||||
|
||||
service.init_restore(manifest.clone(), true).unwrap();
|
||||
assert!(service.init_restore(manifest.clone(), true).is_ok());
|
||||
|
||||
for hash in manifest.state_hashes {
|
||||
let chunk = service.chunk(hash).unwrap();
|
||||
service.feed_state_chunk(hash, &chunk);
|
||||
}
|
||||
|
||||
for hash in manifest.block_hashes {
|
||||
let chunk = service.chunk(hash).unwrap();
|
||||
service.feed_block_chunk(hash, &chunk);
|
||||
}
|
||||
|
||||
assert_eq!(service.status(), ::snapshot::RestorationStatus::Inactive);
|
||||
|
||||
for x in 0..NUM_BLOCKS {
|
||||
let block1 = client.block(BlockID::Number(x as u64)).unwrap();
|
||||
let block2 = client2.block(BlockID::Number(x as u64)).unwrap();
|
||||
|
||||
assert_eq!(block1, block2);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn guards_delete_folders() {
|
||||
let spec = Spec::new_null();
|
||||
let path = RandomTempPath::create_dir();
|
||||
let mut path = path.as_path().clone();
|
||||
let service_params = ServiceParams {
|
||||
engine: spec.engine.clone(),
|
||||
genesis_block: spec.genesis_block(),
|
||||
db_config: DatabaseConfig::with_columns(::db::NUM_COLUMNS),
|
||||
pruning: ::util::journaldb::Algorithm::Archive,
|
||||
channel: IoChannel::disconnected(),
|
||||
snapshot_root: path.clone(),
|
||||
db_restore: Arc::new(NoopDBRestore),
|
||||
};
|
||||
|
||||
let service = Service::new(service_params).unwrap();
|
||||
path.push("restoration");
|
||||
|
||||
let manifest = ManifestData {
|
||||
state_hashes: vec![],
|
||||
block_hashes: vec![],
|
||||
block_number: 0,
|
||||
block_hash: Default::default(),
|
||||
state_root: Default::default(),
|
||||
};
|
||||
|
||||
service.init_restore(manifest.clone(), true).unwrap();
|
||||
assert!(path.exists());
|
||||
|
||||
service.abort_restore();
|
||||
assert!(!path.exists());
|
||||
|
||||
service.init_restore(manifest.clone(), true).unwrap();
|
||||
assert!(path.exists());
|
||||
|
||||
drop(service);
|
||||
assert!(!path.exists());
|
||||
}
|
||||
@@ -56,7 +56,7 @@ fn can_handshake() {
|
||||
let stop_guard = StopGuard::new();
|
||||
let socket_path = "ipc:///tmp/parity-client-rpc-10.ipc";
|
||||
run_test_worker(scope, stop_guard.share(), socket_path);
|
||||
let remote_client = nanoipc::init_client::<RemoteClient<_>>(socket_path).unwrap();
|
||||
let remote_client = nanoipc::generic_client::<RemoteClient<_>>(socket_path).unwrap();
|
||||
|
||||
assert!(remote_client.handshake().is_ok());
|
||||
})
|
||||
@@ -68,7 +68,7 @@ fn can_query_block() {
|
||||
let stop_guard = StopGuard::new();
|
||||
let socket_path = "ipc:///tmp/parity-client-rpc-20.ipc";
|
||||
run_test_worker(scope, stop_guard.share(), socket_path);
|
||||
let remote_client = nanoipc::init_client::<RemoteClient<_>>(socket_path).unwrap();
|
||||
let remote_client = nanoipc::generic_client::<RemoteClient<_>>(socket_path).unwrap();
|
||||
|
||||
let non_existant_block = remote_client.block_header(BlockID::Number(999));
|
||||
|
||||
|
||||
Reference in New Issue
Block a user