parity-clib: async C bindings to RPC requests + subscribe/unsubscribe to websocket events (#9920)

* feat(parity-clib asynchronous rpc queries)

* feat(seperate bindings for ws and rpc)

* Subscribing to websockets for the full-client works

* feat(c binding unsubscribe_from_websocket)

* fix(tests): tweak CMake build config

* Enforce C+11

* refactor(parity-cpp-example) : `cpp:ify`

* fix(typedefs) : revert typedefs parity-clib

* docs(nits)

* fix(simplify websocket_unsubscribe)

* refactor(cpp example) : more subscriptions

* fix(callback type) : address grumbles on callback

* Use it the example to avoid using global variables

* docs(nits) - don't mention `arc`

* fix(jni bindings): fix compile errors

* feat(java example and updated java bindings)

* fix(java example) : run both full and light client

* fix(Java shutdown) : unsubscribe to sessions

Forgot to pass the JNIEnv environment since it is an instance method

* feat(return valid JString)

* Remove Java dependency by constructing a valid Java String in the callback

* fix(logger) : remove `rpc` trace log

* fix(format)

* fix(parity-clib): remove needless callback `type`

* fix(parity-clib-examples) : update examples

* `cpp` example pass in a struct instead to determines `callback kind`
* `java` add a instance variable the class `Callback` to determine `callback kind`

* fix(review comments): docs and format

* Update parity-clib/src/java.rs

Co-Authored-By: niklasad1 <niklasadolfsson1@gmail.com>

* fix(bad merge + spelling)

* fix(move examples to parity-clib/examples)
This commit is contained in:
Niklas Adolfsson
2019-01-02 16:49:01 +01:00
committed by Afri Schoedon
parent 2bb79614f6
commit b4f8bba843
14 changed files with 763 additions and 228 deletions

211
parity-clib/src/java.rs Normal file
View File

@@ -0,0 +1,211 @@
use std::{mem, ptr};
use std::sync::Arc;
use std::time::Duration;
use std::thread;
use std::os::raw::c_void;
use {parity_config_from_cli, parity_destroy, parity_start, parity_unsubscribe_ws, ParityParams, error};
use futures::{Future, Stream};
use futures::sync::mpsc;
use jni::{JavaVM, JNIEnv};
use jni::objects::{JClass, JString, JObject, JValue, GlobalRef};
use jni::sys::{jlong, jobjectArray, va_list};
use tokio_current_thread::CurrentThread;
use parity_ethereum::{RunningClient, PubSubSession};
type CheckedQuery<'a> = (&'a RunningClient, String, JavaVM, GlobalRef);
// Creates a Java callback to a static method named `void callback(Object)`
struct Callback<'a> {
jvm: JavaVM,
callback: GlobalRef,
method_name: &'a str,
method_descriptor: &'a str,
}
unsafe impl<'a> Send for Callback<'a> {}
unsafe impl<'a> Sync for Callback<'a> {}
impl<'a> Callback<'a> {
fn new(jvm: JavaVM, callback: GlobalRef) -> Self {
Self {
jvm,
callback,
method_name: "callback",
method_descriptor: "(Ljava/lang/Object;)V",
}
}
fn call(&self, msg: &str) {
let env = self.jvm.attach_current_thread().expect("JavaVM should have an environment; qed");
let java_str = env.new_string(msg.to_string()).expect("Rust String is valid JString; qed");
let val = &[JValue::Object(JObject::from(java_str))];
env.call_method(self.callback.as_obj(), self.method_name, self.method_descriptor, val).expect(
"The callback must be an instance method and be named \"void callback(Object)\"; qed)");
}
}
#[no_mangle]
pub unsafe extern "system" fn Java_io_parity_ethereum_Parity_configFromCli(env: JNIEnv, _: JClass, cli: jobjectArray) -> jlong {
let cli_len = env.get_array_length(cli).expect("invalid Java bindings");
let mut jni_strings = Vec::with_capacity(cli_len as usize);
let mut opts = Vec::with_capacity(cli_len as usize);
let mut opts_lens = Vec::with_capacity(cli_len as usize);
for n in 0..cli_len {
let elem = env.get_object_array_element(cli, n).expect("invalid Java bindings");
let elem_str: JString = elem.into();
match env.get_string(elem_str) {
Ok(s) => {
opts.push(s.as_ptr());
opts_lens.push(s.to_bytes().len());
jni_strings.push(s);
},
Err(err) => {
let _ = env.throw_new("java/lang/Exception", err.to_string());
return 0
}
};
}
let mut out = ptr::null_mut();
match parity_config_from_cli(opts.as_ptr(), opts_lens.as_ptr(), cli_len as usize, &mut out) {
0 => out as jlong,
_ => {
let _ = env.throw_new("java/lang/Exception", "failed to create config object");
0
},
}
}
#[no_mangle]
pub unsafe extern "system" fn Java_io_parity_ethereum_Parity_build(env: JNIEnv, _: JClass, config: va_list) -> jlong {
let params = ParityParams {
configuration: config,
.. mem::zeroed()
};
let mut out = ptr::null_mut();
match parity_start(&params, &mut out) {
0 => out as jlong,
_ => {
let _ = env.throw_new("java/lang/Exception", "failed to start Parity");
0
},
}
}
#[no_mangle]
pub unsafe extern "system" fn Java_io_parity_ethereum_Parity_destroy(_env: JNIEnv, _: JClass, parity: va_list) {
parity_destroy(parity);
}
unsafe fn async_checker<'a>(client: va_list, rpc: JString, callback: JObject, env: &JNIEnv<'a>)
-> Result<CheckedQuery<'a>, String> {
let query: String = env.get_string(rpc)
.map(Into::into)
.map_err(|e| e.to_string())?;
let client: &RunningClient = &*(client as *const RunningClient);
let jvm = env.get_java_vm().map_err(|e| e.to_string())?;
let global_ref = env.new_global_ref(callback).map_err(|e| e.to_string())?;
Ok((client, query, jvm, global_ref))
}
#[no_mangle]
pub unsafe extern "system" fn Java_io_parity_ethereum_Parity_rpcQueryNative(
env: JNIEnv,
_: JClass,
parity: va_list,
rpc: JString,
timeout_ms: jlong,
callback: JObject,
)
{
let _ = async_checker(parity, rpc, callback, &env)
.map(|(client, query, jvm, global_ref)| {
let callback = Arc::new(Callback::new(jvm, global_ref));
let cb = callback.clone();
let future = client.rpc_query(&query, None).map(move |response| {
let response = response.unwrap_or_else(|| error::EMPTY.to_string());
callback.call(&response);
});
let _handle = thread::Builder::new()
.name("rpc_query".to_string())
.spawn(move || {
let mut current_thread = CurrentThread::new();
current_thread.spawn(future);
let _ = current_thread.run_timeout(Duration::from_millis(timeout_ms as u64))
.map_err(|_e| {
cb.call(error::TIMEOUT);
});
})
.expect("rpc-query thread shouldn't fail; qed");
})
.map_err(|e| {
let _ = env.throw_new("java/lang/Exception", e);
});
}
#[no_mangle]
pub unsafe extern "system" fn Java_io_parity_ethereum_Parity_subscribeWebSocketNative(
env: JNIEnv,
_: JClass,
parity: va_list,
rpc: JString,
callback: JObject,
) -> va_list {
async_checker(parity, rpc, callback, &env)
.map(move |(client, query, jvm, global_ref)| {
let callback = Arc::new(Callback::new(jvm, global_ref));
let (tx, mut rx) = mpsc::channel(1);
let session = Arc::new(PubSubSession::new(tx));
let weak_session = Arc::downgrade(&session);
let query_future = client.rpc_query(&query, Some(session.clone()));;
let _handle = thread::Builder::new()
.name("ws-subscriber".into())
.spawn(move || {
// Wait for subscription ID
// Note this may block forever and can't be destroyed using the session object
// However, this will likely timeout or be catched the RPC layer
if let Ok(Some(response)) = query_future.wait() {
callback.call(&response);
} else {
callback.call(error::SUBSCRIBE);
return;
};
loop {
for response in rx.by_ref().wait() {
if let Ok(r) = response {
callback.call(&r);
}
}
let rc = weak_session.upgrade().map_or(0,|session| Arc::strong_count(&session));
// No subscription left, then terminate
if rc <= 1 {
break;
}
}
})
.expect("rpc-subscriber thread shouldn't fail; qed");
Arc::into_raw(session) as va_list
})
.unwrap_or_else(|e| {
let _ = env.throw_new("java/lang/Exception", e);
ptr::null_mut()
})
}
#[no_mangle]
pub unsafe extern "system" fn Java_io_parity_ethereum_Parity_unsubscribeWebSocketNative(
_: JNIEnv,
_: JClass,
session: va_list) {
parity_unsubscribe_ws(session as *const c_void);
}

