Merge pull request #183 from cubedro/develop

Non-blocking collection
This commit is contained in:
Marian OANCΞA 2015-06-09 02:43:28 +03:00
commit c670b03add
4 changed files with 447 additions and 280 deletions

258
app.js
View File

@ -2,7 +2,6 @@ var _ = require('lodash');
var logger = require('./lib/utils/logger');
var chalk = require('chalk');
var askedForHistory = false;
var askedForHistoryTime = 0;
var Primus = require('primus'),
@ -84,6 +83,21 @@ var client = new Primus(server, {
var clientLatency = 0;
Nodes.setChartsCallback(function (err, charts)
{
if(err !== null)
{
console.error('COL', 'CHR', 'Charts error:', err);
}
else
{
client.write({
action: 'charts',
data: charts
});
}
});
client.use('emit', require('primus-emit'));
api.on('connection', function (spark)
@ -109,25 +123,26 @@ api.on('connection', function (spark)
data.spark = spark.id;
data.latency = spark.latency || 0;
var info = Nodes.add( data );
spark.emit('ready');
Nodes.add( data, function (err, info)
{
if(err !== null)
{
console.error('API', 'CON', 'Connection error:', err);
return false;
}
console.success('API', 'CON', 'Connected', data.id);
if(info !== null)
{
spark.emit('ready');
client.write({
action: 'add',
data: info
console.success('API', 'CON', 'Connected', data.id);
client.write({
action: 'add',
data: info
});
}
});
var time = chalk.reset.cyan((new Date()).toJSON()) + " ";
console.time(time, 'COL', 'CHR', 'Got charts in');
client.write({
action: 'charts',
data: Nodes.getCharts()
});
console.timeEnd(time, 'COL', 'CHR', 'Got charts in');
}
});
@ -136,28 +151,27 @@ api.on('connection', function (spark)
{
if( !_.isUndefined(data.id) && !_.isUndefined(data.stats) )
{
var stats = Nodes.update(data.id, data.stats);
if(stats !== false)
Nodes.update(data.id, data.stats, function (err, stats)
{
client.write({
action: 'update',
data: stats
});
if(err !== null)
{
console.error('API', 'UPD', 'Update error:', err);
}
else
{
if(stats !== null)
{
client.write({
action: 'update',
data: stats
});
console.info('API', 'UPD', 'Update from:', data.id, 'for:', data.stats);
var time = chalk.reset.cyan((new Date()).toJSON()) + " ";
console.time(time, 'COL', 'CHR', 'Got charts in');
client.write({
action: 'charts',
data: Nodes.getCharts()
});
console.timeEnd(time, 'COL', 'CHR', 'Got charts in');
}
console.info('API', 'UPD', 'Update from:', data.id, 'for:', stats);
Nodes.getCharts();
}
}
});
}
else
{
@ -170,27 +184,27 @@ api.on('connection', function (spark)
{
if( !_.isUndefined(data.id) && !_.isUndefined(data.block) )
{
var stats = Nodes.addBlock(data.id, data.block);
if(stats !== false)
Nodes.addBlock(data.id, data.block, function (err, stats)
{
client.write({
action: 'block',
data: stats
});
if(err !== null)
{
console.error('API', 'BLK', 'Block error:', err);
}
else
{
if(stats !== null)
{
client.write({
action: 'block',
data: stats
});
console.success('API', 'BLK', 'Block:', data.block['number'], 'from:', data.id);
console.success('API', 'BLK', 'Block:', data.block['number'], 'from:', data.id);
var time = chalk.reset.cyan((new Date()).toJSON()) + " ";
console.time(time, 'COL', 'CHR', 'Got charts in');
client.write({
action: 'charts',
data: Nodes.getCharts()
});
console.timeEnd(time, 'COL', 'CHR', 'Got charts in');
}
Nodes.getCharts();
}
}
});
}
else
{
@ -203,17 +217,22 @@ api.on('connection', function (spark)
{
if( !_.isUndefined(data.id) && !_.isUndefined(data.stats) )
{
var stats = Nodes.updatePending(data.id, data.stats);
Nodes.updatePending(data.id, data.stats, function (err, stats) {
if(err !== null)
{
console.error('API', 'TXS', 'Pending error:', err);
}
if(stats !== false)
{
client.write({
action: 'pending',
data: stats
});
}
if(stats !== null)
{
client.write({
action: 'pending',
data: stats
});
console.success('API', 'TXS', 'Pending:', data.stats['pending'], 'from:', data.id);
console.success('API', 'TXS', 'Pending:', data.stats['pending'], 'from:', data.id);
}
});
}
else
{
@ -227,17 +246,25 @@ api.on('connection', function (spark)
if( !_.isUndefined(data.id) && !_.isUndefined(data.stats) )
{
var stats = Nodes.updateStats(data.id, data.stats);
if(stats !== false)
Nodes.updateStats(data.id, data.stats, function (err, stats)
{
client.write({
action: 'stats',
data: stats
});
}
if(err !== null)
{
console.error('API', 'STA', 'Stats error:', err);
}
else
{
if(stats !== null)
{
client.write({
action: 'stats',
data: stats
});
console.success('API', 'STA', 'Stats from:', data.id);
console.success('API', 'STA', 'Stats from:', data.id);
}
}
});
}
else
{
@ -248,20 +275,27 @@ api.on('connection', function (spark)
spark.on('history', function (data)
{
console.success('API', 'HIS', 'Got from:', data.id);
console.success('API', 'HIS', 'Got history from:', data.id);
var time = chalk.reset.cyan((new Date()).toJSON()) + " ";
console.time(time, 'COL', 'CHR', 'Got charts in');
var time = chalk.reset.cyan((new Date()).toJSON()) + " ";
console.time(time, 'COL', 'CHR', 'Got charts in');
client.write({
action: 'charts',
data: Nodes.addHistory(data.id, data.history)
Nodes.addHistory(data.id, data.history, function (err, history)
{
console.timeEnd(time, 'COL', 'CHR', 'Got charts in');
if(err !== null)
{
console.error('COL', 'CHR', 'History error:', err);
}
else
{
client.write({
action: 'charts',
data: history
});
}
});
console.timeEnd(time, 'COL', 'CHR', 'Got charts in');
askedForHistory = false;
});
@ -282,26 +316,31 @@ api.on('connection', function (spark)
{
if( !_.isUndefined(data.id) )
{
var latency = Nodes.updateLatency(data.id, data.latency);
Nodes.updateLatency(data.id, data.latency, function (err, latency) {
if(err !== null)
{
console.error('API', 'PIN', 'Latency error:', err);
}
if(latency)
{
client.write({
action: 'latency',
data: latency
});
}
if(latency !== null)
{
client.write({
action: 'latency',
data: latency
});
console.info('API', 'PIN', 'Latency:', latency, 'from:', data.id);
console.info('API', 'PIN', 'Latency:', latency, 'from:', data.id);
}
});
if( Nodes.requiresUpdate(data.id) && (!askedForHistory || _.now() - askedForHistoryTime > 200000) )
if( Nodes.requiresUpdate(data.id) && (!Nodes.askedForHistory() || _.now() - askedForHistoryTime > 200000) )
{
var range = Nodes.getHistory().getHistoryRequestRange();
spark.emit('history', range);
console.info('API', 'HIS', 'Asked:', data.id, 'for history:', range.min, '-', range.max);
askedForHistory = true;
Nodes.askedForHistory(true);
askedForHistoryTime = _.now();
}
}
@ -310,14 +349,22 @@ api.on('connection', function (spark)
spark.on('end', function (data)
{
var stats = Nodes.inactive(spark.id);
Nodes.inactive(spark.id, function (err, stats)
{
if(err !== null)
{
console.error('API', 'CON', 'Connection end error:', err);
}
else
{
client.write({
action: 'inactive',
data: stats
});
client.write({
action: 'inactive',
data: stats
console.warn('API', 'CON', 'Connection with:', spark.id, 'ended:', data);
}
});
console.warn('API', 'CON', 'Connection with:', spark.id, 'ended:', data);
});
});
@ -329,10 +376,7 @@ client.on('connection', function (clientSpark)
{
clientSpark.emit('init', { nodes: Nodes.all() });
clientSpark.write({
action: 'charts',
data: Nodes.getCharts()
});
Nodes.getCharts();
});
clientSpark.on('client-pong', function (data)
@ -365,10 +409,8 @@ var nodeCleanupTimeout = setInterval( function ()
data: Nodes.all()
});
client.write({
action: 'charts',
data: Nodes.getCharts()
});
Nodes.getCharts();
}, 1000*60*60);
server.listen(process.env.PORT || 3000);

View File

@ -6,117 +6,148 @@ var Collection = function Collection()
{
this._items = [];
this._blockchain = new Blockchain();
this._askedForHistory = false;
this._debounced = null;
return this;
}
Collection.prototype.add = function(data)
Collection.prototype.add = function(data, callback)
{
var node = this.getNodeOrNew({ id : data.id }, data);
node.setInfo(data);
return node.getInfo();
node.setInfo(data, callback);
}
Collection.prototype.update = function(id, stats)
Collection.prototype.update = function(id, stats, callback)
{
var node = this.getNode({ id: id });
if (!node)
return false;
var block = this._blockchain.add(stats.block, id);
if (!block)
return false;
var propagationHistory = this._blockchain.getNodePropagation(id);
stats.block.arrived = block.arrived;
stats.block.received = block.received;
stats.block.propagation = block.propagation;
return node.setStats(stats, propagationHistory);
}
Collection.prototype.addBlock = function(id, block)
{
var node = this.getNode({ id: id });
if (!node)
return false;
var block = this._blockchain.add(block, id);
if (!block)
return false;
var propagationHistory = this._blockchain.getNodePropagation(id);
block.arrived = block.arrived;
block.received = block.received;
block.propagation = block.propagation;
return node.setBlock(block, propagationHistory);
}
Collection.prototype.updatePending = function(id, stats)
{
var node = this.getNode({ id: id });
if (!node)
return false;
return node.setPending(stats);
}
Collection.prototype.updateStats = function(id, stats)
{
var node = this.getNode({ id: id });
if (!node)
return false;
return node.setBasicStats(stats);
}
Collection.prototype.addHistory = function(id, blocks)
{
var node = this.getNode({ id: id });
if (!node)
return false;
blocks = blocks.reverse();
for (var i = 0; i <= blocks.length - 1; i++)
{
this._blockchain.add(blocks[i], id);
};
callback('Node not found', null);
}
else
{
var block = this._blockchain.add(stats.block, id);
return this.getCharts();
if (!block)
{
callback('Block data wrong', null);
}
else
{
var propagationHistory = this._blockchain.getNodePropagation(id);
stats.block.arrived = block.block.arrived;
stats.block.received = block.block.received;
stats.block.propagation = block.block.propagation;
node.setStats(stats, propagationHistory, callback);
}
}
}
Collection.prototype.updateLatency = function(id, latency)
Collection.prototype.addBlock = function(id, stats, callback)
{
var node = this.getNode({ id: id });
if (!node)
{
callback('Node not found', null);
}
else
{
var block = this._blockchain.add(stats, id);
if (!block)
{
console.log(block);
callback('Block undefined', null);
}
else
{
var propagationHistory = this._blockchain.getNodePropagation(id);
stats.arrived = block.block.arrived;
stats.received = block.block.received;
stats.propagation = block.block.propagation;
node.setBlock(stats, propagationHistory, callback);
}
}
}
Collection.prototype.updatePending = function(id, stats, callback)
{
var node = this.getNode({ id: id });
if (!node)
return false;
return node.setLatency(latency);
node.setPending(stats, callback);
}
Collection.prototype.inactive = function(id)
Collection.prototype.updateStats = function(id, stats, callback)
{
var node = this.getNode({ id: id });
if (!node)
{
callback('Node not found', null);
}
else
{
node.setBasicStats(stats, callback);
}
}
// TODO: Async series
Collection.prototype.addHistory = function(id, blocks, callback)
{
var node = this.getNode({ id: id });
if (!node)
{
callback('Node not found', null)
}
else
{
blocks = blocks.reverse();
for (var i = 0; i <= blocks.length - 1; i++)
{
this._blockchain.add(blocks[i], id);
};
this.getCharts(callback);
}
this.askedForHistory(false);
}
Collection.prototype.updateLatency = function(id, latency, callback)
{
var node = this.getNode({ id: id });
if (!node)
return false;
node.setLatency(latency, callback);
}
Collection.prototype.inactive = function(id, callback)
{
var node = this.getNode({ spark: id });
if (!node)
return false;
node.setState(false);
return node.getStats();
{
callback('Node not found', null);
}
else
{
node.setState(false);
callback(null, node.getStats());
}
}
Collection.prototype.getIndex = function(search)
@ -192,9 +223,31 @@ Collection.prototype.getUncleCount = function()
return this._blockchain.getUncleCount();
}
Collection.prototype.setChartsCallback = function(callback)
{
this._blockchain.setCallback(callback);
}
Collection.prototype.getCharts = function()
{
return this._blockchain.getCharts();
this.getChartsDebounced();
}
Collection.prototype.getChartsDebounced = function()
{
var self = this;
if( this._debounced === null) {
this._debounced = _.debounce(function(){
self._blockchain.getCharts();
}, 1000, {
leading: false,
maxWait: 5000,
trailing: true
});
}
this._debounced();
}
Collection.prototype.getHistory = function()
@ -224,4 +277,14 @@ Collection.prototype.requiresUpdate = function(id)
return ( this.canNodeUpdate(id) && this._blockchain.requiresUpdate() );
}
Collection.prototype.askedForHistory = function(set)
{
if( !_.isUndefined(set) )
{
this._askedForHistory = set;
}
return this._askedForHistory;
}
module.exports = Collection;

View File

@ -3,7 +3,7 @@ var d3 = require('d3');
var MAX_HISTORY = 1000;
var MAX_PEER_PROPAGATION = 36;
var MAX_PEER_PROPAGATION = 40;
var MIN_PROPAGATION_RANGE = 0;
var MAX_PROPAGATION_RANGE = 10000;
@ -13,6 +13,7 @@ var MAX_BINS = 40;
var History = function History(data)
{
this._items = [];
this._callback = null;
var item = {
height: 0,
@ -33,6 +34,8 @@ var History = function History(data)
History.prototype.add = function(block, id)
{
var changed = false;
if( !_.isUndefined(block) && !_.isUndefined(block.number) && !_.isUndefined(block.uncles) && !_.isUndefined(block.transactions) && !_.isUndefined(block.difficulty) && block.number > 0 )
{
var historyBlock = this.search(block.number);
@ -57,6 +60,8 @@ History.prototype.add = function(block, id)
received: now,
propagation: block.propagation
});
changed = true;
}
else
{
@ -84,6 +89,8 @@ History.prototype.add = function(block, id)
this._items[index].timestamp = block.timestamp;
this._items[index].transactions = block.transactions;
this._items[index].uncles = block.uncles;
changed = true;
}
}
}
@ -103,14 +110,14 @@ History.prototype.add = function(block, id)
block.time = 0;
}
var item = {
height: block.number,
block: block,
propagTimes: []
}
if( this._items.length === 0 || block.number >= (this.bestBlockNumber() - MAX_HISTORY + 1) )
{
var item = {
height: block.number,
block: block,
propagTimes: []
}
item.propagTimes.push({
node: id,
received: now,
@ -118,10 +125,15 @@ History.prototype.add = function(block, id)
});
this._save(item);
changed = true;
}
}
return block;
return {
block: block,
changed: changed
};
}
return false;
@ -386,39 +398,47 @@ History.prototype.getMinersCount = function()
.value();
}
History.prototype.getCharts = function(callback)
History.prototype.setCallback = function(callback)
{
var chartHistory = _( this._items )
.sortByOrder( 'height', false )
.slice(0, MAX_BINS)
.reverse()
.map(function (item)
{
return {
height: item.height,
blocktime: item.block.time / 1000,
difficulty: item.block.difficulty,
uncles: item.block.uncles.length,
transactions: item.block.transactions.length,
gasSpending: item.block.gasUsed,
miner: item.block.miner
};
})
.value();
this._callback = callback;
}
return {
height : _.pluck( chartHistory, 'height' ),
blocktime : _.pluck( chartHistory, 'blocktime' ),
avgBlocktime : _.sum(_.pluck( chartHistory, 'blocktime' )) / (chartHistory.length === 0 ? 1 : chartHistory.length),
difficulty : _.pluck( chartHistory, 'difficulty' ),
uncles : _.pluck( chartHistory, 'uncles' ),
transactions : _.pluck( chartHistory, 'transactions' ),
gasSpending : _.pluck( chartHistory, 'gasSpending' ),
miners : this.getMinersCount(),
propagation : this.getBlockPropagation(),
uncleCount : this.getUncleCount(),
avgHashrate : this.getAvgHashrate()
};
History.prototype.getCharts = function()
{
if(this._callback !== null)
{
var chartHistory = _( this._items )
.sortByOrder( 'height', false )
.slice(0, MAX_BINS)
.reverse()
.map(function (item)
{
return {
height: item.height,
blocktime: item.block.time / 1000,
difficulty: item.block.difficulty,
uncles: item.block.uncles.length,
transactions: item.block.transactions.length,
gasSpending: item.block.gasUsed,
miner: item.block.miner
};
})
.value();
this._callback(null, {
height : _.pluck( chartHistory, 'height' ),
blocktime : _.pluck( chartHistory, 'blocktime' ),
avgBlocktime : _.sum(_.pluck( chartHistory, 'blocktime' )) / (chartHistory.length === 0 ? 1 : chartHistory.length),
difficulty : _.pluck( chartHistory, 'difficulty' ),
uncles : _.pluck( chartHistory, 'uncles' ),
transactions : _.pluck( chartHistory, 'transactions' ),
gasSpending : _.pluck( chartHistory, 'gasSpending' ),
miners : this.getMinersCount(),
propagation : this.getBlockPropagation(),
uncleCount : this.getUncleCount(),
avgHashrate : this.getAvgHashrate()
});
}
}
History.prototype.requiresUpdate = function()

View File

@ -62,10 +62,10 @@ Node.prototype.init = function(data)
if( !_.isUndefined(data.latency) )
this.stats.latency = data.latency;
this.setInfo(data);
this.setInfo(data, null);
}
Node.prototype.setInfo = function(data)
Node.prototype.setInfo = function(data, callback)
{
if( !_.isUndefined(data.info) )
{
@ -85,6 +85,11 @@ Node.prototype.setInfo = function(data)
this.spark = _.result(data, 'spark', null);
this.setState(true);
if(callback !== null)
{
callback(null, this.getInfo());
}
}
Node.prototype.setGeo = function(ip)
@ -93,7 +98,7 @@ Node.prototype.setGeo = function(ip)
this.geo = geoip.lookup(ip);
}
Node.prototype.getInfo = function()
Node.prototype.getInfo = function(callback)
{
return {
id: this.id,
@ -115,37 +120,47 @@ Node.prototype.getInfo = function()
};
}
Node.prototype.setStats = function(stats, history)
Node.prototype.setStats = function(stats, history, callback)
{
if( !_.isUndefined(stats) )
{
this.setBlock( _.result(stats, 'block', this.stats.block), history );
this.setBlock( _.result(stats, 'block', this.stats.block), history, function (err, block) {} );
this.setBasicStats(stats);
this.setBasicStats(stats, function (err, stats) {});
this.setPending( _.result(stats, 'pending', this.stats.pending) );
this.setPending( _.result(stats, 'pending', this.stats.pending), function (err, stats) {} );
return this.getStats();
callback(null, this.getStats());
}
return false;
callback('Stats undefined', null);
}
Node.prototype.setBlock = function(block, history)
Node.prototype.setBlock = function(block, history, callback)
{
if( !_.isUndefined(block) && !_.isUndefined(block.number) && ( !_.isEqual(history, this.history) || !_.isEqual(block, this.stats.block) ))
if( !_.isUndefined(block) && !_.isUndefined(block.number) )
{
if(block.number !== this.stats.block.number && block.hash !== this.stats.block.hash)
if ( !_.isEqual(history, this.history) || !_.isEqual(block, this.stats.block) )
{
this.stats.block = block;
if(block.number !== this.stats.block.number || block.hash !== this.stats.block.hash)
{
this.stats.block = block;
}
this.setHistory(history);
callback(null, this.getBlockStats());
}
else
{
callback(null, null);
}
this.setHistory(history);
return this.getBlockStats();
}
return false;
else
{
console.log(block);
callback('Block undefined', null);
}
}
Node.prototype.setHistory = function(history)
@ -175,24 +190,35 @@ Node.prototype.setHistory = function(history)
return true;
}
Node.prototype.setPending = function(stats)
Node.prototype.setPending = function(stats, callback)
{
if( !_.isUndefined(stats) && !_.isUndefined(stats.pending) && !_.isEqual(stats.pending, this.stats.pending))
if( !_.isUndefined(stats) && !_.isUndefined(stats.pending))
{
this.stats.pending = stats.pending;
if(!_.isEqual(stats.pending, this.stats.pending))
{
this.stats.pending = stats.pending;
return {
id: this.id,
pending: this.stats.pending
};
callback(null, {
id: this.id,
pending: this.stats.pending
});
}
else
{
callback(null, null);
}
}
else
{
callback('Stats undefined', null);
}
return false;
}
Node.prototype.setBasicStats = function(stats)
Node.prototype.setBasicStats = function(stats, callback)
{
if( !_.isUndefined(stats) && !_.isEqual(stats, {
if( !_.isUndefined(stats) )
{
if( !_.isEqual(stats, {
active: this.stats.active,
mining: this.stats.mining,
hashrate: this.stats.hashrate,
@ -200,33 +226,49 @@ Node.prototype.setBasicStats = function(stats)
gasPrice: this.stats.gasPrice,
uptime: this.stats.uptime
}) )
{
this.stats.active = stats.active;
this.stats.mining = stats.mining;
this.stats.hashrate = stats.hashrate;
this.stats.peers = stats.peers;
this.stats.gasPrice = stats.gasPrice;
this.stats.uptime = stats.uptime;
{
this.stats.active = stats.active;
this.stats.mining = stats.mining;
this.stats.hashrate = stats.hashrate;
this.stats.peers = stats.peers;
this.stats.gasPrice = stats.gasPrice;
this.stats.uptime = stats.uptime;
return this.getBasicStats();
callback(null, this.getBasicStats());
}
else
{
callback(null, null);
}
}
else
{
callback('Stats undefined', null);
}
return false;
}
Node.prototype.setLatency = function(latency)
Node.prototype.setLatency = function(latency, callback)
{
if( !_.isUndefined(latency) && !_.isEqual(latency, this.stats.latency) )
if( !_.isUndefined(latency) )
{
this.stats.latency = latency;
if( !_.isEqual(latency, this.stats.latency) )
{
this.stats.latency = latency;
return {
id: this.id,
latency: latency
};
callback(null, {
id: this.id,
latency: latency
});
}
else
{
callback(null, null);
}
}
else
{
callback('Latency undefined', null);
}
return false;
}
Node.prototype.getStats = function()