* fix(remove needless unsafe blocks) * style(nits) * fix(parity-clib): eliminate repetitive event loops * revert(java bindings): safe rust -> unsafe rust These functions can still end up with `UB` thus should be unsafe * fix(grumbles): make Callback trait `pub (crate)`
This commit is contained in:
		
							parent
							
								
									751d15e4be
								
							
						
					
					
						commit
						c84e5745fa
					
				@ -15,34 +15,31 @@
 | 
			
		||||
// along with Parity Ethereum.  If not, see <http://www.gnu.org/licenses/>.
 | 
			
		||||
 | 
			
		||||
use std::{mem, ptr};
 | 
			
		||||
use std::ffi::c_void;
 | 
			
		||||
use std::sync::Arc;
 | 
			
		||||
use std::time::Duration;
 | 
			
		||||
use std::thread;
 | 
			
		||||
use std::os::raw::c_void;
 | 
			
		||||
 | 
			
		||||
use {parity_config_from_cli, parity_destroy, parity_set_logger, parity_start, parity_unsubscribe_ws, ParityParams, error};
 | 
			
		||||
use {Callback, parity_config_from_cli, parity_destroy, parity_rpc_worker, parity_start, parity_set_logger,
 | 
			
		||||
	parity_unsubscribe_ws, parity_ws_worker, ParityParams};
 | 
			
		||||
 | 
			
		||||
use futures::{Future, Stream};
 | 
			
		||||
use futures::sync::mpsc;
 | 
			
		||||
use jni::{JavaVM, JNIEnv};
 | 
			
		||||
use jni::objects::{JClass, JString, JObject, JValue, GlobalRef};
 | 
			
		||||
use jni::sys::{jlong, jobjectArray, va_list};
 | 
			
		||||
use tokio_current_thread::CurrentThread;
 | 
			
		||||
use parity_ethereum::{RunningClient, PubSubSession};
 | 
			
		||||
use parity_ethereum::RunningClient;
 | 
			
		||||
 | 
			
		||||
type CheckedQuery<'a> = (&'a RunningClient, String, JavaVM, GlobalRef);
 | 
			
		||||
 | 
			
		||||
// Creates a Java callback to a static method named `void callback(Object)`
 | 
			
		||||
struct Callback<'a> {
 | 
			
		||||
struct JavaCallback<'a> {
 | 
			
		||||
	jvm: JavaVM,
 | 
			
		||||
	callback: GlobalRef,
 | 
			
		||||
	method_name: &'a str,
 | 
			
		||||
	method_descriptor: &'a str,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
unsafe impl<'a> Send for Callback<'a> {}
 | 
			
		||||
unsafe impl<'a> Sync for Callback<'a> {}
 | 
			
		||||
impl<'a> Callback<'a> {
 | 
			
		||||
unsafe impl<'a> Send for JavaCallback<'a> {}
 | 
			
		||||
unsafe impl<'a> Sync for JavaCallback<'a> {}
 | 
			
		||||
 | 
			
		||||
impl<'a> JavaCallback<'a> {
 | 
			
