Merge branch 'master' into jg-signer-api-queries-2

This commit is contained in:
Jaco Greeff 2016-11-23 12:32:21 +01:00
commit dddb8752f2
14 changed files with 716 additions and 209 deletions

2
Cargo.lock generated
View File

@ -1263,7 +1263,7 @@ dependencies = [
[[package]] [[package]]
name = "parity-ui-precompiled" name = "parity-ui-precompiled"
version = "1.4.0" version = "1.4.0"
source = "git+https://github.com/ethcore/js-precompiled.git#f8dba85e17d32860d4070fb444fa5cf13bcc6b3c" source = "git+https://github.com/ethcore/js-precompiled.git#583b6ae5b132ade23f94c3be4717f8ee5418f38d"
dependencies = [ dependencies = [
"parity-dapps-glue 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "parity-dapps-glue 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
] ]

View File

@ -580,29 +580,29 @@ impl Miner {
let gas_required = |tx: &SignedTransaction| tx.gas_required(&schedule).into(); let gas_required = |tx: &SignedTransaction| tx.gas_required(&schedule).into();
let best_block_header: Header = ::rlp::decode(&chain.best_block_header()); let best_block_header: Header = ::rlp::decode(&chain.best_block_header());
transactions.into_iter() transactions.into_iter()
.filter(|tx| match self.engine.verify_transaction_basic(tx, &best_block_header) { .map(|tx| {
Ok(()) => true, match self.engine.verify_transaction_basic(&tx, &best_block_header) {
Err(e) => { Err(e) => {
debug!(target: "miner", "Rejected tx {:?} with invalid signature: {:?}", tx.hash(), e); debug!(target: "miner", "Rejected tx {:?} with invalid signature: {:?}", tx.hash(), e);
false Err(e)
} },
} Ok(()) => {
) let origin = accounts.as_ref().and_then(|accounts| {
.map(|tx| { tx.sender().ok().and_then(|sender| match accounts.contains(&sender) {
let origin = accounts.as_ref().and_then(|accounts| { true => Some(TransactionOrigin::Local),
tx.sender().ok().and_then(|sender| match accounts.contains(&sender) { false => None,
true => Some(TransactionOrigin::Local), })
false => None, }).unwrap_or(default_origin);
})
}).unwrap_or(default_origin); match origin {
TransactionOrigin::Local | TransactionOrigin::RetractedBlock => {
match origin { transaction_queue.add(tx, origin, &fetch_account, &gas_required)
TransactionOrigin::Local | TransactionOrigin::RetractedBlock => { },
transaction_queue.add(tx, origin, &fetch_account, &gas_required) TransactionOrigin::External => {
transaction_queue.add_with_banlist(tx, &fetch_account, &gas_required)
}
}
}, },
TransactionOrigin::External => {
transaction_queue.add_with_banlist(tx, &fetch_account, &gas_required)
}
} }
}) })
.collect() .collect()

View File

@ -35,6 +35,9 @@ pub mod kind;
const MIN_MEM_LIMIT: usize = 16384; const MIN_MEM_LIMIT: usize = 16384;
const MIN_QUEUE_LIMIT: usize = 512; const MIN_QUEUE_LIMIT: usize = 512;
// maximum possible number of verification threads.
const MAX_VERIFIERS: usize = 8;
/// Type alias for block queue convenience. /// Type alias for block queue convenience.
pub type BlockQueue = VerificationQueue<self::kind::Blocks>; pub type BlockQueue = VerificationQueue<self::kind::Blocks>;
@ -61,6 +64,37 @@ impl Default for Config {
} }
} }
struct VerifierHandle {
deleting: Arc<AtomicBool>,
sleep: Arc<AtomicBool>,
thread: JoinHandle<()>,
}
impl VerifierHandle {
// signal to the verifier thread that it should sleep.
fn sleep(&self) {
self.sleep.store(true, AtomicOrdering::SeqCst);
}
// signal to the verifier thread that it should wake up.
fn wake_up(&self) {
self.sleep.store(false, AtomicOrdering::SeqCst);
self.thread.thread().unpark();
}
// signal to the verifier thread that it should conclude its
// operations.
fn conclude(&self) {
self.wake_up();
self.deleting.store(true, AtomicOrdering::Release);
}
// join the verifier thread.
fn join(self) {
self.thread.join().expect("Verifier thread panicked");
}
}
/// An item which is in the process of being verified. /// An item which is in the process of being verified.
pub struct Verifying<K: Kind> { pub struct Verifying<K: Kind> {
hash: H256, hash: H256,
@ -97,11 +131,12 @@ pub struct VerificationQueue<K: Kind> {
engine: Arc<Engine>, engine: Arc<Engine>,
more_to_verify: Arc<SCondvar>, more_to_verify: Arc<SCondvar>,
verification: Arc<Verification<K>>, verification: Arc<Verification<K>>,
verifiers: Vec<JoinHandle<()>>, verifiers: Mutex<(Vec<VerifierHandle>, usize)>,
deleting: Arc<AtomicBool>, deleting: Arc<AtomicBool>,
ready_signal: Arc<QueueSignal>, ready_signal: Arc<QueueSignal>,
empty: Arc<SCondvar>, empty: Arc<SCondvar>,
processing: RwLock<HashSet<H256>>, processing: RwLock<HashSet<H256>>,
ticks_since_adjustment: AtomicUsize,
max_queue_size: usize, max_queue_size: usize,
max_mem_use: usize, max_mem_use: usize,
} }
@ -157,6 +192,7 @@ struct Verification<K: Kind> {
more_to_verify: SMutex<()>, more_to_verify: SMutex<()>,
empty: SMutex<()>, empty: SMutex<()>,
sizes: Sizes, sizes: Sizes,
check_seal: bool,
} }
impl<K: Kind> VerificationQueue<K> { impl<K: Kind> VerificationQueue<K> {
@ -173,7 +209,8 @@ impl<K: Kind> VerificationQueue<K> {
unverified: AtomicUsize::new(0), unverified: AtomicUsize::new(0),
verifying: AtomicUsize::new(0), verifying: AtomicUsize::new(0),
verified: AtomicUsize::new(0), verified: AtomicUsize::new(0),
} },
check_seal: check_seal,
}); });
let more_to_verify = Arc::new(SCondvar::new()); let more_to_verify = Arc::new(SCondvar::new());
let deleting = Arc::new(AtomicBool::new(false)); let deleting = Arc::new(AtomicBool::new(false));
@ -185,44 +222,82 @@ impl<K: Kind> VerificationQueue<K> {
let empty = Arc::new(SCondvar::new()); let empty = Arc::new(SCondvar::new());
let panic_handler = PanicHandler::new_in_arc(); let panic_handler = PanicHandler::new_in_arc();
let mut verifiers: Vec<JoinHandle<()>> = Vec::new(); let max_verifiers = min(::num_cpus::get(), MAX_VERIFIERS);
let thread_count = max(::num_cpus::get(), 3) - 2; let default_amount = max(::num_cpus::get(), 3) - 2;
for i in 0..thread_count { let mut verifiers = Vec::with_capacity(max_verifiers);
let verification = verification.clone();
let engine = engine.clone(); debug!(target: "verification", "Allocating {} verifiers, {} initially active", max_verifiers, default_amount);
let more_to_verify = more_to_verify.clone();
let ready_signal = ready_signal.clone(); for i in 0..max_verifiers {
let empty = empty.clone(); debug!(target: "verification", "Adding verification thread #{}", i);
let deleting = deleting.clone(); let deleting = deleting.clone();
let panic_handler = panic_handler.clone(); let panic_handler = panic_handler.clone();
verifiers.push( let verification = verification.clone();
thread::Builder::new() let engine = engine.clone();
.name(format!("Verifier #{}", i)) let wait = more_to_verify.clone();
.spawn(move || { let ready = ready_signal.clone();
panic_handler.catch_panic(move || { let empty = empty.clone();
VerificationQueue::verify(verification, engine, more_to_verify, ready_signal, deleting, empty, check_seal)
}).unwrap() // enable only the first few verifiers.
}) let sleep = if i < default_amount {
.expect("Error starting block verification thread") Arc::new(AtomicBool::new(false))
); } else {
Arc::new(AtomicBool::new(true))
};
verifiers.push(VerifierHandle {
deleting: deleting.clone(),
sleep: sleep.clone(),
thread: thread::Builder::new()
.name(format!("Verifier #{}", i))
.spawn(move || {
panic_handler.catch_panic(move || {
VerificationQueue::verify(verification, engine, wait, ready, deleting, empty, sleep)
}).unwrap()
})
.expect("Failed to create verifier thread.")
});
} }
VerificationQueue { VerificationQueue {
engine: engine, engine: engine,
panic_handler: panic_handler, panic_handler: panic_handler,
ready_signal: ready_signal.clone(), ready_signal: ready_signal,
more_to_verify: more_to_verify.clone(), more_to_verify: more_to_verify,
verification: verification.clone(), verification: verification,
verifiers: verifiers, verifiers: Mutex::new((verifiers, default_amount)),
deleting: deleting.clone(), deleting: deleting,
processing: RwLock::new(HashSet::new()), processing: RwLock::new(HashSet::new()),
empty: empty.clone(), empty: empty,
ticks_since_adjustment: AtomicUsize::new(0),
max_queue_size: max(config.max_queue_size, MIN_QUEUE_LIMIT), max_queue_size: max(config.max_queue_size, MIN_QUEUE_LIMIT),
max_mem_use: max(config.max_mem_use, MIN_MEM_LIMIT), max_mem_use: max(config.max_mem_use, MIN_MEM_LIMIT),
} }
} }
fn verify(verification: Arc<Verification<K>>, engine: Arc<Engine>, wait: Arc<SCondvar>, ready: Arc<QueueSignal>, deleting: Arc<AtomicBool>, empty: Arc<SCondvar>, check_seal: bool) { fn verify(
verification: Arc<Verification<K>>,
engine: Arc<Engine>,
wait: Arc<SCondvar>,
ready: Arc<QueueSignal>,
deleting: Arc<AtomicBool>,
empty: Arc<SCondvar>,
sleep: Arc<AtomicBool>,
) {
while !deleting.load(AtomicOrdering::Acquire) { while !deleting.load(AtomicOrdering::Acquire) {
{
while sleep.load(AtomicOrdering::SeqCst) {
trace!(target: "verification", "Verifier sleeping");
::std::thread::park();
trace!(target: "verification", "Verifier waking up");
if deleting.load(AtomicOrdering::Acquire) {
return;
}
}
}
{ {
let mut more_to_verify = verification.more_to_verify.lock().unwrap(); let mut more_to_verify = verification.more_to_verify.lock().unwrap();
@ -255,7 +330,7 @@ impl<K: Kind> VerificationQueue<K> {
}; };
let hash = item.hash(); let hash = item.hash();
let is_ready = match K::verify(item, &*engine, check_seal) { let is_ready = match K::verify(item, &*engine, verification.check_seal) {
Ok(verified) => { Ok(verified) => {
let mut verifying = verification.verifying.lock(); let mut verifying = verification.verifying.lock();
let mut idx = None; let mut idx = None;
@ -302,9 +377,15 @@ impl<K: Kind> VerificationQueue<K> {
} }
} }
fn drain_verifying(verifying: &mut VecDeque<Verifying<K>>, verified: &mut VecDeque<K::Verified>, bad: &mut HashSet<H256>, sizes: &Sizes) { fn drain_verifying(
verifying: &mut VecDeque<Verifying<K>>,
verified: &mut VecDeque<K::Verified>,
bad: &mut HashSet<H256>,
sizes: &Sizes,
) {
let mut removed_size = 0; let mut removed_size = 0;
let mut inserted_size = 0; let mut inserted_size = 0;
while let Some(output) = verifying.front_mut().and_then(|x| x.output.take()) { while let Some(output) = verifying.front_mut().and_then(|x| x.output.take()) {
assert!(verifying.pop_front().is_some()); assert!(verifying.pop_front().is_some());
let size = output.heap_size_of_children(); let size = output.heap_size_of_children();
@ -487,14 +568,85 @@ impl<K: Kind> VerificationQueue<K> {
} }
} }
/// Optimise memory footprint of the heap fields. /// Optimise memory footprint of the heap fields, and adjust the number of threads
/// to better suit the workload.
pub fn collect_garbage(&self) { pub fn collect_garbage(&self) {
{ // number of ticks to average queue stats over
self.verification.unverified.lock().shrink_to_fit(); // when deciding whether to change the number of verifiers.
#[cfg(not(test))]
const READJUSTMENT_PERIOD: usize = 12;
#[cfg(test)]
const READJUSTMENT_PERIOD: usize = 1;
let (u_len, v_len) = {
let u_len = {
let mut q = self.verification.unverified.lock();
q.shrink_to_fit();
q.len()
};
self.verification.verifying.lock().shrink_to_fit(); self.verification.verifying.lock().shrink_to_fit();
self.verification.verified.lock().shrink_to_fit();
} let v_len = {
let mut q = self.verification.verified.lock();
q.shrink_to_fit();
q.len()
};
(u_len as isize, v_len as isize)
};
self.processing.write().shrink_to_fit(); self.processing.write().shrink_to_fit();
if self.ticks_since_adjustment.fetch_add(1, AtomicOrdering::SeqCst) + 1 >= READJUSTMENT_PERIOD {
self.ticks_since_adjustment.store(0, AtomicOrdering::SeqCst);
} else {
return;
}
let current = self.verifiers.lock().1;
let diff = (v_len - u_len).abs();
let total = v_len + u_len;
self.scale_verifiers(
if u_len < 20 {
1
} else if diff <= total / 10 {
current
} else if v_len > u_len {
current - 1
} else {
current + 1
}
);
}
// wake up or sleep verifiers to get as close to the target as
// possible, never going over the amount of initially allocated threads
// or below 1.
fn scale_verifiers(&self, target: usize) {
let mut verifiers = self.verifiers.lock();
let &mut (ref mut verifiers, ref mut verifier_count) = &mut *verifiers;
let target = min(verifiers.len(), target);
let target = max(1, target);
debug!(target: "verification", "Scaling from {} to {} verifiers", verifier_count, target);
// scaling up
for i in *verifier_count..target {
debug!(target: "verification", "Waking up verifier {}", i);
verifiers[i].wake_up();
}
// scaling down.
for i in target..*verifier_count {
debug!(target: "verification", "Putting verifier {} to sleep", i);
verifiers[i].sleep();
}
*verifier_count = target;
} }
} }
@ -509,10 +661,23 @@ impl<K: Kind> Drop for VerificationQueue<K> {
trace!(target: "shutdown", "[VerificationQueue] Closing..."); trace!(target: "shutdown", "[VerificationQueue] Closing...");
self.clear(); self.clear();
self.deleting.store(true, AtomicOrdering::Release); self.deleting.store(true, AtomicOrdering::Release);
self.more_to_verify.notify_all();
for t in self.verifiers.drain(..) { let mut verifiers = self.verifiers.get_mut();
t.join().unwrap(); let mut verifiers = &mut verifiers.0;
// first pass to signal conclusion. must be done before
// notify or deadlock possible.
for handle in verifiers.iter() {
handle.conclude();
} }
self.more_to_verify.notify_all();
// second pass to join.
for handle in verifiers.drain(..) {
handle.join();
}
trace!(target: "shutdown", "[VerificationQueue] Closed."); trace!(target: "shutdown", "[VerificationQueue] Closed.");
} }
} }
@ -611,4 +776,56 @@ mod tests {
} }
assert!(queue.queue_info().is_full()); assert!(queue.queue_info().is_full());
} }
#[test]
fn scaling_limits() {
use super::MAX_VERIFIERS;
let queue = get_test_queue();
queue.scale_verifiers(MAX_VERIFIERS + 1);
assert!(queue.verifiers.lock().1 < MAX_VERIFIERS + 1);
queue.scale_verifiers(0);
assert!(queue.verifiers.lock().1 == 1);
}
#[test]
fn readjust_verifiers() {
let queue = get_test_queue();
// put all the verifiers to sleep to ensure
// the test isn't timing sensitive.
let num_verifiers = {
let verifiers = queue.verifiers.lock();
for i in 0..verifiers.1 {
verifiers.0[i].sleep();
}
verifiers.1
};
for block in get_good_dummy_block_seq(5000) {
queue.import(Unverified::new(block)).expect("Block good by definition; qed");
}
// almost all unverified == bump verifier count.
queue.collect_garbage();
assert_eq!(queue.verifiers.lock().1, num_verifiers + 1);
// wake them up again and verify everything.
{
let verifiers = queue.verifiers.lock();
for i in 0..verifiers.1 {
verifiers.0[i].wake_up();
}
}
queue.flush();
// nothing to verify == use minimum number of verifiers.
queue.collect_garbage();
assert_eq!(queue.verifiers.lock().1, 1);
}
} }

