chain watcher results go in async queue now

This commit is contained in:
cubedro 2015-05-28 13:27:45 +03:00
parent 6fd4a908a6
commit f8f0eda8a8
1 changed files with 35 additions and 22 deletions

View File

@ -106,6 +106,7 @@ function Node ()
this._web3 = false; this._web3 = false;
this._socket = false; this._socket = false;
this._latestQueue = null;
this.pendingFilter = false; this.pendingFilter = false;
this.chainFilter = false; this.chainFilter = false;
this.updateInterval = false; this.updateInterval = false;
@ -337,7 +338,8 @@ Node.prototype.formatBlock = function (block)
block.difficulty = block.difficulty.toString(10); block.difficulty = block.difficulty.toString(10);
block.totalDifficulty = block.totalDifficulty.toString(10); block.totalDifficulty = block.totalDifficulty.toString(10);
if( !_.isUndefined(block.logsBloom) ) { if( !_.isUndefined(block.logsBloom) )
{
delete block.logsBloom; delete block.logsBloom;
} }
@ -356,16 +358,18 @@ Node.prototype.getLatestBlock = function ()
if(this._web3) if(this._web3)
{ {
this._startBlockFetch = _.now(); console.time('==>', 'Got block in');
web3.eth.getBlock('latest', false, function(error, result) { web3.eth.getBlock('latest', false, function(error, result) {
self.validateLatestBlock(error, result); self.validateLatestBlock(error, result, 'Got block in');
}); });
} }
} }
Node.prototype.validateLatestBlock = function (error, result) Node.prototype.validateLatestBlock = function (error, result, timeString)
{ {
console.time('==>', timeString);
if( error ) if( error )
{ {
console.error("xx>", "getLatestBlock couldn't fetch block..."); console.error("xx>", "getLatestBlock couldn't fetch block...");
@ -378,28 +382,29 @@ Node.prototype.validateLatestBlock = function (error, result)
if(block === false) if(block === false)
{ {
console.error("xx>", "Got bad block:", chalk.reset.cyan(result), 'in', chalk.reset.cyan(_.now() - this._startBlockFetch, 'ms')); console.error("xx>", "Got bad block:", chalk.reset.cyan(result));
return false; return false;
} }
if( this.stats.block.number === block.number ) if( this.stats.block.number === block.number )
{ {
console.warn("==>", "Got same block:", chalk.reset.cyan(block.number), 'in', chalk.reset.cyan(_.now() - this._startBlockFetch, 'ms')); console.warn("==>", "Got same block:", chalk.reset.cyan(block.number));
return false; return false;
} }
console.success("==>", "Got block:", chalk.reset.red(block.number));
this.stats.block = block; this.stats.block = block;
console.success("==>", "Got block:", chalk.reset.red(block.number), 'in', chalk.reset.cyan(_.now() - this._startBlockFetch, 'ms'));
this.sendUpdate(); this.sendUpdate();
if(this.stats.block.number - this._lastBlock > 1) if(this.stats.block.number - this._lastBlock > 1)
{ {
var range = _.range( Math.max(this.stats.block.number - MAX_BLOCKS_HISTORY, this._lastBlock + 1), Math.max(this.stats.block.number, 0), 1 ); var range = _.range( Math.max(this.stats.block.number - MAX_BLOCKS_HISTORY, this._lastBlock + 1), Math.max(this.stats.block.number, 0), 1 );
this.getHistory({ list: range }); if( this._latestQueue.idle() )
this.getHistory({ list: range });
} }
if(this.stats.block.number > this._lastBlock) if(this.stats.block.number > this._lastBlock)
@ -561,26 +566,34 @@ Node.prototype.setWatches = function()
{ {
var self = this; var self = this;
this._latestQueue = async.queue(function (hash, callback)
{
console.time('==>', 'Got block ' + hash + ' in');
web3.eth.getBlock(hash, false, function (error, result)
{
self.validateLatestBlock(error, result, 'Got block ' + hash + ' in');
callback();
});
}, 1);
this._latestQueue.drain = function()
{
console.success("Finished processing", 'latest', 'queue');
}
try { try {
this.chainFilter = web3.eth.filter('latest'); this.chainFilter = web3.eth.filter('latest');
this.chainFilter.watch( function (log) this.chainFilter.watch( function (err, hash)
{ {
var now = _.now(); var now = _.now();
var time = now - self._lastChainLog; var time = now - self._lastChainLog;
self._lastChainLog = now; self._lastChainLog = now;
console.info('>>>', 'Chain Filter triggered: ', chalk.reset.cyan(now), '- last trigger:', chalk.reset.cyan(time)); console.info('>>>', 'Chain Filter triggered: ', chalk.reset.red(hash), '- last trigger:', chalk.reset.cyan(time));
if(time > 50) self._latestQueue.push(hash);
{
self.getLatestBlock();
}
else
{
debounce(function() {
self.getLatestBlock();
}, 50);
}
}); });
} }
catch (err) catch (err)
@ -591,13 +604,13 @@ Node.prototype.setWatches = function()
try { try {
this.pendingFilter = web3.eth.filter('pending'); this.pendingFilter = web3.eth.filter('pending');
this.pendingFilter.watch( function (log) this.pendingFilter.watch( function (err, hash)
{ {
var now = _.now(); var now = _.now();
var time = now - self._lastPendingLog; var time = now - self._lastPendingLog;
self._lastPendingLog = now; self._lastPendingLog = now;
console.info('>>>', 'Pending Filter triggered', chalk.reset.cyan(now), '- last trigger:', chalk.reset.cyan(time)); console.info('>>>', 'Pending Filter triggered:', chalk.reset.red(hash), '- last trigger:', chalk.reset.cyan(time));
if(time > 50) if(time > 50)
{ {