diff --git a/Cargo.lock b/Cargo.lock index 26b1f0ed4..1e998f19d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -28,7 +28,7 @@ dependencies = [ "fdlimit 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "hyper 0.9.14 (registry+https://github.com/rust-lang/crates.io-index)", "isatty 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", - "jsonrpc-core 5.0.0 (git+https://github.com/ethcore/jsonrpc.git?branch=mio)", + "jsonrpc-core 5.0.0 (git+https://github.com/ethcore/jsonrpc.git)", "lazy_static 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "num_cpus 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", @@ -355,6 +355,7 @@ dependencies = [ "ethcore-ipc 1.6.0", "ethcore-ipc-codegen 1.6.0", "ethcore-ipc-nano 1.6.0", + "ethcore-stratum 1.6.0", "ethcore-util 1.6.0", "ethjson 0.1.0", "ethkey 0.2.0", @@ -406,8 +407,8 @@ dependencies = [ "fetch 0.1.0", "futures 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", "hyper 0.10.0-a.0 (git+https://github.com/ethcore/hyper)", - "jsonrpc-core 5.0.0 (git+https://github.com/ethcore/jsonrpc.git?branch=mio)", - "jsonrpc-http-server 7.0.0 (git+https://github.com/ethcore/jsonrpc.git?branch=mio)", + "jsonrpc-core 5.0.0 (git+https://github.com/ethcore/jsonrpc.git)", + "jsonrpc-http-server 7.0.0 (git+https://github.com/ethcore/jsonrpc.git)", "linked-hash-map 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "mime 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -577,10 +578,10 @@ dependencies = [ "ethsync 1.6.0", "fetch 0.1.0", "futures 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", - "jsonrpc-core 5.0.0 (git+https://github.com/ethcore/jsonrpc.git?branch=mio)", - "jsonrpc-http-server 7.0.0 (git+https://github.com/ethcore/jsonrpc.git?branch=mio)", - "jsonrpc-ipc-server 1.0.0 (git+https://github.com/ethcore/jsonrpc.git?branch=mio)", - "jsonrpc-macros 0.2.0 (git+https://github.com/ethcore/jsonrpc.git?branch=mio)", + "jsonrpc-core 5.0.0 (git+https://github.com/ethcore/jsonrpc.git)", + "jsonrpc-http-server 7.0.0 (git+https://github.com/ethcore/jsonrpc.git)", + "jsonrpc-ipc-server 1.0.0 (git+https://github.com/ethcore/jsonrpc.git)", + "jsonrpc-macros 0.2.0 (git+https://github.com/ethcore/jsonrpc.git)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "parity-reactor 0.1.0", "parity-updater 1.6.0", @@ -604,7 +605,7 @@ dependencies = [ "ethcore-io 1.6.0", "ethcore-rpc 1.6.0", "ethcore-util 1.6.0", - "jsonrpc-core 5.0.0 (git+https://github.com/ethcore/jsonrpc.git?branch=mio)", + "jsonrpc-core 5.0.0 (git+https://github.com/ethcore/jsonrpc.git)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "parity-dapps-glue 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "parity-ui 1.6.0", @@ -623,13 +624,15 @@ dependencies = [ "ethcore-ipc-codegen 1.6.0", "ethcore-ipc-nano 1.6.0", "ethcore-util 1.6.0", - "jsonrpc-core 5.0.0 (git+https://github.com/ethcore/jsonrpc.git?branch=mio)", - "jsonrpc-macros 0.2.0 (git+https://github.com/ethcore/jsonrpc.git?branch=mio)", - "jsonrpc-tcp-server 1.0.0 (git+https://github.com/ethcore/jsonrpc.git?branch=mio)", + "futures 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", + "jsonrpc-core 5.0.0 (git+https://github.com/ethcore/jsonrpc.git)", + "jsonrpc-macros 0.2.0 (git+https://github.com/ethcore/jsonrpc.git)", + "jsonrpc-tcp-server 1.0.0 (git+https://github.com/ethcore/jsonrpc.git)", "lazy_static 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "mio 0.5.1 (git+https://github.com/ethcore/mio?branch=v0.5.x)", "semver 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-core 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -946,7 +949,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "jsonrpc-core" version = "5.0.0" -source = "git+https://github.com/ethcore/jsonrpc.git?branch=mio#472abedcbf5198326912bc62d485acb1f5a0989c" +source = "git+https://github.com/ethcore/jsonrpc.git#5eeee0980e4d2682a831c633fa03a8af99e0d68c" dependencies = [ "futures 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", @@ -959,11 +962,11 @@ dependencies = [ [[package]] name = "jsonrpc-http-server" version = "7.0.0" -source = "git+https://github.com/ethcore/jsonrpc.git?branch=mio#472abedcbf5198326912bc62d485acb1f5a0989c" +source = "git+https://github.com/ethcore/jsonrpc.git#5eeee0980e4d2682a831c633fa03a8af99e0d68c" dependencies = [ "futures 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", "hyper 0.10.0-a.0 (git+https://github.com/ethcore/hyper)", - "jsonrpc-core 5.0.0 (git+https://github.com/ethcore/jsonrpc.git?branch=mio)", + "jsonrpc-core 5.0.0 (git+https://github.com/ethcore/jsonrpc.git)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-core 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", "unicase 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -972,11 +975,11 @@ dependencies = [ [[package]] name = "jsonrpc-ipc-server" version = "1.0.0" -source = "git+https://github.com/ethcore/jsonrpc.git?branch=mio#472abedcbf5198326912bc62d485acb1f5a0989c" +source = "git+https://github.com/ethcore/jsonrpc.git#5eeee0980e4d2682a831c633fa03a8af99e0d68c" dependencies = [ "bytes 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "env_logger 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", - "jsonrpc-core 5.0.0 (git+https://github.com/ethcore/jsonrpc.git?branch=mio)", + "jsonrpc-core 5.0.0 (git+https://github.com/ethcore/jsonrpc.git)", "lazy_static 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "mio 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -988,27 +991,28 @@ dependencies = [ [[package]] name = "jsonrpc-macros" version = "0.2.0" -source = "git+https://github.com/ethcore/jsonrpc.git?branch=mio#472abedcbf5198326912bc62d485acb1f5a0989c" +source = "git+https://github.com/ethcore/jsonrpc.git#5eeee0980e4d2682a831c633fa03a8af99e0d68c" dependencies = [ "futures 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", - "jsonrpc-core 5.0.0 (git+https://github.com/ethcore/jsonrpc.git?branch=mio)", + "jsonrpc-core 5.0.0 (git+https://github.com/ethcore/jsonrpc.git)", "serde 0.8.19 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] name = "jsonrpc-tcp-server" version = "1.0.0" -source = "git+https://github.com/ethcore/jsonrpc.git?branch=mio#472abedcbf5198326912bc62d485acb1f5a0989c" +source = "git+https://github.com/ethcore/jsonrpc.git#5eeee0980e4d2682a831c633fa03a8af99e0d68c" dependencies = [ - "bytes 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "env_logger 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", - "jsonrpc-core 5.0.0 (git+https://github.com/ethcore/jsonrpc.git?branch=mio)", + "futures 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", + "jsonrpc-core 5.0.0 (git+https://github.com/ethcore/jsonrpc.git)", "lazy_static 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", - "mio 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 0.8.4 (registry+https://github.com/rust-lang/crates.io-index)", - "slab 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-core 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-proto 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-service 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -1471,7 +1475,7 @@ dependencies = [ "ethcore-signer 1.6.0", "ethcore-util 1.6.0", "futures 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", - "jsonrpc-core 5.0.0 (git+https://github.com/ethcore/jsonrpc.git?branch=mio)", + "jsonrpc-core 5.0.0 (git+https://github.com/ethcore/jsonrpc.git)", "lazy_static 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "matches 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1985,6 +1989,11 @@ name = "smallvec" version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "smallvec" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "smallvec" version = "0.3.1" @@ -2093,6 +2102,11 @@ dependencies = [ name = "table" version = "0.1.0" +[[package]] +name = "take" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "target_info" version = "0.1.0" @@ -2176,6 +2190,30 @@ dependencies = [ "slab 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "tokio-proto" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "futures 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", + "net2 0.2.23 (registry+https://github.com/rust-lang/crates.io-index)", + "rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)", + "slab 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", + "smallvec 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", + "take 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-core 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-service 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "tokio-service" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "futures 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "toml" version = "0.1.28" @@ -2406,11 +2444,11 @@ dependencies = [ "checksum isatty 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "7408a548dc0e406b7912d9f84c261cc533c1866e047644a811c133c56041ac0c" "checksum itertools 0.4.13 (registry+https://github.com/rust-lang/crates.io-index)" = "086e1fa5fe48840b1cfdef3a20c7e3115599f8d5c4c87ef32a794a7cdd184d76" "checksum itoa 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "ae3088ea4baeceb0284ee9eea42f591226e6beaecf65373e41b38d95a1b8e7a1" -"checksum jsonrpc-core 5.0.0 (git+https://github.com/ethcore/jsonrpc.git?branch=mio)" = "" -"checksum jsonrpc-http-server 7.0.0 (git+https://github.com/ethcore/jsonrpc.git?branch=mio)" = "" -"checksum jsonrpc-ipc-server 1.0.0 (git+https://github.com/ethcore/jsonrpc.git?branch=mio)" = "" -"checksum jsonrpc-macros 0.2.0 (git+https://github.com/ethcore/jsonrpc.git?branch=mio)" = "" -"checksum jsonrpc-tcp-server 1.0.0 (git+https://github.com/ethcore/jsonrpc.git?branch=mio)" = "" +"checksum jsonrpc-core 5.0.0 (git+https://github.com/ethcore/jsonrpc.git)" = "" +"checksum jsonrpc-http-server 7.0.0 (git+https://github.com/ethcore/jsonrpc.git)" = "" +"checksum jsonrpc-ipc-server 1.0.0 (git+https://github.com/ethcore/jsonrpc.git)" = "" +"checksum jsonrpc-macros 0.2.0 (git+https://github.com/ethcore/jsonrpc.git)" = "" +"checksum jsonrpc-tcp-server 1.0.0 (git+https://github.com/ethcore/jsonrpc.git)" = "" "checksum kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d" "checksum language-tags 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "a91d884b6667cd606bb5a69aa0c99ba811a115fc68915e7056ec08a46e93199a" "checksum lazy_static 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "49247ec2a285bb3dcb23cbd9c35193c025e7251bfce77c1d5da97e6362dffe7f" @@ -2511,6 +2549,7 @@ dependencies = [ "checksum slab 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "6dbdd334bd28d328dad1c41b0ea662517883d8880d8533895ef96c8003dec9c4" "checksum slab 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "17b4fcaed89ab08ef143da37bc52adbcc04d4a69014f4c1208d6b51f0c47bc23" "checksum smallvec 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "fcc8d19212aacecf95e4a7a2179b26f7aeb9732a915cf01f05b0d3e044865410" +"checksum smallvec 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "4c8cbcd6df1e117c2210e13ab5109635ad68a929fcbb8964dc965b76cb5ee013" "checksum smallvec 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "3a3c84984c278afe61a46e19868e8b23e2ee3be5b3cc6dea6edad4893bc6c841" "checksum solicit 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)" = "172382bac9424588d7840732b250faeeef88942e37b6e35317dce98cafdd75b2" "checksum spmc 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "93bdab61c1a413e591c4d17388ffa859eaff2df27f1e13a5ec8b716700605adf" @@ -2523,6 +2562,7 @@ dependencies = [ "checksum syntex_pos 0.50.0 (registry+https://github.com/rust-lang/crates.io-index)" = "a43abded5057c75bac8555e46ec913ce502efb418267b1ab8e9783897470c7db" "checksum syntex_syntax 0.33.0 (registry+https://github.com/rust-lang/crates.io-index)" = "44bded3cabafc65c90b663b1071bd2d198a9ab7515e6ce729e4570aaf53c407e" "checksum syntex_syntax 0.50.0 (registry+https://github.com/rust-lang/crates.io-index)" = "6ef781e4b60f03431f1b5b59843546ce60ae029a787770cf8e0969ac1fd063a5" +"checksum take 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b157868d8ac1f56b64604539990685fa7611d8fa9e5476cf0c02cf34d32917c5" "checksum target_info 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "c63f48baada5c52e65a29eef93ab4f8982681b67f9e8d29c7b05abcfec2b9ffe" "checksum tempdir 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "87974a6f5c1dfb344d733055601650059a3363de2a6104819293baff662132d6" "checksum term 0.2.14 (registry+https://github.com/rust-lang/crates.io-index)" = "f2077e54d38055cf1ca0fd7933a2e00cd3ec8f6fed352b2a377f06dcdaaf3281" @@ -2533,6 +2573,8 @@ dependencies = [ "checksum time 0.1.35 (registry+https://github.com/rust-lang/crates.io-index)" = "3c7ec6d62a20df54e07ab3b78b9a3932972f4b7981de295563686849eb3989af" "checksum tiny-keccak 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)" = "f7aef43048292ca0bae4ab32180e85f6202cf2816c2a210c396a84b99dab9270" "checksum tokio-core 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "52416b3e937abac22a543a7f1c66bd37feb60137ff1ab42390fa02df85347e58" +"checksum tokio-proto 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "7c0d6031f94d78d7b4d509d4a7c5e1cdf524a17e7b08d1c188a83cf720e69808" +"checksum tokio-service 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "24da22d077e0f15f55162bdbdc661228c1581892f52074fb242678d015b45162" "checksum toml 0.1.28 (registry+https://github.com/rust-lang/crates.io-index)" = "fcd27a04ca509aff336ba5eb2abc58d456f52c4ff64d9724d88acb85ead560b6" "checksum toml 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "a442dfc13508e603c3f763274361db7f79d7469a0e95c411cde53662ab30fc72" "checksum traitobject 0.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "07eaeb7689bb7fca7ce15628319635758eda769fed481ecfe6686ddef2600616" diff --git a/Cargo.toml b/Cargo.toml index c0823be68..344e0b1a0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,7 +32,7 @@ app_dirs = "1.1.1" fdlimit = "0.1" hyper = { version = "0.9", default-features = false } ctrlc = { git = "https://github.com/ethcore/rust-ctrlc.git" } -jsonrpc-core = { git = "https://github.com/ethcore/jsonrpc.git", branch="mio" } +jsonrpc-core = { git = "https://github.com/ethcore/jsonrpc.git" } ethsync = { path = "sync" } ethcore = { path = "ethcore" } ethcore-util = { path = "util" } @@ -79,7 +79,6 @@ jit = ["ethcore/jit"] dev = ["clippy", "ethcore/dev", "ethcore-util/dev", "ethsync/dev", "ethcore-rpc/dev", "ethcore-dapps/dev", "ethcore-signer/dev"] json-tests = ["ethcore/json-tests"] test-heavy = ["ethcore/test-heavy"] -stratum = ["ipc"] ethkey-cli = ["ethcore/ethkey-cli"] ethstore-cli = ["ethcore/ethstore-cli"] evm-debug = ["ethcore/evm-debug"] diff --git a/dapps/Cargo.toml b/dapps/Cargo.toml index af6e2af5d..09c72cb47 100644 --- a/dapps/Cargo.toml +++ b/dapps/Cargo.toml @@ -13,8 +13,8 @@ rand = "0.3" log = "0.3" env_logger = "0.3" futures = "0.1" -jsonrpc-core = { git = "https://github.com/ethcore/jsonrpc.git", branch="mio" } -jsonrpc-http-server = { git = "https://github.com/ethcore/jsonrpc.git", branch="mio" } +jsonrpc-core = { git = "https://github.com/ethcore/jsonrpc.git" } +jsonrpc-http-server = { git = "https://github.com/ethcore/jsonrpc.git" } hyper = { default-features = false, git = "https://github.com/ethcore/hyper" } unicase = "1.3" url = "1.0" diff --git a/ethcore/Cargo.toml b/ethcore/Cargo.toml index ed5b2a208..e0afa1106 100644 --- a/ethcore/Cargo.toml +++ b/ethcore/Cargo.toml @@ -39,6 +39,7 @@ ethstore = { path = "../ethstore" } ethkey = { path = "../ethkey" } ethcore-ipc-nano = { path = "../ipc/nano" } rlp = { path = "../util/rlp" } +ethcore-stratum = { path = "../stratum" } lru-cache = "0.1.0" ethcore-bloom-journal = { path = "../util/bloom" } ethabi = "0.2.2" diff --git a/ethcore/src/lib.rs b/ethcore/src/lib.rs index 76545ff78..f88c94de5 100644 --- a/ethcore/src/lib.rs +++ b/ethcore/src/lib.rs @@ -102,6 +102,9 @@ extern crate ethcore_bloom_journal as bloom_journal; extern crate byteorder; extern crate transient_hashmap; extern crate linked_hash_map; +extern crate lru_cache; +extern crate ethcore_stratum; +extern crate ethabi; #[macro_use] extern crate log; @@ -113,11 +116,9 @@ extern crate lazy_static; extern crate heapsize; #[macro_use] extern crate ethcore_ipc as ipc; -extern crate lru_cache; #[cfg(feature = "jit" )] extern crate evmjit; -extern crate ethabi; pub extern crate ethstore; diff --git a/ethcore/src/miner/miner.rs b/ethcore/src/miner/miner.rs index 8885432b5..2456beb5b 100644 --- a/ethcore/src/miner/miner.rs +++ b/ethcore/src/miner/miner.rs @@ -32,7 +32,7 @@ use engines::{Engine, Seal}; use miner::{MinerService, MinerStatus, TransactionQueue, TransactionQueueDetailsProvider, PrioritizationStrategy, AccountDetails, TransactionOrigin}; use miner::banning_queue::{BanningTransactionQueue, Threshold}; -use miner::work_notify::WorkPoster; +use miner::work_notify::{WorkPoster, NotifyWork}; use miner::price_info::PriceInfo; use miner::local_transactions::{Status as LocalTransactionStatus}; use miner::service_transaction_checker::ServiceTransactionChecker; @@ -224,18 +224,24 @@ pub struct Miner { engine: Arc, accounts: Option>, - work_poster: Option, + notifiers: RwLock>>, gas_pricer: Mutex, service_transaction_action: ServiceTransactionAction, } impl Miner { + /// Push notifier that will handle new jobs + pub fn push_notifier(&self, notifier: Box) { + self.notifiers.write().push(notifier) + } + + /// Creates new instance of miner Arc. + pub fn new(options: MinerOptions, gas_pricer: GasPricer, spec: &Spec, accounts: Option>) -> Arc { + Arc::new(Miner::new_raw(options, gas_pricer, spec, accounts)) + } + /// Creates new instance of miner. fn new_raw(options: MinerOptions, gas_pricer: GasPricer, spec: &Spec, accounts: Option>) -> Miner { - let work_poster = match options.new_work_notify.is_empty() { - true => None, - false => Some(WorkPoster::new(&options.new_work_notify)) - }; let gas_limit = match options.tx_queue_gas_limit { GasLimit::Fixed(ref limit) => *limit, _ => !U256::zero(), @@ -250,10 +256,17 @@ impl Miner { ban_duration, ), }; + + let notifiers: Vec> = match options.new_work_notify.is_empty() { + true => Vec::new(), + false => vec![Box::new(WorkPoster::new(&options.new_work_notify))], + }; + let service_transaction_action = match options.refuse_service_transactions { true => ServiceTransactionAction::Refuse, false => ServiceTransactionAction::Check(ServiceTransactionChecker::default()), }; + Miner { transaction_queue: Arc::new(Mutex::new(txq)), next_allowed_reseal: Mutex::new(Instant::now()), @@ -271,7 +284,7 @@ impl Miner { options: options, accounts: accounts, engine: spec.engine.clone(), - work_poster: work_poster, + notifiers: RwLock::new(notifiers), gas_pricer: Mutex::new(gas_pricer), service_transaction_action: service_transaction_action, } @@ -287,11 +300,6 @@ impl Miner { Miner::new_raw(Default::default(), GasPricer::new_fixed(20_000_000_000u64.into()), spec, None) } - /// Creates new instance of a miner Arc. - pub fn new(options: MinerOptions, gas_pricer: GasPricer, spec: &Spec, accounts: Option>) -> Arc { - Arc::new(Miner::new_raw(options, gas_pricer, spec, accounts)) - } - fn forced_sealing(&self) -> bool { self.options.force_sealing || !self.options.new_work_notify.is_empty() } @@ -522,7 +530,7 @@ impl Miner { let is_new = original_work_hash.map_or(true, |h| block.block().fields().header.hash() != h); sealing_work.queue.push(block); // If push notifications are enabled we assume all work items are used. - if self.work_poster.is_some() && is_new { + if !self.notifiers.read().is_empty() && is_new { sealing_work.queue.use_last_ref(); } (Some((pow_hash, difficulty, number)), is_new) @@ -533,7 +541,11 @@ impl Miner { (work, is_new) }; if is_new { - work.map(|(pow_hash, difficulty, number)| self.work_poster.as_ref().map(|p| p.notify(pow_hash, difficulty, number))); + work.map(|(pow_hash, difficulty, number)| { + for notifier in self.notifiers.read().iter() { + notifier.notify(pow_hash, difficulty, number) + } + }); } } diff --git a/ethcore/src/miner/mod.rs b/ethcore/src/miner/mod.rs index 7c259acf8..4acb4e3ae 100644 --- a/ethcore/src/miner/mod.rs +++ b/ethcore/src/miner/mod.rs @@ -49,13 +49,17 @@ mod price_info; mod service_transaction_checker; mod transaction_queue; mod work_notify; +mod stratum; pub use self::external::{ExternalMiner, ExternalMinerService}; + pub use self::miner::{Miner, MinerOptions, Banning, PendingSet, GasPricer, GasPriceCalibratorOptions, GasLimit}; pub use self::transaction_queue::{TransactionQueue, TransactionDetailsProvider as TransactionQueueDetailsProvider, PrioritizationStrategy, AccountDetails, TransactionOrigin}; pub use self::local_transactions::{Status as LocalTransactionStatus}; pub use client::TransactionImportResult; +pub use self::work_notify::NotifyWork; +pub use self::stratum::{Stratum, Error as StratumError, Options as StratumOptions}; use std::collections::BTreeMap; use util::{H256, U256, Address, Bytes}; diff --git a/ethcore/src/miner/stratum.rs b/ethcore/src/miner/stratum.rs new file mode 100644 index 000000000..da367e763 --- /dev/null +++ b/ethcore/src/miner/stratum.rs @@ -0,0 +1,248 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Client-side stratum job dispatcher and mining notifier handler + +use ethcore_stratum::{ + JobDispatcher, PushWorkHandler, + Stratum as StratumService, Error as StratumServiceError, +}; + +use std::sync::{Arc, Weak}; +use std::net::{SocketAddr, AddrParseError}; +use std::fmt; + +use util::{H256, U256, FixedHash, H64, clean_0x}; +use ethereum::ethash::Ethash; +use ethash::SeedHashCompute; +use util::Mutex; +use miner::{self, Miner, MinerService}; +use client::Client; +use block::IsBlock; +use std::str::FromStr; +use rlp::encode; + +/// Configures stratum server options. +#[derive(Debug, PartialEq, Clone)] +pub struct Options { + /// Working directory + pub io_path: String, + /// Network address + pub listen_addr: String, + /// Port + pub port: u16, + /// Secret for peers + pub secret: Option, +} + +struct SubmitPayload { + nonce: H64, + pow_hash: H256, + mix_hash: H256, +} + +impl SubmitPayload { + fn from_args(payload: Vec) -> Result { + if payload.len() != 3 { + return Err(PayloadError::ArgumentsAmountUnexpected(payload.len())); + } + + let nonce = match H64::from_str(clean_0x(&payload[0])) { + Ok(nonce) => nonce, + Err(e) => { + warn!(target: "stratum", "submit_work ({}): invalid nonce ({:?})", &payload[0], e); + return Err(PayloadError::InvalidNonce(payload[0].clone())) + } + }; + + let pow_hash = match H256::from_str(clean_0x(&payload[1])) { + Ok(pow_hash) => pow_hash, + Err(e) => { + warn!(target: "stratum", "submit_work ({}): invalid hash ({:?})", &payload[1], e); + return Err(PayloadError::InvalidPowHash(payload[1].clone())); + } + }; + + let mix_hash = match H256::from_str(clean_0x(&payload[2])) { + Ok(mix_hash) => mix_hash, + Err(e) => { + warn!(target: "stratum", "submit_work ({}): invalid mix-hash ({:?})", &payload[2], e); + return Err(PayloadError::InvalidMixHash(payload[2].clone())); + } + }; + + Ok(SubmitPayload { + nonce: nonce, + pow_hash: pow_hash, + mix_hash: mix_hash, + }) + } +} + +#[derive(Debug)] +enum PayloadError { + ArgumentsAmountUnexpected(usize), + InvalidNonce(String), + InvalidPowHash(String), + InvalidMixHash(String), +} + +impl fmt::Display for PayloadError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + fmt::Debug::fmt(&self, f) + } +} + +/// Job dispatcher for stratum service +pub struct StratumJobDispatcher { + seed_compute: Mutex, + client: Weak, + miner: Weak, +} + + +impl JobDispatcher for StratumJobDispatcher { + fn initial(&self) -> Option { + // initial payload may contain additional data, not in this case + self.job() + } + + fn job(&self) -> Option { + self.with_core(|client, miner| miner.map_sealing_work(&*client, |b| { + let pow_hash = b.hash(); + let number = b.block().header().number(); + let difficulty = b.block().header().difficulty(); + + self.payload(pow_hash, *difficulty, number) + }) + ) + } + + fn submit(&self, payload: Vec) -> Result<(), StratumServiceError> { + let payload = SubmitPayload::from_args(payload).map_err(|e| + StratumServiceError::Dispatch(format!("{}", e)) + )?; + + trace!( + target: "stratum", + "submit_work: Decoded: nonce={}, pow_hash={}, mix_hash={}", + payload.nonce, + payload.pow_hash, + payload.mix_hash, + ); + + self.with_core_void(|client, miner| { + let seal = vec![encode(&payload.mix_hash).to_vec(), encode(&payload.nonce).to_vec()]; + if let Err(e) = miner.submit_seal(&*client, payload.pow_hash, seal) { + warn!(target: "stratum", "submit_seal error: {:?}", e); + }; + }); + + Ok(()) + } +} + +impl StratumJobDispatcher { + /// New stratum job dispatcher given the miner and client + fn new(miner: Weak, client: Weak) -> StratumJobDispatcher { + StratumJobDispatcher { + seed_compute: Mutex::new(SeedHashCompute::new()), + client: client, + miner: miner, + } + } + + /// Serializes payload for stratum service + fn payload(&self, pow_hash: H256, difficulty: U256, number: u64) -> String { + // TODO: move this to engine + let target = Ethash::difficulty_to_boundary(&difficulty); + let seed_hash = &self.seed_compute.lock().get_seedhash(number); + let seed_hash = H256::from_slice(&seed_hash[..]); + format!( + r#"["0x", "0x{}","0x{}","0x{}","0x{:x}"]"#, + pow_hash.hex(), seed_hash.hex(), target.hex(), number + ) + } + + fn with_core(&self, f: F) -> Option where F: Fn(Arc, Arc) -> Option { + self.client.upgrade().and_then(|client| self.miner.upgrade().and_then(|miner| (f)(client, miner))) + } + + fn with_core_void(&self, f: F) where F: Fn(Arc, Arc) { + self.client.upgrade().map(|client| self.miner.upgrade().map(|miner| (f)(client, miner))); + } +} + +/// Wrapper for dedicated stratum service +pub struct Stratum { + dispatcher: Arc, + service: Arc, +} + +#[derive(Debug)] +/// Stratum error +pub enum Error { + /// IPC sockets error + Service(StratumServiceError), + /// Invalid network address + Address(AddrParseError), +} + +impl From for Error { + fn from(service_err: StratumServiceError) -> Error { Error::Service(service_err) } +} + +impl From for Error { + fn from(err: AddrParseError) -> Error { Error::Address(err) } +} + +impl super::work_notify::NotifyWork for Stratum { + fn notify(&self, pow_hash: H256, difficulty: U256, number: u64) { + self.service.push_work_all( + self.dispatcher.payload(pow_hash, difficulty, number) + ).unwrap_or_else( + |e| warn!(target: "stratum", "Error while pushing work: {:?}", e) + ); + } +} + +impl Stratum { + + /// New stratum job dispatcher, given the miner, client and dedicated stratum service + pub fn start(options: &Options, miner: Weak, client: Weak) -> Result { + use std::net::IpAddr; + + let dispatcher = Arc::new(StratumJobDispatcher::new(miner, client)); + + let stratum_svc = StratumService::start( + &SocketAddr::new(IpAddr::from_str(&options.listen_addr)?, options.port), + dispatcher.clone(), + options.secret.clone(), + )?; + + Ok(Stratum { + dispatcher: dispatcher, + service: stratum_svc, + }) + } + + /// Start STRATUM job dispatcher and register it in the miner + pub fn register(cfg: &Options, miner: Arc, client: Weak) -> Result<(), Error> { + let stratum = miner::Stratum::start(cfg, Arc::downgrade(&miner.clone()), client)?; + miner.push_notifier(Box::new(stratum) as Box); + Ok(()) + } +} diff --git a/ethcore/src/miner/work_notify.rs b/ethcore/src/miner/work_notify.rs index 1b2ce67b1..34e611202 100644 --- a/ethcore/src/miner/work_notify.rs +++ b/ethcore/src/miner/work_notify.rs @@ -26,6 +26,12 @@ use hyper::Url; use util::*; use ethereum::ethash::Ethash; +/// Trait for notifying about new mining work +pub trait NotifyWork : Send + Sync { + /// Fired when new mining job available + fn notify(&self, pow_hash: H256, difficulty: U256, number: u64); +} + pub struct WorkPoster { urls: Vec, client: Mutex>, @@ -57,8 +63,10 @@ impl WorkPoster { .build() .expect("Error creating HTTP client") } +} - pub fn notify(&self, pow_hash: H256, difficulty: U256, number: u64) { +impl NotifyWork for WorkPoster { + fn notify(&self, pow_hash: H256, difficulty: U256, number: u64) { // TODO: move this to engine let target = Ethash::difficulty_to_boundary(&difficulty); let seed_hash = &self.seed_compute.lock().get_seedhash(number); diff --git a/ethcore/src/service.rs b/ethcore/src/service.rs index d55a228e4..89960bec2 100644 --- a/ethcore/src/service.rs +++ b/ethcore/src/service.rs @@ -22,6 +22,7 @@ use spec::Spec; use error::*; use client::{Client, ClientConfig, ChainNotify}; use miner::Miner; + use snapshot::ManifestData; use snapshot::service::{Service as SnapshotService, ServiceParams as SnapServiceParams}; use std::sync::atomic::AtomicBool; diff --git a/parity/cli/mod.rs b/parity/cli/mod.rs index 7e978e840..59a4790df 100644 --- a/parity/cli/mod.rs +++ b/parity/cli/mod.rs @@ -233,6 +233,15 @@ usage! { flag_refuse_service_transactions: bool = false, or |c: &Config| otry!(c.mining).refuse_service_transactions.clone(), + flag_stratum: bool = false, + or |c: &Config| Some(c.stratum.is_some()), + flag_stratum_interface: String = "local", + or |c: &Config| otry!(c.stratum).interface.clone(), + flag_stratum_port: u16 = 8008u16, + or |c: &Config| otry!(c.stratum).port.clone(), + flag_stratum_secret: Option = None, + or |c: &Config| otry!(c.stratum).secret.clone().map(Some), + // -- Footprint Options flag_tracing: String = "auto", or |c: &Config| otry!(c.footprint).tracing.clone(), @@ -313,6 +322,7 @@ struct Config { snapshots: Option, vm: Option, misc: Option, + stratum: Option, } #[derive(Default, Debug, PartialEq, RustcDecodable)] @@ -421,6 +431,13 @@ struct Mining { refuse_service_transactions: Option, } +#[derive(Default, Debug, PartialEq, RustcDecodable)] +struct Stratum { + interface: Option, + port: Option, + secret: Option, +} + #[derive(Default, Debug, PartialEq, RustcDecodable)] struct Footprint { tracing: Option, @@ -638,6 +655,11 @@ mod tests { flag_notify_work: Some("http://localhost:3001".into()), flag_refuse_service_transactions: false, + flag_stratum: false, + flag_stratum_interface: "local".to_owned(), + flag_stratum_port: 8008u16, + flag_stratum_secret: None, + // -- Footprint Options flag_tracing: "auto".into(), flag_pruning: "auto".into(), @@ -843,7 +865,8 @@ mod tests { logging: Some("own_tx=trace".into()), log_file: Some("/var/log/parity.log".into()), color: Some(true), - }) + }), + stratum: None, }); } } diff --git a/parity/cli/usage.txt b/parity/cli/usage.txt index 6ec4808cb..18021174f 100644 --- a/parity/cli/usage.txt +++ b/parity/cli/usage.txt @@ -260,6 +260,11 @@ Sealing/Mining Options: (default: {flag_notify_work:?}) --refuse-service-transactions Always refuse service transactions. (default: {flag_refuse_service_transactions}). + --stratum Run Stratum server for miner push notification. (default: {flag_stratum}) + --stratum-interface IP Interface address for Stratum server. (default: {flag_stratum_interface}) + --stratum-port PORT Port for Stratum server to listen on. (default: {flag_stratum_port}) + --stratum-secret STRING Secret for authorizing Stratum server for peers. + (default: {flag_stratum_secret:?}) Footprint Options: --tracing BOOL Indicates if full transaction tracing should be diff --git a/parity/configuration.rs b/parity/configuration.rs index b86a3152d..03a65e941 100644 --- a/parity/configuration.rs +++ b/parity/configuration.rs @@ -20,12 +20,12 @@ use std::net::SocketAddr; use std::path::{Path, PathBuf}; use std::cmp::max; use cli::{Args, ArgsError}; -use util::{Hashable, U256, Uint, Bytes, version_data, Address}; +use util::{Hashable, H256, U256, Uint, Bytes, version_data, Address}; use util::log::Colour; use ethsync::{NetworkConfiguration, is_valid_node_url, AllowIP}; use ethcore::ethstore::ethkey::Secret; use ethcore::client::{VMType}; -use ethcore::miner::{MinerOptions, Banning}; +use ethcore::miner::{MinerOptions, Banning, StratumOptions}; use ethcore::verification::queue::VerifierSettings; use rpc::{IpcConfiguration, HttpConfiguration}; @@ -329,6 +329,7 @@ impl Configuration { acc_conf: self.accounts_config()?, gas_pricer: self.gas_pricer_config()?, miner_extras: self.miner_extras()?, + stratum: self.stratum_options()?, update_policy: update_policy, mode: mode, tracing: tracing, @@ -465,6 +466,17 @@ impl Configuration { Ok(cfg) } + fn stratum_options(&self) -> Result, String> { + if self.args.flag_stratum { + Ok(Some(StratumOptions { + io_path: self.directories().db, + listen_addr: self.stratum_interface(), + port: self.args.flag_stratum_port, + secret: self.args.flag_stratum_secret.as_ref().map(|s| s.parse::().unwrap_or_else(|_| s.sha3())), + })) + } else { Ok(None) } + } + fn miner_options(&self) -> Result { let reseal = self.args.flag_reseal_on_txs.parse::()?; @@ -827,6 +839,14 @@ impl Configuration { }.into() } + fn stratum_interface(&self) -> String { + match self.args.flag_stratum_interface.as_str() { + "local" => "127.0.0.1", + "all" => "0.0.0.0", + x => x, + }.into() + } + fn dapps_enabled(&self) -> bool { !self.args.flag_dapps_off && !self.args.flag_no_dapps && cfg!(feature = "dapps") } @@ -1086,6 +1106,7 @@ mod tests { custom_bootnodes: false, fat_db: Default::default(), no_periodic_snapshot: false, + stratum: None, check_seal: true, download_old_blocks: true, verifier_settings: Default::default(), diff --git a/parity/modules.rs b/parity/modules.rs index 83f955992..8617f5891 100644 --- a/parity/modules.rs +++ b/parity/modules.rs @@ -45,10 +45,7 @@ pub mod service_urls { pub const LIGHT_PROVIDER: &'static str = "parity-light-provider.ipc"; #[cfg(feature="stratum")] - pub const STRATUM: &'static str = "parity-stratum.ipc"; - #[cfg(feature="stratum")] - pub const MINING_JOB_DISPATCHER: &'static str = "parity-mining-jobs.ipc"; - + pub const STRATUM_CONTROL: &'static str = "parity-stratum-control.ipc"; pub fn with_base(data_dir: &str, service_path: &str) -> String { let mut path = PathBuf::from(data_dir); @@ -126,18 +123,35 @@ fn sync_arguments(io_path: &str, sync_cfg: SyncConfig, net_cfg: NetworkConfigura } #[cfg(feature="ipc")] -pub fn sync - ( - hypervisor_ref: &mut Option, - sync_cfg: SyncConfig, - net_cfg: NetworkConfiguration, - _client: Arc, - _snapshot_service: Arc, - _provider: Arc, - log_settings: &LogConfig, - ) - -> Result -{ +pub fn stratum( + hypervisor_ref: &mut Option, + config: &::ethcore::miner::StratumOptions +) { + use ethcore_stratum; + + let mut hypervisor = hypervisor_ref.take().expect("There should be hypervisor for ipc configuration"); + let args = BootArgs::new().stdin( + serialize(ðcore_stratum::ServiceConfiguration { + io_path: hypervisor.io_path.to_owned(), + port: config.port, + listen_addr: config.listen_addr.to_owned(), + secret: config.secret, + }).expect("Any binary-derived struct is serializable by definition") + ).cli(vec!["stratum".to_owned()]); + hypervisor = hypervisor.module(super::stratum::MODULE_ID, args); + *hypervisor_ref = Some(hypervisor); +} + +#[cfg(feature="ipc")] +pub fn sync( + hypervisor_ref: &mut Option, + sync_cfg: SyncConfig, + net_cfg: NetworkConfiguration, + _client: Arc, + _snapshot_service: Arc, + _provider: Arc, + log_settings: &LogConfig, +) -> Result { let mut hypervisor = hypervisor_ref.take().expect("There should be hypervisor for ipc configuration"); let args = sync_arguments(&hypervisor.io_path, sync_cfg, net_cfg, log_settings); hypervisor = hypervisor.module(SYNC_MODULE_ID, args); @@ -153,29 +167,26 @@ pub fn sync &service_urls::with_base(&hypervisor.io_path, service_urls::NETWORK_MANAGER)).unwrap(); let provider_client = generic_client::>( &service_urls::with_base(&hypervisor.io_path, service_urls::LIGHT_PROVIDER)).unwrap(); - + *hypervisor_ref = Some(hypervisor); Ok((sync_client, manage_client, notify_client)) } #[cfg(not(feature="ipc"))] -pub fn sync - ( - _hypervisor: &mut Option, - sync_cfg: SyncConfig, - net_cfg: NetworkConfiguration, - client: Arc, - snapshot_service: Arc, - provider: Arc, - _log_settings: &LogConfig, - ) - -> Result -{ +pub fn sync( + _hypervisor: &mut Option, + sync_cfg: SyncConfig, + net_cfg: NetworkConfiguration, + client: Arc, + snapshot_service: Arc, + provider: Arc, + _log_settings: &LogConfig, +) -> Result { let eth_sync = EthSync::new(Params { - config: sync_cfg, + config: sync_cfg, chain: client, provider: provider, - snapshot_service: snapshot_service, + snapshot_service: snapshot_service, network_config: net_cfg, })?; diff --git a/parity/run.rs b/parity/run.rs index ba4236bac..2b824b4d7 100644 --- a/parity/run.rs +++ b/parity/run.rs @@ -23,6 +23,7 @@ use ethsync::NetworkConfiguration; use util::{Colour, version, RotatingLogger, Mutex, Condvar}; use io::{MayPanic, ForwardPanic, PanicHandler}; use ethcore_logger::{Config as LogConfig}; +use ethcore::miner::{StratumOptions, Stratum}; use ethcore::client::{Mode, DatabaseCompactionProfile, VMType, BlockChainClient}; use ethcore::service::ClientService; use ethcore::account_provider::AccountProvider; @@ -97,6 +98,7 @@ pub struct RunCmd { pub ui: bool, pub name: String, pub custom_bootnodes: bool, + pub stratum: Option, pub no_periodic_snapshot: bool, pub check_seal: bool, pub download_old_blocks: bool, @@ -315,6 +317,12 @@ pub fn execute(cmd: RunCmd, can_restart: bool, logger: Arc) -> R // create external miner let external_miner = Arc::new(ExternalMiner::default()); + // start stratum + if let Some(ref stratum_config) = cmd.stratum { + Stratum::register(stratum_config, miner.clone(), Arc::downgrade(&client)) + .map_err(|e| format!("Stratum start error: {:?}", e))?; + } + // create sync object let (sync_provider, manage_network, chain_notify) = modules::sync( &mut hypervisor, diff --git a/parity/stratum.rs b/parity/stratum.rs index 1e510ed44..c47eeded4 100644 --- a/parity/stratum.rs +++ b/parity/stratum.rs @@ -16,42 +16,104 @@ //! Parity sync service -use std; use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; use ethcore_stratum::{Stratum as StratumServer, PushWorkHandler, RemoteJobDispatcher, ServiceConfiguration}; -use std::thread; use modules::service_urls; use boot; use hypervisor::service::IpcModuleId; -use std::net::SocketAddr; +use hypervisor::{HYPERVISOR_IPC_URL, ControlService}; +use std::net::{SocketAddr, IpAddr}; use std::str::FromStr; +use nanoipc; +use std::thread; +use ethcore::miner::stratum::{STRATUM_SOCKET_NAME, JOB_DISPATCHER_SOCKET_NAME}; -const STRATUM_MODULE_ID: IpcModuleId = 8000; +pub const MODULE_ID: IpcModuleId = 8000; + +#[derive(Default)] +struct StratumControlService { + pub stop: Arc, +} + +impl ControlService for StratumControlService { + fn shutdown(&self) -> bool { + trace!(target: "hypervisor", "Received shutdown from control service"); + self.stop.store(true, ::std::sync::atomic::Ordering::Relaxed); + true + } +} pub fn main() { boot::setup_cli_logger("stratum"); let service_config: ServiceConfiguration = boot::payload() - .unwrap_or_else(|e| panic!("Fatal: error reading boot arguments ({:?})", e)); + .unwrap_or_else(|e| { + println!("Fatal: error reading boot arguments ({:?})", e); + std::process::exit(1) + }); - let job_dispatcher = dependency!(RemoteJobDispatcher, service_urls::MINING_JOB_DISPATCHER); + let job_dispatcher = dependency!( + RemoteJobDispatcher, + &service_urls::with_base(&service_config.io_path, JOB_DISPATCHER_SOCKET_NAME) + ); + + let _ = boot::main_thread(); + let service_stop = Arc::new(AtomicBool::new(false)); - let stop = boot::main_thread(); let server = StratumServer::start( - &SocketAddr::from_str(&service_config.listen_addr) - .unwrap_or_else(|e| panic!("Fatal: invalid listen address ({:?})", e)), + &SocketAddr::new( + IpAddr::from_str(&service_config.listen_addr) + .unwrap_or_else(|e| + println!("Fatal: invalid listen address: '{}' ({:?})", &service_config.listen_addr, e); + std::process::exit(1) + ), + service_config.port, + ), job_dispatcher.service().clone(), service_config.secret ).unwrap_or_else( - |e| panic!("Fatal: cannot start stratum server({:?})", e) + |e| { + println!("Fatal: cannot start stratum server({:?})", e); + std::process::exit(1) + } ); - boot::host_service(service_urls::STRATUM, stop.clone(), server.clone() as Arc); + boot::host_service( + &service_urls::with_base(&service_config.io_path, STRATUM_SOCKET_NAME), + service_stop.clone(), + server.clone() as Arc + ); - let _ = boot::register(STRATUM_MODULE_ID); + let hypervisor = boot::register( + &service_urls::with_base(&service_config.io_path, HYPERVISOR_IPC_URL), + &service_urls::with_base(&service_config.io_path, service_urls::STRATUM_CONTROL), + MODULE_ID + ); - while !stop.load(::std::sync::atomic::Ordering::Relaxed) { - thread::park_timeout(std::time::Duration::from_millis(1000)); + let timer_svc = server.clone(); + let timer_stop = service_stop.clone(); + thread::spawn(move || { + while !timer_stop.load(Ordering::SeqCst) { + thread::park_timeout(::std::time::Duration::from_millis(2000)); + // It almost always not doing anything, only greets new peers with a job + timer_svc.maintain(); + } + }); + + let control_service = Arc::new(StratumControlService::default()); + let as_control = control_service.clone() as Arc; + let mut worker = nanoipc::Worker::::new(&as_control); + worker.add_reqrep( + &service_urls::with_base(&service_config.io_path, service_urls::STRATUM_CONTROL) + ).unwrap(); + + while !control_service.stop.load(Ordering::SeqCst) { + worker.poll(); } + service_stop.store(true, Ordering::SeqCst); + + hypervisor.module_shutdown(MODULE_ID); + trace!(target: "hypervisor", "Stratum process terminated gracefully"); } diff --git a/rpc/Cargo.toml b/rpc/Cargo.toml index 1c8c673f5..a11112032 100644 --- a/rpc/Cargo.toml +++ b/rpc/Cargo.toml @@ -17,10 +17,10 @@ serde_json = "0.8" rustc-serialize = "0.3" time = "0.1" transient-hashmap = "0.1" -jsonrpc-core = { git = "https://github.com/ethcore/jsonrpc.git", branch="mio" } -jsonrpc-http-server = { git = "https://github.com/ethcore/jsonrpc.git", branch="mio" } -jsonrpc-ipc-server = { git = "https://github.com/ethcore/jsonrpc.git", branch="mio" } -jsonrpc-macros = { git = "https://github.com/ethcore/jsonrpc.git", branch="mio" } +jsonrpc-core = { git = "https://github.com/ethcore/jsonrpc.git" } +jsonrpc-http-server = { git = "https://github.com/ethcore/jsonrpc.git" } +jsonrpc-ipc-server = { git = "https://github.com/ethcore/jsonrpc.git" } +jsonrpc-macros = { git = "https://github.com/ethcore/jsonrpc.git" } ethcore-io = { path = "../util/io" } ethcore-ipc = { path = "../ipc/rpc" } ethcore-util = { path = "../util" } diff --git a/rpc_client/Cargo.toml b/rpc_client/Cargo.toml index 16b513550..e5c8e9b12 100644 --- a/rpc_client/Cargo.toml +++ b/rpc_client/Cargo.toml @@ -16,7 +16,7 @@ serde = "0.8" serde_json = "0.8" tempdir = "0.3.5" url = "1.2.0" -jsonrpc-core = { git = "https://github.com/ethcore/jsonrpc.git", branch="mio" } +jsonrpc-core = { git = "https://github.com/ethcore/jsonrpc.git" } ws = { git = "https://github.com/ethcore/ws-rs.git", branch = "mio-upstream-stable" } ethcore-rpc = { path = "../rpc" } ethcore-signer = { path = "../signer" } diff --git a/signer/Cargo.toml b/signer/Cargo.toml index 66f996694..5a24c52bb 100644 --- a/signer/Cargo.toml +++ b/signer/Cargo.toml @@ -12,7 +12,7 @@ rustc_version = "0.1" [dependencies] rand = "0.3.14" -jsonrpc-core = { git = "https://github.com/ethcore/jsonrpc.git", branch="mio" } +jsonrpc-core = { git = "https://github.com/ethcore/jsonrpc.git" } log = "0.3" env_logger = "0.3" parity-dapps-glue = { version = "1.4", optional = true } diff --git a/stratum/Cargo.toml b/stratum/Cargo.toml index 76a5db3fd..65b2c0e9c 100644 --- a/stratum/Cargo.toml +++ b/stratum/Cargo.toml @@ -11,9 +11,9 @@ ethcore-ipc-codegen = { path = "../ipc/codegen" } [dependencies] log = "0.3" -jsonrpc-core = { git = "https://github.com/ethcore/jsonrpc.git", branch="mio" } -jsonrpc-macros = { git = "https://github.com/ethcore/jsonrpc.git", branch="mio" } -jsonrpc-tcp-server = { git = "https://github.com/ethcore/jsonrpc.git", branch="mio" } +jsonrpc-core = { git = "https://github.com/ethcore/jsonrpc.git" } +jsonrpc-macros = { git = "https://github.com/ethcore/jsonrpc.git" } +jsonrpc-tcp-server = { git = "https://github.com/ethcore/jsonrpc.git" } mio = { git = "https://github.com/ethcore/mio", branch = "v0.5.x" } ethcore-util = { path = "../util" } ethcore-devtools = { path = "../devtools" } @@ -22,6 +22,8 @@ env_logger = "0.3" ethcore-ipc = { path = "../ipc/rpc" } semver = "0.5" ethcore-ipc-nano = { path = "../ipc/nano" } +futures = "0.1" +tokio-core = "0.1" [profile.release] debug = true diff --git a/stratum/src/lib.rs b/stratum/src/lib.rs index 1ff0100b3..7481643a8 100644 --- a/stratum/src/lib.rs +++ b/stratum/src/lib.rs @@ -23,16 +23,14 @@ extern crate jsonrpc_macros; extern crate ethcore_util as util; extern crate ethcore_ipc as ipc; extern crate semver; +extern crate futures; -#[cfg(test)] -extern crate mio; -#[cfg(test)] +#[cfg(test)] extern crate tokio_core; extern crate ethcore_devtools as devtools; -#[cfg(test)] -extern crate env_logger; -#[cfg(test)] -#[macro_use] -extern crate lazy_static; +#[cfg(test)] extern crate env_logger; +#[cfg(test)] #[macro_use] extern crate lazy_static; + +use futures::{future, BoxFuture, Future}; mod traits { //! Stratum ipc interfaces specification @@ -45,8 +43,11 @@ pub use traits::{ RemoteWorkHandler, RemoteJobDispatcher, }; -use jsonrpc_tcp_server::Server as JsonRpcServer; -use jsonrpc_core::{IoHandler, Params, to_value}; +use jsonrpc_tcp_server::{ + Server as JsonRpcServer, RequestContext, MetaExtractor, Dispatcher, + PushMessageError +}; +use jsonrpc_core::{MetaIoHandler, Params, to_value, Value, Metadata, Compatibility}; use jsonrpc_macros::IoDelegate; use std::sync::Arc; @@ -54,25 +55,64 @@ use std::net::SocketAddr; use std::collections::{HashSet, HashMap}; use util::{H256, Hashable, RwLock, RwLockReadGuard}; -type RpcResult = Result; +type RpcResult = BoxFuture; struct StratumRpc { stratum: RwLock>>, } + impl StratumRpc { - fn subscribe(&self, params: Params) -> RpcResult { + fn subscribe(&self, params: Params, meta: SocketMetadata) -> RpcResult { self.stratum.read().as_ref().expect("RPC methods are called after stratum is set.") - .subscribe(params) + .subscribe(params, meta) } - fn authorize(&self, params: Params) -> RpcResult { + fn authorize(&self, params: Params, meta: SocketMetadata) -> RpcResult { self.stratum.read().as_ref().expect("RPC methods are called after stratum is set.") - .authorize(params) + .authorize(params, meta) + } + + fn submit(&self, params: Params, meta: SocketMetadata) -> RpcResult { + self.stratum.read().as_ref().expect("RPC methods are called after stratum is set.") + .submit(params, meta) + } +} + +#[derive(Clone)] +pub struct SocketMetadata { + addr: SocketAddr, +} + +impl Default for SocketMetadata { + fn default() -> Self { + SocketMetadata { addr: "0.0.0.0:0".parse().unwrap() } + } +} + +impl SocketMetadata { + pub fn addr(&self) -> &SocketAddr { + &self.addr + } +} + +impl Metadata for SocketMetadata { } + +impl From for SocketMetadata { + fn from(addr: SocketAddr) -> SocketMetadata { + SocketMetadata { addr: addr } + } +} + +pub struct PeerMetaExtractor; + +impl MetaExtractor for PeerMetaExtractor { + fn extract(&self, context: &RequestContext) -> SocketMetadata { + context.peer_addr.into() } } pub struct Stratum { - rpc_server: JsonRpcServer<()>, + rpc_server: JsonRpcServer, /// Subscribed clients subscribers: RwLock>, /// List of workers supposed to receive job update @@ -83,48 +123,92 @@ pub struct Stratum { workers: Arc>>, /// Secret if any secret: Option, + /// Dispatch notify couinter + notify_counter: RwLock, + /// Message dispatcher (tcp/ip service) + tcp_dispatcher: Dispatcher, } +const NOTIFY_COUNTER_INITIAL: u32 = 16; + impl Stratum { pub fn start( addr: &SocketAddr, dispatcher: Arc, secret: Option, - ) -> Result, jsonrpc_tcp_server::Error> { + ) -> Result, Error> { + let rpc = Arc::new(StratumRpc { stratum: RwLock::new(None), }); - let mut delegate = IoDelegate::::new(rpc.clone()); - delegate.add_method("miner.subscribe", StratumRpc::subscribe); - delegate.add_method("miner.authorize", StratumRpc::authorize); - - let mut handler = IoHandler::default(); + let mut delegate = IoDelegate::::new(rpc.clone()); + delegate.add_method_with_meta("mining.subscribe", StratumRpc::subscribe); + delegate.add_method_with_meta("mining.authorize", StratumRpc::authorize); + delegate.add_method_with_meta("mining.submit", StratumRpc::submit); + let mut handler = MetaIoHandler::::with_compatibility(Compatibility::Both); handler.extend_with(delegate); - let server = JsonRpcServer::new(addr, handler)?; + + let server = JsonRpcServer::new(addr.clone(), Arc::new(handler)) + .extractor(Arc::new(PeerMetaExtractor) as Arc>); + let stratum = Arc::new(Stratum { + tcp_dispatcher: server.dispatcher(), rpc_server: server, subscribers: RwLock::new(Vec::new()), job_que: RwLock::new(HashSet::new()), dispatcher: dispatcher, workers: Arc::new(RwLock::new(HashMap::new())), secret: secret, + notify_counter: RwLock::new(NOTIFY_COUNTER_INITIAL), }); *rpc.stratum.write() = Some(stratum.clone()); - stratum.rpc_server.run_async()?; + let running_stratum = stratum.clone(); + ::std::thread::spawn(move || running_stratum.rpc_server.run()); Ok(stratum) } - fn subscribe(&self, _params: Params) -> RpcResult { + fn update_peers(&self) { + if let Some(job) = self.dispatcher.job() { + if let Err(e) = self.push_work_all(job) { + warn!("Failed to update some of the peers: {:?}", e); + } + } + } + + fn submit(&self, params: Params, _meta: SocketMetadata) -> RpcResult { + future::ok(match params { + Params::Array(vals) => { + // first two elements are service messages (worker_id & job_id) + match self.dispatcher.submit(vals.iter().skip(2) + .filter_map(|val| match val { &Value::String(ref str) => Some(str.to_owned()), _ => None }) + .collect::>()) { + Ok(()) => { + self.update_peers(); + to_value(true) + }, + Err(submit_err) => { + warn!("Error while submitting share: {:?}", submit_err); + to_value(false) + } + } + }, + _ => { + trace!(target: "stratum", "Invalid submit work format {:?}", params); + to_value(false) + } + }).boxed() + } + + fn subscribe(&self, _params: Params, meta: SocketMetadata) -> RpcResult { use std::str::FromStr; - if let Some(context) = self.rpc_server.request_context() { - self.subscribers.write().push(context.socket_addr); - self.job_que.write().insert(context.socket_addr); - trace!(target: "stratum", "Subscription request from {:?}", context.socket_addr); - } - Ok(match self.dispatcher.initial() { + self.subscribers.write().push(meta.addr().clone()); + self.job_que.write().insert(meta.addr().clone()); + trace!(target: "stratum", "Subscription request from {:?}", meta.addr()); + + future::ok(match self.dispatcher.initial() { Some(initial) => match jsonrpc_core::Value::from_str(&initial) { Ok(val) => val, Err(e) => { @@ -133,26 +217,21 @@ impl Stratum { }, }, None => to_value(&[0u8; 0]), - }) + }).boxed() } - fn authorize(&self, params: Params) -> RpcResult { - params.parse::<(String, String)>().map(|(worker_id, secret)|{ + fn authorize(&self, params: Params, meta: SocketMetadata) -> RpcResult { + future::result(params.parse::<(String, String)>().map(|(worker_id, secret)|{ if let Some(valid_secret) = self.secret { let hash = secret.sha3(); if hash != valid_secret { return to_value(&false); } } - if let Some(context) = self.rpc_server.request_context() { - self.workers.write().insert(context.socket_addr, worker_id); - to_value(&true) - } - else { - warn!(target: "stratum", "Authorize without valid context received!"); - to_value(&false) - } - }) + trace!(target: "stratum", "New worker #{} registered", worker_id); + self.workers.write().insert(meta.addr().clone(), worker_id); + to_value(true) + })).boxed() } pub fn subscribers(&self) -> RwLockReadGuard> { @@ -161,31 +240,50 @@ impl Stratum { pub fn maintain(&self) { let mut job_que = self.job_que.write(); - let workers = self.workers.read(); + let job_payload = self.dispatcher.job(); for socket_addr in job_que.drain() { - if let Some(worker_id) = workers.get(&socket_addr) { - let job_payload = self.dispatcher.job(worker_id.to_owned()); - job_payload.map( - |json| self.rpc_server.push_message(&socket_addr, json.as_bytes()) - ); - } - else { - trace!( - target: "stratum", - "Job queued for worker that is still not authorized, skipping ('{:?}')", socket_addr - ); - } + job_payload.as_ref().map( + |json| self.tcp_dispatcher.push_message(&socket_addr, json.to_owned()) + ); } } } impl PushWorkHandler for Stratum { fn push_work_all(&self, payload: String) -> Result<(), Error> { - let workers = self.workers.read(); - println!("pushing work for {} workers", workers.len()); - for (ref addr, _) in workers.iter() { - self.rpc_server.push_message(addr, payload.as_bytes())?; + let hup_peers = { + let workers = self.workers.read(); + let next_request_id = { + let mut counter = self.notify_counter.write(); + if *counter == ::std::u32::MAX { *counter = NOTIFY_COUNTER_INITIAL; } + else { *counter = *counter + 1 } + *counter + }; + + let mut hup_peers = HashSet::with_capacity(0); // most of the cases won't be needed, hence avoid allocation + let workers_msg = format!("{{ \"id\": {}, \"method\": \"mining.notify\", \"params\": {} }}", next_request_id, payload); + trace!(target: "stratum", "pushing work for {} workers (payload: '{}')", workers.len(), &workers_msg); + for (ref addr, _) in workers.iter() { + trace!(target: "stratum", "pusing work to {}", addr); + match self.tcp_dispatcher.push_message(addr, workers_msg.clone()) { + Err(PushMessageError::NoSuchPeer) => { + trace!(target: "stratum", "Worker no longer connected: {}", &addr); + hup_peers.insert(*addr.clone()); + }, + Err(e) => { + warn!(target: "stratum", "Unexpected transport error: {:?}", e); + }, + Ok(_) => { }, + } + } + hup_peers + }; + + if !hup_peers.is_empty() { + let mut workers = self.workers.write(); + for hup_peer in hup_peers { workers.remove(&hup_peer); } } + Ok(()) } @@ -203,9 +301,9 @@ impl PushWorkHandler for Stratum { while que.len() > 0 { let next_worker = addrs[addr_index]; let mut next_payload = que.drain(0..1); - self.rpc_server.push_message( + self.tcp_dispatcher.push_message( next_worker, - next_payload.nth(0).expect("drained successfully of 0..1, so 0-th element should exist").as_bytes() + next_payload.nth(0).expect("drained successfully of 0..1, so 0-th element should exist") )?; addr_index = addr_index + 1; } @@ -218,12 +316,20 @@ mod tests { use super::*; use std::str::FromStr; use std::net::SocketAddr; - use std::sync::{Arc, RwLock}; - use std::thread; + use std::sync::Arc; + + use tokio_core::reactor::{Core, Timeout}; + use tokio_core::net::TcpStream; + use tokio_core::io; + use futures::{Future, future}; pub struct VoidManager; - impl JobDispatcher for VoidManager { } + impl JobDispatcher for VoidManager { + fn submit(&self, _payload: Vec) -> Result<(), Error> { + Ok(()) + } + } lazy_static! { static ref LOG_DUMMY: bool = { @@ -251,78 +357,42 @@ mod tests { let _ = *LOG_DUMMY; } - pub fn dummy_request(addr: &SocketAddr, buf: &[u8]) -> Vec { - use std::io::{Read, Write}; - use mio::*; - use mio::tcp::*; + fn dummy_request(addr: &SocketAddr, data: &str) -> Vec { + let mut core = Core::new().expect("Tokio Core should be created with no errors"); + let mut buffer = vec![0u8; 2048]; - let mut poll = Poll::new().unwrap(); - let mut sock = TcpStream::connect(addr).unwrap(); - poll.register(&sock, Token(0), EventSet::writable(), PollOpt::edge() | PollOpt::oneshot()).unwrap(); - poll.poll(Some(50)).unwrap(); - sock.write_all(buf).unwrap(); - poll.reregister(&sock, Token(0), EventSet::readable(), PollOpt::edge() | PollOpt::oneshot()).unwrap(); - poll.poll(Some(50)).unwrap(); + let mut data_vec = data.as_bytes().to_vec(); + data_vec.extend(b"\n"); - let mut buf = Vec::new(); - sock.read_to_end(&mut buf).unwrap_or_else(|_| { 0 }); - buf - } + let stream = TcpStream::connect(addr, &core.handle()) + .and_then(|stream| { + io::write_all(stream, &data_vec) + }) + .and_then(|(stream, _)| { + io::read(stream, &mut buffer) + }) + .and_then(|(_, read_buf, len)| { + future::ok(read_buf[0..len].to_vec()) + }); + let result = core.run(stream).expect("Core should run with no errors"); - pub fn dummy_async_waiter(addr: &SocketAddr, initial: Vec, result: Arc>>) -> ::devtools::StopGuard { - use std::io::{Read, Write}; - use mio::*; - use mio::tcp::*; - use std::sync::atomic::Ordering; - - let stop_guard = ::devtools::StopGuard::new(); - let collector = result.clone(); - let thread_stop = stop_guard.share(); - let socket_addr = addr.clone(); - thread::spawn(move || { - let mut poll = Poll::new().unwrap(); - let mut sock = TcpStream::connect(&socket_addr).unwrap(); - poll.register(&sock, Token(0), EventSet::writable(), PollOpt::edge() | PollOpt::oneshot()).unwrap(); - - for initial_req in initial { - poll.poll(Some(120)).unwrap(); - sock.write_all(initial_req.as_bytes()).unwrap(); - poll.reregister(&sock, Token(0), EventSet::readable(), PollOpt::edge() | PollOpt::oneshot()).unwrap(); - poll.poll(Some(120)).unwrap(); - - let mut buf = Vec::new(); - sock.read_to_end(&mut buf).unwrap_or_else(|_| { 0 }); - collector.write().unwrap().push(String::from_utf8(buf).unwrap()); - poll.reregister(&sock, Token(0), EventSet::writable(), PollOpt::edge() | PollOpt::oneshot()).unwrap(); - } - - while !thread_stop.load(Ordering::Relaxed) { - poll.reregister(&sock, Token(0), EventSet::readable(), PollOpt::edge() | PollOpt::oneshot()).unwrap(); - poll.poll(Some(120)).unwrap(); - - let mut buf = Vec::new(); - sock.read_to_end(&mut buf).unwrap_or_else(|_| { 0 }); - if buf.len() > 0 { - collector.write().unwrap().push(String::from_utf8(buf).unwrap()); - } - } - }); - - stop_guard + result } #[test] fn can_be_started() { - let stratum = Stratum::start(&SocketAddr::from_str("0.0.0.0:19980").unwrap(), Arc::new(VoidManager), None); + let stratum = Stratum::start(&SocketAddr::from_str("127.0.0.1:19980").unwrap(), Arc::new(VoidManager), None); assert!(stratum.is_ok()); } #[test] fn records_subscriber() { - let addr = SocketAddr::from_str("0.0.0.0:19985").unwrap(); + init_log(); + + let addr = SocketAddr::from_str("127.0.0.1:19985").unwrap(); let stratum = Stratum::start(&addr, Arc::new(VoidManager), None).unwrap(); - let request = r#"{"jsonrpc": "2.0", "method": "miner.subscribe", "params": [], "id": 1}"#; - dummy_request(&addr, request.as_bytes()); + let request = r#"{"jsonrpc": "2.0", "method": "mining.subscribe", "params": [], "id": 1}"#; + dummy_request(&addr, request); assert_eq!(1, stratum.subscribers.read().len()); } @@ -349,33 +419,43 @@ mod tests { fn initial(&self) -> Option { Some(self.initial_payload.clone()) } + + fn submit(&self, _payload: Vec) -> Result<(), Error> { + Ok(()) + } + } + + fn terminated_str(origin: &'static str) -> String { + let mut s = String::new(); + s.push_str(origin); + s.push_str("\n"); + s } #[test] fn receives_initial_paylaod() { - let addr = SocketAddr::from_str("0.0.0.0:19975").unwrap(); - Stratum::start(&addr, DummyManager::new(), None).unwrap(); - let request = r#"{"jsonrpc": "2.0", "method": "miner.subscribe", "params": [], "id": 1}"#; + let addr = SocketAddr::from_str("127.0.0.1:19975").unwrap(); + Stratum::start(&addr, DummyManager::new(), None).expect("There should be no error starting stratum"); + let request = r#"{"jsonrpc": "2.0", "method": "mining.subscribe", "params": [], "id": 2}"#; - let response = String::from_utf8(dummy_request(&addr, request.as_bytes())).unwrap(); + let response = String::from_utf8(dummy_request(&addr, request)).unwrap(); - assert_eq!(r#"{"jsonrpc":"2.0","result":["dummy payload"],"id":1}"#, response); + assert_eq!(terminated_str(r#"{"jsonrpc":"2.0","result":["dummy payload"],"id":2}"#), response); } #[test] fn can_authorize() { - let addr = SocketAddr::from_str("0.0.0.0:19970").unwrap(); + let addr = SocketAddr::from_str("127.0.0.1:19970").unwrap(); let stratum = Stratum::start( &addr, Arc::new(DummyManager::build().of_initial(r#"["dummy autorize payload"]"#)), None - ).unwrap(); + ).expect("There should be no error starting stratum"); - let request = r#"{"jsonrpc": "2.0", "method": "miner.authorize", "params": ["miner1", ""], "id": 1}"#; + let request = r#"{"jsonrpc": "2.0", "method": "mining.authorize", "params": ["miner1", ""], "id": 1}"#; + let response = String::from_utf8(dummy_request(&addr, request)).unwrap(); - let response = String::from_utf8(dummy_request(&addr, request.as_bytes())).unwrap(); - - assert_eq!(r#"{"jsonrpc":"2.0","result":true,"id":1}"#, response); + assert_eq!(terminated_str(r#"{"jsonrpc":"2.0","result":true,"id":1}"#), response); assert_eq!(1, stratum.workers.read().len()); } @@ -383,26 +463,57 @@ mod tests { fn can_push_work() { init_log(); - let addr = SocketAddr::from_str("0.0.0.0:19965").unwrap(); + let addr = SocketAddr::from_str("127.0.0.1:19995").unwrap(); let stratum = Stratum::start( &addr, - Arc::new(DummyManager::build().of_initial(r#"["dummy push request payload"]"#)), + Arc::new(DummyManager::build().of_initial(r#"["dummy autorize payload"]"#)), None - ).unwrap(); + ).expect("There should be no error starting stratum"); - let result = Arc::new(RwLock::new(Vec::::new())); - let _stop = dummy_async_waiter( - &addr, - vec![ - r#"{"jsonrpc": "2.0", "method": "miner.authorize", "params": ["miner1", ""], "id": 1}"#.to_owned(), - ], - result.clone(), - ); - ::std::thread::park_timeout(::std::time::Duration::from_millis(150)); + let mut auth_request = + r#"{"jsonrpc": "2.0", "method": "mining.authorize", "params": ["miner1", ""], "id": 1}"# + .as_bytes() + .to_vec(); + auth_request.extend(b"\n"); - stratum.push_work_all(r#"{ "00040008", "100500" }"#.to_owned()).unwrap(); - ::std::thread::park_timeout(::std::time::Duration::from_millis(150)); + let mut core = Core::new().expect("Tokio Core should be created with no errors"); + let timeout1 = Timeout::new(::std::time::Duration::from_millis(100), &core.handle()) + .expect("There should be a timeout produced in message test"); + let timeout2 = Timeout::new(::std::time::Duration::from_millis(100), &core.handle()) + .expect("There should be a timeout produced in message test"); + let mut buffer = vec![0u8; 2048]; + let mut buffer2 = vec![0u8; 2048]; + let stream = TcpStream::connect(&addr, &core.handle()) + .and_then(|stream| { + io::write_all(stream, &auth_request) + }) + .and_then(|(stream, _)| { + io::read(stream, &mut buffer) + }) + .and_then(|(stream, _, _)| { + trace!(target: "stratum", "Received authorization confirmation"); + timeout1.join(future::ok(stream)) + }) + .and_then(|(_, stream)| { + trace!(target: "stratum", "Pusing work to peers"); + stratum.push_work_all(r#"{ "00040008", "100500" }"#.to_owned()) + .expect("Pushing work should produce no errors"); + timeout2.join(future::ok(stream)) + }) + .and_then(|(_, stream)| { + trace!(target: "stratum", "Ready to read work from server"); + io::read(stream, &mut buffer2) + }) + .and_then(|(_, read_buf, len)| { + trace!(target: "stratum", "Received work from server"); + future::ok(read_buf[0..len].to_vec()) + }); + let response = String::from_utf8( + core.run(stream).expect("Core should run with no errors") + ).expect("Response should be utf-8"); - assert_eq!(2, result.read().unwrap().len()); + assert_eq!( + "{ \"id\": 17, \"method\": \"mining.notify\", \"params\": { \"00040008\", \"100500\" } }\n", + response); } } diff --git a/stratum/src/traits.rs b/stratum/src/traits.rs index 8f172bef2..d057bfc43 100644 --- a/stratum/src/traits.rs +++ b/stratum/src/traits.rs @@ -18,6 +18,7 @@ use std; use std::error::Error as StdError; use util::H256; use ipc::IpcConfig; +use jsonrpc_tcp_server::PushMessageError; #[derive(Debug, Clone)] #[binary] @@ -25,6 +26,8 @@ pub enum Error { NoWork, NoWorkers, Io(String), + Tcp(String), + Dispatch(String), } impl From for Error { @@ -33,6 +36,12 @@ impl From for Error { } } +impl From for Error { + fn from(err: PushMessageError) -> Self { + Error::Tcp(format!("Push message error: {:?}", err)) + } +} + #[ipc(client_ident="RemoteJobDispatcher")] /// Interface that can provide pow/blockchain-specific responses for the clients pub trait JobDispatcher: Send + Sync { @@ -41,7 +50,9 @@ pub trait JobDispatcher: Send + Sync { // json for difficulty dispatch fn difficulty(&self) -> Option { None } // json for job update given worker_id (payload manager should split job!) - fn job(&self, _worker_id: String) -> Option { None } + fn job(&self) -> Option { None } + // miner job result + fn submit(&self, payload: Vec) -> Result<(), Error>; } #[ipc(client_ident="RemoteWorkHandler")] @@ -56,7 +67,9 @@ pub trait PushWorkHandler: Send + Sync { #[binary] pub struct ServiceConfiguration { + pub io_path: String, pub listen_addr: String, + pub port: u16, pub secret: Option, }