Snapshot sync and block gap info in eth_syncing (#2948)

* provide snapshot sync info in eth_syncing

* specify block gap in eth_syncing

* Extend eth_syncing with warp, format the output properly

* adjust serialization tests for sync info

* whitespace
This commit is contained in:
Robert Habermeier 2016-10-31 17:32:53 +01:00 committed by Gav Wood
parent ff90fac125
commit 8599a11a0b
13 changed files with 220 additions and 23 deletions

View File

@ -84,6 +84,10 @@ pub struct TestBlockChainClient {
pub vm_factory: EvmFactory,
/// Timestamp assigned to latest sealed block
pub latest_block_timestamp: RwLock<u64>,
/// Ancient block info.
pub ancient_block: RwLock<Option<(H256, u64)>>,
/// First block info.
pub first_block: RwLock<Option<(H256, u64)>>,
}
#[derive(Clone)]
@ -133,6 +137,8 @@ impl TestBlockChainClient {
spec: spec,
vm_factory: EvmFactory::new(VMType::Interpreter, 1024 * 1024),
latest_block_timestamp: RwLock::new(10_000_000),
ancient_block: RwLock::new(None),
first_block: RwLock::new(None),
};
client.add_blocks(1, EachBlockWith::Nothing); // add genesis block
client.genesis_hash = client.last_hash.read().clone();
@ -594,10 +600,10 @@ impl BlockChainClient for TestBlockChainClient {
genesis_hash: self.genesis_hash.clone(),
best_block_hash: self.last_hash.read().clone(),
best_block_number: self.blocks.read().len() as BlockNumber - 1,
first_block_hash: None,
first_block_number: None,
ancient_block_hash: None,
ancient_block_number: None,
first_block_hash: self.first_block.read().as_ref().map(|x| x.0),
first_block_number: self.first_block.read().as_ref().map(|x| x.1),
ancient_block_hash: self.ancient_block.read().as_ref().map(|x| x.0),
ancient_block_number: self.ancient_block.read().as_ref().map(|x| x.1)
}
}

View File

