* New version of jsonrpc. * Better invalid encoding messages * Fixing deprecated methods of tokio_core * Using dedicated branch for jsonrpc * Bump
123 lines
3.4 KiB
Rust
123 lines
3.4 KiB
Rust
// Copyright 2015-2017 Parity Technologies (UK) Ltd.
|
|
// This file is part of Parity.
|
|
|
|
// Parity is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
|
|
// Parity is distributed in the hope that it will be useful,
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU General Public License for more details.
|
|
|
|
// You should have received a copy of the GNU General Public License
|
|
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
use std::{io, thread, time};
|
|
use std::sync::{atomic, mpsc, Arc};
|
|
use util::Mutex;
|
|
|
|
use futures::{self, Future};
|
|
use fetch::{self, Fetch};
|
|
|
|
pub struct FetchControl {
|
|
sender: mpsc::Sender<()>,
|
|
fetch: FakeFetch,
|
|
}
|
|
|
|
impl FetchControl {
|
|
pub fn respond(self) {
|
|
self.sender.send(())
|
|
.expect("Fetch cannot be finished without sending a response at least once.");
|
|
}
|
|
|
|
pub fn wait_for_requests(&self, len: usize) {
|
|
const MAX_TIMEOUT_MS: u64 = 5000;
|
|
const ATTEMPTS: u64 = 10;
|
|
let mut attempts_left = ATTEMPTS;
|
|
loop {
|
|
let current = self.fetch.requested.lock().len();
|
|
|
|
if current == len {
|
|
break;
|
|
} else if attempts_left == 0 {
|
|
panic!(
|
|
"Timeout reached when waiting for pending requests. Expected: {}, current: {}",
|
|
len, current
|
|
);
|
|
} else {
|
|
attempts_left -= 1;
|
|
// Should we handle spurious timeouts better?
|
|
thread::park_timeout(time::Duration::from_millis(MAX_TIMEOUT_MS / ATTEMPTS));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
#[derive(Clone, Default)]
|
|
pub struct FakeFetch {
|
|
manual: Arc<Mutex<Option<mpsc::Receiver<()>>>>,
|
|
response: Arc<Mutex<Option<&'static [u8]>>>,
|
|
asserted: Arc<atomic::AtomicUsize>,
|
|
requested: Arc<Mutex<Vec<String>>>,
|
|
}
|
|
|
|
impl FakeFetch {
|
|
pub fn set_response(&self, data: &'static [u8]) {
|
|
*self.response.lock() = Some(data);
|
|
}
|
|
|
|
pub fn manual(&self) -> FetchControl {
|
|
assert!(self.manual.lock().is_none(), "Only one manual control may be active.");
|
|
let (tx, rx) = mpsc::channel();
|
|
*self.manual.lock() = Some(rx);
|
|
|
|
FetchControl {
|
|
sender: tx,
|
|
fetch: self.clone(),
|
|
}
|
|
}
|
|
|
|
pub fn assert_requested(&self, url: &str) {
|
|
let requests = self.requested.lock();
|
|
let idx = self.asserted.fetch_add(1, atomic::Ordering::SeqCst);
|
|
|
|
assert_eq!(requests.get(idx), Some(&url.to_owned()), "Expected fetch from specific URL.");
|
|
}
|
|
|
|
pub fn assert_no_more_requests(&self) {
|
|
let requests = self.requested.lock();
|
|
let len = self.asserted.load(atomic::Ordering::SeqCst);
|
|
assert_eq!(requests.len(), len, "Didn't expect any more requests, got: {:?}", &requests[len..]);
|
|
}
|
|
}
|
|
|
|
impl Fetch for FakeFetch {
|
|
type Result = futures::BoxFuture<fetch::Response, fetch::Error>;
|
|
|
|
fn new() -> Result<Self, fetch::Error> where Self: Sized {
|
|
Ok(FakeFetch::default())
|
|
}
|
|
|
|
fn fetch_with_abort(&self, url: &str, _abort: fetch::Abort) -> Self::Result {
|
|
self.requested.lock().push(url.into());
|
|
let manual = self.manual.clone();
|
|
let response = self.response.clone();
|
|
|
|
let (tx, rx) = futures::oneshot();
|
|
thread::spawn(move || {
|
|
if let Some(rx) = manual.lock().take() {
|
|
// wait for manual resume
|
|
let _ = rx.recv();
|
|
}
|
|
|
|
let data = response.lock().take().unwrap_or(b"Some content");
|
|
let cursor = io::Cursor::new(data);
|
|
tx.send(fetch::Response::from_reader(cursor)).unwrap();
|
|
});
|
|
|
|
rx.map_err(|_| fetch::Error::Aborted).boxed()
|
|
}
|
|
}
|