refactored to new serialization

This commit is contained in:
Nikolay Volf 2016-04-23 18:15:50 +03:00
parent dcb7546d6d
commit fb82d185c7
3 changed files with 112 additions and 102 deletions

View File

@ -66,14 +66,14 @@ impl<T> BinaryConvertable for Option<T> where T: BinaryConvertable {
impl<E: BinaryConvertable> BinaryConvertable for Result<(), E> {
fn size(&self) -> usize {
1usize + match *self {
Ok(ref r) => 0,
Ok(_) => 0,
Err(ref e) => e.size(),
}
}
fn to_bytes(&self, buffer: &mut [u8], length_stack: &mut VecDeque<usize>) -> Result<(), BinaryConvertError> {
match *self {
Ok(ref r) => Ok(()),
Ok(_) => Ok(()),
Err(ref e) => Ok(try!(e.to_bytes(buffer, length_stack))),
}
}
@ -239,29 +239,41 @@ pub fn deserialize_from<T, R>(r: &mut R) -> Result<T, BinaryConvertError>
T: BinaryConvertable
{
let mut fake_stack = VecDeque::new();
let mut length_stack = VecDeque::<usize>::new();
let mut size_buffer = [0u8; 8];
try!(r.read(&mut size_buffer[..]).map_err(|_| BinaryConvertError));
let stack_len = try!(u64::from_bytes(&mut size_buffer[..], &mut fake_stack)) as usize;
if stack_len > 0 {
let mut header_buffer = Vec::with_capacity(stack_len * 8);
unsafe { header_buffer.set_len(stack_len * 8); };
try!(r.read(&mut header_buffer[..]).map_err(|_| BinaryConvertError));
for idx in 0..stack_len {
let stack_item = try!(u64::from_bytes(&header_buffer[idx*8..(idx+1)*8], &mut fake_stack));
length_stack.push_back(stack_item as usize);
match T::len_params() {
0 => {
let fixed_size = mem::size_of::<T>();
let mut payload_buffer = Vec::with_capacity(fixed_size);
unsafe { payload_buffer.set_len(fixed_size); }
try!(r.read(&mut payload_buffer).map_err(|_| BinaryConvertError));
T::from_bytes(&payload_buffer[..], &mut fake_stack)
},
_ => {
let mut length_stack = VecDeque::<usize>::new();
let mut size_buffer = [0u8; 8];
try!(r.read(&mut size_buffer[..]).map_err(|_| BinaryConvertError));
let stack_len = try!(u64::from_bytes(&mut size_buffer[..], &mut fake_stack)) as usize;
if stack_len > 0 {
let mut header_buffer = Vec::with_capacity(stack_len * 8);
unsafe { header_buffer.set_len(stack_len * 8); };
try!(r.read(&mut header_buffer[..]).map_err(|_| BinaryConvertError));
for idx in 0..stack_len {
let stack_item = try!(u64::from_bytes(&header_buffer[idx*8..(idx+1)*8], &mut fake_stack));
length_stack.push_back(stack_item as usize);
}
}
try!(r.read(&mut size_buffer[..]).map_err(|_| BinaryConvertError));
let size = try!(u64::from_bytes(&size_buffer[..], &mut fake_stack)) as usize;
let mut data = Vec::with_capacity(size);
unsafe { data.set_len(size) };
try!(r.read(&mut data).map_err(|_| BinaryConvertError));
T::from_bytes(&data[..], &mut length_stack)
}
}
try!(r.read(&mut size_buffer[..]).map_err(|_| BinaryConvertError));
let size = try!(u64::from_bytes(&size_buffer[..], &mut fake_stack)) as usize;
let mut data = Vec::with_capacity(size);
unsafe { data.set_len(size) };
try!(r.read(&mut data).map_err(|_| BinaryConvertError));
T::from_bytes(&data[..], &mut length_stack)
}
pub fn deserialize<T: BinaryConvertable>(buffer: &[u8]) -> Result<T, BinaryConvertError> {
@ -274,39 +286,52 @@ pub fn serialize_into<T, W>(t: &T, w: &mut W) -> Result<(), BinaryConvertError>
where W: ::std::io::Write,
T: BinaryConvertable
{
let mut length_stack = VecDeque::<usize>::new();
let mut fake_stack = VecDeque::new();
let mut size_buffer = [0u8; 8];
let size = t.size();
let mut buffer = Vec::with_capacity(size);
unsafe { buffer.set_len(size); }
try!(t.to_bytes(&mut buffer[..], &mut length_stack));
match T::len_params() {
0 => {
let fixed_size = mem::size_of::<T>();
let mut buffer = Vec::with_capacity(fixed_size);
unsafe { buffer.set_len(fixed_size); }
try!(t.to_bytes(&mut buffer[..], &mut fake_stack));
try!(w.write(&buffer[..]).map_err(|_| BinaryConvertError));
Ok(())
},
_ => {
let mut length_stack = VecDeque::<usize>::new();
let mut size_buffer = [0u8; 8];
let stack_len = length_stack.len();
try!((stack_len as u64).to_bytes(&mut size_buffer[..], &mut fake_stack));
try!(w.write(&size_buffer[..]).map_err(|_| BinaryConvertError));
if stack_len > 0 {
let mut header_buffer = Vec::with_capacity(stack_len * 8);
unsafe { header_buffer.set_len(stack_len * 8); };
try!((stack_len as u64).to_bytes(&mut header_buffer[0..8], &mut fake_stack));
let mut idx = 0;
loop {
match length_stack.pop_front() {
Some(val) => try!((val as u64).to_bytes(&mut header_buffer[idx * 8..(idx+1) * 8], &mut fake_stack)),
None => { break; }
let size = t.size();
let mut buffer = Vec::with_capacity(size);
unsafe { buffer.set_len(size); }
try!(t.to_bytes(&mut buffer[..], &mut length_stack));
let stack_len = length_stack.len();
try!((stack_len as u64).to_bytes(&mut size_buffer[..], &mut fake_stack));
try!(w.write(&size_buffer[..]).map_err(|_| BinaryConvertError));
if stack_len > 0 {
let mut header_buffer = Vec::with_capacity(stack_len * 8);
unsafe { header_buffer.set_len(stack_len * 8); };
try!((stack_len as u64).to_bytes(&mut header_buffer[0..8], &mut fake_stack));
let mut idx = 0;
loop {
match length_stack.pop_front() {
Some(val) => try!((val as u64).to_bytes(&mut header_buffer[idx * 8..(idx+1) * 8], &mut fake_stack)),
None => { break; }
}
idx = idx + 1;
}
try!(w.write(&header_buffer[..]).map_err(|_| BinaryConvertError));
}
idx = idx + 1;
try!((size as u64).to_bytes(&mut size_buffer[..], &mut fake_stack));
try!(w.write(&size_buffer[..]).map_err(|_| BinaryConvertError));
try!(w.write(&buffer[..]).map_err(|_| BinaryConvertError));
Ok(())
}
try!(w.write(&header_buffer[..]).map_err(|_| BinaryConvertError));
}
try!((size as u64).to_bytes(&mut size_buffer[..], &mut fake_stack));
try!(w.write(&size_buffer[..]).map_err(|_| BinaryConvertError));
try!(w.write(&buffer[..]).map_err(|_| BinaryConvertError));
Ok(())
}
pub fn serialize<T: BinaryConvertable>(t: &T) -> Result<Vec<u8>, BinaryConvertError> {

View File

@ -27,7 +27,11 @@ mod tests {
#[test]
fn call_service() {
// method_num = 0, f = 10 (method Service::commit)
let mut socket = TestSocket::new_ready(vec![0, 16, 0, 0, 0, 10]);
let mut socket = TestSocket::new_ready(vec![
0, 16,
0, 0, 0, 0, 0, 0, 0, 0,
4, 0, 0, 0, 0, 0, 0, 0,
10, 0, 0, 0]);
let service = Service::new();
assert_eq!(0, *service.commits.read().unwrap());
@ -42,13 +46,13 @@ mod tests {
fn call_service_handshake() {
let mut socket = TestSocket::new_ready(vec![0, 0,
// part count = 3
0, 0, 0, 0, 0, 0, 0, 3,
3, 0, 0, 0, 0, 0, 0, 0,
// part sizes
0, 0, 0, 0, 0, 0, 0, 5,
0, 0, 0, 0, 0, 0, 0, 5,
0, 0, 0, 0, 0, 0, 0, 64,
5, 0, 0, 0, 0, 0, 0, 0,
5, 0, 0, 0, 0, 0, 0, 0,
64, 0, 0, 0, 0, 0, 0, 0,
// total payload length
0, 0, 0, 0, 0, 0, 0, 70,
70, 0, 0, 0, 0, 0, 0, 0,
// protocol version
b'1', b'.', b'0', b'.', b'0',
// api version
@ -72,24 +76,34 @@ mod tests {
#[test]
fn call_service_client() {
let mut socket = TestSocket::new();
socket.read_buffer = vec![0, 0, 0, 10];
socket.read_buffer = vec![10, 0, 0, 0];
let service_client = ServiceClient::init(socket);
let result = service_client.commit(5);
assert_eq!(vec![0, 16, 0, 0, 0, 5], service_client.socket().borrow().write_buffer.clone());
assert_eq!(
vec![0, 16,
0, 0, 0, 0, 0, 0, 0, 0,
4, 0, 0, 0, 0, 0, 0, 0,
5, 0, 0, 0],
service_client.socket().borrow().write_buffer.clone());
assert_eq!(10, result);
}
#[test]
fn call_service_client_optional() {
let mut socket = TestSocket::new();
socket.read_buffer = vec![0, 0, 0, 10];
socket.read_buffer = vec![10, 0, 0, 0];
let service_client = ServiceClient::init(socket);
let result = service_client.rollback(Some(5), 10);
assert_eq!(vec![0, 17, 1, 0, 0, 0, 5, 0, 0, 0, 10], service_client.socket().borrow().write_buffer.clone());
assert_eq!(vec![
0, 17,
1, 0, 0, 0, 0, 0, 0, 0,
4, 0, 0, 0, 0, 0, 0, 0,
8, 0, 0, 0, 0, 0, 0, 0,
5, 0, 0, 0, 10, 0, 0, 0], service_client.socket().borrow().write_buffer.clone());
assert_eq!(10, result);
}
@ -123,9 +137,12 @@ mod tests {
assert_eq!(vec![
// message num..
0, 18,
// payload length
0, 0, 0, 0, 0, 0, 0, 16,
// structure raw bytes (bigendians :( )
// variable size length-s
1, 0, 0, 0, 0, 0, 0, 0,
16, 0, 0, 0, 0, 0, 0, 0,
// total length
16, 0, 0, 0, 0, 0, 0, 0,
// items
3, 0, 0, 0, 0, 0, 0, 0,
11, 0, 0, 0, 0, 0, 0, 0],
service_client.socket().borrow().write_buffer.clone());
@ -135,13 +152,19 @@ mod tests {
#[test]
fn can_invoke_generic_service() {
let mut socket = TestSocket::new();
socket.read_buffer = vec![0, 0, 0, 0];
socket.read_buffer = vec![
1, 0, 0, 0, 0, 0, 0, 0,
1, 0, 0, 0, 0, 0, 0, 0,
1, 0, 0, 0, 0, 0, 0, 0,
0,
];
let db_client = DBClient::<u64, _>::init(socket);
let result = db_client.write(vec![0u8; 100]);
assert!(result.is_ok());
}
#[test]
fn can_handshake_generic_service() {
let mut socket = TestSocket::new();

View File

@ -20,7 +20,7 @@ mod tests {
use super::super::service::*;
use nanoipc;
use std::sync::Arc;
use std::io::{Write, Read};
use std::io::Write;
use std::sync::atomic::{Ordering, AtomicBool};
fn dummy_write(addr: &str, buf: &[u8]) -> (::nanomsg::Socket, ::nanomsg::Endpoint) {
@ -67,42 +67,4 @@ mod tests {
worker_should_exit.store(true, Ordering::Relaxed);
assert!(hs.is_ok());
}
#[test]
fn can_receive_dummy_writes_in_thread() {
let url = "ipc:///tmp/parity-test-nano-30.ipc";
let worker_should_exit = Arc::new(AtomicBool::new(false));
let worker_is_ready = Arc::new(AtomicBool::new(false));
let c_worker_should_exit = worker_should_exit.clone();
let c_worker_is_ready = worker_is_ready.clone();
::std::thread::spawn(move || {
let mut worker = init_worker(url);
while !c_worker_should_exit.load(Ordering::Relaxed) {
worker.poll();
c_worker_is_ready.store(true, Ordering::Relaxed);
}
});
while !worker_is_ready.load(Ordering::Relaxed) { }
let (mut _s, _e) = dummy_write(url, &vec![0, 0,
// protocol version
0, 0, 0, 0, 0, 0, 0, 5, b'1', b'.', b'0', b'.', b'0',
// api version
0, 0, 0, 0, 0, 0, 0, 5, b'1', b'.', b'0', b'.', b'0',
// reserved
0, 0, 0, 0, 0, 0, 0, 64,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
]);
let mut buf = vec![0u8;1];
_s.read(&mut buf).unwrap();
assert_eq!(1, buf.len());
assert_eq!(1, buf[0]);
worker_should_exit.store(true, Ordering::Relaxed);
}
}