		||||
	fn new(jvm: JavaVM, callback: GlobalRef) -> Self {
 | 
			
		||||
		Self {
 | 
			
		||||
			jvm,
 | 
			
		||||
@ -51,7 +48,9 @@ impl<'a> Callback<'a> {
 | 
			
		||||
			method_descriptor: "(Ljava/lang/Object;)V",
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl<'a> Callback for JavaCallback<'a> {
 | 
			
		||||
	fn call(&self, msg: &str) {
 | 
			
		||||
		let env = self.jvm.attach_current_thread().expect("JavaVM should have an environment; qed");
 | 
			
		||||
		let java_str = env.new_string(msg.to_string()).expect("Rust String is valid JString; qed");
 | 
			
		||||
@ -63,13 +62,13 @@ impl<'a> Callback<'a> {
 | 
			
		||||
 | 
			
		||||
#[no_mangle]
 | 
			
		||||
pub unsafe extern "system" fn Java_io_parity_ethereum_Parity_configFromCli(env: JNIEnv, _: JClass, cli: jobjectArray) -> jlong {
 | 
			
		||||
	let cli_len = env.get_array_length(cli).expect("invalid Java bindings");
 | 
			
		||||
	let cli_len = env.get_array_length(cli).expect("invalid Java bindings") as usize;
 | 
			
		||||
 | 
			
		||||
	let mut jni_strings = Vec::with_capacity(cli_len as usize);
 | 
			
		||||
	let mut opts = Vec::with_capacity(cli_len as usize);
 | 
			
		||||
	let mut opts_lens = Vec::with_capacity(cli_len as usize);
 | 
			
		||||
	let mut jni_strings = Vec::with_capacity(cli_len);
 | 
			
		||||
	let mut opts = Vec::with_capacity(cli_len);
 | 
			
		||||
	let mut opts_lens = Vec::with_capacity(cli_len);
 | 
			
		||||
 | 
			
		||||
	for n in 0..cli_len {
 | 
			
		||||
	for n in 0..cli_len as i32 {
 | 
			
		||||
		let elem = env.get_object_array_element(cli, n).expect("invalid Java bindings");
 | 
			
		||||
		let elem_str: JString = elem.into();
 | 
			
		||||
		match env.get_string(elem_str) {
 | 
			
		||||
@ -77,7 +76,7 @@ pub unsafe extern "system" fn Java_io_parity_ethereum_Parity_configFromCli(env:
 | 
			
		||||
				opts.push(s.as_ptr());
 | 
			
		||||
				opts_lens.push(s.to_bytes().len());
 | 
			
		||||
				jni_strings.push(s);
 | 
			
		||||
			},
 | 
			
		||||
			}
 | 
			
		||||
			Err(err) => {
 | 
			
		||||
				let _ = env.throw_new("java/lang/Exception", err.to_string());
 | 
			
		||||
				return 0
 | 
			
		||||
@ -86,7 +85,7 @@ pub unsafe extern "system" fn Java_io_parity_ethereum_Parity_configFromCli(env:
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	let mut out = ptr::null_mut();
 | 
			
		||||
	match parity_config_from_cli(opts.as_ptr(), opts_lens.as_ptr(), cli_len as usize, &mut out) {
 | 
			
		||||
	match parity_config_from_cli(opts.as_ptr(), opts_lens.as_ptr(), cli_len, &mut out) {
 | 
			
		||||
		0 => out as jlong,
 | 
			
		||||
		_ => {
 | 
			
		||||
			let _ = env.throw_new("java/lang/Exception", "failed to create config object");
 | 
			
		||||
@ -120,7 +119,7 @@ pub unsafe extern "system" fn Java_io_parity_ethereum_Parity_build(
 | 
			
		||||
		_ => {
 | 
			
		||||
			let _ = env.throw_new("java/lang/Exception", "failed to start Parity");
 | 
			
		||||
			0
 | 
			
		||||
		},
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -129,7 +128,7 @@ pub unsafe extern "system" fn Java_io_parity_ethereum_Parity_destroy(_env: JNIEn
 | 
			
		||||
	parity_destroy(parity);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
unsafe fn async_checker<'a>(client: va_list, rpc: JString, callback: JObject, env: &JNIEnv<'a>)
 | 
			
		||||
unsafe fn java_query_checker<'a>(client: va_list, rpc: JString, callback: JObject, env: &JNIEnv<'a>)
 | 
			
		||||
-> Result<CheckedQuery<'a>, String> {
 | 
			
		||||
	let query: String = env.get_string(rpc)
 | 
			
		||||
		.map(Into::into)
 | 
			
		||||
@ -151,26 +150,10 @@ pub unsafe extern "system" fn Java_io_parity_ethereum_Parity_rpcQueryNative(
 | 
			
		||||
	callback: JObject,
 | 
			
		||||
	)
 | 
			
		||||
{
 | 
			
		||||
	let _ = async_checker(parity, rpc, callback, &env)
 | 
			
		||||
	let _ = java_query_checker(parity, rpc, callback, &env)
 | 
			
		||||
		.map(|(client, query, jvm, global_ref)| {
 | 
			
		||||
			let callback = Arc::new(Callback::new(jvm, global_ref));
 | 
			
		||||
			let cb = callback.clone();
 | 
			
		||||
			let future = client.rpc_query(&query, None).map(move |response| {
 | 
			
		||||
				let response = response.unwrap_or_else(|| error::EMPTY.to_string());
 | 
			
		||||
				callback.call(&response);
 | 
			
		||||
			});
 | 
			
		||||
 | 
			
		||||
			let _handle = thread::Builder::new()
 | 
			
		||||
				.name("rpc_query".to_string())
 | 
			
		||||
				.spawn(move || {
 | 
			
		||||
					let mut current_thread = CurrentThread::new();
 | 
			
		||||
					current_thread.spawn(future);
 | 
			
		||||
					let _ = current_thread.run_timeout(Duration::from_millis(timeout_ms as u64))
 | 
			
		||||
						.map_err(|_e| {
 | 
			
		||||
							cb.call(error::TIMEOUT);
 | 
			
		||||
						});
 | 
			
		||||
				})
 | 
			
		||||
				.expect("rpc-query thread shouldn't fail; qed");
 | 
			
		||||
			let callback = Arc::new(JavaCallback::new(jvm, global_ref));
 | 
			
		||||
			parity_rpc_worker(client, &query, callback, timeout_ms as u64);
 | 
			
		||||
		})
 | 
			
		||||
		.map_err(|e| {
 | 
			
		||||
			let _ = env.throw_new("java/lang/Exception", e);
 | 
			
		||||
@ -186,43 +169,10 @@ pub unsafe extern "system" fn Java_io_parity_ethereum_Parity_subscribeWebSocketN
 | 
			
		||||
	callback: JObject,
 | 
			
		||||
	) -> va_list {
 | 
			
		||||
 | 
			
		||||
	async_checker(parity, rpc, callback, &env)
 | 
			
		||||
	java_query_checker(parity, rpc, callback, &env)
 | 
			
		||||
		.map(move |(client, query, jvm, global_ref)| {
 | 
			
		||||
			let callback = Arc::new(Callback::new(jvm, global_ref));
 | 
			
		||||
			let (tx, mut rx) = mpsc::channel(1);
 | 
			
		||||
			let session = Arc::new(PubSubSession::new(tx));
 | 
			
		||||
			let weak_session = Arc::downgrade(&session);
 | 
			
		||||
			let query_future = client.rpc_query(&query, Some(session.clone()));;
 | 
			
		||||
 | 
			
		||||
			let _handle = thread::Builder::new()
 | 
			
		||||
				.name("ws-subscriber".into())
 | 
			
		||||
				.spawn(move || {
 | 
			
		||||
					// Wait for subscription ID
 | 
			
		||||
					// Note this may block forever and can't be destroyed using the session object
 | 
			
		||||
					// However, this will likely timeout or be catched the RPC layer
 | 
			
		||||
					if let Ok(Some(response)) = query_future.wait() {
 | 
			
		||||
						callback.call(&response);
 | 
			
		||||
					} else {
 | 
			
		||||
						callback.call(error::SUBSCRIBE);
 | 
			
		||||
						return;
 | 
			
		||||
					};
 | 
			
		||||
 | 
			
		||||
					loop {
 | 
			
		||||
						for response in rx.by_ref().wait() {
 | 
			
		||||
							if let Ok(r) = response {
 | 
			
		||||
								callback.call(&r);
 | 
			
		||||
							}
 | 
			
		||||
						}
 | 
			
		||||
 | 
			
		||||
						let rc = weak_session.upgrade().map_or(0,|session| Arc::strong_count(&session));
 | 
			
		||||
						// No subscription left, then terminate
 | 
			
		||||
						if rc <= 1 {
 | 
			
		||||
							break;
 | 
			
		||||
						}
 | 
			
		||||
					}
 | 
			
		||||
				})
 | 
			
		||||
			.expect("rpc-subscriber thread shouldn't fail; qed");
 | 
			
		||||
			Arc::into_raw(session) as va_list
 | 
			
		||||
			let callback = Arc::new(JavaCallback::new(jvm, global_ref));
 | 
			
		||||
			parity_ws_worker(client, &query, callback) as va_list
 | 
			
		||||
		})
 | 
			
		||||
		.unwrap_or_else(|e| {
 | 
			
		||||
			let _ = env.throw_new("java/lang/Exception", e);
 | 
			
		||||
 | 
			
		||||
@ -40,7 +40,7 @@ use futures::sync::mpsc;
 | 
			
		||||
use parity_ethereum::{PubSubSession, RunningClient};
 | 
			
		||||
use tokio_current_thread::CurrentThread;
 | 
			
		||||
 | 
			
		||||
type Callback = Option<extern "C" fn(*mut c_void, *const c_char, usize)>;
 | 
			
		||||
type CCallback = Option<extern "C" fn(*mut c_void, *const c_char, usize)>;
 | 
			
		||||
type CheckedQuery<'a> = (&'a RunningClient, &'static str);
 | 
			
		||||
 | 
			
		||||
pub mod error {
 | 
			
		||||
@ -52,11 +52,33 @@ pub mod error {
 | 
			
		||||
#[repr(C)]
 | 
			
		||||
pub struct ParityParams {
 | 
			
		||||
	pub configuration: *mut c_void,
 | 
			
		||||
	pub on_client_restart_cb: Callback,
 | 
			
		||||
	pub on_client_restart_cb: CCallback,
 | 
			
		||||
	pub on_client_restart_cb_custom: *mut c_void,
 | 
			
		||||
	pub logger: *mut c_void
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/// Trait representing a callback that passes a string
 | 
			
		||||
pub(crate) trait Callback: Send + Sync {
 | 
			
		||||
	fn call(&self, msg: &str);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Internal structure for handling callbacks that get passed a string.
 | 
			
		||||
struct CallbackStr {
 | 
			
		||||
	user_data: *mut c_void,
 | 
			
		||||
	function: CCallback,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
unsafe impl Send for CallbackStr {}
 | 
			
		||||
unsafe impl Sync for CallbackStr {}
 | 
			
		||||
impl Callback for CallbackStr {
 | 
			
		||||
	fn call(&self, msg: &str) {
 | 
			
		||||
		if let Some(ref cb) = self.function {
 | 
			
		||||
			let cstr = CString::new(msg).expect("valid string with no nul bytes in the middle; qed").into_raw();
 | 
			
		||||
			cb(self.user_data, cstr, msg.len())
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[no_mangle]
 | 
			
		||||
pub unsafe extern fn parity_config_from_cli(
 | 
			
		||||
	args: *const *const c_char,
 | 
			
		||||
@ -112,7 +134,6 @@ pub unsafe extern fn parity_start(cfg: *const ParityParams, output: *mut *mut c_
 | 
			
		||||
	panic::catch_unwind(|| {
 | 
			
		||||
		*output = ptr::null_mut();
 | 
			
		||||
		let cfg: &ParityParams = &*cfg;
 | 
			
		||||
 | 
			
		||||
		let logger = Arc::from_raw(cfg.logger as *mut parity_ethereum::RotatingLogger);
 | 
			
		||||
		let config = Box::from_raw(cfg.configuration as *mut parity_ethereum::Configuration);
 | 
			
		||||
 | 
			
		||||
@ -121,7 +142,7 @@ pub unsafe extern fn parity_start(cfg: *const ParityParams, output: *mut *mut c_
 | 
			
		||||
				user_data: cfg.on_client_restart_cb_custom,
 | 
			
		||||
				function: cfg.on_client_restart_cb,
 | 
			
		||||
			};
 | 
			
		||||
			move |new_chain: String| { cb.call(new_chain.as_bytes()); }
 | 
			
		||||
			move |new_chain: String| { cb.call(&new_chain); }
 | 
			
		||||
		};
 | 
			
		||||
 | 
			
		||||
		let action = match parity_ethereum::start(*config, logger, on_client_restart_cb, || {}) {
 | 
			
		||||
@ -133,7 +154,7 @@ pub unsafe extern fn parity_start(cfg: *const ParityParams, output: *mut *mut c_
 | 
			
		||||
			parity_ethereum::ExecutionAction::Instant(Some(s)) => { println!("{}", s); 0 },
 | 
			
		||||
			parity_ethereum::ExecutionAction::Instant(None) => 0,
 | 
			
		||||
			parity_ethereum::ExecutionAction::Running(client) => {
 | 
			
		||||
				*output = Box::into_raw(Box::<parity_ethereum::RunningClient>::new(client)) as *mut c_void;
 | 
			
		||||
				*output = Box::into_raw(Box::new(client)) as *mut c_void;
 | 
			
		||||
				0
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
@ -148,47 +169,19 @@ pub unsafe extern fn parity_destroy(client: *mut c_void) {
 | 
			
		||||
	});
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
unsafe fn parity_rpc_query_checker<'a>(client: *const c_void, query: *const c_char, len: usize)
 | 
			
		||||
	-> Option<CheckedQuery<'a>>
 | 
			
		||||
{
 | 
			
		||||
	let query_str = {
 | 
			
		||||
			let string = slice::from_raw_parts(query as *const u8, len);
 | 
			
		||||
			str::from_utf8(string).ok()?
 | 
			
		||||
	};
 | 
			
		||||
	let client: &RunningClient = &*(client as *const RunningClient);
 | 
			
		||||
	Some((client, query_str))
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[no_mangle]
 | 
			
		||||
pub unsafe extern fn parity_rpc(
 | 
			
		||||
	client: *const c_void,
 | 
			
		||||
	query: *const c_char,
 | 
			
		||||
	len: usize,
 | 
			
		||||
	timeout_ms: usize,
 | 
			
		||||
	callback: Callback,
 | 
			
		||||
	callback: CCallback,
 | 
			
		||||
	user_data: *mut c_void,
 | 
			
		||||
) -> c_int {
 | 
			
		||||
	panic::catch_unwind(|| {
 | 
			
		||||
		if let Some((client, query)) = parity_rpc_query_checker(client, query, len) {
 | 
			
		||||
			let client = client as &RunningClient;
 | 
			
		||||
			let callback = Arc::new(CallbackStr {user_data, function: callback} );
 | 
			
		||||
			let cb = callback.clone();
 | 
			
		||||
			let query = client.rpc_query(query, None).map(move |response| {
 | 
			
		||||
				let response = response.unwrap_or_else(|| error::EMPTY.to_string());
 | 
			
		||||
				callback.call(response.as_bytes());
 | 
			
		||||
			});
 | 
			
		||||
 | 
			
		||||
			let _handle = thread::Builder::new()
 | 
			
		||||
			.name("rpc_query".to_string())
 | 
			
		||||
			.spawn(move || {
 | 
			
		||||
				let mut current_thread = CurrentThread::new();
 | 
			
		||||
				current_thread.spawn(query);
 | 
			
		||||
				let _ = current_thread.run_timeout(Duration::from_millis(timeout_ms as u64))
 | 
			
		||||
				.map_err(|_e| {
 | 
			
		||||
					cb.call(error::TIMEOUT.as_bytes());
 | 
			
		||||
				});
 | 
			
		||||
			})
 | 
			
		||||
			.expect("rpc-query thread shouldn't fail; qed");
 | 
			
		||||
			parity_rpc_worker(client, query, callback, timeout_ms as u64);
 | 
			
		||||
			0
 | 
			
		||||
		} else {
 | 
			
		||||
			1
 | 
			
		||||
@ -201,47 +194,13 @@ pub unsafe extern fn parity_subscribe_ws(
 | 
			
		||||
	client: *const c_void,
 | 
			
		||||
	query: *const c_char,
 | 
			
		||||
	len: usize,
 | 
			
		||||
	callback: Callback,
 | 
			
		||||
	callback: CCallback,
 | 
			
		||||
	user_data: *mut c_void,
 | 
			
		||||
) -> *const c_void {
 | 
			
		||||
 | 
			
		||||
	panic::catch_unwind(|| {
 | 
			
		||||
		if let Some((client, query)) = parity_rpc_query_checker(client, query, len) {
 | 
			
		||||
			let (tx, mut rx) = mpsc::channel(1);
 | 
			
		||||
			let session = Arc::new(PubSubSession::new(tx));
 | 
			
		||||
			let query_future = client.rpc_query(query, Some(session.clone()));
 | 
			
		||||
			let weak_session = Arc::downgrade(&session);
 | 
			
		||||
			let cb = CallbackStr { user_data, function: callback};
 | 
			
		||||
 | 
			
		||||
			let _handle = thread::Builder::new()
 | 
			
		||||
				.name("ws-subscriber".into())
 | 
			
		||||
				.spawn(move || {
 | 
			
		||||
					// Wait for subscription ID
 | 
			
		||||
					// Note this may block forever and be can't destroyed using the session object
 | 
			
		||||
					// However, this will likely timeout or be catched the RPC layer
 | 
			
		||||
					if let Ok(Some(response)) = query_future.wait() {
 | 
			
		||||
						cb.call(response.as_bytes());
 | 
			
		||||
					} else {
 | 
			
		||||
						cb.call(error::SUBSCRIBE.as_bytes());
 | 
			
		||||
						return;
 | 
			
		||||
					}
 | 
			
		||||
 | 
			
		||||
					loop {
 | 
			
		||||
						for response in rx.by_ref().wait() {
 | 
			
		||||
							if let Ok(r) = response {
 | 
			
		||||
								cb.call(r.as_bytes());
 | 
			
		||||
							}
 | 
			
		||||
						}
 | 
			
		||||
 | 
			
		||||
						let rc = weak_session.upgrade().map_or(0,|session| Arc::strong_count(&session));
 | 
			
		||||
						// No subscription left, then terminate
 | 
			
		||||
						if rc <= 1 {
 | 
			
		||||
							break;
 | 
			
		||||
						}
 | 
			
		||||
					}
 | 
			
		||||
			})
 | 
			
		||||
			.expect("rpc-subscriber thread shouldn't fail; qed");
 | 
			
		||||
			Arc::into_raw(session) as *const c_void
 | 
			
		||||
			let callback = Arc::new(CallbackStr { user_data, function: callback});
 | 
			
		||||
			parity_ws_worker(client, query, callback)
 | 
			
		||||
		} else {
 | 
			
		||||
			ptr::null()
 | 
			
		||||
		}
 | 
			
		||||
@ -257,10 +216,10 @@ pub unsafe extern fn parity_unsubscribe_ws(session: *const c_void) {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[no_mangle]
 | 
			
		||||
pub unsafe extern fn parity_set_panic_hook(callback: Callback, param: *mut c_void) {
 | 
			
		||||
pub extern fn parity_set_panic_hook(callback: CCallback, param: *mut c_void) {
 | 
			
		||||
	let cb = CallbackStr {user_data: param, function: callback};
 | 
			
		||||
	panic_hook::set_with(move |panic_msg| {
 | 
			
		||||
		cb.call(panic_msg.as_bytes());
 | 
			
		||||
		cb.call(panic_msg);
 | 
			
		||||
	});
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -283,19 +242,63 @@ pub unsafe extern fn parity_set_logger(
 | 
			
		||||
	*logger = Arc::into_raw(parity_ethereum::setup_log(&logger_cfg).expect("Logger initialized only once; qed")) as *mut _;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Internal structure for handling callbacks that get passed a string.
 | 
			
		||||
struct CallbackStr {
 | 
			
		||||
	user_data: *mut c_void,
 | 
			
		||||
	function: Callback,
 | 
			
		||||
// WebSocket event loop
 | 
			
		||||
fn parity_ws_worker(client: &RunningClient, query: &str, callback: Arc<Callback>) -> *const c_void {
 | 
			
		||||
	let (tx, mut rx) = mpsc::channel(1);
 | 
			
		||||
	let session = Arc::new(PubSubSession::new(tx));
 | 
			
		||||
	let query_future = client.rpc_query(query, Some(session.clone()));
 | 
			
		||||
	let weak_session = Arc::downgrade(&session);
 | 
			
		||||
	let _handle = thread::Builder::new()
 | 
			
		||||
		.name("ws-subscriber".into())
 | 
			
		||||
		.spawn(move || {
 | 
			
		||||
			// Wait for subscription ID
 | 
			
		||||
			// Note this may block forever and be can't destroyed using the session object
 | 
			
		||||
			// However, this will likely timeout or be catched the RPC layer
 | 
			
		||||
			if let Ok(Some(response)) = query_future.wait() {
 | 
			
		||||
				callback.call(&response);
 | 
			
		||||
			} else {
 | 
			
		||||
				callback.call(error::SUBSCRIBE);
 | 
			
		||||
				return;
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			while weak_session.upgrade().map_or(0, |session| Arc::strong_count(&session)) > 1 {
 | 
			
		||||
				for response in rx.by_ref().wait() {
 | 
			
		||||
					if let Ok(r) = response {
 | 
			
		||||
						callback.call(&r);
 | 
			
		||||
					}
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		})
 | 
			
		||||
		.expect("rpc-subscriber thread shouldn't fail; qed");
 | 
			
		||||
	Arc::into_raw(session) as *const c_void
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
unsafe impl Send for CallbackStr {}
 | 
			
		||||
unsafe impl Sync for CallbackStr {}
 | 
			
		||||
impl CallbackStr {
 | 
			
		||||
	fn call(&self, msg: &[u8]) {
 | 
			
		||||
		if let Some(ref cb) = self.function {
 | 
			
		||||
			let cstr = CString::new(msg).expect("valid string with no null bytes in the middle; qed").into_raw();
 | 
			
		||||
			cb(self.user_data, cstr, msg.len())
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
// RPC event loop that runs for at most `timeout_ms`
 | 
			
		||||
fn parity_rpc_worker(client: &RunningClient, query: &str, callback: Arc<Callback>, timeout_ms: u64) {
 | 
			
		||||
	let cb = callback.clone();
 | 
			
		||||
	let query = client.rpc_query(query, None).map(move |response| {
 | 
			
		||||
		let response = response.unwrap_or_else(|| error::EMPTY.to_string());
 | 
			
		||||
		callback.call(&response);
 | 
			
		||||
	});
 | 
			
		||||
 | 
			
		||||
	let _handle = thread::Builder::new()
 | 
			
		||||
		.name("rpc_query".to_string())
 | 
			
		||||
		.spawn(move || {
 | 
			
		||||
			let mut current_thread = CurrentThread::new();
 | 
			
		||||
			current_thread.spawn(query);
 | 
			
		||||
			let _ = current_thread
 | 
			
		||||
				.run_timeout(Duration::from_millis(timeout_ms))
 | 
			
		||||
				.map_err(|_e| {
 | 
			
		||||
					cb.call(error::TIMEOUT);
 | 
			
		||||
				});
 | 
			
		||||
		})
 | 
			
		||||
		.expect("rpc-query thread shouldn't fail; qed");
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
unsafe fn parity_rpc_query_checker<'a>(client: *const c_void, query: *const c_char, len: usize)
 | 
			
		||||
	-> Option<CheckedQuery<'a>>
 | 
			
		||||
{
 | 
			
		||||
	let query_str = str::from_utf8(slice::from_raw_parts(query as *const u8, len)).ok()?;
 | 
			
		||||
	let client: &RunningClient = &*(client as *const RunningClient);
 | 
			
		||||
	Some((client, query_str))
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
		Loading…
	
		Reference in New Issue
	
	Block a user