View File

@ -1,6 +1,6 @@
{ {
"name": "parity.js", "name": "parity.js",
"version": "0.2.63", "version": "0.2.65",
"main": "release/index.js", "main": "release/index.js",
"jsnext:main": "src/index.js", "jsnext:main": "src/index.js",
"author": "Parity Team <admin@parity.io>", "author": "Parity Team <admin@parity.io>",

View File

@ -15,7 +15,8 @@
// along with Parity. If not, see <http://www.gnu.org/licenses/>. // along with Parity. If not, see <http://www.gnu.org/licenses/>.
export default function (rpc) { export default function (rpc) {
const subscriptions = []; let subscriptions = [];
let pollStatusIntervalId = null;
function getCoins () { function getCoins () {
return rpc.get('getcoins'); return rpc.get('getcoins');
@ -45,6 +46,24 @@ export default function (rpc) {
callback, callback,
idx idx
}); });
// Only poll if there are subscriptions...
if (!pollStatusIntervalId) {
pollStatusIntervalId = setInterval(_pollStatus, 2000);
}
}
function unsubscribe (depositAddress) {
const newSubscriptions = []
.concat(subscriptions)
.filter((sub) => sub.depositAddress !== depositAddress);
subscriptions = newSubscriptions;
if (subscriptions.length === 0) {
clearInterval(pollStatusIntervalId);
pollStatusIntervalId = null;
}
} }
function _getSubscriptionStatus (subscription) { function _getSubscriptionStatus (subscription) {
@ -81,13 +100,12 @@ export default function (rpc) {
subscriptions.forEach(_getSubscriptionStatus); subscriptions.forEach(_getSubscriptionStatus);
} }
setInterval(_pollStatus, 2000);
return { return {
getCoins, getCoins,
getMarketInfo, getMarketInfo,
getStatus, getStatus,
shift, shift,
subscribe subscribe,
unsubscribe
}; };
} }