@ -153,6 +153,28 @@ export function outSignerRequest (request) {
return request;
}
export function outSyncing (syncing) {
if (syncing && syncing !== 'false') {
Object.keys(syncing).forEach((key) => {
switch (key) {
case 'currentBlock':
case 'highestBlock':
case 'startingBlock':
case 'warpChunksAmount':
case 'warpChunksProcessed':
syncing[key] = outNumber(syncing[key]);
break;
case 'blockGap':
syncing[key] = syncing[key].map(outNumber);
break;
}
});
}
return syncing;
}
export function outTransaction (tx) {
if (tx) {
Object.keys(tx).forEach((key) => {

View File

@ -16,7 +16,7 @@
import BigNumber from 'bignumber.js';
import { outBlock, outAccountInfo, outAddress, outDate, outHistogram, outNumber, outPeers, outReceipt, outTransaction, outTrace } from './output';
import { outBlock, outAccountInfo, outAddress, outDate, outHistogram, outNumber, outPeers, outReceipt, outSyncing, outTransaction, outTrace } from './output';
import { isAddress, isBigNumber, isInstanceOf } from '../../../test/types';
describe('api/format/output', () => {
@ -203,6 +203,22 @@ describe('api/format/output', () => {
});
});
describe('outSyncing', () => {
['currentBlock', 'highestBlock', 'startingBlock', 'warpChunksAmount', 'warpChunksProcessed'].forEach((input) => {
it(`formats ${input} numbers as a number`, () => {
expect(outSyncing({ [input]: '0x123' })).to.deep.equal({
[input]: new BigNumber('0x123')
});
});
});
it('formats blockGap properly', () => {
expect(outSyncing({ blockGap: [0x123, 0x456] })).to.deep.equal({
blockGap: [new BigNumber(0x123), new BigNumber(0x456)]
});
});
});
describe('outTransaction', () => {
['from', 'to'].forEach((input) => {
it(`formats ${input} address as address`, () => {

View File

@ -15,7 +15,7 @@
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
import { inAddress, inBlockNumber, inData, inFilter, inHash, inHex, inNumber16, inOptions } from '../../format/input';
import { outAddress, outBlock, outLog, outNumber, outReceipt, outTransaction } from '../../format/output';
import { outAddress, outBlock, outLog, outNumber, outReceipt, outSyncing, outTransaction } from '../../format/output';
export default class Eth {
constructor (transport) {
@ -314,7 +314,8 @@ export default class Eth {
syncing () {
return this._transport
.execute('eth_syncing');
.execute('eth_syncing')
.then(outSyncing);
}
uninstallFilter (filterId) {

View File

@ -1003,7 +1003,7 @@ export default {
details: {
startingBlock: {
type: Quantity,
desc: 'The block at which the import started (will only be reset, after the sync reached his head)'
desc: 'The block at which the import started (will only be reset, after the sync reached this head)'
},
currentBlock: {
type: Quantity,
@ -1012,6 +1012,18 @@ export default {
highestBlock: {
type: Quantity,
desc: 'The estimated highest block'
},
blockGap: {
type: Array,
desc: 'Array of "first", "last", such that [first, last) are all missing from the chain'
},
warpChunksAmount: {
type: Quantity,
desc: 'Total amount of snapshot chunks'
},
warpChunksProcessed: {
type: Quantity,
desc: 'Total amount of snapshot chunks processed'
}
}
}

View File

@ -23,6 +23,7 @@ use util::RotatingLogger;
use ethcore::miner::{Miner, ExternalMiner};
use ethcore::client::Client;
use ethcore::account_provider::AccountProvider;
use ethcore::snapshot::SnapshotService;
use ethsync::{ManageNetwork, SyncProvider};
use ethcore_rpc::{Extendable, NetworkSettings};
pub use ethcore_rpc::SignerService;
@ -97,6 +98,7 @@ impl FromStr for ApiSet {
pub struct Dependencies {
pub signer_service: Arc<SignerService>,
pub client: Arc<Client>,
pub snapshot: Arc<SnapshotService>,
pub sync: Arc<SyncProvider>,
pub net: Arc<ManageNetwork>,
pub secret_store: Arc<AccountProvider>,
@ -161,6 +163,7 @@ pub fn setup_rpc<T: Extendable>(server: T, deps: Arc<Dependencies>, apis: ApiSet
Api::Eth => {
let client = EthClient::new(
&deps.client,
&deps.snapshot,
&deps.sync,
&deps.secret_store,
&deps.miner,

View File

@ -251,6 +251,7 @@ pub fn execute(cmd: RunCmd) -> Result<(), String> {
signer_service: Arc::new(rpc_apis::SignerService::new(move || {
signer::generate_new_token(signer_path.clone()).map_err(|e| format!("{:?}", e))
}, cmd.signer_port)),
snapshot: snapshot_service.clone(),
client: client.clone(),
sync: sync_provider.clone(),
net: manage_network.clone(),

View File

@ -40,6 +40,7 @@ use ethcore::ethereum::Ethash;
use ethcore::transaction::{Transaction as EthTransaction, SignedTransaction, Action};
use ethcore::log_entry::LogEntry;
use ethcore::filter::Filter as EthcoreFilter;
use ethcore::snapshot::SnapshotService;
use self::ethash::SeedHashCompute;
use v1::traits::Eth;
use v1::types::{
@ -70,13 +71,15 @@ impl Default for EthClientOptions {
}
/// Eth rpc implementation.
pub struct EthClient<C, S: ?Sized, M, EM> where
pub struct EthClient<C, SN: ?Sized, S: ?Sized, M, EM> where
C: MiningBlockChainClient,
SN: SnapshotService,
S: SyncProvider,
M: MinerService,
EM: ExternalMinerService {
client: Weak<C>,
snapshot: Weak<SN>,
sync: Weak<S>,
accounts: Weak<AccountProvider>,
miner: Weak<M>,
@ -85,17 +88,26 @@ pub struct EthClient<C, S: ?Sized, M, EM> where
options: EthClientOptions,
}
impl<C, S: ?Sized, M, EM> EthClient<C, S, M, EM> where
impl<C, SN: ?Sized, S: ?Sized, M, EM> EthClient<C, SN, S, M, EM> where
C: MiningBlockChainClient,
SN: SnapshotService,
S: SyncProvider,
M: MinerService,
EM: ExternalMinerService {
/// Creates new EthClient.
pub fn new(client: &Arc<C>, sync: &Arc<S>, accounts: &Arc<AccountProvider>, miner: &Arc<M>, em: &Arc<EM>, options: EthClientOptions)
-> EthClient<C, S, M, EM> {
pub fn new(
client: &Arc<C>,
snapshot: &Arc<SN>,
sync: &Arc<S>,
accounts: &Arc<AccountProvider>,
miner: &Arc<M>,
em: &Arc<EM>,
options: EthClientOptions
) -> Self {
EthClient {
client: Arc::downgrade(client),
snapshot: Arc::downgrade(snapshot),
sync: Arc::downgrade(sync),
miner: Arc::downgrade(miner),
accounts: Arc::downgrade(accounts),
@ -220,8 +232,9 @@ pub fn pending_logs<M>(miner: &M, best_block: EthBlockNumber, filter: &EthcoreFi
const MAX_QUEUE_SIZE_TO_MINE_ON: usize = 4; // because uncles go back 6.
impl<C, S: ?Sized, M, EM> EthClient<C, S, M, EM> where
impl<C, SN: ?Sized, S: ?Sized, M, EM> EthClient<C, SN, S, M, EM> where
C: MiningBlockChainClient + 'static,
SN: SnapshotService + 'static,
S: SyncProvider + 'static,
M: MinerService + 'static,
EM: ExternalMinerService + 'static {
@ -239,8 +252,9 @@ static SOLC: &'static str = "solc.exe";
#[cfg(not(windows))]
static SOLC: &'static str = "solc";
impl<C, S: ?Sized, M, EM> Eth for EthClient<C, S, M, EM> where
impl<C, SN: ?Sized, S: ?Sized, M, EM> Eth for EthClient<C, SN, S, M, EM> where
C: MiningBlockChainClient + 'static,
SN: SnapshotService + 'static,
S: SyncProvider + 'static,
M: MinerService + 'static,
EM: ExternalMinerService + 'static {
@ -253,16 +267,33 @@ impl<C, S: ?Sized, M, EM> Eth for EthClient<C, S, M, EM> where
}
fn syncing(&self) -> Result<SyncStatus, Error> {
use ethcore::snapshot::RestorationStatus;
try!(self.active());
let status = take_weak!(self.sync).status();
let client = take_weak!(self.client);
if is_major_importing(Some(status.state), client.queue_info()) {
let current_block = U256::from(client.chain_info().best_block_number);
let snapshot_status = take_weak!(self.snapshot).status();
let (warping, warp_chunks_amount, warp_chunks_processed) = match snapshot_status {
RestorationStatus::Ongoing { state_chunks, block_chunks, state_chunks_done, block_chunks_done } =>
(true, Some(block_chunks + state_chunks), Some(block_chunks_done + state_chunks_done)),
_ => (false, None, None),
};
if warping || is_major_importing(Some(status.state), client.queue_info()) {
let chain_info = client.chain_info();
let current_block = U256::from(chain_info.best_block_number);
let highest_block = U256::from(status.highest_block_number.unwrap_or(status.start_block_number));
let gap = chain_info.ancient_block_number.map(|x| U256::from(x + 1))
.and_then(|first| chain_info.first_block_number.map(|last| (first, U256::from(last))));
let info = SyncInfo {
starting_block: status.start_block_number.into(),
current_block: current_block.into(),
highest_block: highest_block.into(),
warp_chunks_amount: warp_chunks_amount.map(|x| U256::from(x as u64)).map(Into::into),
warp_chunks_processed: warp_chunks_processed.map(|x| U256::from(x as u64)).map(Into::into),
block_gap: gap.map(|(x, y)| (x.into(), y.into())),
};
Ok(SyncStatus::Info(info))
} else {

View File

@ -37,7 +37,7 @@ use v1::impls::{EthClient, EthSigningUnsafeClient};
use v1::types::U256 as NU256;
use v1::traits::eth::Eth;
use v1::traits::eth_signing::EthSigning;
use v1::tests::helpers::{TestSyncProvider, Config};
use v1::tests::helpers::{TestSnapshotService, TestSyncProvider, Config};
fn account_provider() -> Arc<AccountProvider> {
Arc::new(AccountProvider::transient_provider())
@ -73,6 +73,10 @@ fn miner_service(spec: &Spec, accounts: Arc<AccountProvider>) -> Arc<Miner> {
)
}
fn snapshot_service() -> Arc<TestSnapshotService> {
Arc::new(TestSnapshotService::new())
}
fn make_spec(chain: &BlockChain) -> Spec {
let genesis = Genesis::from(chain.genesis());
let mut spec = ethereum::new_frontier_test();
@ -86,6 +90,7 @@ fn make_spec(chain: &BlockChain) -> Spec {
struct EthTester {
client: Arc<Client>,
_miner: Arc<MinerService>,
_snapshot: Arc<TestSnapshotService>,
accounts: Arc<AccountProvider>,
handler: IoHandler,
}
@ -112,6 +117,7 @@ impl EthTester {
let dir = RandomTempPath::new();
let account_provider = account_provider();
let miner_service = miner_service(&spec, account_provider.clone());
let snapshot_service = snapshot_service();
let db_config = ::util::kvdb::DatabaseConfig::with_columns(::ethcore::db::NUM_COLUMNS);
let client = Client::new(
@ -127,6 +133,7 @@ impl EthTester {
let eth_client = EthClient::new(
&client,
&snapshot_service,
&sync_provider,
&account_provider,
&miner_service,
@ -145,6 +152,7 @@ impl EthTester {
EthTester {
_miner: miner_service,
_snapshot: snapshot_service,
client: client,
accounts: account_provider,
handler: handler,

View File

@ -19,7 +19,9 @@
mod sync_provider;
mod miner_service;
mod fetch;
mod snapshot_service;
pub use self::sync_provider::{Config, TestSyncProvider};
pub use self::miner_service::TestMinerService;
pub use self::fetch::TestFetch;
pub use self::snapshot_service::TestSnapshotService;

View File

@ -0,0 +1,51 @@
// Copyright 2015, 2016 Ethcore (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use ethcore::snapshot::{ManifestData, RestorationStatus, SnapshotService};
use util::{Bytes, Mutex};
use util::hash::H256;
/// Mocked snapshot service (used for sync info extensions).
pub struct TestSnapshotService {
status: Mutex<RestorationStatus>,
}
impl TestSnapshotService {
/// Create a test snapshot service. Only the `status` function matters -- it'll
/// return `Inactive` by default.
pub fn new() -> Self {
TestSnapshotService {
status: Mutex::new(RestorationStatus::Inactive),
}
}
/// Set the restoration status.
pub fn set_status(&self, status: RestorationStatus) {
*self.status.lock() = status;
}
}
impl SnapshotService for TestSnapshotService {
fn manifest(&self) -> Option<ManifestData> { None }
fn chunk(&self, _hash: H256) -> Option<Bytes> { None }
fn status(&self) -> RestorationStatus { self.status.lock().clone() }
fn begin_restore(&self, _manifest: ManifestData) { }
fn abort_restore(&self) { }
fn restore_state_chunk(&self, _hash: H256, _chunk: Bytes) { }
fn restore_block_chunk(&self, _hash: H256, _chunk: Bytes) { }
fn provide_canon_hashes(&self, _hashes: &[(u64, H256)]) { }
}

View File

@ -28,7 +28,7 @@ use ethcore::transaction::{Transaction, Action};
use ethcore::miner::{ExternalMiner, MinerService};
use ethsync::SyncState;
use v1::{Eth, EthClient, EthClientOptions, EthFilter, EthFilterClient, EthSigning, EthSigningUnsafeClient};
use v1::tests::helpers::{TestSyncProvider, Config, TestMinerService};
use v1::tests::helpers::{TestSyncProvider, Config, TestMinerService, TestSnapshotService};
use rustc_serialize::hex::ToHex;
use time::get_time;
@ -52,11 +52,16 @@ fn miner_service() -> Arc<TestMinerService> {
Arc::new(TestMinerService::default())
}
fn snapshot_service() -> Arc<TestSnapshotService> {
Arc::new(TestSnapshotService::new())
}
struct EthTester {
pub client: Arc<TestBlockChainClient>,
pub sync: Arc<TestSyncProvider>,
pub accounts_provider: Arc<AccountProvider>,
pub miner: Arc<TestMinerService>,
pub snapshot: Arc<TestSnapshotService>,
hashrates: Arc<Mutex<HashMap<H256, (Instant, U256)>>>,
pub io: IoHandler,
}
@ -73,9 +78,10 @@ impl EthTester {
let sync = sync_provider();
let ap = accounts_provider();
let miner = miner_service();
let snapshot = snapshot_service();
let hashrates = Arc::new(Mutex::new(HashMap::new()));
let external_miner = Arc::new(ExternalMiner::new(hashrates.clone()));
let eth = EthClient::new(&client, &sync, &ap, &miner, &external_miner, options).to_delegate();
let eth = EthClient::new(&client, &snapshot, &sync, &ap, &miner, &external_miner, options).to_delegate();
let filter = EthFilterClient::new(&client, &miner).to_delegate();
let sign = EthSigningUnsafeClient::new(&client, &ap, &miner).to_delegate();
let io = IoHandler::new();
@ -88,6 +94,7 @@ impl EthTester {
sync: sync,
accounts_provider: ap,
miner: miner,
snapshot: snapshot,
io: io,
hashrates: hashrates,
}
@ -109,6 +116,8 @@ fn rpc_eth_protocol_version() {
#[test]
fn rpc_eth_syncing() {
use ethcore::snapshot::RestorationStatus;
let request = r#"{"jsonrpc": "2.0", "method": "eth_syncing", "params": [], "id": 1}"#;
let tester = EthTester::default();
@ -125,10 +134,27 @@ fn rpc_eth_syncing() {
// "sync" to 1000 blocks.
// causes TestBlockChainClient to return 1000 for its best block number.
tester.add_blocks(1000, EachBlockWith::Nothing);
*tester.client.ancient_block.write() = Some((H256::new(), 5));
*tester.client.first_block.write() = Some((H256::from(U256::from(1234)), 3333));
let true_res = r#"{"jsonrpc":"2.0","result":{"currentBlock":"0x3e8","highestBlock":"0x9c4","startingBlock":"0x0"},"id":1}"#;
let true_res = r#"{"jsonrpc":"2.0","result":{"blockGap":["0x6","0xd05"],"currentBlock":"0x3e8","highestBlock":"0x9c4","startingBlock":"0x0","warpChunksAmount":null,"warpChunksProcessed":null},"id":1}"#;
assert_eq!(tester.io.handle_request_sync(request), Some(true_res.to_owned()));
*tester.client.ancient_block.write() = None;
*tester.client.first_block.write() = None;
let snap_res = r#"{"jsonrpc":"2.0","result":{"blockGap":null,"currentBlock":"0x3e8","highestBlock":"0x9c4","startingBlock":"0x0","warpChunksAmount":"0x32","warpChunksProcessed":"0x18"},"id":1}"#;
tester.snapshot.set_status(RestorationStatus::Ongoing {
state_chunks: 40,
block_chunks: 10,
state_chunks_done: 18,
block_chunks_done: 6,
});
assert_eq!(tester.io.handle_request_sync(request), Some(snap_res.to_owned()));
tester.snapshot.set_status(RestorationStatus::Inactive);
// finish "syncing"
tester.add_blocks(1500, EachBlockWith::Nothing);

View File

@ -30,6 +30,15 @@ pub struct SyncInfo {
/// Highest block seen so far
#[serde(rename="highestBlock")]
pub highest_block: U256,
/// Warp sync snapshot chunks total.
#[serde(rename="warpChunksAmount")]
pub warp_chunks_amount: Option<U256>,
/// Warp sync snpashot chunks processed.
#[serde(rename="warpChunksProcessed")]
pub warp_chunks_processed: Option<U256>,
/// Describes the gap in the blockchain, if there is one: (first, last)
#[serde(rename="blockGap")]
pub block_gap: Option<(U256, U256)>,
}
/// Peers info
@ -53,7 +62,7 @@ pub struct PeerInfo {
/// Node client ID
pub name: String,
/// Capabilities
pub caps: Vec<String>,
pub caps: Vec<String>,
/// Network information
pub network: PeerNetworkInfo,
/// Protocols information
@ -138,7 +147,7 @@ mod tests {
fn test_serialize_sync_info() {
let t = SyncInfo::default();
let serialized = serde_json::to_string(&t).unwrap();
assert_eq!(serialized, r#"{"startingBlock":"0x0","currentBlock":"0x0","highestBlock":"0x0"}"#);
assert_eq!(serialized, r#"{"startingBlock":"0x0","currentBlock":"0x0","highestBlock":"0x0","warpChunksAmount":null,"warpChunksProcessed":null,"blockGap":null}"#);
}
#[test]
@ -156,6 +165,15 @@ mod tests {
let t = SyncStatus::Info(SyncInfo::default());
let serialized = serde_json::to_string(&t).unwrap();
assert_eq!(serialized, r#"{"startingBlock":"0x0","currentBlock":"0x0","highestBlock":"0x0"}"#);
assert_eq!(serialized, r#"{"startingBlock":"0x0","currentBlock":"0x0","highestBlock":"0x0","warpChunksAmount":null,"warpChunksProcessed":null,"blockGap":null}"#);
}
#[test]
fn test_serialize_block_gap() {
let mut t = SyncInfo::default();
t.block_gap = Some((1.into(), 5.into()));
let serialized = serde_json::to_string(&t).unwrap();
assert_eq!(serialized, r#"{"startingBlock":"0x0","currentBlock":"0x0","highestBlock":"0x0","warpChunksAmount":null,"warpChunksProcessed":null,"blockGap":["0x1","0x5"]}"#)
}
}