diff --git a/ethcore/src/client.rs b/ethcore/src/client.rs index 2b5ec5ccb..5a1bda51e 100644 --- a/ethcore/src/client.rs +++ b/ethcore/src/client.rs @@ -87,6 +87,8 @@ pub struct ClientConfig { pub blockchain: BlockChainConfig, /// Prefer journal rather than archive. pub prefer_journal: bool, + /// The name of the client instance. + pub name: String, } impl Default for ClientConfig { @@ -95,6 +97,7 @@ impl Default for ClientConfig { queue: Default::default(), blockchain: Default::default(), prefer_journal: false, + name: Default::default(), } } } @@ -224,7 +227,7 @@ pub struct Client where V: Verifier { } const HISTORY: u64 = 1000; -const CLIENT_DB_VER_STR: &'static str = "5.0"; +const CLIENT_DB_VER_STR: &'static str = "5.2"; impl Client { /// Create a new client with given spec and DB path. diff --git a/parity/main.rs b/parity/main.rs index 605fb315d..43b0504f1 100644 --- a/parity/main.rs +++ b/parity/main.rs @@ -53,6 +53,16 @@ use docopt::Docopt; use daemonize::Daemonize; use number_prefix::{binary_prefix, Standalone, Prefixed}; +fn die_with_message(msg: &str) -> ! { + println!("ERROR: {}", msg); + exit(1); +} + +#[macro_export] +macro_rules! die { + ($($arg:tt)*) => (die_with_message(&format!("{}", format_args!($($arg)*)))); +} + const USAGE: &'static str = r#" Parity. Ethereum Client. By Wood/Paronyan/Kotewicz/Drwięga/Volf. @@ -62,13 +72,16 @@ Usage: parity daemon [options] [ --no-bootstrap | ... ] parity [options] [ --no-bootstrap | ... ] -Options: +Protocol Options: --chain CHAIN Specify the blockchain type. CHAIN may be either a JSON chain specification file - or frontier, mainnet, morden, or testnet [default: frontier]. + or olympic, frontier, homestead, mainnet, morden, or testnet [default: homestead]. + --testnet Equivalent to --chain testnet (geth-compatible). + --networkid INDEX Override the network identifier from the chain we are on. --archive Client should not prune the state/storage trie. - -d --db-path PATH Specify the database & configuration directory path [default: $HOME/.parity] - --keys-path PATH Specify the path for JSON key files to be found [default: $HOME/.web3/keys] + -d --datadir PATH Specify the database & configuration directory path [default: $HOME/.parity] + --identity NAME Specify your node's name. +Networking Options: --no-bootstrap Don't bother trying to connect to any nodes initially. --listen-address URL Specify the IP/port on which to listen for peers [default: 0.0.0.0:30304]. --public-address URL Specify the IP/port on which peers may connect. @@ -78,18 +91,32 @@ Options: --no-upnp Disable trying to figure out the correct public adderss over UPnP. --node-key KEY Specify node secret key, either as 64-character hex string or input to SHA3 operation. +API and Console Options: + -j --jsonrpc Enable the JSON-RPC API sever. + --jsonrpc-addr HOST Specify the hostname portion of the JSONRPC API server [default: 127.0.0.1]. + --jsonrpc-port PORT Specify the port portion of the JSONRPC API server [default: 8545]. + --jsonrpc-cors URL Specify CORS header for JSON-RPC API responses [default: null]. + --jsonrpc-apis APIS Specify the APIs available through the JSONRPC interface. APIS is a comma-delimited + list of API name. Possible name are web3, eth and net. [default: web3,eth,net]. + --rpc Equivalent to --jsonrpc (geth-compatible). + --rpcaddr HOST Equivalent to --jsonrpc-addr HOST (geth-compatible). + --rpcport PORT Equivalent to --jsonrpc-port PORT (geth-compatible). + --rpcapi APIS Equivalent to --jsonrpc-apis APIS (geth-compatible). + --rpccorsdomain URL Equivalent to --jsonrpc-cors URL (geth-compatible). + +Sealing/Mining Options: + --author ADDRESS Specify the block author (aka "coinbase") address for sending block rewards + from sealed blocks [default: 0037a6b811ffeb6e072da21179d11b1406371c63]. + --extradata STRING Specify a custom extra-data for authored blocks, no more than 32 characters. + +Memory Footprint Options: --cache-pref-size BYTES Specify the prefered size of the blockchain cache in bytes [default: 16384]. --cache-max-size BYTES Specify the maximum size of the blockchain cache in bytes [default: 262144]. --queue-max-size BYTES Specify the maximum size of memory to use for block queue [default: 52428800]. + --cache MEGABYTES Set total amount of cache to use for the entire system, mutually exclusive with + other cache options (geth-compatible). - -j --jsonrpc Enable the JSON-RPC API sever. - --jsonrpc-url URL Specify URL for JSON-RPC API server [default: 127.0.0.1:8545]. - --jsonrpc-cors URL Specify CORS header for JSON-RPC API responses [default: null]. - - --author ADDRESS Specify the block author (aka "coinbase") address for sending block rewards - from sealed blocks [default: 0037a6b811ffeb6e072da21179d11b1406371c63]. - --extra-data STRING Specify a custom extra-data for authored blocks, no more than 32 characters. - +Miscellaneous Options: -l --logging LOGGING Specify the logging level. -v --version Show information about version. -h --help Show this screen. @@ -101,14 +128,18 @@ struct Args { arg_pid_file: String, arg_enode: Vec, flag_chain: String, + flag_testnet: bool, flag_db_path: String, + flag_networkid: Option, + flag_identity: String, + flag_cache: Option, flag_keys_path: String, flag_archive: bool, flag_no_bootstrap: bool, flag_listen_address: String, flag_public_address: Option, flag_address: Option, - flag_peers: u32, + flag_peers: usize, flag_no_discovery: bool, flag_no_upnp: bool, flag_node_key: Option, @@ -116,8 +147,15 @@ struct Args { flag_cache_max_size: usize, flag_queue_max_size: usize, flag_jsonrpc: bool, - flag_jsonrpc_url: String, + flag_jsonrpc_addr: String, + flag_jsonrpc_port: u16, flag_jsonrpc_cors: String, + flag_jsonrpc_apis: String, + flag_rpc: bool, + flag_rpcaddr: Option, + flag_rpcport: Option, + flag_rpccorsdomain: Option, + flag_rpcapi: Option, flag_logging: Option, flag_version: bool, flag_author: String, @@ -151,14 +189,23 @@ fn setup_log(init: &Option) { } #[cfg(feature = "rpc")] -fn setup_rpc_server(client: Arc, sync: Arc, url: &str, cors_domain: &str) { +fn setup_rpc_server(client: Arc, sync: Arc, url: &str, cors_domain: &str, apis: Vec<&str>) { use rpc::v1::*; let mut server = rpc::HttpServer::new(1); - server.add_delegate(Web3Client::new().to_delegate()); - server.add_delegate(EthClient::new(&client, &sync).to_delegate()); - server.add_delegate(EthFilterClient::new(&client).to_delegate()); - server.add_delegate(NetClient::new(&sync).to_delegate()); + for api in apis.into_iter() { + match api { + "web3" => server.add_delegate(Web3Client::new().to_delegate()), + "net" => server.add_delegate(NetClient::new(&sync).to_delegate()), + "eth" => { + server.add_delegate(EthClient::new(&client, &sync).to_delegate()); + server.add_delegate(EthFilterClient::new(&client).to_delegate()); + } + _ => { + die!("{}: Invalid API name to be enabled.", api); + } + } + } server.start_async(url, cors_domain); } @@ -179,16 +226,6 @@ By Wood/Paronyan/Kotewicz/Drwięga/Volf.\ ", version()); } -fn die_with_message(msg: &str) -> ! { - println!("ERROR: {}", msg); - exit(1); -} - -#[macro_export] -macro_rules! die { - ($($arg:tt)*) => (die_with_message(&format!("{}", format_args!($($arg)*)))); -} - struct Configuration { args: Args } @@ -221,8 +258,11 @@ impl Configuration { } fn spec(&self) -> Spec { + if self.args.flag_testnet { + return ethereum::new_morden(); + } match self.args.flag_chain.as_ref() { - "frontier" | "mainnet" => ethereum::new_frontier(), + "frontier" | "homestead" | "mainnet" => ethereum::new_frontier(), "morden" | "testnet" => ethereum::new_morden(), "olympic" => ethereum::new_olympic(), f => Spec::from_json_utf8(contents(f).unwrap_or_else(|_| die!("{}: Couldn't read chain specification file. Sure it exists?", f)).as_ref()), @@ -276,7 +316,7 @@ impl Configuration { ret.public_address = public; ret.use_secret = self.args.flag_node_key.as_ref().map(|s| Secret::from_str(&s).unwrap_or_else(|_| s.sha3())); ret.discovery_enabled = !self.args.flag_no_discovery; - ret.ideal_peers = self.args.flag_peers; + ret.ideal_peers = self.args.flag_peers as u32; let mut net_path = PathBuf::from(&self.path()); net_path.push("network"); ret.config_path = Some(net_path.to_str().unwrap().to_owned()); @@ -307,13 +347,22 @@ impl Configuration { let spec = self.spec(); let net_settings = self.net_settings(&spec); let mut sync_config = SyncConfig::default(); - sync_config.network_id = spec.network_id(); + sync_config.network_id = self.args.flag_networkid.as_ref().map(|id| U256::from_str(id).unwrap_or_else(|_| die!("{}: Invalid index given with --networkid", id))).unwrap_or(spec.network_id()); // Build client let mut client_config = ClientConfig::default(); - client_config.blockchain.pref_cache_size = self.args.flag_cache_pref_size; - client_config.blockchain.max_cache_size = self.args.flag_cache_max_size; + match self.args.flag_cache { + Some(mb) => { + client_config.blockchain.max_cache_size = mb * 1024 * 1024; + client_config.blockchain.pref_cache_size = client_config.blockchain.max_cache_size / 2; + } + None => { + client_config.blockchain.pref_cache_size = self.args.flag_cache_pref_size; + client_config.blockchain.max_cache_size = self.args.flag_cache_max_size; + } + } client_config.prefer_journal = !self.args.flag_archive; + client_config.name = self.args.flag_identity.clone(); client_config.queue.max_mem_use = self.args.flag_queue_max_size; let mut service = ClientService::start(client_config, spec, net_settings, &Path::new(&self.path())).unwrap(); let client = service.client().clone(); @@ -324,9 +373,16 @@ impl Configuration { let sync = EthSync::register(service.network(), sync_config, client); // Setup rpc - if self.args.flag_jsonrpc { - setup_rpc_server(service.client(), sync.clone(), &self.args.flag_jsonrpc_url, &self.args.flag_jsonrpc_cors); - SocketAddr::from_str(&self.args.flag_jsonrpc_url).unwrap_or_else(|_|die!("{}: Invalid JSONRPC listen address given with --jsonrpc-url. Should be of the form 'IP:port'.", self.args.flag_jsonrpc_url)); + if self.args.flag_jsonrpc || self.args.flag_rpc { + let url = format!("{}:{}", + self.args.flag_rpcaddr.as_ref().unwrap_or(&self.args.flag_jsonrpc_addr), + self.args.flag_rpcport.unwrap_or(self.args.flag_jsonrpc_port) + ); + SocketAddr::from_str(&url).unwrap_or_else(|_|die!("{}: Invalid JSONRPC listen host/port given.", url)); + let cors = self.args.flag_rpccorsdomain.as_ref().unwrap_or(&self.args.flag_jsonrpc_cors); + // TODO: use this as the API list. + let apis = self.args.flag_rpcapi.as_ref().unwrap_or(&self.args.flag_jsonrpc_apis); + setup_rpc_server(service.client(), sync.clone(), &url, cors, apis.split(",").collect()); } // Register IO handler diff --git a/rpc/src/v1/impls/eth.rs b/rpc/src/v1/impls/eth.rs index 2313d5114..7113c55b1 100644 --- a/rpc/src/v1/impls/eth.rs +++ b/rpc/src/v1/impls/eth.rs @@ -312,7 +312,8 @@ impl EthFilter for EthFilterClient { None => Ok(Value::Array(vec![] as Vec)), Some(info) => match info.filter { PollFilter::Block => { - let current_number = client.chain_info().best_block_number; + // + 1, cause we want to return hashes including current block hash. + let current_number = client.chain_info().best_block_number + 1; let hashes = (info.block_number..current_number).into_iter() .map(BlockId::Number) .filter_map(|id| client.block_hash(id)) diff --git a/sync/src/chain.rs b/sync/src/chain.rs index 530cfa424..fe1b559cd 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -900,9 +900,8 @@ impl ChainSync { } match sync.send(peer_id, packet_id, packet) { Err(e) => { - warn!(target:"sync", "Error sending request: {:?}", e); + debug!(target:"sync", "Error sending request: {:?}", e); sync.disable_peer(peer_id); - self.on_peer_aborting(sync, peer_id); } Ok(_) => { let mut peer = self.peers.get_mut(&peer_id).unwrap(); @@ -915,9 +914,8 @@ impl ChainSync { /// Generic packet sender fn send_packet(&mut self, sync: &mut SyncIo, peer_id: PeerId, packet_id: PacketId, packet: Bytes) { if let Err(e) = sync.send(peer_id, packet_id, packet) { - warn!(target:"sync", "Error sending packet: {:?}", e); + debug!(target:"sync", "Error sending packet: {:?}", e); sync.disable_peer(peer_id); - self.on_peer_aborting(sync, peer_id); } } /// Called when peer sends us new transactions diff --git a/util/src/journaldb.rs b/util/src/journaldb.rs index 1c62a9960..3309612c2 100644 --- a/util/src/journaldb.rs +++ b/util/src/journaldb.rs @@ -46,17 +46,17 @@ use std::env; /// data nodes that go out of history scope and must be written to disk. /// /// Commit workflow: -/// Create a new journal record from the transaction overlay. -/// Inseart each node from the transaction overlay into the History overlay increasing reference +/// 1. Create a new journal record from the transaction overlay. +/// 2. Inseart each node from the transaction overlay into the History overlay increasing reference /// count if it is already there. Note that the reference counting is managed by `MemoryDB` -/// Clear the transaction overlay. -/// For a canonical journal record that becomes ancient inserts its insertions into the disk DB -/// For each journal record that goes out of the history scope (becomes ancient) remove its +/// 3. Clear the transaction overlay. +/// 4. For a canonical journal record that becomes ancient inserts its insertions into the disk DB +/// 5. For each journal record that goes out of the history scope (becomes ancient) remove its /// insertions from the history overlay, decreasing the reference counter and removing entry if /// if reaches zero. -/// For a canonical journal record that becomes ancient delete its removals from the disk only if +/// 6. For a canonical journal record that becomes ancient delete its removals from the disk only if /// the removed key is not present in the history overlay. -/// Delete ancient record from memory and disk. +/// 7. Delete ancient record from memory and disk. /// pub struct JournalDB { transaction_overlay: MemoryDB, @@ -66,13 +66,11 @@ pub struct JournalDB { struct JournalOverlay { backing_overlay: MemoryDB, - journal: VecDeque + journal: HashMap> } struct JournalEntry { id: H256, - index: usize, - era: u64, insertions: Vec, deletions: Vec, } @@ -200,29 +198,6 @@ impl JournalDB { let mut journal_overlay = self.journal_overlay.as_mut().unwrap().write().unwrap(); let batch = DBTransaction::new(); { - let mut index = 0usize; - let mut last; - - while { - let record = try!(self.backing.get({ - let mut r = RlpStream::new_list(3); - r.append(&now); - r.append(&index); - r.append(&&PADDING[..]); - last = r.drain(); - &last - })); - match record { - Some(r) => { - assert!(&Rlp::new(&r).val_at::(0) != id); - true - }, - None => false, - } - } { - index += 1; - } - let mut r = RlpStream::new_list(3); let mut tx = self.transaction_overlay.drain(); let inserted_keys: Vec<_> = tx.iter().filter_map(|(k, &(_, c))| if c > 0 { Some(k.clone()) } else { None }).collect(); @@ -238,55 +213,64 @@ impl JournalDB { journal_overlay.backing_overlay.emplace(k, v); } r.append(&removed_keys); - try!(batch.put(&last, r.as_raw())); + + let mut k = RlpStream::new_list(3); + let index = journal_overlay.journal.get(&now).map(|j| j.len()).unwrap_or(0); + k.append(&now); + k.append(&index); + k.append(&&PADDING[..]); + try!(batch.put(&k.drain(), r.as_raw())); try!(batch.put(&LATEST_ERA_KEY, &encode(&now))); - journal_overlay.journal.push_back(JournalEntry { id: id.clone(), index: index, era: now, insertions: inserted_keys, deletions: removed_keys }); + journal_overlay.journal.entry(now).or_insert_with(Vec::new).push(JournalEntry { id: id.clone(), insertions: inserted_keys, deletions: removed_keys }); } + let journal_overlay = journal_overlay.deref_mut(); // apply old commits' details if let Some((end_era, canon_id)) = end { - let mut canon_insertions: Vec<(H256, Bytes)> = Vec::new(); - let mut canon_deletions: Vec = Vec::new(); - let mut overlay_deletions: Vec = Vec::new(); - while journal_overlay.journal.front().map_or(false, |e| e.era <= end_era) { - let mut journal = journal_overlay.journal.pop_front().unwrap(); - //delete the record from the db - let mut r = RlpStream::new_list(3); - r.append(&journal.era); - r.append(&journal.index); - r.append(&&PADDING[..]); - try!(batch.delete(&r.drain())); - trace!("commit: Delete journal for time #{}.{}: {}, (canon was {}): +{} -{} entries", end_era, journal.index, journal.id, canon_id, journal.insertions.len(), journal.deletions.len()); - { - if canon_id == journal.id { - for h in &journal.insertions { - match journal_overlay.backing_overlay.raw(&h) { - Some(&(ref d, rc)) if rc > 0 => canon_insertions.push((h.clone(), d.clone())), //TODO: optimizie this to avoid data copy - _ => () + if let Some(ref mut records) = journal_overlay.journal.get_mut(&end_era) { + let mut canon_insertions: Vec<(H256, Bytes)> = Vec::new(); + let mut canon_deletions: Vec = Vec::new(); + let mut overlay_deletions: Vec = Vec::new(); + let mut index = 0usize; + for mut journal in records.drain(..) { + //delete the record from the db + let mut r = RlpStream::new_list(3); + r.append(&end_era); + r.append(&index); + r.append(&&PADDING[..]); + try!(batch.delete(&r.drain())); + trace!("commit: Delete journal for time #{}.{}: {}, (canon was {}): +{} -{} entries", end_era, index, journal.id, canon_id, journal.insertions.len(), journal.deletions.len()); + { + if canon_id == journal.id { + for h in &journal.insertions { + match journal_overlay.backing_overlay.raw(&h) { + Some(&(ref d, rc)) if rc > 0 => canon_insertions.push((h.clone(), d.clone())), //TODO: optimizie this to avoid data copy + _ => () + } } + canon_deletions = journal.deletions; } - canon_deletions = journal.deletions; + overlay_deletions.append(&mut journal.insertions); } - overlay_deletions.append(&mut journal.insertions); + index +=1; } - if canon_id == journal.id { + // apply canon inserts first + for (k, v) in canon_insertions { + try!(batch.put(&k, &v)); } - } - // apply canon inserts first - for (k, v) in canon_insertions { - try!(batch.put(&k, &v)); - } - // clean the overlay - for k in overlay_deletions { - journal_overlay.backing_overlay.kill(&k); - } - // apply removes - for k in canon_deletions { - if !journal_overlay.backing_overlay.exists(&k) { - try!(batch.delete(&k)); + // clean the overlay + for k in overlay_deletions { + journal_overlay.backing_overlay.kill(&k); } + // apply removes + for k in canon_deletions { + if !journal_overlay.backing_overlay.exists(&k) { + try!(batch.delete(&k)); + } + } + journal_overlay.backing_overlay.purge(); } - journal_overlay.backing_overlay.purge(); + journal_overlay.journal.remove(&end_era); } try!(self.backing.write(batch)); Ok(0 as u32) @@ -297,7 +281,7 @@ impl JournalDB { } fn read_overlay(db: &Database) -> JournalOverlay { - let mut journal = VecDeque::new(); + let mut journal = HashMap::new(); let mut overlay = MemoryDB::new(); let mut count = 0; if let Some(val) = db.get(&LATEST_ERA_KEY).expect("Low-level database error.") { @@ -311,6 +295,7 @@ impl JournalDB { r.append(&&PADDING[..]); &r.drain() }).expect("Low-level database error.") { + trace!("read_counters: era={}, index={}", era, index); let rlp = Rlp::new(&rlp_data); let id: H256 = rlp.val_at(0); let insertions = rlp.at(1); @@ -323,10 +308,8 @@ impl JournalDB { inserted_keys.push(k); count += 1; } - journal.push_front(JournalEntry { + journal.entry(era).or_insert_with(Vec::new).push(JournalEntry { id: id, - index: index, - era: era, insertions: inserted_keys, deletions: deletions, }); @@ -413,6 +396,28 @@ mod tests { use super::*; use hashdb::*; + #[test] + fn insert_same_in_fork() { + // history is 1 + let mut jdb = JournalDB::new_temp(); + + let x = jdb.insert(b"X"); + jdb.commit(1, &b"1".sha3(), None).unwrap(); + jdb.commit(2, &b"2".sha3(), None).unwrap(); + jdb.commit(3, &b"1002a".sha3(), Some((1, b"1".sha3()))).unwrap(); + jdb.commit(4, &b"1003a".sha3(), Some((2, b"2".sha3()))).unwrap(); + + jdb.remove(&x); + jdb.commit(3, &b"1002b".sha3(), Some((1, b"1".sha3()))).unwrap(); + let x = jdb.insert(b"X"); + jdb.commit(4, &b"1003b".sha3(), Some((2, b"2".sha3()))).unwrap(); + + jdb.commit(5, &b"1004a".sha3(), Some((3, b"1002a".sha3()))).unwrap(); + jdb.commit(6, &b"1005a".sha3(), Some((4, b"1003a".sha3()))).unwrap(); + + assert!(jdb.exists(&x)); + } + #[test] fn long_history() { // history is 3 @@ -533,6 +538,7 @@ mod tests { assert!(jdb.exists(&foo)); } + #[test] fn reopen() { let mut dir = ::std::env::temp_dir(); @@ -574,20 +580,27 @@ mod tests { // history is 1 let foo = jdb.insert(b"foo"); jdb.commit(0, &b"0".sha3(), None).unwrap(); + jdb.commit(1, &b"1".sha3(), Some((0, b"0".sha3()))).unwrap(); + + // foo is ancient history. + jdb.insert(b"foo"); - jdb.commit(1, &b"1".sha3(), None).unwrap(); + jdb.commit(2, &b"2".sha3(), Some((1, b"1".sha3()))).unwrap(); foo }; { let mut jdb = JournalDB::new(dir.to_str().unwrap()); jdb.remove(&foo); - jdb.commit(2, &b"2".sha3(), Some((1, b"1".sha3()))).unwrap(); - assert!(jdb.exists(&foo)); jdb.commit(3, &b"3".sha3(), Some((2, b"2".sha3()))).unwrap(); + assert!(jdb.exists(&foo)); + jdb.remove(&foo); + jdb.commit(4, &b"4".sha3(), Some((3, b"3".sha3()))).unwrap(); + jdb.commit(5, &b"5".sha3(), Some((4, b"4".sha3()))).unwrap(); assert!(!jdb.exists(&foo)); } } + #[test] fn reopen_fork() { let mut dir = ::std::env::temp_dir(); diff --git a/util/src/network/connection.rs b/util/src/network/connection.rs index 55e688c91..fe65be6d1 100644 --- a/util/src/network/connection.rs +++ b/util/src/network/connection.rs @@ -190,25 +190,25 @@ impl Connection { /// Register this connection with the IO event loop. pub fn register_socket(&self, reg: Token, event_loop: &mut EventLoop) -> io::Result<()> { - trace!(target: "net", "connection register; token={:?}", reg); + trace!(target: "network", "connection register; token={:?}", reg); if let Err(e) = event_loop.register(&self.socket, reg, self.interest, PollOpt::edge() | PollOpt::oneshot()) { - debug!("Failed to register {:?}, {:?}", reg, e); + trace!(target: "network", "Failed to register {:?}, {:?}", reg, e); } Ok(()) } /// Update connection registration. Should be called at the end of the IO handler. pub fn update_socket(&self, reg: Token, event_loop: &mut EventLoop) -> io::Result<()> { - trace!(target: "net", "connection reregister; token={:?}", reg); + trace!(target: "network", "connection reregister; token={:?}", reg); event_loop.reregister( &self.socket, reg, self.interest, PollOpt::edge() | PollOpt::oneshot()).or_else(|e| { - debug!("Failed to reregister {:?}, {:?}", reg, e); + trace!(target: "network", "Failed to reregister {:?}, {:?}", reg, e); Ok(()) }) } /// Delete connection registration. Should be called at the end of the IO handler. pub fn deregister_socket(&self, event_loop: &mut EventLoop) -> io::Result<()> { - trace!(target: "net", "connection deregister; token={:?}", self.token); + trace!(target: "network", "connection deregister; token={:?}", self.token); event_loop.deregister(&self.socket).ok(); // ignore errors here Ok(()) } diff --git a/util/src/network/handshake.rs b/util/src/network/handshake.rs index cca133ba9..a72cc28ad 100644 --- a/util/src/network/handshake.rs +++ b/util/src/network/handshake.rs @@ -222,7 +222,7 @@ impl Handshake { /// Parse, validate and confirm auth message fn read_auth(&mut self, secret: &Secret, data: &[u8]) -> Result<(), UtilError> { - trace!(target:"net", "Received handshake auth from {:?}", self.connection.socket.peer_addr()); + trace!(target:"network", "Received handshake auth from {:?}", self.connection.socket.peer_addr()); if data.len() != V4_AUTH_PACKET_SIZE { debug!(target:"net", "Wrong auth packet size"); return Err(From::from(NetworkError::BadProtocol)); @@ -253,7 +253,7 @@ impl Handshake { } fn read_auth_eip8(&mut self, secret: &Secret, data: &[u8]) -> Result<(), UtilError> { - trace!(target:"net", "Received EIP8 handshake auth from {:?}", self.connection.socket.peer_addr()); + trace!(target:"network", "Received EIP8 handshake auth from {:?}", self.connection.socket.peer_addr()); self.auth_cipher.extend_from_slice(data); let auth = try!(ecies::decrypt(secret, &self.auth_cipher[0..2], &self.auth_cipher[2..])); let rlp = UntrustedRlp::new(&auth); @@ -268,7 +268,7 @@ impl Handshake { /// Parse and validate ack message fn read_ack(&mut self, secret: &Secret, data: &[u8]) -> Result<(), UtilError> { - trace!(target:"net", "Received handshake auth to {:?}", self.connection.socket.peer_addr()); + trace!(target:"network", "Received handshake auth to {:?}", self.connection.socket.peer_addr()); if data.len() != V4_ACK_PACKET_SIZE { debug!(target:"net", "Wrong ack packet size"); return Err(From::from(NetworkError::BadProtocol)); @@ -296,7 +296,7 @@ impl Handshake { } fn read_ack_eip8(&mut self, secret: &Secret, data: &[u8]) -> Result<(), UtilError> { - trace!(target:"net", "Received EIP8 handshake auth from {:?}", self.connection.socket.peer_addr()); + trace!(target:"network", "Received EIP8 handshake auth from {:?}", self.connection.socket.peer_addr()); self.ack_cipher.extend_from_slice(data); let ack = try!(ecies::decrypt(secret, &self.ack_cipher[0..2], &self.ack_cipher[2..])); let rlp = UntrustedRlp::new(&ack); @@ -309,7 +309,7 @@ impl Handshake { /// Sends auth message fn write_auth(&mut self, secret: &Secret, public: &Public) -> Result<(), UtilError> { - trace!(target:"net", "Sending handshake auth to {:?}", self.connection.socket.peer_addr()); + trace!(target:"network", "Sending handshake auth to {:?}", self.connection.socket.peer_addr()); let mut data = [0u8; /*Signature::SIZE*/ 65 + /*H256::SIZE*/ 32 + /*Public::SIZE*/ 64 + /*H256::SIZE*/ 32 + 1]; //TODO: use associated constants let len = data.len(); { @@ -336,7 +336,7 @@ impl Handshake { /// Sends ack message fn write_ack(&mut self) -> Result<(), UtilError> { - trace!(target:"net", "Sending handshake ack to {:?}", self.connection.socket.peer_addr()); + trace!(target:"network", "Sending handshake ack to {:?}", self.connection.socket.peer_addr()); let mut data = [0u8; 1 + /*Public::SIZE*/ 64 + /*H256::SIZE*/ 32]; //TODO: use associated constants let len = data.len(); { @@ -355,7 +355,7 @@ impl Handshake { /// Sends EIP8 ack message fn write_ack_eip8(&mut self) -> Result<(), UtilError> { - trace!(target:"net", "Sending EIP8 handshake ack to {:?}", self.connection.socket.peer_addr()); + trace!(target:"network", "Sending EIP8 handshake ack to {:?}", self.connection.socket.peer_addr()); let mut rlp = RlpStream::new_list(3); rlp.append(self.ecdhe.public()); rlp.append(&self.nonce); diff --git a/util/src/network/host.rs b/util/src/network/host.rs index f2cc9fe48..ece24a1d1 100644 --- a/util/src/network/host.rs +++ b/util/src/network/host.rs @@ -170,29 +170,37 @@ pub struct NetworkContext<'s, Message> where Message: Send + Sync + Clone + 'sta io: &'s IoContext>, protocol: ProtocolId, sessions: Arc>>, - session: Option, + session: Option, + session_id: Option, } impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone + 'static, { /// Create a new network IO access point. Takes references to all the data that can be updated within the IO handler. fn new(io: &'s IoContext>, protocol: ProtocolId, - session: Option, sessions: Arc>>) -> NetworkContext<'s, Message> { + session: Option, sessions: Arc>>) -> NetworkContext<'s, Message> { + let id = session.as_ref().map(|s| s.lock().unwrap().token()); NetworkContext { io: io, protocol: protocol, + session_id: id, session: session, sessions: sessions, } } + fn resolve_session(&self, peer: PeerId) -> Option { + match self.session_id { + Some(id) if id == peer => self.session.clone(), + _ => self.sessions.read().unwrap().get(peer).cloned(), + } + } + /// Send a packet over the network to another peer. pub fn send(&self, peer: PeerId, packet_id: PacketId, data: Vec) -> Result<(), UtilError> { - let session = { self.sessions.read().unwrap().get(peer).cloned() }; + let session = self.resolve_session(peer); if let Some(session) = session { - session.lock().unwrap().deref_mut().send_packet(self.protocol, packet_id as u8, &data).unwrap_or_else(|e| { - warn!(target: "network", "Send error: {:?}", e); - }); //TODO: don't copy vector data + try!(session.lock().unwrap().deref_mut().send_packet(self.protocol, packet_id as u8, &data)); try!(self.io.update_registration(peer)); } else { trace!(target: "network", "Send: Peer no longer exist") @@ -200,14 +208,10 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone Ok(()) } - /// Respond to a current network message. Panics if no there is no packet in the context. + /// Respond to a current network message. Panics if no there is no packet in the context. If the session is expired returns nothing. pub fn respond(&self, packet_id: PacketId, data: Vec) -> Result<(), UtilError> { - match self.session { - Some(session) => self.send(session, packet_id, data), - None => { - panic!("Respond: Session does not exist") - } - } + assert!(self.session.is_some(), "Respond called without network context"); + self.send(self.session_id.unwrap(), packet_id, data) } /// Send an IO message @@ -215,7 +219,6 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone self.io.message(NetworkIoMessage::User(msg)); } - /// Disable current protocol capability for given peer. If no capabilities left peer gets disconnected. pub fn disable_peer(&self, peer: PeerId) { //TODO: remove capability, disconnect if no capabilities left @@ -239,7 +242,7 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone /// Returns peer identification string pub fn peer_info(&self, peer: PeerId) -> String { - let session = { self.sessions.read().unwrap().get(peer).cloned() }; + let session = self.resolve_session(peer); if let Some(session) = session { return session.lock().unwrap().info.client_version.clone() } @@ -624,7 +627,7 @@ impl Host where Message: Send + Sync + Clone { let mut packet_data: Option<(ProtocolId, PacketId, Vec)> = None; let mut kill = false; let session = { self.sessions.read().unwrap().get(token).cloned() }; - if let Some(session) = session { + if let Some(session) = session.clone() { let mut s = session.lock().unwrap(); match s.readable(io, &self.info.read().unwrap()) { Err(e) => { @@ -656,11 +659,11 @@ impl Host where Message: Send + Sync + Clone { } for p in ready_data { let h = self.handlers.read().unwrap().get(p).unwrap().clone(); - h.connected(&NetworkContext::new(io, p, Some(token), self.sessions.clone()), &token); + h.connected(&NetworkContext::new(io, p, session.clone(), self.sessions.clone()), &token); } if let Some((p, packet_id, data)) = packet_data { let h = self.handlers.read().unwrap().get(p).unwrap().clone(); - h.read(&NetworkContext::new(io, p, Some(token), self.sessions.clone()), &token, packet_id, &data[1..]); + h.read(&NetworkContext::new(io, p, session.clone(), self.sessions.clone()), &token, packet_id, &data[1..]); } io.update_registration(token).unwrap_or_else(|e| debug!(target: "network", "Token registration error: {:?}", e)); } @@ -718,6 +721,7 @@ impl Host where Message: Send + Sync + Clone { let mut to_disconnect: Vec = Vec::new(); let mut failure_id = None; let mut deregister = false; + let mut expired_session = None; match token { FIRST_HANDSHAKE ... LAST_HANDSHAKE => { let handshakes = self.handshakes.write().unwrap(); @@ -733,6 +737,7 @@ impl Host where Message: Send + Sync + Clone { FIRST_SESSION ... LAST_SESSION => { let sessions = self.sessions.write().unwrap(); if let Some(session) = sessions.get(token).cloned() { + expired_session = Some(session.clone()); let mut s = session.lock().unwrap(); if !s.expired() { if s.is_ready() { @@ -757,7 +762,7 @@ impl Host where Message: Send + Sync + Clone { } for p in to_disconnect { let h = self.handlers.read().unwrap().get(p).unwrap().clone(); - h.disconnected(&NetworkContext::new(io, p, Some(token), self.sessions.clone()), &token); + h.disconnected(&NetworkContext::new(io, p, expired_session.clone(), self.sessions.clone()), &token); } if deregister { io.deregister_stream(token).expect("Error deregistering stream"); diff --git a/util/src/network/session.rs b/util/src/network/session.rs index edf929a9a..84c063c92 100644 --- a/util/src/network/session.rs +++ b/util/src/network/session.rs @@ -213,6 +213,9 @@ impl Session { /// Send a protocol packet to peer. pub fn send_packet(&mut self, protocol: &str, packet_id: u8, data: &[u8]) -> Result<(), UtilError> { + if self.expired() { + return Err(From::from(NetworkError::Expired)); + } let mut i = 0usize; while protocol != self.info.capabilities[i].protocol { i += 1; @@ -351,15 +354,15 @@ impl Session { offset += caps[i].packet_count; i += 1; } - trace!(target: "net", "Hello: {} v{} {} {:?}", client_version, protocol, id, caps); + trace!(target: "network", "Hello: {} v{} {} {:?}", client_version, protocol, id, caps); self.info.client_version = client_version; self.info.capabilities = caps; if self.info.capabilities.is_empty() { - trace!("No common capabilities with peer."); + trace!(target: "network", "No common capabilities with peer."); return Err(From::from(self.disconnect(DisconnectReason::UselessPeer))); } if protocol != host.protocol_version { - trace!("Peer protocol version mismatch: {}", protocol); + trace!(target: "network", "Peer protocol version mismatch: {}", protocol); return Err(From::from(self.disconnect(DisconnectReason::UselessPeer))); } self.had_hello = true;