View File

@ -48,7 +48,11 @@ export default class Contract {
this._instance[fn.signature] = fn; this._instance[fn.signature] = fn;
}); });
this._sendSubscriptionChanges(); this._subscribedToPendings = false;
this._pendingsSubscriptionId = null;
this._subscribedToBlock = false;
this._blockSubscriptionId = null;
} }
get address () { get address () {
@ -239,44 +243,71 @@ export default class Contract {
return event; return event;
} }
subscribe (eventName = null, options = {}, callback) { _findEvent (eventName = null) {
return new Promise((resolve, reject) => { const event = eventName
let event = null; ? this._events.find((evt) => evt.name === eventName)
: null;
if (eventName) { if (eventName && !event) {
event = this._events.find((evt) => evt.name === eventName); const events = this._events.map((evt) => evt.name).join(', ');
throw new Error(`${eventName} is not a valid eventName, subscribe using one of ${events} (or null to include all)`);
}
if (!event) { return event;
const events = this._events.map((evt) => evt.name).join(', '); }
reject(new Error(`${eventName} is not a valid eventName, subscribe using one of ${events} (or null to include all)`));
return;
}
}
return this._subscribe(event, options, callback).then(resolve).catch(reject); _createEthFilter (event = null, _options) {
const optionTopics = _options.topics || [];
const signature = event && event.signature || null;
// If event provided, remove the potential event signature
// as the first element of the topics
const topics = signature
? [ signature ].concat(optionTopics.filter((t, idx) => idx > 0 || t !== signature))
: optionTopics;
const options = Object.assign({}, _options, {
address: this._address,
topics
}); });
return this._api.eth.newFilter(options);
}
subscribe (eventName = null, options = {}, callback) {
try {
const event = this._findEvent(eventName);
return this._subscribe(event, options, callback);
} catch (e) {
return Promise.reject(e);
}
} }
_subscribe (event = null, _options, callback) { _subscribe (event = null, _options, callback) {
const subscriptionId = nextSubscriptionId++; const subscriptionId = nextSubscriptionId++;
const options = Object.assign({}, _options, { const { skipInitFetch } = _options;
address: this._address, delete _options['skipInitFetch'];
topics: [event ? event.signature : null]
});
return this._api.eth return this
.newFilter(options) ._createEthFilter(event, _options)
.then((filterId) => { .then((filterId) => {
this._subscriptions[subscriptionId] = {
options: _options,
callback,
filterId
};
if (skipInitFetch) {
this._subscribeToChanges();
return subscriptionId;
}
return this._api.eth return this._api.eth
.getFilterLogs(filterId) .getFilterLogs(filterId)
.then((logs) => { .then((logs) => {
callback(null, this.parseEventLogs(logs)); callback(null, this.parseEventLogs(logs));
this._subscriptions[subscriptionId] = {
options,
callback,
filterId
};
this._subscribeToChanges();
return subscriptionId; return subscriptionId;
}); });
}); });
@ -285,19 +316,89 @@ export default class Contract {
unsubscribe (subscriptionId) { unsubscribe (subscriptionId) {
return this._api.eth return this._api.eth
.uninstallFilter(this._subscriptions[subscriptionId].filterId) .uninstallFilter(this._subscriptions[subscriptionId].filterId)
.then(() => {
delete this._subscriptions[subscriptionId];
})
.catch((error) => { .catch((error) => {
console.error('unsubscribe', error); console.error('unsubscribe', error);
})
.then(() => {
delete this._subscriptions[subscriptionId];
this._unsubscribeFromChanges();
}); });
} }
_sendSubscriptionChanges = () => { _subscribeToChanges = () => {
const subscriptions = Object.values(this._subscriptions); const subscriptions = Object.values(this._subscriptions);
const timeout = () => setTimeout(this._sendSubscriptionChanges, 1000);
Promise const pendingSubscriptions = subscriptions
.filter((s) => s.options.toBlock && s.options.toBlock === 'pending');
const otherSubscriptions = subscriptions
.filter((s) => !(s.options.toBlock && s.options.toBlock === 'pending'));
if (pendingSubscriptions.length > 0 && !this._subscribedToPendings) {
this._subscribedToPendings = true;
this._subscribeToPendings();
}
if (otherSubscriptions.length > 0 && !this._subscribedToBlock) {
this._subscribedToBlock = true;
this._subscribeToBlock();
}
}
_unsubscribeFromChanges = () => {
const subscriptions = Object.values(this._subscriptions);
const pendingSubscriptions = subscriptions
.filter((s) => s.options.toBlock && s.options.toBlock === 'pending');
const otherSubscriptions = subscriptions
.filter((s) => !(s.options.toBlock && s.options.toBlock === 'pending'));
if (pendingSubscriptions.length === 0 && this._subscribedToPendings) {
this._subscribedToPendings = false;
clearTimeout(this._pendingsSubscriptionId);
}
if (otherSubscriptions.length === 0 && this._subscribedToBlock) {
this._subscribedToBlock = false;
this._api.unsubscribe(this._blockSubscriptionId);
}
}
_subscribeToBlock = () => {
this._api
.subscribe('eth_blockNumber', (error) => {
if (error) {
console.error('::_subscribeToBlock', error, error && error.stack);
}
const subscriptions = Object.values(this._subscriptions)
.filter((s) => !(s.options.toBlock && s.options.toBlock === 'pending'));
this._sendSubscriptionChanges(subscriptions);
})
.then((blockSubId) => {
this._blockSubscriptionId = blockSubId;
})
.catch((e) => {
console.error('::_subscribeToBlock', e, e && e.stack);
});
}
_subscribeToPendings = () => {
const subscriptions = Object.values(this._subscriptions)
.filter((s) => s.options.toBlock && s.options.toBlock === 'pending');
const timeout = () => setTimeout(() => this._subscribeFromPendings(), 1000);
this._sendSubscriptionChanges(subscriptions)
.then(() => {
this._pendingsSubscriptionId = timeout();
});
}
_sendSubscriptionChanges = (subscriptions) => {
return Promise
.all( .all(
subscriptions.map((subscription) => { subscriptions.map((subscription) => {
return this._api.eth.getFilterChanges(subscription.filterId); return this._api.eth.getFilterChanges(subscription.filterId);
@ -315,12 +416,9 @@ export default class Contract {
console.error('_sendSubscriptionChanges', error); console.error('_sendSubscriptionChanges', error);
} }
}); });
timeout();
}) })
.catch((error) => { .catch((error) => {
console.error('_sendSubscriptionChanges', error); console.error('_sendSubscriptionChanges', error);
timeout();
}); });
} }
} }

