// Copyright 2015-2020 Parity Technologies (UK) Ltd. // This file is part of Parity. // Parity is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // Parity is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with Parity. If not, see . //! Private transactions logs. use ethereum_types::{H256, Address}; use std::collections::HashMap; use std::fs::File; use std::path::PathBuf; use std::sync::Arc; use std::time::{SystemTime, Duration, Instant}; use parking_lot::RwLock; use serde::ser::{Serializer, SerializeSeq}; use error::Error; #[cfg(not(time_checked_add))] use time_utils::CheckedSystemTime; /// Maximum amount of stored private transaction logs. const MAX_JOURNAL_LEN: usize = 1000; /// Maximum period for storing private transaction logs. /// Logs older than 20 days will not be processed const MAX_STORING_TIME: Duration = Duration::from_secs(60 * 60 * 24 * 20); /// Source of monotonic time for log timestamps struct MonoTime { start_time: SystemTime, start_inst: Instant } impl MonoTime { fn new(start: SystemTime) -> Self { Self { start_time: start, start_inst: Instant::now() } } fn elapsed(&self) -> Duration { self.start_inst.elapsed() } fn to_system_time(&self) -> SystemTime { self.start_time + self.elapsed() } } impl Default for MonoTime { fn default() -> Self { MonoTime::new(SystemTime::now()) } } /// Current status of the private transaction #[derive(Clone, Serialize, Deserialize, Debug, PartialEq)] pub enum PrivateTxStatus { /// Private tx was created but no validation received yet Created, /// Private state not found locally and being retrived from peers PrivateStateSync, /// Retrieval of the private state failed, transaction not created PrivateStateSyncFailed, /// Several validators (but not all) validated the transaction Validating, /// All validators has validated the private tx /// Corresponding public tx was created and added into the pool Deployed, } /// Information about private tx validation #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ValidatorLog { /// Account of the validator pub account: Address, /// Validation timestamp, None if the transaction is not validated pub validation_timestamp: Option, } #[cfg(test)] impl PartialEq for ValidatorLog { fn eq(&self, other: &Self) -> bool { self.account == other.account } } /// Information about the private transaction #[derive(Debug, Clone, Serialize, Deserialize)] pub struct TransactionLog { /// Original signed transaction hash (used as a source for private tx) pub tx_hash: H256, /// Current status of the private transaction pub status: PrivateTxStatus, /// Creation timestamp pub creation_timestamp: SystemTime, /// List of validations pub validators: Vec, /// Timestamp of the resulting public tx deployment pub deployment_timestamp: Option, /// Hash of the resulting public tx pub public_tx_hash: Option, } #[cfg(test)] impl PartialEq for TransactionLog { fn eq(&self, other: &Self) -> bool { self.tx_hash == other.tx_hash && self.status == other.status && self.validators == other.validators && self.public_tx_hash == other.public_tx_hash } } /// Wrapper other JSON serializer pub trait LogsSerializer: Send + Sync + 'static { /// Read logs from the source fn read_logs(&self) -> Result, Error>; /// Write all logs to the source fn flush_logs(&self, logs: &HashMap) -> Result<(), Error>; } /// Logs serializer to the json file pub struct FileLogsSerializer { logs_dir: PathBuf, } impl FileLogsSerializer { pub fn with_path>(logs_dir: P) -> Self { FileLogsSerializer { logs_dir: logs_dir.into(), } } fn open_file(&self, to_create: bool) -> Result { let file_path = self.logs_dir.with_file_name("private_tx.log"); if to_create { File::create(&file_path).map_err(From::from) } else { File::open(&file_path).map_err(From::from) } } } impl LogsSerializer for FileLogsSerializer { fn read_logs(&self) -> Result, Error> { let log_file = self.open_file(false)?; match serde_json::from_reader(log_file) { Ok(logs) => Ok(logs), Err(err) => { error!(target: "privatetx", "Cannot deserialize logs from file: {}", err); return Err(format!("Cannot deserialize logs from file: {:?}", err).into()); } } } fn flush_logs(&self, logs: &HashMap) -> Result<(), Error> { if logs.is_empty() { // Do not create empty file return Ok(()); } let log_file = self.open_file(true)?; let mut json = serde_json::Serializer::new(log_file); let mut json_array = json.serialize_seq(Some(logs.len()))?; for v in logs.values() { json_array.serialize_element(v)?; } json_array.end()?; Ok(()) } } /// Private transactions logging pub struct Logging { logs: RwLock>, logs_serializer: Arc, mono_time: MonoTime, } impl Logging { /// Creates the logging object pub fn new(logs_serializer: Arc) -> Self { let mut logging = Logging { logs: RwLock::new(HashMap::new()), logs_serializer, mono_time: MonoTime::default(), }; match logging.read_logs() { // Initialize time source by max from current system time and max creation time from already saved logs Ok(initial_time) => logging.mono_time = MonoTime::new(initial_time), Err(err) => warn!(target: "privatetx", "Cannot read logs: {:?}", err), } logging } /// Retrieves log for the corresponding tx hash pub fn tx_log(&self, tx_hash: &H256) -> Option { self.logs.read().get(&tx_hash).cloned() } /// Logs the creation of the private transaction pub fn private_tx_created(&self, tx_hash: &H256, validators: &[Address]) { let mut logs = self.logs.write(); if let Some(transaction_log) = logs.get_mut(&tx_hash) { if transaction_log.status == PrivateTxStatus::PrivateStateSync { // Transaction was already created before, its private state was being retrieved transaction_log.status = PrivateTxStatus::Created; return; } else { error!(target: "privatetx", "Attempt to create a duplicate transaction"); return; } } let mut validator_logs = Vec::new(); for account in validators { validator_logs.push(ValidatorLog { account: *account, validation_timestamp: None, }); } if logs.len() > MAX_JOURNAL_LEN { // Remove the oldest log if let Some(tx_hash) = logs.values() .min_by(|x, y| x.creation_timestamp.cmp(&y.creation_timestamp)) .map(|oldest| oldest.tx_hash) { logs.remove(&tx_hash); } } logs.insert(*tx_hash, TransactionLog { tx_hash: *tx_hash, status: PrivateTxStatus::Created, creation_timestamp: self.mono_time.to_system_time(), validators: validator_logs, deployment_timestamp: None, public_tx_hash: None, }); } /// Private state retrieval started pub fn private_state_request(&self, tx_hash: &H256) { let mut logs = self.logs.write(); if let Some(transaction_log) = logs.get_mut(&tx_hash) { transaction_log.status = PrivateTxStatus::PrivateStateSync; } } /// Private state retrieval failed pub fn private_state_sync_failed(&self, tx_hash: &H256) { let mut logs = self.logs.write(); if let Some(transaction_log) = logs.get_mut(&tx_hash) { transaction_log.status = PrivateTxStatus::PrivateStateSyncFailed; } } /// Logs the validation of the private transaction by one of its validators pub fn signature_added(&self, tx_hash: &H256, validator: &Address) { let mut logs = self.logs.write(); if let Some(transaction_log) = logs.get_mut(&tx_hash) { if let Some(ref mut validator_log) = transaction_log.validators.iter_mut().find(|log| log.account == *validator) { transaction_log.status = PrivateTxStatus::Validating; validator_log.validation_timestamp = Some(self.mono_time.to_system_time()); } } } /// Logs the final deployment of the resulting public transaction pub fn tx_deployed(&self, tx_hash: &H256, public_tx_hash: &H256) { let mut logs = self.logs.write(); if let Some(log) = logs.get_mut(&tx_hash) { log.status = PrivateTxStatus::Deployed; log.deployment_timestamp = Some(self.mono_time.to_system_time()); log.public_tx_hash = Some(*public_tx_hash); } } fn read_logs(&self) -> Result { let mut transaction_logs = self.logs_serializer.read_logs()?; // Drop old logs let earliest_possible = SystemTime::now().checked_sub(MAX_STORING_TIME).ok_or(Error::TimestampOverflow)?; transaction_logs.retain(|tx_log| tx_log.creation_timestamp > earliest_possible); // Sort logs by their creation time in order to find the most recent transaction_logs.sort_by(|a, b| b.creation_timestamp.cmp(&a.creation_timestamp)); let initial_timestamp = transaction_logs.first() .map_or(SystemTime::now(), |l| std::cmp::max(SystemTime::now(), l.creation_timestamp)); let mut logs = self.logs.write(); for log in transaction_logs { logs.insert(log.tx_hash, log); } Ok(initial_timestamp) } fn flush_logs(&self) -> Result<(), Error> { let logs = self.logs.read(); self.logs_serializer.flush_logs(&logs) } } // Flush all logs on drop impl Drop for Logging { fn drop(&mut self) { if let Err(err) = self.flush_logs() { warn!(target: "privatetx", "Cannot write logs: {:?}", err); } } } #[cfg(test)] mod tests { use serde_json; use error::Error; use ethereum_types::{H256, Address}; use std::collections::{HashMap, BTreeMap}; use std::sync::Arc; use std::time::{SystemTime, Duration}; use std::str::FromStr; use types::transaction::Transaction; use parking_lot::RwLock; use super::{TransactionLog, Logging, PrivateTxStatus, LogsSerializer, ValidatorLog}; #[cfg(not(time_checked_add))] use time_utils::CheckedSystemTime; struct StringLogSerializer { string_log: RwLock, } impl StringLogSerializer { fn new(source: String) -> Self { StringLogSerializer { string_log: RwLock::new(source), } } fn log(&self) -> String { let log = self.string_log.read(); log.clone() } } impl LogsSerializer for StringLogSerializer { fn read_logs(&self) -> Result, Error> { let source = self.string_log.read(); if source.is_empty() { return Ok(Vec::new()) } let logs = serde_json::from_str(&source).unwrap(); Ok(logs) } fn flush_logs(&self, logs: &HashMap) -> Result<(), Error> { // Sort logs in order to have the same order let sorted_logs: BTreeMap<&H256, &TransactionLog> = logs.iter().collect(); *self.string_log.write() = serde_json::to_string(&sorted_logs.values().collect::>())?; Ok(()) } } #[test] fn private_log_format() { let s = r#"{ "tx_hash":"0x64f648ca7ae7f4138014f860ae56164d8d5732969b1cea54d8be9d144d8aa6f6", "status":"Deployed", "creation_timestamp":{"secs_since_epoch":1557220355,"nanos_since_epoch":196382053}, "validators":[{ "account":"0x82a978b3f5962a5b0957d9ee9eef472ee55b42f1", "validation_timestamp":{"secs_since_epoch":1557220355,"nanos_since_epoch":196382053} }], "deployment_timestamp":{"secs_since_epoch":1557220355,"nanos_since_epoch":196382053}, "public_tx_hash":"0x69b9c691ede7993effbcc88911c309af1c82be67b04b3882dd446b808ae146da" }"#; let _deserialized: TransactionLog = serde_json::from_str(s).unwrap(); } #[test] fn private_log_status() { let logger = Logging::new(Arc::new(StringLogSerializer::new("".into()))); let private_tx = Transaction::default(); let hash = private_tx.hash(None); logger.private_tx_created(&hash, &vec![Address::from_str("82a978b3f5962a5b0957d9ee9eef472ee55b42f1").unwrap()]); logger.signature_added(&hash, &Address::from_str("82a978b3f5962a5b0957d9ee9eef472ee55b42f1").unwrap()); logger.tx_deployed(&hash, &hash); let tx_log = logger.tx_log(&hash).unwrap(); assert_eq!(tx_log.status, PrivateTxStatus::Deployed); } #[test] fn serialization() { let current_timestamp = SystemTime::now(); let initial_validator_log = ValidatorLog { account: Address::from_str("82a978b3f5962a5b0957d9ee9eef472ee55b42f1").unwrap(), validation_timestamp: Some(current_timestamp.checked_add(Duration::from_secs(1)).unwrap()), }; let initial_log = TransactionLog { tx_hash: H256::from_str("64f648ca7ae7f4138014f860ae56164d8d5732969b1cea54d8be9d144d8aa6f6").unwrap(), status: PrivateTxStatus::Deployed, creation_timestamp: current_timestamp, validators: vec![initial_validator_log], deployment_timestamp: Some(current_timestamp.checked_add(Duration::from_secs(2)).unwrap()), public_tx_hash: Some(H256::from_str("69b9c691ede7993effbcc88911c309af1c82be67b04b3882dd446b808ae146da").unwrap()), }; let serializer = Arc::new(StringLogSerializer::new(serde_json::to_string(&vec![initial_log.clone()]).unwrap())); let logger = Logging::new(serializer.clone()); let hash = H256::from_str("63c715e88f7291e66069302f6fcbb4f28a19ef5d7cbd1832d0c01e221c0061c6").unwrap(); logger.private_tx_created(&hash, &vec![Address::from_str("7ffbe3512782069be388f41be4d8eb350672d3a5").unwrap()]); logger.signature_added(&hash, &Address::from_str("7ffbe3512782069be388f41be4d8eb350672d3a5").unwrap()); logger.tx_deployed(&hash, &H256::from_str("de2209a8635b9cab9eceb67928b217c70ab53f6498e5144492ec01e6f43547d7").unwrap()); drop(logger); let added_validator_log = ValidatorLog { account: Address::from_str("7ffbe3512782069be388f41be4d8eb350672d3a5").unwrap(), validation_timestamp: Some(current_timestamp.checked_add(Duration::from_secs(7)).unwrap()), }; let added_log = TransactionLog { tx_hash: H256::from_str("63c715e88f7291e66069302f6fcbb4f28a19ef5d7cbd1832d0c01e221c0061c6").unwrap(), status: PrivateTxStatus::Deployed, creation_timestamp: current_timestamp.checked_add(Duration::from_secs(6)).unwrap(), validators: vec![added_validator_log], deployment_timestamp: Some(current_timestamp.checked_add(Duration::from_secs(8)).unwrap()), public_tx_hash: Some(H256::from_str("de2209a8635b9cab9eceb67928b217c70ab53f6498e5144492ec01e6f43547d7").unwrap()), }; let should_be_final = vec![added_log, initial_log]; let deserialized_logs: Vec = serde_json::from_str(&serializer.log()).unwrap(); assert_eq!(deserialized_logs, should_be_final); } }