From dfac17538f2933ac48f1080ac10df2db6dd73f44 Mon Sep 17 00:00:00 2001 From: Nikolay Volf Date: Thu, 19 May 2016 15:36:15 +0300 Subject: [PATCH] ethcore-db crate (#1097) * trait * implentated, lifetime issue still * full api * test mod * working open * get/retrieve * fix warnings and bug * working serialization of &[u8] parameters * client attributes * fix empty payload ser/de * [ci skip] debug assert out * extra deserialization test * extra serialization test * extra serialization test * serialization fixes, nupdate rocksdb * open test working * result bug & remove some scaffolds * fix warnings * more simple tests * consistent quotes * get rid of dedicated is_open flag * hashmap -> btreemap --- Cargo.lock | 6 +- db/Cargo.toml | 24 ++ db/build.rs | 43 ++++ db/src/database.rs | 370 +++++++++++++++++++++++++++++++ db/src/lib.rs | 20 ++ db/src/lib.rs.in | 25 +++ db/src/service.rs | 0 db/src/traits.rs | 73 ++++++ ipc/codegen/src/codegen.rs | 15 +- ipc/codegen/src/serialization.rs | 165 ++++++++++---- ipc/nano/src/lib.rs | 4 +- ipc/rpc/src/binary.rs | 118 +++++++--- ipc/tests/binary.rs.in | 14 ++ ipc/tests/nested.rs.in | 7 + util/Cargo.toml | 2 +- util/src/kvdb.rs | 8 +- 16 files changed, 809 insertions(+), 85 deletions(-) create mode 100644 db/Cargo.toml create mode 100644 db/build.rs create mode 100644 db/src/database.rs create mode 100644 db/src/lib.rs create mode 100644 db/src/lib.rs.in create mode 100644 db/src/service.rs create mode 100644 db/src/traits.rs diff --git a/Cargo.lock b/Cargo.lock index 932b077fc..7f336c2c3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -332,7 +332,7 @@ dependencies = [ "mio 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)", "nix 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)", - "rocksdb 0.4.3 (git+https://github.com/arkpar/rust-rocksdb.git)", + "rocksdb 0.4.3", "rust-crypto 0.2.35 (registry+https://github.com/rust-lang/crates.io-index)", "rustc-serialize 0.3.19 (registry+https://github.com/rust-lang/crates.io-index)", "rustc_version 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", @@ -628,7 +628,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "librocksdb-sys" version = "0.2.3" -source = "git+https://github.com/arkpar/rust-rocksdb.git#ae44ef33ed1358ffc79aa05ed77839d555daba33" dependencies = [ "libc 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -970,10 +969,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "rocksdb" version = "0.4.3" -source = "git+https://github.com/arkpar/rust-rocksdb.git#ae44ef33ed1358ffc79aa05ed77839d555daba33" dependencies = [ "libc 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)", - "librocksdb-sys 0.2.3 (git+https://github.com/arkpar/rust-rocksdb.git)", + "librocksdb-sys 0.2.3", ] [[package]] diff --git a/db/Cargo.toml b/db/Cargo.toml new file mode 100644 index 000000000..8bd26d1f9 --- /dev/null +++ b/db/Cargo.toml @@ -0,0 +1,24 @@ +[package] +description = "Ethcore Database" +homepage = "http://ethcore.io" +license = "GPL-3.0" +name = "ethcore-db" +version = "1.2.0" +authors = ["Ethcore "] +build = "build.rs" + +[build-dependencies] +syntex = "*" +ethcore-ipc-codegen = { path = "../ipc/codegen" } + +[dependencies] +ethcore-util = { path = "../util" } +clippy = { version = "0.0.67", optional = true} +ethcore-devtools = { path = "../devtools" } +ethcore-ipc = { path = "../ipc/rpc" } +rocksdb = { git = "https://github.com/ethcore/rust-rocksdb" } +semver = "0.2" +ethcore-ipc-nano = { path = "../ipc/nano" } + +[features] +dev = ["clippy"] diff --git a/db/build.rs b/db/build.rs new file mode 100644 index 000000000..0f70bd68c --- /dev/null +++ b/db/build.rs @@ -0,0 +1,43 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +extern crate syntex; +extern crate ethcore_ipc_codegen as codegen; + +use std::env; +use std::path::Path; + +pub fn main() { + let out_dir = env::var_os("OUT_DIR").unwrap(); + + // ipc pass + { + let src = Path::new("src/lib.rs.in"); + let dst = Path::new(&out_dir).join("lib.intermediate.rs.in"); + let mut registry = syntex::Registry::new(); + codegen::register(&mut registry); + registry.expand("", &src, &dst).unwrap(); + } + + // binary serialization pass + { + let src = Path::new(&out_dir).join("lib.intermediate.rs.in"); + let dst = Path::new(&out_dir).join("lib.rs"); + let mut registry = syntex::Registry::new(); + codegen::register(&mut registry); + registry.expand("", &src, &dst).unwrap(); + } +} diff --git a/db/src/database.rs b/db/src/database.rs new file mode 100644 index 000000000..4abc98467 --- /dev/null +++ b/db/src/database.rs @@ -0,0 +1,370 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Ethcore rocksdb ipc service + +use traits::*; +use rocksdb::{DB, Writable, WriteBatch, IteratorMode, DBIterator, + IndexType, Options, DBCompactionStyle, BlockBasedOptions, Direction}; +use std::collections::BTreeMap; +use std::sync::{RwLock}; +use std::convert::From; +use ipc::IpcConfig; +use std::ops::*; +use std::mem; +use ipc::binary::BinaryConvertError; +use std::collections::VecDeque; + +impl From for Error { + fn from(s: String) -> Error { + Error::RocksDb(s) + } +} + +pub struct Database { + db: RwLock>, + transactions: RwLock>, + iterators: RwLock>, +} + +impl Database { + pub fn new() -> Database { + Database { + db: RwLock::new(None), + transactions: RwLock::new(BTreeMap::new()), + iterators: RwLock::new(BTreeMap::new()), + } + } +} + +#[derive(Ipc)] +impl DatabaseService for Database { + fn open(&self, config: DatabaseConfig, path: String) -> Result<(), Error> { + let mut db = self.db.write().unwrap(); + if db.is_some() { return Err(Error::AlreadyOpen); } + + let mut opts = Options::new(); + opts.set_max_open_files(256); + opts.create_if_missing(true); + opts.set_use_fsync(false); + opts.set_compaction_style(DBCompactionStyle::DBUniversalCompaction); + if let Some(size) = config.prefix_size { + let mut block_opts = BlockBasedOptions::new(); + block_opts.set_index_type(IndexType::HashSearch); + opts.set_block_based_table_factory(&block_opts); + opts.set_prefix_extractor_fixed_size(size); + } + *db = Some(try!(DB::open(&opts, &path))); + + Ok(()) + } + + fn close(&self) -> Result<(), Error> { + let mut db = self.db.write().unwrap(); + if db.is_none() { return Err(Error::IsClosed); } + + // TODO: wait for transactions to expire/close here? + if self.transactions.read().unwrap().len() > 0 { return Err(Error::UncommitedTransactions); } + + *db = None; + Ok(()) + } + + fn put(&self, key: &[u8], value: &[u8]) -> Result<(), Error> { + let db_lock = self.db.read().unwrap(); + let db = try!(db_lock.as_ref().ok_or(Error::IsClosed)); + + try!(db.put(key, value)); + Ok(()) + } + + fn delete(&self, key: &[u8]) -> Result<(), Error> { + let db_lock = self.db.read().unwrap(); + let db = try!(db_lock.as_ref().ok_or(Error::IsClosed)); + + try!(db.delete(key)); + Ok(()) + } + + fn write(&self, handle: TransactionHandle) -> Result<(), Error> { + let db_lock = self.db.read().unwrap(); + let db = try!(db_lock.as_ref().ok_or(Error::IsClosed)); + + let mut transactions = self.transactions.write().unwrap(); + let batch = try!( + transactions.remove(&handle).ok_or(Error::TransactionUnknown) + ); + try!(db.write(batch)); + Ok(()) + } + + fn get(&self, key: &[u8]) -> Result>, Error> { + let db_lock = self.db.read().unwrap(); + let db = try!(db_lock.as_ref().ok_or(Error::IsClosed)); + + match try!(db.get(key)) { + Some(db_vec) => Ok(Some(db_vec.to_vec())), + None => Ok(None), + } + } + + fn get_by_prefix(&self, prefix: &[u8]) -> Result>, Error> { + let db_lock = self.db.read().unwrap(); + let db = try!(db_lock.as_ref().ok_or(Error::IsClosed)); + + let mut iter = db.iterator(IteratorMode::From(prefix, Direction::Forward)); + match iter.next() { + // TODO: use prefix_same_as_start read option (not availabele in C API currently) + Some((k, v)) => if k[0 .. prefix.len()] == prefix[..] { Ok(Some(v.to_vec())) } else { Ok(None) }, + _ => Ok(None) + } + } + + fn is_empty(&self) -> Result { + let db_lock = self.db.read().unwrap(); + let db = try!(db_lock.as_ref().ok_or(Error::IsClosed)); + + Ok(db.iterator(IteratorMode::Start).next().is_none()) + } + + fn iter(&self) -> Result { + let db_lock = self.db.read().unwrap(); + let db = try!(db_lock.as_ref().ok_or(Error::IsClosed)); + + let mut iterators = self.iterators.write().unwrap(); + let next_iterator = iterators.keys().last().unwrap_or(&0) + 1; + iterators.insert(next_iterator, db.iterator(IteratorMode::Start)); + Ok(next_iterator) + } + + fn iter_next(&self, handle: IteratorHandle) -> Option + { + let mut iterators = self.iterators.write().unwrap(); + let mut iterator = match iterators.get_mut(&handle) { + Some(some_iterator) => some_iterator, + None => { return None; }, + }; + + iterator.next().and_then(|(some_key, some_val)| { + Some(KeyValue { + key: some_key.to_vec(), + value: some_val.to_vec(), + }) + }) + } + + fn transaction_put(&self, transaction: TransactionHandle, key: &[u8], value: &[u8]) -> Result<(), Error> + { + let mut transactions = self.transactions.write().unwrap(); + let batch = try!( + transactions.get_mut(&transaction).ok_or(Error::TransactionUnknown) + ); + try!(batch.put(&key, &value)); + Ok(()) + } + + fn transaction_delete(&self, transaction: TransactionHandle, key: &[u8]) -> Result<(), Error> { + let mut transactions = self.transactions.write().unwrap(); + let batch = try!( + transactions.get_mut(&transaction).ok_or(Error::TransactionUnknown) + ); + try!(batch.delete(&key)); + Ok(()) + } + + fn new_transaction(&self) -> TransactionHandle { + let mut transactions = self.transactions.write().unwrap(); + let next_transaction = transactions.keys().last().unwrap_or(&0) + 1; + transactions.insert(next_transaction, WriteBatch::new()); + + next_transaction + } +} + +// TODO : put proper at compile-time +impl IpcConfig for Database {} + + +#[cfg(test)] +mod test { + + use super::Database; + use traits::*; + use devtools::*; + + #[test] + fn can_be_created() { + let db = Database::new(); + assert!(db.is_empty().is_err()); + } + + #[test] + fn can_be_open_empty() { + let db = Database::new(); + let path = RandomTempPath::create_dir(); + db.open(DatabaseConfig { prefix_size: Some(8) }, path.as_str().to_owned()).unwrap(); + + assert!(db.is_empty().is_ok()); + } + + #[test] + fn can_store_key() { + let db = Database::new(); + let path = RandomTempPath::create_dir(); + db.open(DatabaseConfig { prefix_size: None }, path.as_str().to_owned()).unwrap(); + + db.put("xxx".as_bytes(), "1".as_bytes()).unwrap(); + assert!(!db.is_empty().unwrap()); + } + + #[test] + fn can_retrieve() { + let db = Database::new(); + let path = RandomTempPath::create_dir(); + db.open(DatabaseConfig { prefix_size: None }, path.as_str().to_owned()).unwrap(); + db.put("xxx".as_bytes(), "1".as_bytes()).unwrap(); + db.close().unwrap(); + + db.open(DatabaseConfig { prefix_size: None }, path.as_str().to_owned()).unwrap(); + assert_eq!(db.get("xxx".as_bytes()).unwrap().unwrap(), "1".as_bytes().to_vec()); + } +} + +#[cfg(test)] +mod client_tests { + use super::{DatabaseClient, Database}; + use traits::*; + use devtools::*; + use nanoipc; + use std::sync::Arc; + use std::sync::atomic::{Ordering, AtomicBool}; + + fn init_worker(addr: &str) -> nanoipc::Worker { + let mut worker = nanoipc::Worker::::new(&Arc::new(Database::new())); + worker.add_duplex(addr).unwrap(); + worker + } + + #[test] + fn can_call_handshake() { + let url = "ipc:///tmp/parity-db-ipc-test-10.ipc"; + let worker_should_exit = Arc::new(AtomicBool::new(false)); + let worker_is_ready = Arc::new(AtomicBool::new(false)); + let c_worker_should_exit = worker_should_exit.clone(); + let c_worker_is_ready = worker_is_ready.clone(); + + ::std::thread::spawn(move || { + let mut worker = init_worker(url); + while !c_worker_should_exit.load(Ordering::Relaxed) { + worker.poll(); + c_worker_is_ready.store(true, Ordering::Relaxed); + } + }); + + while !worker_is_ready.load(Ordering::Relaxed) { } + let client = nanoipc::init_duplex_client::>(url).unwrap(); + + let hs = client.handshake(); + + worker_should_exit.store(true, Ordering::Relaxed); + assert!(hs.is_ok()); + } + + #[test] + fn can_open_db() { + let url = "ipc:///tmp/parity-db-ipc-test-20.ipc"; + let path = RandomTempPath::create_dir(); + + let worker_should_exit = Arc::new(AtomicBool::new(false)); + let worker_is_ready = Arc::new(AtomicBool::new(false)); + let c_worker_should_exit = worker_should_exit.clone(); + let c_worker_is_ready = worker_is_ready.clone(); + + ::std::thread::spawn(move || { + let mut worker = init_worker(url); + while !c_worker_should_exit.load(Ordering::Relaxed) { + worker.poll(); + c_worker_is_ready.store(true, Ordering::Relaxed); + } + }); + + while !worker_is_ready.load(Ordering::Relaxed) { } + let client = nanoipc::init_duplex_client::>(url).unwrap(); + + client.open(DatabaseConfig { prefix_size: Some(8) }, path.as_str().to_owned()).unwrap(); + assert!(client.is_empty().unwrap()); + worker_should_exit.store(true, Ordering::Relaxed); + } + + #[test] + fn can_put() { + let url = "ipc:///tmp/parity-db-ipc-test-30.ipc"; + let path = RandomTempPath::create_dir(); + + let worker_should_exit = Arc::new(AtomicBool::new(false)); + let worker_is_ready = Arc::new(AtomicBool::new(false)); + let c_worker_should_exit = worker_should_exit.clone(); + let c_worker_is_ready = worker_is_ready.clone(); + + ::std::thread::spawn(move || { + let mut worker = init_worker(url); + while !c_worker_should_exit.load(Ordering::Relaxed) { + worker.poll(); + c_worker_is_ready.store(true, Ordering::Relaxed); + } + }); + + while !worker_is_ready.load(Ordering::Relaxed) { } + let client = nanoipc::init_duplex_client::>(url).unwrap(); + + client.open(DatabaseConfig { prefix_size: Some(8) }, path.as_str().to_owned()).unwrap(); + client.put("xxx".as_bytes(), "1".as_bytes()).unwrap(); + client.close().unwrap(); + + worker_should_exit.store(true, Ordering::Relaxed); + } + + #[test] + fn can_put_and_read() { + let url = "ipc:///tmp/parity-db-ipc-test-40.ipc"; + let path = RandomTempPath::create_dir(); + + let worker_should_exit = Arc::new(AtomicBool::new(false)); + let worker_is_ready = Arc::new(AtomicBool::new(false)); + let c_worker_should_exit = worker_should_exit.clone(); + let c_worker_is_ready = worker_is_ready.clone(); + + ::std::thread::spawn(move || { + let mut worker = init_worker(url); + while !c_worker_should_exit.load(Ordering::Relaxed) { + worker.poll(); + c_worker_is_ready.store(true, Ordering::Relaxed); + } + }); + + while !worker_is_ready.load(Ordering::Relaxed) { } + let client = nanoipc::init_duplex_client::>(url).unwrap(); + + client.open(DatabaseConfig { prefix_size: Some(8) }, path.as_str().to_owned()).unwrap(); + client.put("xxx".as_bytes(), "1".as_bytes()).unwrap(); + client.close().unwrap(); + + client.open(DatabaseConfig { prefix_size: Some(8) }, path.as_str().to_owned()).unwrap(); + assert_eq!(client.get("xxx".as_bytes()).unwrap().unwrap(), "1".as_bytes().to_vec()); + + worker_should_exit.store(true, Ordering::Relaxed); + } +} diff --git a/db/src/lib.rs b/db/src/lib.rs new file mode 100644 index 000000000..1daaf55c9 --- /dev/null +++ b/db/src/lib.rs @@ -0,0 +1,20 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Database ipc service + +#![allow(dead_code, unused_assignments, unused_variables)] // codegen issues +include!(concat!(env!("OUT_DIR"), "/lib.rs")); diff --git a/db/src/lib.rs.in b/db/src/lib.rs.in new file mode 100644 index 000000000..694b4e3e1 --- /dev/null +++ b/db/src/lib.rs.in @@ -0,0 +1,25 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +extern crate ethcore_ipc as ipc; +extern crate rocksdb; +extern crate ethcore_devtools as devtools; +extern crate semver; +extern crate ethcore_ipc_nano as nanoipc; +extern crate ethcore_util as util; + +pub mod database; +pub mod traits; diff --git a/db/src/service.rs b/db/src/service.rs new file mode 100644 index 000000000..e69de29bb diff --git a/db/src/traits.rs b/db/src/traits.rs new file mode 100644 index 000000000..1e5b2acf4 --- /dev/null +++ b/db/src/traits.rs @@ -0,0 +1,73 @@ +//! Ethcore database trait + +use ipc::BinaryConvertable; +use std::mem; +use ipc::binary::BinaryConvertError; +use std::collections::VecDeque; + +pub type TransactionHandle = u32; +pub type IteratorHandle = u32; + +#[derive(Binary)] +pub struct KeyValue { + pub key: Vec, + pub value: Vec, +} + +#[derive(Debug, Binary)] +pub enum Error { + AlreadyOpen, + IsClosed, + RocksDb(String), + TransactionUnknown, + IteratorUnknown, + UncommitedTransactions, +} + +/// Database configuration +#[derive(Binary)] +pub struct DatabaseConfig { + /// Optional prefix size in bytes. Allows lookup by partial key. + pub prefix_size: Option +} + +pub trait DatabaseService { + /// Opens database in the specified path + fn open(&self, config: DatabaseConfig, path: String) -> Result<(), Error>; + + /// Closes database + fn close(&self) -> Result<(), Error>; + + /// Insert a key-value pair in the transaction. Any existing value value will be overwritten. + fn put(&self, key: &[u8], value: &[u8]) -> Result<(), Error>; + + /// Delete value by key. + fn delete(&self, key: &[u8]) -> Result<(), Error>; + + /// Insert a key-value pair in the transaction. Any existing value value will be overwritten. + fn transaction_put(&self, transaction: TransactionHandle, key: &[u8], value: &[u8]) -> Result<(), Error>; + + /// Delete value by key using transaction + fn transaction_delete(&self, transaction: TransactionHandle, key: &[u8]) -> Result<(), Error>; + + /// Commit transaction to database. + fn write(&self, tr: TransactionHandle) -> Result<(), Error>; + + /// Initiate new transaction on database + fn new_transaction(&self) -> TransactionHandle; + + /// Get value by key. + fn get(&self, key: &[u8]) -> Result>, Error>; + + /// Get value by partial key. Prefix size should match configured prefix size. + fn get_by_prefix(&self, prefix: &[u8]) -> Result>, Error>; + + /// Check if there is anything in the database. + fn is_empty(&self) -> Result; + + /// Get handle to iterate through keys + fn iter(&self) -> Result; + + /// Next key-value for the the given iterator + fn iter_next(&self, iterator: IteratorHandle) -> Option; +} diff --git a/ipc/codegen/src/codegen.rs b/ipc/codegen/src/codegen.rs index 362257fcf..57db7d261 100644 --- a/ipc/codegen/src/codegen.rs +++ b/ipc/codegen/src/codegen.rs @@ -87,6 +87,13 @@ fn field_name(builder: &aster::AstBuilder, arg: &Arg) -> ast::Ident { } } +pub fn replace_slice_u8(builder: &aster::AstBuilder, ty: &P) -> P { + if ::syntax::print::pprust::ty_to_string(&strip_ptr(ty)) == "[u8]" { + return builder.ty().id("Vec") + } + ty.clone() +} + fn push_invoke_signature_aster( builder: &aster::AstBuilder, implement: &ImplItem, @@ -111,8 +118,8 @@ fn push_invoke_signature_aster( .attr().word("derive(Binary)") .attr().word("allow(non_camel_case_types)") .struct_(name_str.as_str()) - .field(arg_name.as_str()).ty() - .build(strip_ptr(arg_ty)); + .field(arg_name.as_str()) + .ty().build(replace_slice_u8(builder, &strip_ptr(arg_ty))); arg_names.push(arg_name); arg_tys.push(arg_ty.clone()); @@ -120,7 +127,7 @@ fn push_invoke_signature_aster( let arg_name = format!("{}", field_name(builder, &arg)); let arg_ty = &arg.ty; - tree = tree.field(arg_name.as_str()).ty().build(strip_ptr(arg_ty)); + tree = tree.field(arg_name.as_str()).ty().build(replace_slice_u8(builder, &strip_ptr(arg_ty))); arg_names.push(arg_name); arg_tys.push(arg_ty.clone()); } @@ -406,7 +413,7 @@ fn implement_client_method_body( request_serialization_statements.push( quote_stmt!(cx, let mut socket = socket_ref.deref_mut())); request_serialization_statements.push( - quote_stmt!(cx, ::ipc::invoke($index_ident, Vec::new(), &mut socket))); + quote_stmt!(cx, ::ipc::invoke($index_ident, &None, &mut socket))); request_serialization_statements }; diff --git a/ipc/codegen/src/serialization.rs b/ipc/codegen/src/serialization.rs index 8cd9ffdca..9854993c5 100644 --- a/ipc/codegen/src/serialization.rs +++ b/ipc/codegen/src/serialization.rs @@ -175,30 +175,46 @@ fn binary_expr_struct( ) -> Result { let size_exprs: Vec> = fields.iter().enumerate().map(|(index, field)| { - - if ::syntax::print::pprust::ty_to_string(&codegen::strip_ptr(&field.ty)) == "u8" { - return quote_expr!(cx, 1); - } - - let field_type_ident = builder.id( - &::syntax::print::pprust::ty_to_string(&codegen::strip_ptr(&field.ty))); - - let field_type_ident_qualified = builder.id( - replace_qualified(&::syntax::print::pprust::ty_to_string(&codegen::strip_ptr(&field.ty)))); - + let raw_ident = ::syntax::print::pprust::ty_to_string(&codegen::strip_ptr(&field.ty)); let index_ident = builder.id(format!("__field{}", index)); - value_ident.and_then(|x| { - let field_id = builder.id(field.ident.unwrap()); - Some(quote_expr!(cx, - match $field_type_ident_qualified::len_params() { - 0 => mem::size_of::<$field_type_ident>(), - _ => $x. $field_id .size(), - })) - }) - .unwrap_or_else(|| quote_expr!(cx, match $field_type_ident_qualified::len_params() { - 0 => mem::size_of::<$field_type_ident>(), - _ => $index_ident .size(), - })) + match raw_ident.as_ref() { + "u8" => { + quote_expr!(cx, 1) + }, + "[u8]" => { + value_ident.and_then(|x| { + let field_id = builder.id(field.ident.unwrap()); + Some(quote_expr!(cx, $x. $field_id .len())) + }) + .unwrap_or_else(|| { + quote_expr!(cx, $index_ident .len()) + } + ) + } + _ => { + let field_type_ident = builder.id( + &::syntax::print::pprust::ty_to_string(&codegen::strip_ptr(&field.ty))); + + let field_type_ident_qualified = builder.id( + replace_qualified(&::syntax::print::pprust::ty_to_string(&codegen::strip_ptr(&field.ty)))); + + value_ident.and_then(|x| + { + let field_id = builder.id(field.ident.unwrap()); + Some(quote_expr!(cx, + match $field_type_ident_qualified::len_params() { + 0 => mem::size_of::<$field_type_ident>(), + _ => $x. $field_id .size(), + })) + }) + .unwrap_or_else(|| { + quote_expr!(cx, match $field_type_ident_qualified::len_params() { + 0 => mem::size_of::<$field_type_ident>(), + _ => $index_ident .size(), + }) + }) + } + } }).collect(); let first_size_expr = size_exprs[0].clone(); @@ -233,17 +249,26 @@ fn binary_expr_struct( }, }; - if ::syntax::print::pprust::ty_to_string(&codegen::strip_ptr(&field.ty)) == "u8" { - write_stmts.push(quote_stmt!(cx, let next_line = offset + 1;).unwrap()); - write_stmts.push(quote_stmt!(cx, buffer[offset] = $member_expr; ).unwrap()); - } - else { - write_stmts.push(quote_stmt!(cx, let next_line = offset + match $field_type_ident_qualified::len_params() { - 0 => mem::size_of::<$field_type_ident>(), - _ => { let size = $member_expr .size(); length_stack.push_back(size); size }, - }).unwrap()); - write_stmts.push(quote_stmt!(cx, - if let Err(e) = $member_expr .to_bytes(&mut buffer[offset..next_line], length_stack) { return Err(e) };).unwrap()); + let raw_ident = ::syntax::print::pprust::ty_to_string(&codegen::strip_ptr(&field.ty)); + match raw_ident.as_ref() { + "u8" => { + write_stmts.push(quote_stmt!(cx, let next_line = offset + 1;).unwrap()); + write_stmts.push(quote_stmt!(cx, buffer[offset] = $member_expr; ).unwrap()); + }, + "[u8]" => { + write_stmts.push(quote_stmt!(cx, let size = $member_expr .len();).unwrap()); + write_stmts.push(quote_stmt!(cx, let next_line = offset + size;).unwrap()); + write_stmts.push(quote_stmt!(cx, length_stack.push_back(size);).unwrap()); + write_stmts.push(quote_stmt!(cx, buffer[offset..next_line].clone_from_slice($member_expr); ).unwrap()); + } + _ => { + write_stmts.push(quote_stmt!(cx, let next_line = offset + match $field_type_ident_qualified::len_params() { + 0 => mem::size_of::<$field_type_ident>(), + _ => { let size = $member_expr .size(); length_stack.push_back(size); size }, + }).unwrap()); + write_stmts.push(quote_stmt!(cx, + if let Err(e) = $member_expr .to_bytes(&mut buffer[offset..next_line], length_stack) { return Err(e) };).unwrap()); + } } write_stmts.push(quote_stmt!(cx, offset = next_line; ).unwrap()); @@ -251,15 +276,21 @@ fn binary_expr_struct( let field_index = builder.id(&format!("{}", index)); map_stmts.push(quote_stmt!(cx, map[$field_index] = total;).unwrap()); - if ::syntax::print::pprust::ty_to_string(&codegen::strip_ptr(&field.ty)) == "u8" { - map_stmts.push(quote_stmt!(cx, total += 1;).unwrap()); - } - else { - map_stmts.push(quote_stmt!(cx, let size = match $field_type_ident_qualified::len_params() { - 0 => mem::size_of::<$field_type_ident>(), - _ => length_stack.pop_front().unwrap(), - }).unwrap()); - map_stmts.push(quote_stmt!(cx, total += size;).unwrap()); + match raw_ident.as_ref() { + "u8" => { + map_stmts.push(quote_stmt!(cx, total = total + 1;).unwrap()); + }, + "[u8]" => { + map_stmts.push(quote_stmt!(cx, let size = length_stack.pop_front().unwrap();).unwrap()); + map_stmts.push(quote_stmt!(cx, total = total + size;).unwrap()); + }, + _ => { + map_stmts.push(quote_stmt!(cx, let size = match $field_type_ident_qualified::len_params() { + 0 => mem::size_of::<$field_type_ident>(), + _ => length_stack.pop_front().unwrap(), + }).unwrap()); + map_stmts.push(quote_stmt!(cx, total = total + size;).unwrap()); + } } }; @@ -419,6 +450,30 @@ fn fields_sequence( continue; } + // special case for [u8], it just takes a byte sequence + if ::syntax::print::pprust::ty_to_string(&field.ty) == "[u8]" { + tt.push(Token(_sp, token::Ident(ext_cx.ident_of("buffer")))); + + tt.push(Token(_sp, token::OpenDelim(token::Bracket))); + tt.push(Token(_sp, token::Ident(ext_cx.ident_of("map")))); + tt.push(Token(_sp, token::OpenDelim(token::Bracket))); + tt.push(Token(_sp, token::Ident(ext_cx.ident_of(&format!("{}", idx))))); + tt.push(Token(_sp, token::CloseDelim(token::Bracket))); + tt.push(Token(_sp, token::DotDot)); + + if idx+1 != fields.len() { + tt.push(Token(_sp, token::Ident(ext_cx.ident_of("map")))); + tt.push(Token(_sp, token::OpenDelim(token::Bracket))); + tt.push(Token(_sp, token::Ident(ext_cx.ident_of(&format!("{}", idx+1))))); + tt.push(Token(_sp, token::CloseDelim(token::Bracket))); + } + + tt.push(Token(_sp, token::CloseDelim(token::Bracket))); + + tt.push(Token(_sp, token::Comma)); + continue; + } + tt.push(Token(_sp, token::Ident(ext_cx.ident_of("try!")))); tt.push(Token(_sp, token::OpenDelim(token::Paren))); tt.push( @@ -513,6 +568,30 @@ fn named_fields_sequence( continue; } + // special case for [u8], it just takes a byte sequence + if ::syntax::print::pprust::ty_to_string(&field.ty) == "[u8]" { + tt.push(Token(_sp, token::Ident(ext_cx.ident_of("buffer")))); + + tt.push(Token(_sp, token::OpenDelim(token::Bracket))); + tt.push(Token(_sp, token::Ident(ext_cx.ident_of("map")))); + tt.push(Token(_sp, token::OpenDelim(token::Bracket))); + tt.push(Token(_sp, token::Ident(ext_cx.ident_of(&format!("{}", idx))))); + tt.push(Token(_sp, token::CloseDelim(token::Bracket))); + tt.push(Token(_sp, token::DotDot)); + + if idx+1 != fields.len() { + tt.push(Token(_sp, token::Ident(ext_cx.ident_of("map")))); + tt.push(Token(_sp, token::OpenDelim(token::Bracket))); + tt.push(Token(_sp, token::Ident(ext_cx.ident_of(&format!("{}", idx+1))))); + tt.push(Token(_sp, token::CloseDelim(token::Bracket))); + } + + tt.push(Token(_sp, token::CloseDelim(token::Bracket))); + + tt.push(Token(_sp, token::Comma)); + continue; + } + tt.push(Token(_sp, token::Ident(ext_cx.ident_of("try!")))); tt.push(Token(_sp, token::OpenDelim(token::Paren))); tt.push(Token( diff --git a/ipc/nano/src/lib.rs b/ipc/nano/src/lib.rs index fdcf1f1f2..964e52c68 100644 --- a/ipc/nano/src/lib.rs +++ b/ipc/nano/src/lib.rs @@ -116,6 +116,8 @@ impl Worker where S: IpcInterface { /// Polls all sockets, reads and dispatches method invocations pub fn poll(&mut self) { + use std::io::Write; + let mut request = PollRequest::new(&mut self.polls[..]); let _result_guard = Socket::poll(&mut request, POLL_TIMEOUT); @@ -135,7 +137,7 @@ impl Worker where S: IpcInterface { // dispatching for ipc interface let result = self.service.dispatch_buf(method_num, payload); - if let Err(e) = socket.nb_write(&result) { + if let Err(e) = socket.write(&result) { warn!(target: "ipc", "Failed to write response: {:?}", e); } } diff --git a/ipc/rpc/src/binary.rs b/ipc/rpc/src/binary.rs index 4fb359f7c..a985a2de9 100644 --- a/ipc/rpc/src/binary.rs +++ b/ipc/rpc/src/binary.rs @@ -67,7 +67,7 @@ impl BinaryConvertable for Option where T: BinaryConvertable { impl BinaryConvertable for Result<(), E> { fn size(&self) -> usize { - 1usize + match *self { + match *self { Ok(_) => 0, Err(ref e) => e.size(), } @@ -75,17 +75,17 @@ impl BinaryConvertable for Result<(), E> { fn to_bytes(&self, buffer: &mut [u8], length_stack: &mut VecDeque) -> Result<(), BinaryConvertError> { match *self { - Ok(_) => Ok(()), + Ok(_) => Err(BinaryConvertError), Err(ref e) => Ok(try!(e.to_bytes(buffer, length_stack))), } } fn from_bytes(buffer: &[u8], length_stack: &mut VecDeque) -> Result { - match buffer[0] { - 0 => Ok(Ok(())), - 1 => Ok(Err(try!(E::from_bytes(&buffer[1..], length_stack)))), - _ => Err(BinaryConvertError) - } + Ok(Err(try!(E::from_bytes(&buffer, length_stack)))) + } + + fn from_empty_bytes() -> Result { + Ok(Ok(())) } fn len_params() -> usize { @@ -104,8 +104,8 @@ impl BinaryConvertable for Result) -> Result<(), BinaryConvertError> { match *self { - Ok(ref r) => Ok(try!(r.to_bytes(buffer, length_stack))), - Err(ref e) => Ok(try!(e.to_bytes(buffer, length_stack))), + Ok(ref r) => { buffer[0] = 0; Ok(try!(r.to_bytes(&mut buffer[1..], length_stack))) }, + Err(ref e) => { buffer[1] = 1; Ok(try!(e.to_bytes(&mut buffer[1..], length_stack))) }, } } @@ -297,29 +297,29 @@ pub fn deserialize_from(r: &mut R) -> Result T::from_bytes(&payload_buffer[..], &mut fake_stack) }, _ => { - let mut length_stack = VecDeque::::new(); - let mut size_buffer = [0u8; 8]; - try!(r.read(&mut size_buffer[..]).map_err(|_| BinaryConvertError)); - let stack_len = try!(u64::from_bytes(&mut size_buffer[..], &mut fake_stack)) as usize; - if stack_len > 0 { - let mut header_buffer = Vec::with_capacity(stack_len * 8); - unsafe { header_buffer.set_len(stack_len * 8); }; + let mut payload = Vec::new(); + try!(r.read_to_end(&mut payload).map_err(|_| BinaryConvertError)); - try!(r.read(&mut header_buffer[..]).map_err(|_| BinaryConvertError)); + let mut length_stack = VecDeque::::new(); + let stack_len = try!(u64::from_bytes(&payload[0..8], &mut fake_stack)) as usize; + + if stack_len > 0 { for idx in 0..stack_len { - let stack_item = try!(u64::from_bytes(&header_buffer[idx*8..(idx+1)*8], &mut fake_stack)); + let stack_item = try!(u64::from_bytes(&payload[8 + idx*8..8 + (idx+1)*8], &mut fake_stack)); length_stack.push_back(stack_item as usize); } } - try!(r.read(&mut size_buffer[..]).map_err(|_| BinaryConvertError)); - let size = try!(u64::from_bytes(&size_buffer[..], &mut fake_stack)) as usize; - - let mut data = Vec::with_capacity(size); - unsafe { data.set_len(size) }; - try!(r.read(&mut data).map_err(|_| BinaryConvertError)); - - T::from_bytes(&data[..], &mut length_stack) + //try!(r.read(&mut size_buffer).map_err(|_| BinaryConvertError)); + let size = try!(u64::from_bytes(&payload[8+stack_len*8..16+stack_len*8], &mut fake_stack)) as usize; + match size { + 0 => { + T::from_empty_bytes() + }, + _ => { + T::from_bytes(&payload[16+stack_len*8..], &mut length_stack) + } + } }, } } @@ -350,6 +350,12 @@ pub fn serialize_into(t: &T, w: &mut W) -> Result<(), BinaryConvertError> let mut size_buffer = [0u8; 8]; let size = t.size(); + if size == 0 { + try!(w.write(&size_buffer).map_err(|_| BinaryConvertError)); + try!(w.write(&size_buffer).map_err(|_| BinaryConvertError)); + return Ok(()); + } + let mut buffer = Vec::with_capacity(size); unsafe { buffer.set_len(size); } try!(t.to_bytes(&mut buffer[..], &mut length_stack)); @@ -386,7 +392,8 @@ pub fn serialize(t: &T) -> Result, BinaryConvertEr use std::io::Cursor; let mut buff = Cursor::new(Vec::new()); try!(serialize_into(t, &mut buff)); - Ok(buff.into_inner()) + let into_inner = buff.into_inner(); + Ok(into_inner) } #[macro_export] @@ -544,7 +551,7 @@ fn deserialize_from_ok() { fn serialize_into_deserialize_from() { use std::io::{Cursor, SeekFrom, Seek}; - let mut buff = Cursor::new(vec![0u8; 1024]); + let mut buff = Cursor::new(Vec::new()); let mut v = Vec::new(); v.push(Some(5u64)); v.push(None); @@ -557,3 +564,58 @@ fn serialize_into_deserialize_from() { let de_v = deserialize_from::>, _>(&mut buff).unwrap(); assert_eq!(v, de_v); } + +#[test] +fn serialize_opt_vec() { + use std::io::Cursor; + + let mut buff = Cursor::new(Vec::new()); + let optional_vec: Option> = None; + serialize_into(&optional_vec, &mut buff).unwrap(); + + assert_eq!(&vec![0u8; 16], buff.get_ref()); +} + +#[test] +fn serialize_opt_vec_payload() { + use std::io::Cursor; + + let optional_vec: Option> = None; + let payload = serialize(&optional_vec).unwrap(); + + assert_eq!(vec![0u8;16], payload); +} + +#[test] +fn deserialize_opt_vec() { + use std::io::Cursor; + let mut buff = Cursor::new(vec![0u8; 16]); + + let vec = deserialize_from::>, _>(&mut buff).unwrap(); + + assert!(vec.is_none()); +} + +#[test] +fn deserialize_simple_err() { + use std::io::Cursor; + let mut buff = Cursor::new(vec![0u8; 16]); + + let result = deserialize_from::, _>(&mut buff).unwrap(); + + assert!(result.is_ok()); +} + +#[test] +fn deserialize_opt_vec_in_out() { + use std::io::{Cursor, SeekFrom, Seek}; + + let mut buff = Cursor::new(Vec::new()); + let optional_vec: Option> = None; + serialize_into(&optional_vec, &mut buff).unwrap(); + + buff.seek(SeekFrom::Start(0)).unwrap(); + let vec = deserialize_from::>, _>(&mut buff).unwrap(); + + assert!(vec.is_none()); +} diff --git a/ipc/tests/binary.rs.in b/ipc/tests/binary.rs.in index 74dd39c1b..fd017e340 100644 --- a/ipc/tests/binary.rs.in +++ b/ipc/tests/binary.rs.in @@ -42,3 +42,17 @@ pub enum EnumWithStruct { Left, Right { how_much: u64 }, } + +#[derive(Binary)] +pub struct TwoVec { + v1: Vec, + v2: Vec, +} + +#[test] +fn opt_two_vec() { + let example: Option = None; + + let serialized = ::ipc::binary::serialize(&example).unwrap(); + assert_eq!(serialized, vec![0u8; 16]); +} diff --git a/ipc/tests/nested.rs.in b/ipc/tests/nested.rs.in index 355067689..26f344d92 100644 --- a/ipc/tests/nested.rs.in +++ b/ipc/tests/nested.rs.in @@ -30,6 +30,7 @@ pub struct DB { pub trait DBWriter { fn write(&self, data: Vec) -> Result<(), DBError>; + fn write_slice(&self, data: &[u8]) -> Result<(), DBError>; } impl IpcConfig for DB {} @@ -44,5 +45,11 @@ impl DBWriter for DB { *writes = *writes + data.len() as u64; Ok(()) } + + fn write_slice(&self, data: &[u8]) -> Result<(), DBError> { + let mut writes = self.writes.write().unwrap(); + *writes = *writes + data.len() as u64; + Ok(()) + } } diff --git a/util/Cargo.toml b/util/Cargo.toml index 1ed4e8e61..6e3f3c79d 100644 --- a/util/Cargo.toml +++ b/util/Cargo.toml @@ -17,7 +17,7 @@ nix ="0.5.0" rand = "0.3.12" time = "0.1.34" tiny-keccak = "1.0" -rocksdb = { git = "https://github.com/arkpar/rust-rocksdb.git" } +rocksdb = { git = "https://github.com/ethcore/rust-rocksdb" } lazy_static = "0.1" eth-secp256k1 = { git = "https://github.com/ethcore/rust-secp256k1" } rust-crypto = "0.2.34" diff --git a/util/src/kvdb.rs b/util/src/kvdb.rs index 9de71bd35..c0eb7c10c 100644 --- a/util/src/kvdb.rs +++ b/util/src/kvdb.rs @@ -55,11 +55,11 @@ pub struct DatabaseConfig { } /// Database iterator -pub struct DatabaseIterator<'a> { - iter: DBIterator<'a>, +pub struct DatabaseIterator { + iter: DBIterator, } -impl<'a> Iterator for DatabaseIterator<'a> { +impl<'a> Iterator for DatabaseIterator { type Item = (Box<[u8]>, Box<[u8]>); fn next(&mut self) -> Option { @@ -135,7 +135,7 @@ impl Database { /// Get value by partial key. Prefix size should match configured prefix size. pub fn get_by_prefix(&self, prefix: &[u8]) -> Option> { - let mut iter = self.db.iterator(IteratorMode::From(prefix, Direction::forward)); + let mut iter = self.db.iterator(IteratorMode::From(prefix, Direction::Forward)); match iter.next() { // TODO: use prefix_same_as_start read option (not availabele in C API currently) Some((k, v)) => if k[0 .. prefix.len()] == prefix[..] { Some(v) } else { None },