diff --git a/parity-clib/src/java.rs b/parity-clib/src/java.rs index bb807083a..ba86e5c88 100644 --- a/parity-clib/src/java.rs +++ b/parity-clib/src/java.rs @@ -15,34 +15,31 @@ // along with Parity Ethereum. If not, see . use std::{mem, ptr}; +use std::ffi::c_void; use std::sync::Arc; -use std::time::Duration; -use std::thread; -use std::os::raw::c_void; -use {parity_config_from_cli, parity_destroy, parity_set_logger, parity_start, parity_unsubscribe_ws, ParityParams, error}; +use {Callback, parity_config_from_cli, parity_destroy, parity_rpc_worker, parity_start, parity_set_logger, + parity_unsubscribe_ws, parity_ws_worker, ParityParams}; -use futures::{Future, Stream}; -use futures::sync::mpsc; use jni::{JavaVM, JNIEnv}; use jni::objects::{JClass, JString, JObject, JValue, GlobalRef}; use jni::sys::{jlong, jobjectArray, va_list}; -use tokio_current_thread::CurrentThread; -use parity_ethereum::{RunningClient, PubSubSession}; +use parity_ethereum::RunningClient; type CheckedQuery<'a> = (&'a RunningClient, String, JavaVM, GlobalRef); // Creates a Java callback to a static method named `void callback(Object)` -struct Callback<'a> { +struct JavaCallback<'a> { jvm: JavaVM, callback: GlobalRef, method_name: &'a str, method_descriptor: &'a str, } -unsafe impl<'a> Send for Callback<'a> {} -unsafe impl<'a> Sync for Callback<'a> {} -impl<'a> Callback<'a> { +unsafe impl<'a> Send for JavaCallback<'a> {} +unsafe impl<'a> Sync for JavaCallback<'a> {} + +impl<'a> JavaCallback<'a> { fn new(jvm: JavaVM, callback: GlobalRef) -> Self { Self { jvm, @@ -51,7 +48,9 @@ impl<'a> Callback<'a> { method_descriptor: "(Ljava/lang/Object;)V", } } +} +impl<'a> Callback for JavaCallback<'a> { fn call(&self, msg: &str) { let env = self.jvm.attach_current_thread().expect("JavaVM should have an environment; qed"); let java_str = env.new_string(msg.to_string()).expect("Rust String is valid JString; qed"); @@ -63,13 +62,13 @@ impl<'a> Callback<'a> { #[no_mangle] pub unsafe extern "system" fn Java_io_parity_ethereum_Parity_configFromCli(env: JNIEnv, _: JClass, cli: jobjectArray) -> jlong { - let cli_len = env.get_array_length(cli).expect("invalid Java bindings"); + let cli_len = env.get_array_length(cli).expect("invalid Java bindings") as usize; - let mut jni_strings = Vec::with_capacity(cli_len as usize); - let mut opts = Vec::with_capacity(cli_len as usize); - let mut opts_lens = Vec::with_capacity(cli_len as usize); + let mut jni_strings = Vec::with_capacity(cli_len); + let mut opts = Vec::with_capacity(cli_len); + let mut opts_lens = Vec::with_capacity(cli_len); - for n in 0..cli_len { + for n in 0..cli_len as i32 { let elem = env.get_object_array_element(cli, n).expect("invalid Java bindings"); let elem_str: JString = elem.into(); match env.get_string(elem_str) { @@ -77,7 +76,7 @@ pub unsafe extern "system" fn Java_io_parity_ethereum_Parity_configFromCli(env: opts.push(s.as_ptr()); opts_lens.push(s.to_bytes().len()); jni_strings.push(s); - }, + } Err(err) => { let _ = env.throw_new("java/lang/Exception", err.to_string()); return 0 @@ -86,7 +85,7 @@ pub unsafe extern "system" fn Java_io_parity_ethereum_Parity_configFromCli(env: } let mut out = ptr::null_mut(); - match parity_config_from_cli(opts.as_ptr(), opts_lens.as_ptr(), cli_len as usize, &mut out) { + match parity_config_from_cli(opts.as_ptr(), opts_lens.as_ptr(), cli_len, &mut out) { 0 => out as jlong, _ => { let _ = env.throw_new("java/lang/Exception", "failed to create config object"); @@ -120,7 +119,7 @@ pub unsafe extern "system" fn Java_io_parity_ethereum_Parity_build( _ => { let _ = env.throw_new("java/lang/Exception", "failed to start Parity"); 0 - }, + } } } @@ -129,7 +128,7 @@ pub unsafe extern "system" fn Java_io_parity_ethereum_Parity_destroy(_env: JNIEn parity_destroy(parity); } -unsafe fn async_checker<'a>(client: va_list, rpc: JString, callback: JObject, env: &JNIEnv<'a>) +unsafe fn java_query_checker<'a>(client: va_list, rpc: JString, callback: JObject, env: &JNIEnv<'a>) -> Result, String> { let query: String = env.get_string(rpc) .map(Into::into) @@ -151,26 +150,10 @@ pub unsafe extern "system" fn Java_io_parity_ethereum_Parity_rpcQueryNative( callback: JObject, ) { - let _ = async_checker(parity, rpc, callback, &env) + let _ = java_query_checker(parity, rpc, callback, &env) .map(|(client, query, jvm, global_ref)| { - let callback = Arc::new(Callback::new(jvm, global_ref)); - let cb = callback.clone(); - let future = client.rpc_query(&query, None).map(move |response| { - let response = response.unwrap_or_else(|| error::EMPTY.to_string()); - callback.call(&response); - }); - - let _handle = thread::Builder::new() - .name("rpc_query".to_string()) - .spawn(move || { - let mut current_thread = CurrentThread::new(); - current_thread.spawn(future); - let _ = current_thread.run_timeout(Duration::from_millis(timeout_ms as u64)) - .map_err(|_e| { - cb.call(error::TIMEOUT); - }); - }) - .expect("rpc-query thread shouldn't fail; qed"); + let callback = Arc::new(JavaCallback::new(jvm, global_ref)); + parity_rpc_worker(client, &query, callback, timeout_ms as u64); }) .map_err(|e| { let _ = env.throw_new("java/lang/Exception", e); @@ -186,43 +169,10 @@ pub unsafe extern "system" fn Java_io_parity_ethereum_Parity_subscribeWebSocketN callback: JObject, ) -> va_list { - async_checker(parity, rpc, callback, &env) + java_query_checker(parity, rpc, callback, &env) .map(move |(client, query, jvm, global_ref)| { - let callback = Arc::new(Callback::new(jvm, global_ref)); - let (tx, mut rx) = mpsc::channel(1); - let session = Arc::new(PubSubSession::new(tx)); - let weak_session = Arc::downgrade(&session); - let query_future = client.rpc_query(&query, Some(session.clone()));; - - let _handle = thread::Builder::new() - .name("ws-subscriber".into()) - .spawn(move || { - // Wait for subscription ID - // Note this may block forever and can't be destroyed using the session object - // However, this will likely timeout or be catched the RPC layer - if let Ok(Some(response)) = query_future.wait() { - callback.call(&response); - } else { - callback.call(error::SUBSCRIBE); - return; - }; - - loop { - for response in rx.by_ref().wait() { - if let Ok(r) = response { - callback.call(&r); - } - } - - let rc = weak_session.upgrade().map_or(0,|session| Arc::strong_count(&session)); - // No subscription left, then terminate - if rc <= 1 { - break; - } - } - }) - .expect("rpc-subscriber thread shouldn't fail; qed"); - Arc::into_raw(session) as va_list + let callback = Arc::new(JavaCallback::new(jvm, global_ref)); + parity_ws_worker(client, &query, callback) as va_list }) .unwrap_or_else(|e| { let _ = env.throw_new("java/lang/Exception", e); diff --git a/parity-clib/src/lib.rs b/parity-clib/src/lib.rs index eca6edcd6..bbb60ec2d 100644 --- a/parity-clib/src/lib.rs +++ b/parity-clib/src/lib.rs @@ -40,7 +40,7 @@ use futures::sync::mpsc; use parity_ethereum::{PubSubSession, RunningClient}; use tokio_current_thread::CurrentThread; -type Callback = Option; +type CCallback = Option; type CheckedQuery<'a> = (&'a RunningClient, &'static str); pub mod error { @@ -52,11 +52,33 @@ pub mod error { #[repr(C)] pub struct ParityParams { pub configuration: *mut c_void, - pub on_client_restart_cb: Callback, + pub on_client_restart_cb: CCallback, pub on_client_restart_cb_custom: *mut c_void, pub logger: *mut c_void } +/// Trait representing a callback that passes a string +pub(crate) trait Callback: Send + Sync { + fn call(&self, msg: &str); +} + +// Internal structure for handling callbacks that get passed a string. +struct CallbackStr { + user_data: *mut c_void, + function: CCallback, +} + +unsafe impl Send for CallbackStr {} +unsafe impl Sync for CallbackStr {} +impl Callback for CallbackStr { + fn call(&self, msg: &str) { + if let Some(ref cb) = self.function { + let cstr = CString::new(msg).expect("valid string with no nul bytes in the middle; qed").into_raw(); + cb(self.user_data, cstr, msg.len()) + } + } +} + #[no_mangle] pub unsafe extern fn parity_config_from_cli( args: *const *const c_char, @@ -112,7 +134,6 @@ pub unsafe extern fn parity_start(cfg: *const ParityParams, output: *mut *mut c_ panic::catch_unwind(|| { *output = ptr::null_mut(); let cfg: &ParityParams = &*cfg; - let logger = Arc::from_raw(cfg.logger as *mut parity_ethereum::RotatingLogger); let config = Box::from_raw(cfg.configuration as *mut parity_ethereum::Configuration); @@ -121,7 +142,7 @@ pub unsafe extern fn parity_start(cfg: *const ParityParams, output: *mut *mut c_ user_data: cfg.on_client_restart_cb_custom, function: cfg.on_client_restart_cb, }; - move |new_chain: String| { cb.call(new_chain.as_bytes()); } + move |new_chain: String| { cb.call(&new_chain); } }; let action = match parity_ethereum::start(*config, logger, on_client_restart_cb, || {}) { @@ -133,7 +154,7 @@ pub unsafe extern fn parity_start(cfg: *const ParityParams, output: *mut *mut c_ parity_ethereum::ExecutionAction::Instant(Some(s)) => { println!("{}", s); 0 }, parity_ethereum::ExecutionAction::Instant(None) => 0, parity_ethereum::ExecutionAction::Running(client) => { - *output = Box::into_raw(Box::::new(client)) as *mut c_void; + *output = Box::into_raw(Box::new(client)) as *mut c_void; 0 } } @@ -148,47 +169,19 @@ pub unsafe extern fn parity_destroy(client: *mut c_void) { }); } -unsafe fn parity_rpc_query_checker<'a>(client: *const c_void, query: *const c_char, len: usize) - -> Option> -{ - let query_str = { - let string = slice::from_raw_parts(query as *const u8, len); - str::from_utf8(string).ok()? - }; - let client: &RunningClient = &*(client as *const RunningClient); - Some((client, query_str)) -} - #[no_mangle] pub unsafe extern fn parity_rpc( client: *const c_void, query: *const c_char, len: usize, timeout_ms: usize, - callback: Callback, + callback: CCallback, user_data: *mut c_void, ) -> c_int { panic::catch_unwind(|| { if let Some((client, query)) = parity_rpc_query_checker(client, query, len) { - let client = client as &RunningClient; let callback = Arc::new(CallbackStr {user_data, function: callback} ); - let cb = callback.clone(); - let query = client.rpc_query(query, None).map(move |response| { - let response = response.unwrap_or_else(|| error::EMPTY.to_string()); - callback.call(response.as_bytes()); - }); - - let _handle = thread::Builder::new() - .name("rpc_query".to_string()) - .spawn(move || { - let mut current_thread = CurrentThread::new(); - current_thread.spawn(query); - let _ = current_thread.run_timeout(Duration::from_millis(timeout_ms as u64)) - .map_err(|_e| { - cb.call(error::TIMEOUT.as_bytes()); - }); - }) - .expect("rpc-query thread shouldn't fail; qed"); + parity_rpc_worker(client, query, callback, timeout_ms as u64); 0 } else { 1 @@ -201,47 +194,13 @@ pub unsafe extern fn parity_subscribe_ws( client: *const c_void, query: *const c_char, len: usize, - callback: Callback, + callback: CCallback, user_data: *mut c_void, ) -> *const c_void { - panic::catch_unwind(|| { if let Some((client, query)) = parity_rpc_query_checker(client, query, len) { - let (tx, mut rx) = mpsc::channel(1); - let session = Arc::new(PubSubSession::new(tx)); - let query_future = client.rpc_query(query, Some(session.clone())); - let weak_session = Arc::downgrade(&session); - let cb = CallbackStr { user_data, function: callback}; - - let _handle = thread::Builder::new() - .name("ws-subscriber".into()) - .spawn(move || { - // Wait for subscription ID - // Note this may block forever and be can't destroyed using the session object - // However, this will likely timeout or be catched the RPC layer - if let Ok(Some(response)) = query_future.wait() { - cb.call(response.as_bytes()); - } else { - cb.call(error::SUBSCRIBE.as_bytes()); - return; - } - - loop { - for response in rx.by_ref().wait() { - if let Ok(r) = response { - cb.call(r.as_bytes()); - } - } - - let rc = weak_session.upgrade().map_or(0,|session| Arc::strong_count(&session)); - // No subscription left, then terminate - if rc <= 1 { - break; - } - } - }) - .expect("rpc-subscriber thread shouldn't fail; qed"); - Arc::into_raw(session) as *const c_void + let callback = Arc::new(CallbackStr { user_data, function: callback}); + parity_ws_worker(client, query, callback) } else { ptr::null() } @@ -257,10 +216,10 @@ pub unsafe extern fn parity_unsubscribe_ws(session: *const c_void) { } #[no_mangle] -pub unsafe extern fn parity_set_panic_hook(callback: Callback, param: *mut c_void) { +pub extern fn parity_set_panic_hook(callback: CCallback, param: *mut c_void) { let cb = CallbackStr {user_data: param, function: callback}; panic_hook::set_with(move |panic_msg| { - cb.call(panic_msg.as_bytes()); + cb.call(panic_msg); }); } @@ -283,19 +242,63 @@ pub unsafe extern fn parity_set_logger( *logger = Arc::into_raw(parity_ethereum::setup_log(&logger_cfg).expect("Logger initialized only once; qed")) as *mut _; } -// Internal structure for handling callbacks that get passed a string. -struct CallbackStr { - user_data: *mut c_void, - function: Callback, +// WebSocket event loop +fn parity_ws_worker(client: &RunningClient, query: &str, callback: Arc) -> *const c_void { + let (tx, mut rx) = mpsc::channel(1); + let session = Arc::new(PubSubSession::new(tx)); + let query_future = client.rpc_query(query, Some(session.clone())); + let weak_session = Arc::downgrade(&session); + let _handle = thread::Builder::new() + .name("ws-subscriber".into()) + .spawn(move || { + // Wait for subscription ID + // Note this may block forever and be can't destroyed using the session object + // However, this will likely timeout or be catched the RPC layer + if let Ok(Some(response)) = query_future.wait() { + callback.call(&response); + } else { + callback.call(error::SUBSCRIBE); + return; + } + + while weak_session.upgrade().map_or(0, |session| Arc::strong_count(&session)) > 1 { + for response in rx.by_ref().wait() { + if let Ok(r) = response { + callback.call(&r); + } + } + } + }) + .expect("rpc-subscriber thread shouldn't fail; qed"); + Arc::into_raw(session) as *const c_void } -unsafe impl Send for CallbackStr {} -unsafe impl Sync for CallbackStr {} -impl CallbackStr { - fn call(&self, msg: &[u8]) { - if let Some(ref cb) = self.function { - let cstr = CString::new(msg).expect("valid string with no null bytes in the middle; qed").into_raw(); - cb(self.user_data, cstr, msg.len()) - } - } +// RPC event loop that runs for at most `timeout_ms` +fn parity_rpc_worker(client: &RunningClient, query: &str, callback: Arc, timeout_ms: u64) { + let cb = callback.clone(); + let query = client.rpc_query(query, None).map(move |response| { + let response = response.unwrap_or_else(|| error::EMPTY.to_string()); + callback.call(&response); + }); + + let _handle = thread::Builder::new() + .name("rpc_query".to_string()) + .spawn(move || { + let mut current_thread = CurrentThread::new(); + current_thread.spawn(query); + let _ = current_thread + .run_timeout(Duration::from_millis(timeout_ms)) + .map_err(|_e| { + cb.call(error::TIMEOUT); + }); + }) + .expect("rpc-query thread shouldn't fail; qed"); +} + +unsafe fn parity_rpc_query_checker<'a>(client: *const c_void, query: *const c_char, len: usize) + -> Option> +{ + let query_str = str::from_utf8(slice::from_raw_parts(query as *const u8, len)).ok()?; + let client: &RunningClient = &*(client as *const RunningClient); + Some((client, query_str)) }