From b4f8bba8430b0f9e050cdac425b632b30f0f331c Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Wed, 2 Jan 2019 16:49:01 +0100 Subject: [PATCH] parity-clib: `async C bindings to RPC requests` + `subscribe/unsubscribe to websocket events` (#9920) * feat(parity-clib asynchronous rpc queries) * feat(seperate bindings for ws and rpc) * Subscribing to websockets for the full-client works * feat(c binding unsubscribe_from_websocket) * fix(tests): tweak CMake build config * Enforce C+11 * refactor(parity-cpp-example) : `cpp:ify` * fix(typedefs) : revert typedefs parity-clib * docs(nits) * fix(simplify websocket_unsubscribe) * refactor(cpp example) : more subscriptions * fix(callback type) : address grumbles on callback * Use it the example to avoid using global variables * docs(nits) - don't mention `arc` * fix(jni bindings): fix compile errors * feat(java example and updated java bindings) * fix(java example) : run both full and light client * fix(Java shutdown) : unsubscribe to sessions Forgot to pass the JNIEnv environment since it is an instance method * feat(return valid JString) * Remove Java dependency by constructing a valid Java String in the callback * fix(logger) : remove `rpc` trace log * fix(format) * fix(parity-clib): remove needless callback `type` * fix(parity-clib-examples) : update examples * `cpp` example pass in a struct instead to determines `callback kind` * `java` add a instance variable the class `Callback` to determine `callback kind` * fix(review comments): docs and format * Update parity-clib/src/java.rs Co-Authored-By: niklasad1 * fix(bad merge + spelling) * fix(move examples to parity-clib/examples) --- Cargo.lock | 3 + parity-clib/Cargo.toml | 5 +- parity-clib/Parity.java | 93 +++++--- parity-clib/examples/cpp/CMakeLists.txt | 5 +- parity-clib/examples/cpp/main.cpp | 187 +++++++++++++--- parity-clib/examples/java/Main.java | 109 +++++++++ parity-clib/examples/java/README.md | 9 + parity-clib/examples/java/run.sh | 15 ++ parity-clib/parity.h | 47 +++- parity-clib/src/java.rs | 211 ++++++++++++++++++ parity-clib/src/lib.rs | 284 +++++++++++++----------- parity/lib.rs | 1 + parity/run.rs | 21 +- rpc/src/lib.rs | 1 + 14 files changed, 763 insertions(+), 228 deletions(-) create mode 100644 parity-clib/examples/java/Main.java create mode 100644 parity-clib/examples/java/README.md create mode 100755 parity-clib/examples/java/run.sh create mode 100644 parity-clib/src/java.rs diff --git a/Cargo.lock b/Cargo.lock index 47763e751..0f5c445e2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2328,9 +2328,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" name = "parity-clib" version = "1.12.0" dependencies = [ + "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", "jni 0.10.2 (registry+https://github.com/rust-lang/crates.io-index)", "panic_hook 0.1.0", "parity-ethereum 2.3.0", + "tokio 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-current-thread 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] diff --git a/parity-clib/Cargo.toml b/parity-clib/Cargo.toml index 5397225ea..b3635c6e0 100644 --- a/parity-clib/Cargo.toml +++ b/parity-clib/Cargo.toml @@ -10,9 +10,12 @@ name = "parity" crate-type = ["cdylib", "staticlib"] [dependencies] +futures = "0.1.6" +jni = { version = "0.10.1", optional = true } panic_hook = { path = "../util/panic-hook" } parity-ethereum = { path = "../", default-features = false } -jni = { version = "0.10.1", optional = true } +tokio = "0.1.11" +tokio-current-thread = "0.1.3" [features] default = [] diff --git a/parity-clib/Parity.java b/parity-clib/Parity.java index 7e70917ae..37a6722b7 100644 --- a/parity-clib/Parity.java +++ b/parity-clib/Parity.java @@ -20,44 +20,67 @@ package io.parity.ethereum; * Interface to the Parity client. */ public class Parity { - /** - * Starts the Parity client with the CLI options passed as an array of strings. - * - * Each space-delimited option corresponds to an array entry. - * For example: `["--port", "12345"]` - * - * @param options The CLI options to start Parity with - */ - public Parity(String[] options) { - long config = configFromCli(options); - inner = build(config); - } + /** + * Starts the Parity client with the CLI options passed as an array of strings. + * + * Each space-delimited option corresponds to an array entry. + * For example: `["--port", "12345"]` + * + * @param options The CLI options to start Parity with + */ + public Parity(String[] options) { + long config = configFromCli(options); + inner = build(config); + } - /** Performs a synchronous RPC query. - * - * Note that this will block the current thread until the query is finished. You are - * encouraged to create a background thread if you don't want to block. - * - * @param query The JSON-encoded RPC query to perform - * @return A JSON-encoded result - */ - public String rpcQuery(String query) { - return rpcQueryNative(inner, query); - } + /** Performs an asynchronous RPC query by spawning a background thread that is executed until + * either a response is received or the timeout has been expired. + * + * @param query The JSON-encoded RPC query to perform + * @param timeoutMillis The maximum time in milliseconds that the query will run + * @param callback An instance of class which must have a instance method named `callback` that will be + * invoke when the result is ready + */ + public void rpcQuery(String query, long timeoutMillis, Object callback) { + rpcQueryNative(inner, query, timeoutMillis, callback); + } - @Override - protected void finalize​() { - destroy(inner); - } + /** Subscribes to a specific WebSocket event that will run in a background thread until it is canceled. + * + * @param query The JSON-encoded RPC query to perform + * @param callback An instance of class which must have a instance method named `callback` that will be invoked + * when the result is ready + * + * @return A pointer to the current sessions which can be used to terminate the session later + */ + public long subscribeWebSocket(String query, Object callback) { + return subscribeWebSocketNative(inner, query, callback); + } - static { - System.loadLibrary("parity"); - } + /** Unsubscribes to a specific WebSocket event + * + * @param session Pointer the the session to terminate + */ + public void unsubscribeWebSocket(long session) { + unsubscribeWebSocketNative(session); + } - private static native long configFromCli(String[] cliOptions); - private static native long build(long config); - private static native void destroy(long inner); - private static native String rpcQueryNative(long inner, String rpc); + // FIXME: `finalize` is deprecated - https://github.com/paritytech/parity-ethereum/issues/10066 + @Override + protected void finalize​() { + destroy(inner); + } - private long inner; + static { + System.loadLibrary("parity"); + } + + private static native long configFromCli(String[] cliOptions); + private static native long build(long config); + private static native void destroy(long inner); + private static native void rpcQueryNative(long inner, String rpc, long timeoutMillis, Object callback); + private static native long subscribeWebSocketNative(long inner, String rpc, Object callback); + private static native void unsubscribeWebSocketNative(long session); + + private long inner; } diff --git a/parity-clib/examples/cpp/CMakeLists.txt b/parity-clib/examples/cpp/CMakeLists.txt index 69b58c211..d3aaf457b 100644 --- a/parity-clib/examples/cpp/CMakeLists.txt +++ b/parity-clib/examples/cpp/CMakeLists.txt @@ -1,8 +1,7 @@ cmake_minimum_required(VERSION 3.5) include(ExternalProject) - -include_directories("${CMAKE_SOURCE_DIR}/../../../parity-clib") - +include_directories("${CMAKE_SOURCE_DIR}/../..") +set (CMAKE_CXX_STANDARD 11) # Enfore C++11 add_executable(parity-example main.cpp) ExternalProject_Add( diff --git a/parity-clib/examples/cpp/main.cpp b/parity-clib/examples/cpp/main.cpp index c5e83d064..aab05c906 100644 --- a/parity-clib/examples/cpp/main.cpp +++ b/parity-clib/examples/cpp/main.cpp @@ -14,44 +14,169 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . -#include -#include -#include -#include -#include +#include #include +#include +#include +#include -void on_restart(void*, const char*, size_t) {} +void* parity_run(std::vector); +int parity_subscribe_to_websocket(void*); +int parity_rpc_queries(void*); + +const int SUBSCRIPTION_ID_LEN = 18; +const size_t TIMEOUT_ONE_MIN_AS_MILLIS = 60 * 1000; +const unsigned int CALLBACK_RPC = 1; +const unsigned int CALLBACK_WS = 2; + +struct Callback { + unsigned int type; + long unsigned int counter; +}; + +// list of rpc queries +const std::vector rpc_queries { + "{\"method\":\"parity_versionInfo\",\"params\":[],\"id\":1,\"jsonrpc\":\"2.0\"}", + "{\"method\":\"eth_getTransactionReceipt\",\"params\":[\"0x444172bef57ad978655171a8af2cfd89baa02a97fcb773067aef7794d6913fff\"],\"id\":1,\"jsonrpc\":\"2.0\"}", + "{\"method\":\"eth_estimateGas\",\"params\":[{\"from\":\"0x0066Dc48bb833d2B59f730F33952B3c29fE926F5\"}],\"id\":1,\"jsonrpc\":\"2.0\"}", + "{\"method\":\"eth_getBalance\",\"params\":[\"0x0066Dc48bb833d2B59f730F33952B3c29fE926F5\"],\"id\":1,\"jsonrpc\":\"2.0\"}" +}; + +// list of subscriptions +const std::vector ws_subscriptions { + "{\"method\":\"parity_subscribe\",\"params\":[\"eth_getBalance\",[\"0xcd2a3d9f938e13cd947ec05abc7fe734df8dd826\",\"latest\"]],\"id\":1,\"jsonrpc\":\"2.0\"}", + "{\"method\":\"parity_subscribe\",\"params\":[\"parity_netPeers\"],\"id\":1,\"jsonrpc\":\"2.0\"}", + "{\"method\":\"eth_subscribe\",\"params\":[\"newHeads\"],\"id\":1,\"jsonrpc\":\"2.0\"}" +}; + +// callback that gets invoked upon an event +void callback(void* user_data, const char* response, size_t _len) { + Callback* cb = static_cast(user_data); + if (cb->type == CALLBACK_RPC) { + printf("rpc response: %s\r\n", response); + cb->counter -= 1; + } else if (cb->type == CALLBACK_WS) { + printf("websocket response: %s\r\n", response); + std::regex is_subscription ("\\{\"jsonrpc\":\"2.0\",\"result\":\"0[xX][a-fA-F0-9]{16}\",\"id\":1\\}"); + if (std::regex_match(response, is_subscription) == true) { + cb->counter -= 1; + } + } +} int main() { - ParityParams cfg = { 0 }; - cfg.on_client_restart_cb = on_restart; + // run full-client + { + std::vector config = {"--no-ipc" , "--jsonrpc-apis=all", "--chain", "kovan"}; + void* parity = parity_run(config); + if (parity_rpc_queries(parity)) { + printf("rpc_queries failed\r\n"); + return 1; + } - const char* args[] = {"--no-ipc"}; - size_t str_lens[] = {8}; - if (parity_config_from_cli(args, str_lens, 1, &cfg.configuration) != 0) { - return 1; - } + if (parity_subscribe_to_websocket(parity)) { + printf("ws_queries failed\r\n"); + return 1; + } - void* parity; - if (parity_start(&cfg, &parity) != 0) { - return 1; - } + if (parity != nullptr) { + parity_destroy(parity); + } + } - const char* rpc = "{\"method\":\"parity_versionInfo\",\"params\":[],\"id\":1,\"jsonrpc\":\"2.0\"}"; - size_t out_len = 256; - char* out = (char*)malloc(out_len + 1); - if (parity_rpc(parity, rpc, strlen(rpc), out, &out_len)) { - return 1; - } - out[out_len] = '\0'; - printf("RPC output: %s", out); - free(out); + // run light-client + { + std::vector light_config = {"--no-ipc", "--light", "--jsonrpc-apis=all", "--chain", "kovan"}; + void* parity = parity_run(light_config); - sleep(5); - if (parity != NULL) { - parity_destroy(parity); - } + if (parity_rpc_queries(parity)) { + printf("rpc_queries failed\r\n"); + return 1; + } - return 0; + if (parity_subscribe_to_websocket(parity)) { + printf("ws_queries failed\r\n"); + return 1; + } + + if (parity != nullptr) { + parity_destroy(parity); + } + } + return 0; +} + +int parity_rpc_queries(void* parity) { + if (!parity) { + return 1; + } + + Callback cb { .type = CALLBACK_RPC, .counter = rpc_queries.size() }; + + for (auto query : rpc_queries) { + if (parity_rpc(parity, query.c_str(), query.length(), TIMEOUT_ONE_MIN_AS_MILLIS, callback, &cb) != 0) { + return 1; + } + } + + while(cb.counter != 0); + return 0; +} + + +int parity_subscribe_to_websocket(void* parity) { + if (!parity) { + return 1; + } + + std::vector sessions; + + Callback cb { .type = CALLBACK_WS, .counter = ws_subscriptions.size() }; + + for (auto sub : ws_subscriptions) { + void *const session = parity_subscribe_ws(parity, sub.c_str(), sub.length(), callback, &cb); + if (!session) { + return 1; + } + sessions.push_back(session); + } + + while(cb.counter != 0); + std::this_thread::sleep_for(std::chrono::seconds(60)); + for (auto session : sessions) { + parity_unsubscribe_ws(session); + } + return 0; +} + +void* parity_run(std::vector args) { + ParityParams cfg = { + .configuration = nullptr, + .on_client_restart_cb = callback, + .on_client_restart_cb_custom = nullptr + }; + + std::vector str_lens; + + for (auto arg: args) { + str_lens.push_back(std::strlen(arg)); + } + + // make sure no out-of-range access happens here + if (args.empty()) { + if (parity_config_from_cli(nullptr, nullptr, 0, &cfg.configuration) != 0) { + return nullptr; + } + } else { + if (parity_config_from_cli(&args[0], &str_lens[0], args.size(), &cfg.configuration) != 0) { + return nullptr; + } + } + + void *parity = nullptr; + if (parity_start(&cfg, &parity) != 0) { + return nullptr; + } + + return parity; } diff --git a/parity-clib/examples/java/Main.java b/parity-clib/examples/java/Main.java new file mode 100644 index 000000000..88189af1c --- /dev/null +++ b/parity-clib/examples/java/Main.java @@ -0,0 +1,109 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +import java.util.Vector; +import java.util.concurrent.atomic.AtomicInteger; +import io.parity.ethereum.Parity; + +class Main { + public static final int ONE_MINUTE_AS_MILLIS = 60 * 1000; + + public static final String[] rpc_queries = { + "{\"method\":\"parity_versionInfo\",\"params\":[],\"id\":1,\"jsonrpc\":\"2.0\"}", + "{\"method\":\"eth_getTransactionReceipt\",\"params\":[\"0x444172bef57ad978655171a8af2cfd89baa02a97fcb773067aef7794d6913fff\"],\"id\":1,\"jsonrpc\":\"2.0\"}", + "{\"method\":\"eth_estimateGas\",\"params\":[{\"from\":\"0x0066Dc48bb833d2B59f730F33952B3c29fE926F5\"}],\"id\":1,\"jsonrpc\":\"2.0\"}", + "{\"method\":\"eth_getBalance\",\"params\":[\"0x0066Dc48bb833d2B59f730F33952B3c29fE926F5\"],\"id\":1,\"jsonrpc\":\"2.0\"}" + }; + + public static final String[] ws_queries = { + "{\"method\":\"parity_subscribe\",\"params\":[\"eth_getBalance\",[\"0xcd2a3d9f938e13cd947ec05abc7fe734df8dd826\",\"latest\"]],\"id\":1,\"jsonrpc\":\"2.0\"}", + "{\"method\":\"parity_subscribe\",\"params\":[\"parity_netPeers\"],\"id\":1,\"jsonrpc\":\"2.0\"}", + "{\"method\":\"eth_subscribe\",\"params\":[\"newHeads\"],\"id\":1,\"jsonrpc\":\"2.0\"}" + }; + + public static void runParity(String[] config) { + Parity parity = new Parity(config); + + Callback rpcCallback = new Callback(1); + Callback webSocketCallback = new Callback(2); + + for (String query : rpc_queries) { + parity.rpcQuery(query, ONE_MINUTE_AS_MILLIS, rpcCallback); + } + + while (rpcCallback.getNumCallbacks() != 4); + + Vector sessions = new Vector(); + + for (String ws : ws_queries) { + long session = parity.subscribeWebSocket(ws, webSocketCallback); + sessions.add(session); + } + + try { + Thread.sleep(ONE_MINUTE_AS_MILLIS); + } catch (Exception e) { + System.out.println(e); + } + + for (long session : sessions) { + parity.unsubscribeWebSocket(session); + } + + // Force GC to destroy parity + parity = null; + System.gc(); + } + + public static void main(String[] args) { + String[] full = {"--no-ipc" , "--jsonrpc-apis=all", "--chain", "kovan"}; + String[] light = {"--no-ipc", "--light", "--jsonrpc-apis=all", "--chain", "kovan"}; + + runParity(full); + + try { + Thread.sleep(ONE_MINUTE_AS_MILLIS); + } catch (Exception e) { + System.out.println(e); + } + + runParity(light); + } +} + +class Callback { + private AtomicInteger counter; + private final int callbackType; + + public Callback(int type) { + counter = new AtomicInteger(); + callbackType = type; + } + + public void callback(Object response) { + response = (String) response; + if (callbackType == 1) { + System.out.println("rpc: " + response); + } else if (callbackType == 2) { + System.out.println("ws: " + response); + } + counter.getAndIncrement(); + } + + public int getNumCallbacks() { + return counter.intValue(); + } +} diff --git a/parity-clib/examples/java/README.md b/parity-clib/examples/java/README.md new file mode 100644 index 000000000..ec83905bf --- /dev/null +++ b/parity-clib/examples/java/README.md @@ -0,0 +1,9 @@ +parity-clib: Java example +=================================== + +An example Java application to demonstrate how to use `jni` bindings to parity-ethereum. Note, that the example is built in debug-mode to reduce the build time. If you want to use it in real project use release-mode instead to facilitate all compiler optimizations. + +## How to compile and run + +1. Make sure you have installed [JDK](https://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html) +2. Run `run.sh` \ No newline at end of file diff --git a/parity-clib/examples/java/run.sh b/parity-clib/examples/java/run.sh new file mode 100755 index 000000000..428a7dc75 --- /dev/null +++ b/parity-clib/examples/java/run.sh @@ -0,0 +1,15 @@ +#!/usr/bin/env bash + +FLAGS="-Xlint:deprecation" +PARITY_JAVA="../../Parity.java" +# parity-clib must be built with feature `jni` in debug-mode to work +PARITY_LIB=".:../../../target/debug/" + +# build +cd .. +cargo build --features jni +cd - +javac $FLAGS -d $PWD $PARITY_JAVA +javac $FLAGS *.java +# Setup the path `libparity.so` and run +java -Djava.library.path=$PARITY_LIB Main diff --git a/parity-clib/parity.h b/parity-clib/parity.h index 9be077b4d..71d6ca775 100644 --- a/parity-clib/parity.h +++ b/parity-clib/parity.h @@ -57,7 +57,7 @@ extern "C" { /// const char *args[] = {"--light", "--can-restart"}; /// size_t str_lens[] = {7, 13}; /// if (parity_config_from_cli(args, str_lens, 2, &cfg) != 0) { -/// return 1; +/// return 1; /// } /// ``` /// @@ -86,21 +86,44 @@ int parity_start(const ParityParams* params, void** out); /// must not call this function. void parity_destroy(void* parity); -/// Performs an RPC request. +/// Performs an asynchronous RPC request running in a background thread for at most X milliseconds /// -/// Blocks the current thread until the request is finished. You are therefore encouraged to spawn -/// a new thread for each RPC request that requires accessing the blockchain. +/// - parity : Reference to the running parity client +/// - rpc_query : JSON encoded string representing the RPC request. +/// - len : Length of the RPC query +/// - timeout_ms : Maximum time that request is waiting for a response +/// - response : Callback to invoke when the query gets answered. It will respond with a JSON encoded the string +/// with the result both on success and error. +/// - ud : Specific user defined data that can used in the callback /// -/// - `rpc` and `len` must contain the JSON string representing the RPC request. -/// - `out_str` and `out_len` point to a buffer where the output JSON result will be stored. If the -/// buffer is not large enough, the function fails. -/// - `out_len` will receive the final length of the string. -/// - On success, the function returns 0. On failure, it returns 1. +/// - On success : The function returns 0 +/// - On error : The function returns 1 /// -/// **Important**: Keep in mind that this function doesn't write any null terminator on the output -/// string. +int parity_rpc(const void *const parity, const char* rpc_query, size_t rpc_len, size_t timeout_ms, + void (*subscribe)(void* ud, const char* response, size_t len), void* ud); + + +/// Subscribes to a specific websocket event that will run until it is canceled /// -int parity_rpc(void* parity, const char* rpc, size_t len, char* out_str, size_t* out_len); +/// - parity : Reference to the running parity client +/// - ws_query : JSON encoded string representing the websocket event to subscribe to +/// - len : Length of the query +/// - response : Callback to invoke when a websocket event occurs +/// - ud : Specific user defined data that can used in the callback +/// +/// - On success : The function returns an object to the current session +/// which can be used cancel the subscription +/// - On error : The function returns a null pointer +/// +void* parity_subscribe_ws(const void *const parity, const char* ws_query, size_t len, + void (*subscribe)(void* ud, const char* response, size_t len), void* ud); + +/// Unsubscribes from a websocket subscription. Caution this function consumes the session object and must only be +/// used exactly once per session. +/// +/// - session : Pointer to the session to unsubscribe from +/// +int parity_unsubscribe_ws(const void *const session); /// Sets a callback to call when a panic happens in the Rust code. /// diff --git a/parity-clib/src/java.rs b/parity-clib/src/java.rs new file mode 100644 index 000000000..30e63e601 --- /dev/null +++ b/parity-clib/src/java.rs @@ -0,0 +1,211 @@ +use std::{mem, ptr}; +use std::sync::Arc; +use std::time::Duration; +use std::thread; +use std::os::raw::c_void; + +use {parity_config_from_cli, parity_destroy, parity_start, parity_unsubscribe_ws, ParityParams, error}; + +use futures::{Future, Stream}; +use futures::sync::mpsc; +use jni::{JavaVM, JNIEnv}; +use jni::objects::{JClass, JString, JObject, JValue, GlobalRef}; +use jni::sys::{jlong, jobjectArray, va_list}; +use tokio_current_thread::CurrentThread; +use parity_ethereum::{RunningClient, PubSubSession}; + +type CheckedQuery<'a> = (&'a RunningClient, String, JavaVM, GlobalRef); + +// Creates a Java callback to a static method named `void callback(Object)` +struct Callback<'a> { + jvm: JavaVM, + callback: GlobalRef, + method_name: &'a str, + method_descriptor: &'a str, +} + +unsafe impl<'a> Send for Callback<'a> {} +unsafe impl<'a> Sync for Callback<'a> {} +impl<'a> Callback<'a> { + fn new(jvm: JavaVM, callback: GlobalRef) -> Self { + Self { + jvm, + callback, + method_name: "callback", + method_descriptor: "(Ljava/lang/Object;)V", + } + } + + fn call(&self, msg: &str) { + let env = self.jvm.attach_current_thread().expect("JavaVM should have an environment; qed"); + let java_str = env.new_string(msg.to_string()).expect("Rust String is valid JString; qed"); + let val = &[JValue::Object(JObject::from(java_str))]; + env.call_method(self.callback.as_obj(), self.method_name, self.method_descriptor, val).expect( + "The callback must be an instance method and be named \"void callback(Object)\"; qed)"); + } +} + +#[no_mangle] +pub unsafe extern "system" fn Java_io_parity_ethereum_Parity_configFromCli(env: JNIEnv, _: JClass, cli: jobjectArray) -> jlong { + let cli_len = env.get_array_length(cli).expect("invalid Java bindings"); + + let mut jni_strings = Vec::with_capacity(cli_len as usize); + let mut opts = Vec::with_capacity(cli_len as usize); + let mut opts_lens = Vec::with_capacity(cli_len as usize); + + for n in 0..cli_len { + let elem = env.get_object_array_element(cli, n).expect("invalid Java bindings"); + let elem_str: JString = elem.into(); + match env.get_string(elem_str) { + Ok(s) => { + opts.push(s.as_ptr()); + opts_lens.push(s.to_bytes().len()); + jni_strings.push(s); + }, + Err(err) => { + let _ = env.throw_new("java/lang/Exception", err.to_string()); + return 0 + } + }; + } + + let mut out = ptr::null_mut(); + match parity_config_from_cli(opts.as_ptr(), opts_lens.as_ptr(), cli_len as usize, &mut out) { + 0 => out as jlong, + _ => { + let _ = env.throw_new("java/lang/Exception", "failed to create config object"); + 0 + }, + } +} + +#[no_mangle] +pub unsafe extern "system" fn Java_io_parity_ethereum_Parity_build(env: JNIEnv, _: JClass, config: va_list) -> jlong { + let params = ParityParams { + configuration: config, + .. mem::zeroed() + }; + + let mut out = ptr::null_mut(); + match parity_start(¶ms, &mut out) { + 0 => out as jlong, + _ => { + let _ = env.throw_new("java/lang/Exception", "failed to start Parity"); + 0 + }, + } +} + +#[no_mangle] +pub unsafe extern "system" fn Java_io_parity_ethereum_Parity_destroy(_env: JNIEnv, _: JClass, parity: va_list) { + parity_destroy(parity); +} + +unsafe fn async_checker<'a>(client: va_list, rpc: JString, callback: JObject, env: &JNIEnv<'a>) +-> Result, String> { + let query: String = env.get_string(rpc) + .map(Into::into) + .map_err(|e| e.to_string())?; + + let client: &RunningClient = &*(client as *const RunningClient); + let jvm = env.get_java_vm().map_err(|e| e.to_string())?; + let global_ref = env.new_global_ref(callback).map_err(|e| e.to_string())?; + Ok((client, query, jvm, global_ref)) +} + +#[no_mangle] +pub unsafe extern "system" fn Java_io_parity_ethereum_Parity_rpcQueryNative( + env: JNIEnv, + _: JClass, + parity: va_list, + rpc: JString, + timeout_ms: jlong, + callback: JObject, + ) +{ + let _ = async_checker(parity, rpc, callback, &env) + .map(|(client, query, jvm, global_ref)| { + let callback = Arc::new(Callback::new(jvm, global_ref)); + let cb = callback.clone(); + let future = client.rpc_query(&query, None).map(move |response| { + let response = response.unwrap_or_else(|| error::EMPTY.to_string()); + callback.call(&response); + }); + + let _handle = thread::Builder::new() + .name("rpc_query".to_string()) + .spawn(move || { + let mut current_thread = CurrentThread::new(); + current_thread.spawn(future); + let _ = current_thread.run_timeout(Duration::from_millis(timeout_ms as u64)) + .map_err(|_e| { + cb.call(error::TIMEOUT); + }); + }) + .expect("rpc-query thread shouldn't fail; qed"); + }) + .map_err(|e| { + let _ = env.throw_new("java/lang/Exception", e); + }); +} + +#[no_mangle] +pub unsafe extern "system" fn Java_io_parity_ethereum_Parity_subscribeWebSocketNative( + env: JNIEnv, + _: JClass, + parity: va_list, + rpc: JString, + callback: JObject, + ) -> va_list { + + async_checker(parity, rpc, callback, &env) + .map(move |(client, query, jvm, global_ref)| { + let callback = Arc::new(Callback::new(jvm, global_ref)); + let (tx, mut rx) = mpsc::channel(1); + let session = Arc::new(PubSubSession::new(tx)); + let weak_session = Arc::downgrade(&session); + let query_future = client.rpc_query(&query, Some(session.clone()));; + + let _handle = thread::Builder::new() + .name("ws-subscriber".into()) + .spawn(move || { + // Wait for subscription ID + // Note this may block forever and can't be destroyed using the session object + // However, this will likely timeout or be catched the RPC layer + if let Ok(Some(response)) = query_future.wait() { + callback.call(&response); + } else { + callback.call(error::SUBSCRIBE); + return; + }; + + loop { + for response in rx.by_ref().wait() { + if let Ok(r) = response { + callback.call(&r); + } + } + + let rc = weak_session.upgrade().map_or(0,|session| Arc::strong_count(&session)); + // No subscription left, then terminate + if rc <= 1 { + break; + } + } + }) + .expect("rpc-subscriber thread shouldn't fail; qed"); + Arc::into_raw(session) as va_list + }) + .unwrap_or_else(|e| { + let _ = env.throw_new("java/lang/Exception", e); + ptr::null_mut() + }) +} + +#[no_mangle] +pub unsafe extern "system" fn Java_io_parity_ethereum_Parity_unsubscribeWebSocketNative( + _: JNIEnv, + _: JClass, + session: va_list) { + parity_unsubscribe_ws(session as *const c_void); +} diff --git a/parity-clib/src/lib.rs b/parity-clib/src/lib.rs index 7791f97bd..b4f776c89 100644 --- a/parity-clib/src/lib.rs +++ b/parity-clib/src/lib.rs @@ -17,31 +17,52 @@ //! Note that all the structs and functions here are documented in `parity.h`, to avoid //! duplicating documentation. +extern crate futures; +extern crate panic_hook; +extern crate parity_ethereum; +extern crate tokio; +extern crate tokio_current_thread; + #[cfg(feature = "jni")] extern crate jni; -extern crate parity_ethereum; -extern crate panic_hook; +#[cfg(feature = "jni")] +mod java; + +use std::ffi::CString; use std::os::raw::{c_char, c_void, c_int}; -use std::panic; -use std::ptr; -use std::slice; -use std::str; +use std::{panic, ptr, slice, str, thread}; +use std::sync::Arc; +use std::time::Duration; -#[cfg(feature = "jni")] -use std::mem; -#[cfg(feature = "jni")] -use jni::{JNIEnv, objects::JClass, objects::JString, sys::jlong, sys::jobjectArray}; +use futures::{Future, Stream}; +use futures::sync::mpsc; +use parity_ethereum::{PubSubSession, RunningClient}; +use tokio_current_thread::CurrentThread; + +type Callback = Option; +type CheckedQuery<'a> = (&'a RunningClient, &'static str); + +pub mod error { + pub const EMPTY: &str = r#"{"jsonrpc":"2.0","result":"null","id":1}"#; + pub const TIMEOUT: &str = r#"{"jsonrpc":"2.0","result":"timeout","id":1}"#; + pub const SUBSCRIBE: &str = r#"{"jsonrpc":"2.0","result":"subcribe_fail","id":1}"#; +} #[repr(C)] pub struct ParityParams { pub configuration: *mut c_void, - pub on_client_restart_cb: Option, + pub on_client_restart_cb: Callback, pub on_client_restart_cb_custom: *mut c_void, } #[no_mangle] -pub unsafe extern fn parity_config_from_cli(args: *const *const c_char, args_lens: *const usize, len: usize, output: *mut *mut c_void) -> c_int { +pub unsafe extern fn parity_config_from_cli( + args: *const *const c_char, + args_lens: *const usize, + len: usize, + output: *mut *mut c_void +) -> c_int { panic::catch_unwind(|| { *output = ptr::null_mut(); @@ -59,7 +80,6 @@ pub unsafe extern fn parity_config_from_cli(args: *const *const c_char, args_len Err(_) => return 1, }; } - args }; @@ -95,8 +115,11 @@ pub unsafe extern fn parity_start(cfg: *const ParityParams, output: *mut *mut c_ let config = Box::from_raw(cfg.configuration as *mut parity_ethereum::Configuration); let on_client_restart_cb = { - let cb = CallbackStr(cfg.on_client_restart_cb, cfg.on_client_restart_cb_custom); - move |new_chain: String| { cb.call(&new_chain); } + let cb = CallbackStr { + user_data: cfg.on_client_restart_cb_custom, + function: cfg.on_client_restart_cb, + }; + move |new_chain: String| { cb.call(new_chain.as_bytes()); } }; let action = match parity_ethereum::start(*config, on_client_restart_cb, || {}) { @@ -118,32 +141,53 @@ pub unsafe extern fn parity_start(cfg: *const ParityParams, output: *mut *mut c_ #[no_mangle] pub unsafe extern fn parity_destroy(client: *mut c_void) { let _ = panic::catch_unwind(|| { - let client = Box::from_raw(client as *mut parity_ethereum::RunningClient); + let client = Box::from_raw(client as *mut RunningClient); client.shutdown(); }); } -#[no_mangle] -pub unsafe extern fn parity_rpc(client: *mut c_void, query: *const c_char, len: usize, out_str: *mut c_char, out_len: *mut usize) -> c_int { - panic::catch_unwind(|| { - let client: &mut parity_ethereum::RunningClient = &mut *(client as *mut parity_ethereum::RunningClient); - let query_str = { +unsafe fn parity_rpc_query_checker<'a>(client: *const c_void, query: *const c_char, len: usize) + -> Option> +{ + let query_str = { let string = slice::from_raw_parts(query as *const u8, len); - match str::from_utf8(string) { - Ok(a) => a, - Err(_) => return 1, - } - }; + str::from_utf8(string).ok()? + }; + let client: &RunningClient = &*(client as *const RunningClient); + Some((client, query_str)) +} - if let Some(output) = client.rpc_query_sync(query_str) { - let q_out_len = output.as_bytes().len(); - if *out_len < q_out_len { - return 1; - } +#[no_mangle] +pub unsafe extern fn parity_rpc( + client: *const c_void, + query: *const c_char, + len: usize, + timeout_ms: usize, + callback: Callback, + user_data: *mut c_void, +) -> c_int { + panic::catch_unwind(|| { + if let Some((client, query)) = parity_rpc_query_checker(client, query, len) { + let client = client as &RunningClient; + let callback = Arc::new(CallbackStr {user_data, function: callback} ); + let cb = callback.clone(); + let query = client.rpc_query(query, None).map(move |response| { + let response = response.unwrap_or_else(|| error::EMPTY.to_string()); + callback.call(response.as_bytes()); + }); - ptr::copy_nonoverlapping(output.as_bytes().as_ptr(), out_str as *mut u8, q_out_len); - *out_len = q_out_len; + let _handle = thread::Builder::new() + .name("rpc_query".to_string()) + .spawn(move || { + let mut current_thread = CurrentThread::new(); + current_thread.spawn(query); + let _ = current_thread.run_timeout(Duration::from_millis(timeout_ms as u64)) + .map_err(|_e| { + cb.call(error::TIMEOUT.as_bytes()); + }); + }) + .expect("rpc-query thread shouldn't fail; qed"); 0 } else { 1 @@ -152,116 +196,86 @@ pub unsafe extern fn parity_rpc(client: *mut c_void, query: *const c_char, len: } #[no_mangle] -pub unsafe extern fn parity_set_panic_hook(callback: extern "C" fn(*mut c_void, *const c_char, usize), param: *mut c_void) { - let cb = CallbackStr(Some(callback), param); +pub unsafe extern fn parity_subscribe_ws( + client: *const c_void, + query: *const c_char, + len: usize, + callback: Callback, + user_data: *mut c_void, +) -> *const c_void { + + panic::catch_unwind(|| { + if let Some((client, query)) = parity_rpc_query_checker(client, query, len) { + let (tx, mut rx) = mpsc::channel(1); + let session = Arc::new(PubSubSession::new(tx)); + let query_future = client.rpc_query(query, Some(session.clone())); + let weak_session = Arc::downgrade(&session); + let cb = CallbackStr { user_data, function: callback}; + + let _handle = thread::Builder::new() + .name("ws-subscriber".into()) + .spawn(move || { + // Wait for subscription ID + // Note this may block forever and be can't destroyed using the session object + // However, this will likely timeout or be catched the RPC layer + if let Ok(Some(response)) = query_future.wait() { + cb.call(response.as_bytes()); + } else { + cb.call(error::SUBSCRIBE.as_bytes()); + return; + } + + loop { + for response in rx.by_ref().wait() { + if let Ok(r) = response { + cb.call(r.as_bytes()); + } + } + + let rc = weak_session.upgrade().map_or(0,|session| Arc::strong_count(&session)); + // No subscription left, then terminate + if rc <= 1 { + break; + } + } + }) + .expect("rpc-subscriber thread shouldn't fail; qed"); + Arc::into_raw(session) as *const c_void + } else { + ptr::null() + } + }) + .unwrap_or(ptr::null()) +} + +#[no_mangle] +pub unsafe extern fn parity_unsubscribe_ws(session: *const c_void) { + let _ = panic::catch_unwind(|| { + let _session = Arc::from_raw(session as *const PubSubSession); + }); +} + +#[no_mangle] +pub unsafe extern fn parity_set_panic_hook(callback: Callback, param: *mut c_void) { + let cb = CallbackStr {user_data: param, function: callback}; panic_hook::set_with(move |panic_msg| { - cb.call(panic_msg); + cb.call(panic_msg.as_bytes()); }); } // Internal structure for handling callbacks that get passed a string. -struct CallbackStr(Option, *mut c_void); +struct CallbackStr { + user_data: *mut c_void, + function: Callback, +} + unsafe impl Send for CallbackStr {} unsafe impl Sync for CallbackStr {} impl CallbackStr { - fn call(&self, new_chain: &str) { - if let Some(ref cb) = self.0 { - cb(self.1, new_chain.as_bytes().as_ptr() as *const _, new_chain.len()) - } - } -} - -#[cfg(feature = "jni")] -#[no_mangle] -pub unsafe extern "system" fn Java_io_parity_ethereum_Parity_configFromCli(env: JNIEnv, _: JClass, cli: jobjectArray) -> jlong { - let cli_len = env.get_array_length(cli).expect("invalid Java bindings"); - - let mut jni_strings = Vec::with_capacity(cli_len as usize); - let mut opts = Vec::with_capacity(cli_len as usize); - let mut opts_lens = Vec::with_capacity(cli_len as usize); - - for n in 0 .. cli_len { - let elem = env.get_object_array_element(cli, n).expect("invalid Java bindings"); - let elem_str: JString = elem.into(); - match env.get_string(elem_str) { - Ok(s) => { - opts.push(s.as_ptr()); - opts_lens.push(s.to_bytes().len()); - jni_strings.push(s); - }, - Err(err) => { - let _ = env.throw_new("java/lang/Exception", err.to_string()); - return 0 - } - }; - } - - let mut out = ptr::null_mut(); - match parity_config_from_cli(opts.as_ptr(), opts_lens.as_ptr(), cli_len as usize, &mut out) { - 0 => out as usize as jlong, - _ => { - let _ = env.throw_new("java/lang/Exception", "failed to create config object"); - 0 - }, - } -} - -#[cfg(feature = "jni")] -#[no_mangle] -pub unsafe extern "system" fn Java_io_parity_ethereum_Parity_build(env: JNIEnv, _: JClass, config: jlong) -> jlong { - let params = ParityParams { - configuration: config as usize as *mut c_void, - .. mem::zeroed() - }; - - let mut out = ptr::null_mut(); - match parity_start(¶ms, &mut out) { - 0 => out as usize as jlong, - _ => { - let _ = env.throw_new("java/lang/Exception", "failed to start Parity"); - 0 - }, - } -} - -#[cfg(feature = "jni")] -#[no_mangle] -pub unsafe extern "system" fn Java_io_parity_ethereum_Parity_destroy(_env: JNIEnv, _: JClass, parity: jlong) { - let parity = parity as usize as *mut c_void; - parity_destroy(parity); -} - -#[cfg(feature = "jni")] -#[no_mangle] -pub unsafe extern "system" fn Java_io_parity_ethereum_Parity_rpcQueryNative<'a>(env: JNIEnv<'a>, _: JClass, parity: jlong, rpc: JString) -> JString<'a> { - let parity = parity as usize as *mut c_void; - - let rpc = match env.get_string(rpc) { - Ok(s) => s, - Err(err) => { - let _ = env.throw_new("java/lang/Exception", err.to_string()); - return env.new_string("").expect("Creating an empty string never fails"); - }, - }; - - let mut out_len = 255; - let mut out = [0u8; 256]; - - match parity_rpc(parity, rpc.as_ptr(), rpc.to_bytes().len(), out.as_mut_ptr() as *mut c_char, &mut out_len) { - 0 => (), - _ => { - let _ = env.throw_new("java/lang/Exception", "failed to perform RPC query"); - return env.new_string("").expect("Creating an empty string never fails"); - }, - } - - let out = str::from_utf8(&out[..out_len]) - .expect("parity always generates an UTF-8 RPC response"); - match env.new_string(out) { - Ok(s) => s, - Err(err) => { - let _ = env.throw_new("java/lang/Exception", err.to_string()); - return env.new_string("").expect("Creating an empty string never fails"); + fn call(&self, msg: &[u8]) { + if let Some(ref cb) = self.function { + let cstr = CString::new(msg).expect("valid string with no null bytes in the middle; qed").into_raw(); + cb(self.user_data, cstr, msg.len()) } } } diff --git a/parity/lib.rs b/parity/lib.rs index b5a614a7b..88330f1ac 100644 --- a/parity/lib.rs +++ b/parity/lib.rs @@ -121,6 +121,7 @@ use std::alloc::System; pub use self::configuration::Configuration; pub use self::run::RunningClient; +pub use parity_rpc::PubSubSession; #[cfg(feature = "memory_profiling")] #[global_allocator] diff --git a/parity/run.rs b/parity/run.rs index 28536ea6a..6dad5fd6d 100644 --- a/parity/run.rs +++ b/parity/run.rs @@ -41,7 +41,8 @@ use light::Cache as LightDataCache; use miner::external::ExternalMiner; use node_filter::NodeFilter; use parity_runtime::Runtime; -use parity_rpc::{Origin, Metadata, NetworkSettings, informant, is_major_importing}; +use parity_rpc::{Origin, Metadata, NetworkSettings, informant, is_major_importing, PubSubSession, FutureResult, + FutureResponse, FutureOutput}; use updater::{UpdatePolicy, Updater}; use parity_version::version; use ethcore_private_tx::{ProviderConfig, EncryptorConfig, SecretStoreEncryptor}; @@ -875,21 +876,19 @@ enum RunningClientInner { } impl RunningClient { - /// Performs a synchronous RPC query. - /// Blocks execution until the result is ready. - pub fn rpc_query_sync(&self, request: &str) -> Option { + /// Performs an asynchronous RPC query. + // FIXME: [tomaka] This API should be better, with for example a Future + pub fn rpc_query(&self, request: &str, session: Option>) + -> FutureResult + { let metadata = Metadata { origin: Origin::CApi, - session: None, + session, }; match self.inner { - RunningClientInner::Light { ref rpc, .. } => { - rpc.handle_request_sync(request, metadata) - }, - RunningClientInner::Full { ref rpc, .. } => { - rpc.handle_request_sync(request, metadata) - }, + RunningClientInner::Light { ref rpc, .. } => rpc.handle_request(request, metadata), + RunningClientInner::Full { ref rpc, .. } => rpc.handle_request(request, metadata), } } diff --git a/rpc/src/lib.rs b/rpc/src/lib.rs index ac0d3dd6f..ca5ed28ba 100644 --- a/rpc/src/lib.rs +++ b/rpc/src/lib.rs @@ -108,6 +108,7 @@ pub mod v1; pub mod tests; +pub use jsonrpc_core::{FutureOutput, FutureResult, FutureResponse, FutureRpcResult}; pub use jsonrpc_pubsub::Session as PubSubSession; pub use ipc::{Server as IpcServer, MetaExtractor as IpcMetaExtractor, RequestContext as IpcRequestContext}; pub use http::{