View File

@ -437,6 +437,7 @@ describe('api/contract/Contract', () => {
] ]
} }
]; ];
const logs = [{ const logs = [{
address: '0x22bff18ec62281850546a664bb63a5c06ac5f76c', address: '0x22bff18ec62281850546a664bb63a5c06ac5f76c',
blockHash: '0xa9280530a3b47bee2fc80f2862fd56502ae075350571d724d6442ea4c597347b', blockHash: '0xa9280530a3b47bee2fc80f2862fd56502ae075350571d724d6442ea4c597347b',
@ -450,6 +451,7 @@ describe('api/contract/Contract', () => {
transactionHash: '0xca16f537d761d13e4e80953b754e2b15541f267d6cad9381f750af1bae1e4917', transactionHash: '0xca16f537d761d13e4e80953b754e2b15541f267d6cad9381f750af1bae1e4917',
transactionIndex: '0x0' transactionIndex: '0x0'
}]; }];
const parsed = [{ const parsed = [{
address: '0x22bfF18ec62281850546a664bb63a5C06AC5F76C', address: '0x22bfF18ec62281850546a664bb63a5C06AC5F76C',
blockHash: '0xa9280530a3b47bee2fc80f2862fd56502ae075350571d724d6442ea4c597347b', blockHash: '0xa9280530a3b47bee2fc80f2862fd56502ae075350571d724d6442ea4c597347b',
@ -466,11 +468,13 @@ describe('api/contract/Contract', () => {
sender: { type: 'address', value: '0x63Cf90D3f0410092FC0fca41846f596223979195' } sender: { type: 'address', value: '0x63Cf90D3f0410092FC0fca41846f596223979195' }
}, },
topics: [ topics: [
'0x954ba6c157daf8a26539574ffa64203c044691aa57251af95f4b48d85ec00dd5', '0x0000000000000000000000000000000000000000000000000001000000004fe0' '0x954ba6c157daf8a26539574ffa64203c044691aa57251af95f4b48d85ec00dd5',
'0x0000000000000000000000000000000000000000000000000001000000004fe0'
], ],
transactionHash: '0xca16f537d761d13e4e80953b754e2b15541f267d6cad9381f750af1bae1e4917', transactionHash: '0xca16f537d761d13e4e80953b754e2b15541f267d6cad9381f750af1bae1e4917',
transactionIndex: new BigNumber(0) transactionIndex: new BigNumber(0)
}]; }];
let contract; let contract;
beforeEach(() => { beforeEach(() => {
@ -496,18 +500,19 @@ describe('api/contract/Contract', () => {
scope = mockHttp([ scope = mockHttp([
{ method: 'eth_newFilter', reply: { result: '0x123' } }, { method: 'eth_newFilter', reply: { result: '0x123' } },
{ method: 'eth_getFilterLogs', reply: { result: logs } }, { method: 'eth_getFilterLogs', reply: { result: logs } },
{ method: 'eth_getFilterChanges', reply: { result: logs } },
{ method: 'eth_newFilter', reply: { result: '0x123' } }, { method: 'eth_newFilter', reply: { result: '0x123' } },
{ method: 'eth_getFilterLogs', reply: { result: logs } } { method: 'eth_getFilterLogs', reply: { result: logs } }
]); ]);
cbb = sinon.stub(); cbb = sinon.stub();
cbe = sinon.stub(); cbe = sinon.stub();
return contract.subscribe('Message', {}, cbb); return contract.subscribe('Message', { toBlock: 'pending' }, cbb);
}); });
it('sets the subscriptionId returned', () => { it('sets the subscriptionId returned', () => {
return contract return contract
.subscribe('Message', {}, cbe) .subscribe('Message', { toBlock: 'pending' }, cbe)
.then((subscriptionId) => { .then((subscriptionId) => {
expect(subscriptionId).to.equal(1); expect(subscriptionId).to.equal(1);
}); });
@ -515,7 +520,7 @@ describe('api/contract/Contract', () => {
it('creates a new filter and retrieves the logs on it', () => { it('creates a new filter and retrieves the logs on it', () => {
return contract return contract
.subscribe('Message', {}, cbe) .subscribe('Message', { toBlock: 'pending' }, cbe)
.then((subscriptionId) => { .then((subscriptionId) => {
expect(scope.isDone()).to.be.true; expect(scope.isDone()).to.be.true;
}); });
@ -523,7 +528,7 @@ describe('api/contract/Contract', () => {
it('returns the logs to the callback', () => { it('returns the logs to the callback', () => {
return contract return contract
.subscribe('Message', {}, cbe) .subscribe('Message', { toBlock: 'pending' }, cbe)
.then((subscriptionId) => { .then((subscriptionId) => {
expect(cbe).to.have.been.calledWith(null, parsed); expect(cbe).to.have.been.calledWith(null, parsed);
}); });

View File

@ -15,6 +15,7 @@
// along with Parity. If not, see <http://www.gnu.org/licenses/>. // along with Parity. If not, see <http://www.gnu.org/licenses/>.
import BigNumber from 'bignumber.js'; import BigNumber from 'bignumber.js';
import { range } from 'lodash';
import { isArray, isHex, isInstanceOf, isString } from '../util/types'; import { isArray, isHex, isInstanceOf, isString } from '../util/types';
@ -50,14 +51,19 @@ export function inHash (hash) {
return inHex(hash); return inHex(hash);
} }
export function pad (input, length) {
const value = inHex(input).substr(2, length * 2);
return '0x' + value + range(length * 2 - value.length).map(() => '0').join('');
}
export function inTopics (_topics) { export function inTopics (_topics) {
let topics = (_topics || []) let topics = (_topics || [])
.filter((topic) => topic) .filter((topic) => topic === null || topic)
.map(inHex); .map((topic) => topic === null ? null : pad(topic, 32));
while (topics.length < 4) { // while (topics.length < 4) {
topics.push(null); // topics.push(null);
} // }
return topics; return topics;
} }

View File

@ -29,3 +29,7 @@ export function hex2Ascii (_hex) {
return str; return str;
} }
export function asciiToHex (string) {
return '0x' + string.split('').map((s) => s.charCodeAt(0).toString(16)).join('');
}

View File

@ -16,7 +16,7 @@
import { isAddress as isAddressValid, toChecksumAddress } from '../../abi/util/address'; import { isAddress as isAddressValid, toChecksumAddress } from '../../abi/util/address';
import { decodeCallData, decodeMethodInput, methodToAbi } from './decode'; import { decodeCallData, decodeMethodInput, methodToAbi } from './decode';
import { bytesToHex, hex2Ascii } from './format'; import { bytesToHex, hex2Ascii, asciiToHex } from './format';
import { fromWei, toWei } from './wei'; import { fromWei, toWei } from './wei';
import { sha3 } from './sha3'; import { sha3 } from './sha3';
import { isArray, isFunction, isHex, isInstanceOf, isString } from './types'; import { isArray, isFunction, isHex, isInstanceOf, isString } from './types';
@ -31,6 +31,7 @@ export default {
isString, isString,
bytesToHex, bytesToHex,
hex2Ascii, hex2Ascii,
asciiToHex,
createIdentityImg, createIdentityImg,
decodeCallData, decodeCallData,
decodeMethodInput, decodeMethodInput,

View File

@ -19,17 +19,34 @@ export const checkIfVerified = (contract, account) => {
}; };
export const checkIfRequested = (contract, account) => { export const checkIfRequested = (contract, account) => {
let subId = null;
let resolved = false;
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
contract.subscribe('Requested', { contract
fromBlock: 0, toBlock: 'pending' .subscribe('Requested', {
}, (err, logs) => { fromBlock: 0, toBlock: 'pending'
if (err) { }, (err, logs) => {
return reject(err); if (err) {
} return reject(err);
const e = logs.find((l) => { }
return l.type === 'mined' && l.params.who && l.params.who.value === account; const e = logs.find((l) => {
return l.type === 'mined' && l.params.who && l.params.who.value === account;
});
resolve(e ? e.transactionHash : false);
resolved = true;
if (subId) {
contract.unsubscribe(subId);
}
})
.then((_subId) => {
subId = _subId;
if (resolved) {
contract.unsubscribe(subId);
}
}); });
resolve(e ? e.transactionHash : false);
});
}); });
}; };

View File

@ -63,6 +63,16 @@ export default class Shapeshift extends Component {
this.retrieveCoins(); this.retrieveCoins();
} }
componentWillUnmount () {
this.unsubscribe();
}
unsubscribe () {
// Unsubscribe from Shapeshit
const { depositAddress } = this.state;
shapeshift.unsubscribe(depositAddress);
}
render () { render () {
const { error, stage } = this.state; const { error, stage } = this.state;
@ -205,6 +215,10 @@ export default class Shapeshift extends Component {
console.log('onShift', result); console.log('onShift', result);
const depositAddress = result.deposit; const depositAddress = result.deposit;
if (this.state.depositAddress) {
this.unsubscribe();
}
shapeshift.subscribe(depositAddress, this.onExchangeInfo); shapeshift.subscribe(depositAddress, this.onExchangeInfo);
this.setState({ depositAddress }); this.setState({ depositAddress });
}) })

View File

@ -31,13 +31,23 @@ export default class Balances {
constructor (store, api) { constructor (store, api) {
this._api = api; this._api = api;
this._store = store; this._store = store;
this._tokens = {};
this._images = {};
this._accountsInfo = null; this._accountsInfo = null;
this._tokens = []; this._tokenreg = null;
this._fetchingTokens = false;
this._fetchedTokens = false;
this._tokenregSubId = null;
this._tokenregMetaSubId = null;
} }
start () { start () {
this._subscribeBlockNumber(); this._subscribeBlockNumber();
this._subscribeAccountsInfo(); this._subscribeAccountsInfo();
this._retrieveTokens();
} }
_subscribeAccountsInfo () { _subscribeAccountsInfo () {
@ -48,10 +58,7 @@ export default class Balances {
} }
this._accountsInfo = accountsInfo; this._accountsInfo = accountsInfo;
this._retrieveBalances(); this._retrieveTokens();
})
.then((subscriptionId) => {
console.log('_subscribeAccountsInfo', 'subscriptionId', subscriptionId);
}) })
.catch((error) => { .catch((error) => {
console.warn('_subscribeAccountsInfo', error); console.warn('_subscribeAccountsInfo', error);
@ -62,21 +69,22 @@ export default class Balances {
this._api this._api
.subscribe('eth_blockNumber', (error) => { .subscribe('eth_blockNumber', (error) => {
if (error) { if (error) {
return; return console.warn('_subscribeBlockNumber', error);
} }
this._retrieveTokens(); this._retrieveTokens();
}) })
.then((subscriptionId) => {
console.log('_subscribeBlockNumber', 'subscriptionId', subscriptionId);
})
.catch((error) => { .catch((error) => {
console.warn('_subscribeBlockNumber', error); console.warn('_subscribeBlockNumber', error);
}); });
} }
_retrieveTokens () { getTokenRegistry () {
this._api.parity if (this._tokenreg) {
return Promise.resolve(this._tokenreg);
}
return this._api.parity
.registryAddress() .registryAddress()
.then((registryAddress) => { .then((registryAddress) => {
const registry = this._api.newContract(abis.registry, registryAddress); const registry = this._api.newContract(abis.registry, registryAddress);
@ -85,60 +93,49 @@ export default class Balances {
}) })
.then((tokenregAddress) => { .then((tokenregAddress) => {
const tokenreg = this._api.newContract(abis.tokenreg, tokenregAddress); const tokenreg = this._api.newContract(abis.tokenreg, tokenregAddress);
this._tokenreg = tokenreg;
this.attachToTokens();
return tokenreg;
});
}
_retrieveTokens () {
if (this._fetchingTokens) {
return;
}
if (this._fetchedTokens) {
return this._retrieveBalances();
}
this._fetchingTokens = true;
this._fetchedTokens = false;
this
.getTokenRegistry()
.then((tokenreg) => {
return tokenreg.instance.tokenCount return tokenreg.instance.tokenCount
.call() .call()
.then((numTokens) => { .then((numTokens) => {
const promisesTokens = []; const promises = [];
const promisesImages = [];
while (promisesTokens.length < numTokens.toNumber()) { for (let i = 0; i < numTokens.toNumber(); i++) {
const index = promisesTokens.length; promises.push(this.fetchTokenInfo(tokenreg, i));
promisesTokens.push(tokenreg.instance.token.call({}, [index]));
promisesImages.push(tokenreg.instance.meta.call({}, [index, 'IMG']));
} }
return Promise.all([ return Promise.all(promises);
Promise.all(promisesTokens),
Promise.all(promisesImages)
]);
}); });
}) })
.then(([_tokens, images]) => { .then(() => {
const tokens = {}; this._fetchingTokens = false;
this._tokens = _tokens this._fetchedTokens = true;
.map((_token, index) => {
const [address, tag, format, name] = _token;
const token = { this._store.dispatch(getTokens(this._tokens));
address,
name,
tag,
format: format.toString(),
contract: this._api.newContract(abis.eip20, address)
};
tokens[address] = token;
this._store.dispatch(setAddressImage(address, images[index]));
return token;
})
.sort((a, b) => {
if (a.tag < b.tag) {
return -1;
} else if (a.tag > b.tag) {
return 1;
}
return 0;
});
this._store.dispatch(getTokens(tokens));
this._retrieveBalances(); this._retrieveBalances();
}) })
.catch((error) => { .catch((error) => {
console.warn('_retrieveTokens', error); console.warn('balances::_retrieveTokens', error);
this._retrieveBalances();
}); });
} }
@ -147,48 +144,20 @@ export default class Balances {
return; return;
} }
const addresses = Object.keys(this._accountsInfo); const addresses = Object
.keys(this._accountsInfo)
.filter((address) => {
const account = this._accountsInfo[address];
return !account.meta || !account.meta.deleted;
});
this._balances = {}; this._balances = {};
Promise Promise
.all( .all(addresses.map((a) => this.fetchAccountBalance(a)))
addresses.map((address) => Promise.all([ .then((balances) => {
this._api.eth.getBalance(address), addresses.forEach((a, idx) => {
this._api.eth.getTransactionCount(address) this._balances[a] = balances[idx];
]))
)
.then((balanceTxCount) => {
return Promise.all(
balanceTxCount.map(([value, txCount], idx) => {
const address = addresses[idx];
this._balances[address] = {
txCount,
tokens: [{
token: ETH,
value
}]
};
return Promise.all(
this._tokens.map((token) => {
return token.contract.instance.balanceOf.call({}, [address]);
})
);
})
);
})
.then((tokenBalances) => {
addresses.forEach((address, idx) => {
const balanceOf = tokenBalances[idx];
const balance = this._balances[address];
this._tokens.forEach((token, tidx) => {
balance.tokens.push({
token,
value: balanceOf[tidx]
});
});
}); });
this._store.dispatch(getBalances(this._balances)); this._store.dispatch(getBalances(this._balances));
@ -197,4 +166,161 @@ export default class Balances {
console.warn('_retrieveBalances', error); console.warn('_retrieveBalances', error);
}); });
} }
attachToTokens () {
this.attachToTokenMetaChange();
this.attachToNewToken();
}
attachToNewToken () {
if (this._tokenregSubId) {
return;
}
this._tokenreg
.instance
.Registered
.subscribe({
fromBlock: 0,
toBlock: 'latest',
skipInitFetch: true
}, (error, logs) => {
if (error) {
return console.error('balances::attachToNewToken', 'failed to attach to tokenreg Registered', error.toString(), error.stack);
}
const promises = logs.map((log) => {
const id = log.params.id.value.toNumber();
return this.fetchTokenInfo(this._tokenreg, id);
});
return Promise.all(promises);
})
.then((tokenregSubId) => {
this._tokenregSubId = tokenregSubId;
})
.catch((e) => {
console.warn('balances::attachToNewToken', e);
});
}
attachToTokenMetaChange () {
if (this._tokenregMetaSubId) {
return;
}
this._tokenreg
.instance
.MetaChanged
.subscribe({
fromBlock: 0,
toBlock: 'latest',
topics: [ null, this._api.util.asciiToHex('IMG') ],
skipInitFetch: true
}, (error, logs) => {
if (error) {
return console.error('balances::attachToTokenMetaChange', 'failed to attach to tokenreg MetaChanged', error.toString(), error.stack);
}
// In case multiple logs for same token
// in one block. Take the last value.
const tokens = logs
.filter((log) => log.type === 'mined')
.reduce((_tokens, log) => {
const id = log.params.id.value.toNumber();
const image = log.params.value.value;
const token = Object.values(this._tokens).find((c) => c.id === id);
const { address } = token;
_tokens[address] = { address, id, image };
return _tokens;
}, {});
Object
.values(tokens)
.forEach((token) => {
const { address, image } = token;
if (this._images[address] !== image.toString()) {
this._store.dispatch(setAddressImage(address, image));
this._images[address] = image.toString();
}
});
})
.then((tokenregMetaSubId) => {
this._tokenregMetaSubId = tokenregMetaSubId;
})
.catch((e) => {
console.warn('balances::attachToTokenMetaChange', e);
});
}
fetchTokenInfo (tokenreg, tokenId) {
return Promise
.all([
tokenreg.instance.token.call({}, [tokenId]),
tokenreg.instance.meta.call({}, [tokenId, 'IMG'])
])
.then(([ tokenData, image ]) => {
const [ address, tag, format, name ] = tokenData;
const contract = this._api.newContract(abis.eip20, address);
if (this._images[address] !== image.toString()) {
this._store.dispatch(setAddressImage(address, image));
this._images[address] = image.toString();
}
const token = {
format: format.toString(),
id: tokenId,
address,
tag,
name,
contract
};
this._tokens[address] = token;
return token;
})
.catch((e) => {
console.warn('balances::fetchTokenInfo', `couldn't fetch token #${tokenId}`, e);
});
}
/**
* TODO?: txCount is only shown on an address page, so we
* might not need to fetch it for each address for each block,
* but only for one address when the user is on the account
* view.
*/
fetchAccountBalance (address) {
const _tokens = Object.values(this._tokens);
const tokensPromises = _tokens
.map((token) => {
return token.contract.instance.balanceOf.call({}, [ address ]);
});
return Promise
.all([
this._api.eth.getTransactionCount(address),
this._api.eth.getBalance(address)
].concat(tokensPromises))
.then(([ txCount, ethBalance, ...tokensBalance ]) => {
const tokens = []
.concat(
{ token: ETH, value: ethBalance },
_tokens
.map((token, index) => ({
token,
value: tokensBalance[index]
}))
);
const balance = { txCount, tokens };
return balance;
});
}
} }

View File

@ -23,9 +23,10 @@ export const TEST_HTTP_URL = 'http://localhost:6688';
export const TEST_WS_URL = 'ws://localhost:8866'; export const TEST_WS_URL = 'ws://localhost:8866';
export function mockHttp (requests) { export function mockHttp (requests) {
nock.cleanAll();
let scope = nock(TEST_HTTP_URL); let scope = nock(TEST_HTTP_URL);
requests.forEach((request) => { requests.forEach((request, index) => {
scope = scope scope = scope
.post('/') .post('/')
.reply(request.code || 200, (uri, body) => { .reply(request.code || 200, (uri, body) => {