View File

@@ -17,31 +17,52 @@
//! Note that all the structs and functions here are documented in `parity.h`, to avoid
//! duplicating documentation.
extern crate futures;
extern crate panic_hook;
extern crate parity_ethereum;
extern crate tokio;
extern crate tokio_current_thread;
#[cfg(feature = "jni")]
extern crate jni;
extern crate parity_ethereum;
extern crate panic_hook;
#[cfg(feature = "jni")]
mod java;
use std::ffi::CString;
use std::os::raw::{c_char, c_void, c_int};
use std::panic;
use std::ptr;
use std::slice;
use std::str;
use std::{panic, ptr, slice, str, thread};
use std::sync::Arc;
use std::time::Duration;
#[cfg(feature = "jni")]
use std::mem;
#[cfg(feature = "jni")]
use jni::{JNIEnv, objects::JClass, objects::JString, sys::jlong, sys::jobjectArray};
use futures::{Future, Stream};
use futures::sync::mpsc;
use parity_ethereum::{PubSubSession, RunningClient};
use tokio_current_thread::CurrentThread;
type Callback = Option<extern "C" fn(*mut c_void, *const c_char, usize)>;
type CheckedQuery<'a> = (&'a RunningClient, &'static str);
pub mod error {
pub const EMPTY: &str = r#"{"jsonrpc":"2.0","result":"null","id":1}"#;
pub const TIMEOUT: &str = r#"{"jsonrpc":"2.0","result":"timeout","id":1}"#;
pub const SUBSCRIBE: &str = r#"{"jsonrpc":"2.0","result":"subcribe_fail","id":1}"#;
}
#[repr(C)]
pub struct ParityParams {
pub configuration: *mut c_void,
pub on_client_restart_cb: Option<extern "C" fn(*mut c_void, *const c_char, usize)>,
pub on_client_restart_cb: Callback,
pub on_client_restart_cb_custom: *mut c_void,
}
#[no_mangle]
pub unsafe extern fn parity_config_from_cli(args: *const *const c_char, args_lens: *const usize, len: usize, output: *mut *mut c_void) -> c_int {
pub unsafe extern fn parity_config_from_cli(
args: *const *const c_char,
args_lens: *const usize,
len: usize,
output: *mut *mut c_void
) -> c_int {
panic::catch_unwind(|| {
*output = ptr::null_mut();
@@ -59,7 +80,6 @@ pub unsafe extern fn parity_config_from_cli(args: *const *const c_char, args_len
Err(_) => return 1,
};
}
args
};
@@ -95,8 +115,11 @@ pub unsafe extern fn parity_start(cfg: *const ParityParams, output: *mut *mut c_
let config = Box::from_raw(cfg.configuration as *mut parity_ethereum::Configuration);
let on_client_restart_cb = {
let cb = CallbackStr(cfg.on_client_restart_cb, cfg.on_client_restart_cb_custom);
move |new_chain: String| { cb.call(&new_chain); }
let cb = CallbackStr {
user_data: cfg.on_client_restart_cb_custom,
function: cfg.on_client_restart_cb,
};
move |new_chain: String| { cb.call(new_chain.as_bytes()); }
};
let action = match parity_ethereum::start(*config, on_client_restart_cb, || {}) {
@@ -118,32 +141,53 @@ pub unsafe extern fn parity_start(cfg: *const ParityParams, output: *mut *mut c_
#[no_mangle]
pub unsafe extern fn parity_destroy(client: *mut c_void) {
let _ = panic::catch_unwind(|| {
let client = Box::from_raw(client as *mut parity_ethereum::RunningClient);
let client = Box::from_raw(client as *mut RunningClient);
client.shutdown();
});
}
#[no_mangle]
pub unsafe extern fn parity_rpc(client: *mut c_void, query: *const c_char, len: usize, out_str: *mut c_char, out_len: *mut usize) -> c_int {
panic::catch_unwind(|| {
let client: &mut parity_ethereum::RunningClient = &mut *(client as *mut parity_ethereum::RunningClient);
let query_str = {
unsafe fn parity_rpc_query_checker<'a>(client: *const c_void, query: *const c_char, len: usize)
-> Option<CheckedQuery<'a>>
{
let query_str = {
let string = slice::from_raw_parts(query as *const u8, len);
match str::from_utf8(string) {
Ok(a) => a,
Err(_) => return 1,
}
};
str::from_utf8(string).ok()?
};
let client: &RunningClient = &*(client as *const RunningClient);
Some((client, query_str))
}
if let Some(output) = client.rpc_query_sync(query_str) {
let q_out_len = output.as_bytes().len();
if *out_len < q_out_len {
return 1;
}
#[no_mangle]
pub unsafe extern fn parity_rpc(
client: *const c_void,
query: *const c_char,
len: usize,
timeout_ms: usize,
callback: Callback,
user_data: *mut c_void,
) -> c_int {
panic::catch_unwind(|| {
if let Some((client, query)) = parity_rpc_query_checker(client, query, len) {
let client = client as &RunningClient;
let callback = Arc::new(CallbackStr {user_data, function: callback} );
let cb = callback.clone();
let query = client.rpc_query(query, None).map(move |response| {
let response = response.unwrap_or_else(|| error::EMPTY.to_string());
callback.call(response.as_bytes());
});
ptr::copy_nonoverlapping(output.as_bytes().as_ptr(), out_str as *mut u8, q_out_len);
*out_len = q_out_len;
let _handle = thread::Builder::new()
.name("rpc_query".to_string())
.spawn(move || {
let mut current_thread = CurrentThread::new();
current_thread.spawn(query);
let _ = current_thread.run_timeout(Duration::from_millis(timeout_ms as u64))
.map_err(|_e| {
cb.call(error::TIMEOUT.as_bytes());
});
})
.expect("rpc-query thread shouldn't fail; qed");
0
} else {
1
@@ -152,116 +196,86 @@ pub unsafe extern fn parity_rpc(client: *mut c_void, query: *const c_char, len:
}
#[no_mangle]
pub unsafe extern fn parity_set_panic_hook(callback: extern "C" fn(*mut c_void, *const c_char, usize), param: *mut c_void) {
let cb = CallbackStr(Some(callback), param);
pub unsafe extern fn parity_subscribe_ws(
client: *const c_void,
query: *const c_char,
len: usize,
callback: Callback,
user_data: *mut c_void,
) -> *const c_void {
panic::catch_unwind(|| {
if let Some((client, query)) = parity_rpc_query_checker(client, query, len) {
let (tx, mut rx) = mpsc::channel(1);
let session = Arc::new(PubSubSession::new(tx));
let query_future = client.rpc_query(query, Some(session.clone()));
let weak_session = Arc::downgrade(&session);
let cb = CallbackStr { user_data, function: callback};
let _handle = thread::Builder::new()
.name("ws-subscriber".into())
.spawn(move || {
// Wait for subscription ID
// Note this may block forever and be can't destroyed using the session object
// However, this will likely timeout or be catched the RPC layer
if let Ok(Some(response)) = query_future.wait() {
cb.call(response.as_bytes());
} else {
cb.call(error::SUBSCRIBE.as_bytes());
return;
}
loop {
for response in rx.by_ref().wait() {
if let Ok(r) = response {
cb.call(r.as_bytes());
}
}
let rc = weak_session.upgrade().map_or(0,|session| Arc::strong_count(&session));
// No subscription left, then terminate
if rc <= 1 {
break;
}
}
})
.expect("rpc-subscriber thread shouldn't fail; qed");
Arc::into_raw(session) as *const c_void
} else {
ptr::null()
}
})
.unwrap_or(ptr::null())
}
#[no_mangle]
pub unsafe extern fn parity_unsubscribe_ws(session: *const c_void) {
let _ = panic::catch_unwind(|| {
let _session = Arc::from_raw(session as *const PubSubSession);
});
}
#[no_mangle]
pub unsafe extern fn parity_set_panic_hook(callback: Callback, param: *mut c_void) {
let cb = CallbackStr {user_data: param, function: callback};
panic_hook::set_with(move |panic_msg| {
cb.call(panic_msg);
cb.call(panic_msg.as_bytes());
});
}
// Internal structure for handling callbacks that get passed a string.
struct CallbackStr(Option<extern "C" fn(*mut c_void, *const c_char, usize)>, *mut c_void);
struct CallbackStr {
user_data: *mut c_void,
function: Callback,
}
unsafe impl Send for CallbackStr {}
unsafe impl Sync for CallbackStr {}
impl CallbackStr {
fn call(&self, new_chain: &str) {
if let Some(ref cb) = self.0 {
cb(self.1, new_chain.as_bytes().as_ptr() as *const _, new_chain.len())
}
}
}
#[cfg(feature = "jni")]
#[no_mangle]
pub unsafe extern "system" fn Java_io_parity_ethereum_Parity_configFromCli(env: JNIEnv, _: JClass, cli: jobjectArray) -> jlong {
let cli_len = env.get_array_length(cli).expect("invalid Java bindings");
let mut jni_strings = Vec::with_capacity(cli_len as usize);
let mut opts = Vec::with_capacity(cli_len as usize);
let mut opts_lens = Vec::with_capacity(cli_len as usize);
for n in 0 .. cli_len {
let elem = env.get_object_array_element(cli, n).expect("invalid Java bindings");
let elem_str: JString = elem.into();
match env.get_string(elem_str) {
Ok(s) => {
opts.push(s.as_ptr());
opts_lens.push(s.to_bytes().len());
jni_strings.push(s);
},
Err(err) => {
let _ = env.throw_new("java/lang/Exception", err.to_string());
return 0
}
};
}
let mut out = ptr::null_mut();
match parity_config_from_cli(opts.as_ptr(), opts_lens.as_ptr(), cli_len as usize, &mut out) {
0 => out as usize as jlong,
_ => {
let _ = env.throw_new("java/lang/Exception", "failed to create config object");
0
},
}
}
#[cfg(feature = "jni")]
#[no_mangle]
pub unsafe extern "system" fn Java_io_parity_ethereum_Parity_build(env: JNIEnv, _: JClass, config: jlong) -> jlong {
let params = ParityParams {
configuration: config as usize as *mut c_void,
.. mem::zeroed()
};
let mut out = ptr::null_mut();
match parity_start(&params, &mut out) {
0 => out as usize as jlong,
_ => {
let _ = env.throw_new("java/lang/Exception", "failed to start Parity");
0
},
}
}
#[cfg(feature = "jni")]
#[no_mangle]
pub unsafe extern "system" fn Java_io_parity_ethereum_Parity_destroy(_env: JNIEnv, _: JClass, parity: jlong) {
let parity = parity as usize as *mut c_void;
parity_destroy(parity);
}
#[cfg(feature = "jni")]
#[no_mangle]
pub unsafe extern "system" fn Java_io_parity_ethereum_Parity_rpcQueryNative<'a>(env: JNIEnv<'a>, _: JClass, parity: jlong, rpc: JString) -> JString<'a> {
let parity = parity as usize as *mut c_void;
let rpc = match env.get_string(rpc) {
Ok(s) => s,
Err(err) => {
let _ = env.throw_new("java/lang/Exception", err.to_string());
return env.new_string("").expect("Creating an empty string never fails");
},
};
let mut out_len = 255;
let mut out = [0u8; 256];
match parity_rpc(parity, rpc.as_ptr(), rpc.to_bytes().len(), out.as_mut_ptr() as *mut c_char, &mut out_len) {
0 => (),
_ => {
let _ = env.throw_new("java/lang/Exception", "failed to perform RPC query");
return env.new_string("").expect("Creating an empty string never fails");
},
}
let out = str::from_utf8(&out[..out_len])
.expect("parity always generates an UTF-8 RPC response");
match env.new_string(out) {
Ok(s) => s,
Err(err) => {
let _ = env.throw_new("java/lang/Exception", err.to_string());
return env.new_string("").expect("Creating an empty string never fails");
fn call(&self, msg: &[u8]) {
if let Some(ref cb) = self.function {
let cstr = CString::new(msg).expect("valid string with no null bytes in the middle; qed").into_raw();
cb(self.user_data, cstr, msg.len())
}
}
}