From 3357cfb3e53d1fc52a8e5c16624b26a39029b5ca Mon Sep 17 00:00:00 2001 From: Anton Gavrilov Date: Tue, 11 Feb 2020 22:02:25 +0100 Subject: [PATCH] Switch from the internal runtime lib to external one from crates.io (#11480) * Parity runtime switched to the version from crates * Tests fixed --- Cargo.lock | 4 +- Cargo.toml | 2 +- ethcore/Cargo.toml | 2 +- ethcore/sync/Cargo.toml | 2 +- miner/Cargo.toml | 2 +- miner/price-info/Cargo.toml | 3 +- rpc/Cargo.toml | 2 +- secret-store/Cargo.toml | 3 +- updater/hash-fetch/Cargo.toml | 2 +- util/runtime/Cargo.toml | 11 -- util/runtime/src/lib.rs | 274 ---------------------------------- 11 files changed, 13 insertions(+), 294 deletions(-) delete mode 100644 util/runtime/Cargo.toml delete mode 100644 util/runtime/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 04201f0d1..004562f93 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3513,7 +3513,9 @@ dependencies = [ [[package]] name = "parity-runtime" -version = "0.1.0" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "710e8d8e9769827952aa83a44d33bc993658cccd97e15e3b5eb070d1a70d1a3a" dependencies = [ "futures", "tokio", diff --git a/Cargo.toml b/Cargo.toml index be7f0b924..7bc7d6d68 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -57,7 +57,7 @@ parity-ipfs-api = { path = "ipfs" } parity-local-store = { path = "miner/local-store" } parity-path = "0.1" parity-rpc = { path = "rpc" } -parity-runtime = { path = "util/runtime" } +parity-runtime = "0.1.1" parity-updater = { path = "updater" } parity-util-mem = { version = "0.5.1", features = ["jemalloc-global"] } parity-version = { path = "util/version" } diff --git a/ethcore/Cargo.toml b/ethcore/Cargo.toml index c640a0191..747d822ed 100644 --- a/ethcore/Cargo.toml +++ b/ethcore/Cargo.toml @@ -83,7 +83,7 @@ kvdb-rocksdb = "0.5.0" lazy_static = "1.3" machine = { path = "./machine", features = ["test-helpers"] } macros = { path = "../util/macros" } -parity-runtime = { path = "../util/runtime" } +parity-runtime = "0.1.1" serde_json = "1.0" stats = { path = "../util/stats" } pod = { path = "pod" } diff --git a/ethcore/sync/Cargo.toml b/ethcore/sync/Cargo.toml index ef26b8bb6..69e661ab5 100644 --- a/ethcore/sync/Cargo.toml +++ b/ethcore/sync/Cargo.toml @@ -25,7 +25,7 @@ light = { package = "ethcore-light", path = "../light" } log = "0.4" macros = { path = "../../util/macros" } network = { package = "ethcore-network", path = "../../util/network" } -parity-runtime = { path = "../../util/runtime" } +parity-runtime = "0.1.1" parity-crypto = { version = "0.5.0", features = ["publickey"] } parity-util-mem = "0.5.1" rand = "0.7" diff --git a/miner/Cargo.toml b/miner/Cargo.toml index e551e9e40..8e24869b1 100644 --- a/miner/Cargo.toml +++ b/miner/Cargo.toml @@ -26,7 +26,7 @@ parity-util-mem = "0.5.1" keccak-hash = "0.4.0" linked-hash-map = "0.5" log = "0.4" -parity-runtime = { path = "../util/runtime" } +parity-runtime = "0.1.1" parking_lot = "0.10.0" price-info = { path = "./price-info", optional = true } registrar = { path = "../util/registrar" } diff --git a/miner/price-info/Cargo.toml b/miner/price-info/Cargo.toml index 23f69e37c..fa3cd8aee 100644 --- a/miner/price-info/Cargo.toml +++ b/miner/price-info/Cargo.toml @@ -11,8 +11,9 @@ edition = "2018" fetch = { path = "../../util/fetch" } futures = "0.1" log = "0.4" -parity-runtime = { path = "../../util/runtime" } +parity-runtime = "0.1.1" serde_json = "1.0" [dev-dependencies] fake-fetch = { path = "../../util/fake-fetch" } +parity-runtime = { version = "0.1.1", features = ["test-helpers"] } diff --git a/rpc/Cargo.toml b/rpc/Cargo.toml index aebc982ff..76037c1e0 100644 --- a/rpc/Cargo.toml +++ b/rpc/Cargo.toml @@ -59,7 +59,7 @@ ethkey = { path = "../accounts/ethkey" } ethstore = { path = "../accounts/ethstore" } fetch = { path = "../util/fetch" } keccak-hash = "0.4.0" -parity-runtime = { path = "../util/runtime" } +parity-runtime = { version = "0.1.1", features = ["test-helpers"] } parity-updater = { path = "../updater" } parity-version = { path = "../util/version" } rlp = "0.4.0" diff --git a/secret-store/Cargo.toml b/secret-store/Cargo.toml index adb6d5d72..42322c804 100644 --- a/secret-store/Cargo.toml +++ b/secret-store/Cargo.toml @@ -22,7 +22,7 @@ libsecp256k1 = { version = "0.3.5", default-features = false } log = "0.4" parity-bytes = "0.1" parity-crypto = { version = "0.5.0", features = ["publickey"] } -parity-runtime = { path = "../util/runtime" } +parity-runtime = "0.1.1" parking_lot = "0.10.0" percent-encoding = "2.1.0" rustc-hex = "1.0" @@ -40,3 +40,4 @@ jsonrpc-server-utils = "14.0.3" env_logger = "0.5" tempdir = "0.3" kvdb-rocksdb = "0.5.0" +parity-runtime = { version = "0.1.1", features = ["test-helpers"] } diff --git a/updater/hash-fetch/Cargo.toml b/updater/hash-fetch/Cargo.toml index 0f39d61c9..3fadd61a1 100644 --- a/updater/hash-fetch/Cargo.toml +++ b/updater/hash-fetch/Cargo.toml @@ -17,7 +17,7 @@ rustc-hex = "1.0" fetch = { path = "../../util/fetch" } parity-bytes = "0.1" ethereum-types = "0.8.0" -parity-runtime = { path = "../../util/runtime" } +parity-runtime = "0.1.1" keccak-hash = "0.4.0" registrar = { path = "../../util/registrar" } types = { path = "../../ethcore/types", package = "common-types" } diff --git a/util/runtime/Cargo.toml b/util/runtime/Cargo.toml deleted file mode 100644 index d22106c49..000000000 --- a/util/runtime/Cargo.toml +++ /dev/null @@ -1,11 +0,0 @@ -[package] -description = "Parity Runtime" -homepage = "http://parity.io" -license = "GPL-3.0" -name = "parity-runtime" -version = "0.1.0" -authors = ["Parity Technologies "] - -[dependencies] -futures = "0.1" -tokio = "0.1.22" diff --git a/util/runtime/src/lib.rs b/util/runtime/src/lib.rs deleted file mode 100644 index 568b2771f..000000000 --- a/util/runtime/src/lib.rs +++ /dev/null @@ -1,274 +0,0 @@ -// Copyright 2015-2020 Parity Technologies (UK) Ltd. -// This file is part of Parity Ethereum. - -// Parity Ethereum is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Parity Ethereum is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Parity Ethereum. If not, see . - -//! Tokio Runtime wrapper. - -pub extern crate futures; -pub extern crate tokio; - -use std::{fmt, thread}; -use std::sync::mpsc; -use std::time::{Duration, Instant}; -use futures::{future, Future, IntoFuture}; -pub use tokio::timer::Delay; -pub use tokio::runtime::{Runtime as TokioRuntime, Builder as TokioRuntimeBuilder, TaskExecutor}; - -/// Runtime for futures. -/// -/// Runs in a separate thread. -pub struct Runtime { - executor: Executor, - handle: RuntimeHandle, -} - -impl Runtime { - fn new(runtime_bldr: &mut TokioRuntimeBuilder) -> Self { - let mut runtime = runtime_bldr - .build() - .expect("Building a Tokio runtime will only fail when mio components \ - cannot be initialized (catastrophic)"); - let (stop, stopped) = futures::oneshot(); - let (tx, rx) = mpsc::channel(); - let handle = thread::spawn(move || { - tx.send(runtime.executor()).expect("Rx is blocking upper thread."); - runtime.block_on(futures::empty().select(stopped).map(|_| ()).map_err(|_| ())) - .expect("Tokio runtime should not have unhandled errors."); - }); - let executor = rx.recv().expect("tx is transfered to a newly spawned thread."); - - Runtime { - executor: Executor { - inner: Mode::Tokio(executor), - }, - handle: RuntimeHandle { - close: Some(stop), - handle: Some(handle), - }, - } - } - - /// Spawns a new tokio runtime with a default thread count on a background - /// thread and returns a `Runtime` which can be used to spawn tasks via - /// its executor. - pub fn with_default_thread_count() -> Self { - let mut runtime_bldr = TokioRuntimeBuilder::new(); - Self::new(&mut runtime_bldr) - } - - /// Spawns a new tokio runtime with a the specified thread count on a - /// background thread and returns a `Runtime` which can be used to spawn - /// tasks via its executor. - pub fn with_thread_count(thread_count: usize) -> Self { - let mut runtime_bldr = TokioRuntimeBuilder::new(); - runtime_bldr.core_threads(thread_count); - - Self::new(&mut runtime_bldr) - } - - /// Returns this runtime raw executor. - /// - /// Deprecated: Exists only to connect with current JSONRPC implementation. - pub fn raw_executor(&self) -> TaskExecutor { - if let Mode::Tokio(ref executor) = self.executor.inner { - executor.clone() - } else { - panic!("Runtime is not initialized in Tokio mode.") - } - } - - /// Returns runtime executor. - pub fn executor(&self) -> Executor { - self.executor.clone() - } -} - -#[derive(Clone)] -enum Mode { - Tokio(TaskExecutor), - Sync, - ThreadPerFuture, -} - -impl fmt::Debug for Mode { - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - use self::Mode::*; - - match *self { - Tokio(_) => write!(fmt, "tokio"), - Sync => write!(fmt, "synchronous"), - ThreadPerFuture => write!(fmt, "thread per future"), - } - } -} - -/// Returns a future which runs `f` until `duration` has elapsed, at which -/// time `on_timeout` is run and the future resolves. -fn timeout(f: F, duration: Duration, on_timeout: T) - -> impl Future + Send + 'static -where - T: FnOnce() -> () + Send + 'static, - F: FnOnce() -> R + Send + 'static, - R: IntoFuture + Send + 'static, - R::Future: Send + 'static, -{ - let future = future::lazy(f); - let timeout = Delay::new(Instant::now() + duration) - .then(move |_| { - on_timeout(); - Ok(()) - }); - future.select(timeout).then(|_| Ok(())) -} - -#[derive(Debug, Clone)] -pub struct Executor { - inner: Mode, -} - -impl Executor { - /// Executor for existing runtime. - /// - /// Deprecated: Exists only to connect with current JSONRPC implementation. - pub fn new(executor: TaskExecutor) -> Self { - Executor { - inner: Mode::Tokio(executor), - } - } - - /// Synchronous executor, used mostly for tests. - pub fn new_sync() -> Self { - Executor { - inner: Mode::Sync, - } - } - - /// Spawns a new thread for each future (use only for tests). - pub fn new_thread_per_future() -> Self { - Executor { - inner: Mode::ThreadPerFuture, - } - } - - /// Spawn a future to this runtime - pub fn spawn(&self, r: R) where - R: IntoFuture + Send + 'static, - R::Future: Send + 'static, - { - match self.inner { - Mode::Tokio(ref executor) => executor.spawn(r.into_future()), - Mode::Sync => { - let _= r.into_future().wait(); - }, - Mode::ThreadPerFuture => { - thread::spawn(move || { - let _= r.into_future().wait(); - }); - }, - } - } - - /// Spawn a new future returned by given closure. - pub fn spawn_fn(&self, f: F) where - F: FnOnce() -> R + Send + 'static, - R: IntoFuture + Send + 'static, - R::Future: Send + 'static, - { - match self.inner { - Mode::Tokio(ref executor) => executor.spawn(future::lazy(f)), - Mode::Sync => { - let _ = future::lazy(f).wait(); - }, - Mode::ThreadPerFuture => { - thread::spawn(move || { - let _= f().into_future().wait(); - }); - }, - } - } - - /// Spawn a new future and wait for it or for a timeout to occur. - pub fn spawn_with_timeout(&self, f: F, duration: Duration, on_timeout: T) where - T: FnOnce() -> () + Send + 'static, - F: FnOnce() -> R + Send + 'static, - R: IntoFuture + Send + 'static, - R::Future: Send + 'static, - { - match self.inner { - Mode::Tokio(ref executor) => { - executor.spawn(timeout(f, duration, on_timeout)) - }, - Mode::Sync => { - let _ = timeout(f, duration, on_timeout).wait(); - }, - Mode::ThreadPerFuture => { - thread::spawn(move || { - let _ = timeout(f, duration, on_timeout).wait(); - }); - }, - } - } -} - -impl + Send + 'static> future::Executor for Executor { - fn execute(&self, future: F) -> Result<(), future::ExecuteError> { - match self.inner { - Mode::Tokio(ref executor) => executor.execute(future), - Mode::Sync => { - let _= future.wait(); - Ok(()) - }, - Mode::ThreadPerFuture => { - thread::spawn(move || { - let _= future.wait(); - }); - Ok(()) - }, - } - } -} - -/// A handle to a runtime. Dropping the handle will cause runtime to shutdown. -pub struct RuntimeHandle { - close: Option>, - handle: Option> -} - -impl From for RuntimeHandle { - fn from(el: Runtime) -> Self { - el.handle - } -} - -impl Drop for RuntimeHandle { - fn drop(&mut self) { - self.close.take().map(|v| v.send(())); - } -} - -impl RuntimeHandle { - /// Blocks current thread and waits until the runtime is finished. - pub fn wait(mut self) -> thread::Result<()> { - self.handle.take() - .expect("Handle is taken only in `wait`, `wait` is consuming; qed").join() - } - - /// Finishes this runtime. - pub fn close(mut self) { - let _ = self.close.take() - .expect("Close is taken only in `close` and `drop`. `close` is consuming; qed") - .send(()); - } -}