Allow simultaneous block ranges and index ranges

This commit is contained in:
nolash 2021-07-25 12:19:22 +02:00
parent 9e1f5c2daa
commit 019f84088c
Signed by untrusted user who does not match committer: lash
GPG Key ID: 21D2E7BB88C2A746
7 changed files with 192 additions and 14 deletions

View File

@ -55,15 +55,35 @@ class Api:
queue=callback_queue, queue=callback_queue,
) )
def list(self, offset, limit, address=None): def list(self, offset=0, limit=100, address=None):
s = celery.signature( s = celery.signature(
'cic_cache.tasks.tx.tx_filter', 'cic_cache.tasks.tx.tx_filter',
[ [
0, offset,
100, limit,
address, address,
], ],
queue=None queue=self.queue,
)
if self.callback_param != None:
s.link(self.callback_success).on_error(self.callback_error)
t = s.apply_async()
return t
def list_content(self, offset=0, limit=100, address=None, block_offset=None, block_limit=None):
s = celery.signature(
'cic_cache.tasks.tx.tx_filter_content',
[
offset,
limit,
address,
block_offset,
block_limit,
],
queue=self.queue,
) )
if self.callback_param != None: if self.callback_param != None:
s.link(self.callback_success).on_error(self.callback_error) s.link(self.callback_success).on_error(self.callback_error)

View File

@ -10,12 +10,16 @@ from cic_cache.db.list import (
list_transactions_mined, list_transactions_mined,
list_transactions_account_mined, list_transactions_account_mined,
list_transactions_mined_with_data, list_transactions_mined_with_data,
list_transactions_mined_with_data_index,
list_transactions_account_mined_with_data_index,
list_transactions_account_mined_with_data,
) )
logg = logging.getLogger() logg = logging.getLogger()
DEFAULT_FILTER_SIZE = 8192 * 8 DEFAULT_FILTER_SIZE = 8192 * 8
DEFAULT_LIMIT = 100
class Cache: class Cache:
@ -99,8 +103,35 @@ class BloomCache(Cache):
class DataCache(Cache): class DataCache(Cache):
def load_transactions_with_data(self, offset, end): def load_transactions_with_data_index(self, offset, limit, block_offset=None, block_limit=None):
rows = list_transactions_mined_with_data(self.session, offset, end) if limit == 0:
limit = DEFAULT_LIMIT
rows = list_transactions_mined_with_data_index(self.session, offset, limit, block_offset, block_limit)
return self.__load_transactions(rows)
def load_transactions_with_data(self, offset, limit, block_offset=None, block_limit=None):
if limit == 0:
limit = DEFAULT_LIMIT
rows = list_transactions_mined_with_data(self.session, offset, limit, block_offset, block_limit)
return self.__load_transactions(rows)
def load_transactions_account_with_data_index(self, address, offset, limit, block_offset=None, block_limit=None):
if limit == 0:
limit = DEFAULT_LIMIT
rows = list_transactions_account_mined_with_data_index(self.session, address, offset, limit, block_offset, block_limit)
return self.__load_transactions(rows)
def load_transactions_account_with_data(self, address, offset, limit, block_offset=None, block_limit=None):
if limit == 0:
limit = DEFAULT_LIMIT
rows = list_transactions_account_mined_with_data(self.session, address, offset, limit, block_offset, block_limit)
return self.__load_transactions(rows)
def __load_transactions(self, rows):
tx_cache = [] tx_cache = []
highest_block = -1; highest_block = -1;
lowest_block = -1; lowest_block = -1;

View File

@ -12,7 +12,7 @@ class ArgumentParser(BaseArgumentParser):
def process_local_flags(self, local_arg_flags): def process_local_flags(self, local_arg_flags):
if local_arg_flags & CICFlag.CELERY: if local_arg_flags & CICFlag.CELERY:
self.add_argument('-q', '--celery-queue', dest='celery_queue', type=str, default='cic-eth', help='Task queue') self.add_argument('-q', '--celery-queue', dest='celery_queue', type=str, default='cic-cache', help='Task queue')
if local_arg_flags & CICFlag.SYNCER: if local_arg_flags & CICFlag.SYNCER:
self.add_argument('--offset', type=int, default=0, help='Start block height for initial history sync') self.add_argument('--offset', type=int, default=0, help='Start block height for initial history sync')
self.add_argument('--no-history', action='store_true', dest='no_history', help='Skip initial history sync') self.add_argument('--no-history', action='store_true', dest='no_history', help='Skip initial history sync')

View File

@ -1,5 +1,5 @@
[celery] [celery]
broker_url = redis://localhost:6379 broker_url = redis://localhost:6379
result_url = result_url =
queue = cic-eth queue = cic-cache
debug = 0 debug = 0

View File

@ -13,6 +13,8 @@ def list_transactions_mined(
session, session,
offset, offset,
limit, limit,
block_offset,
block_limit,
): ):
"""Executes db query to return all confirmed transactions according to the specified offset and limit. """Executes db query to return all confirmed transactions according to the specified offset and limit.
@ -23,15 +25,52 @@ def list_transactions_mined(
:result: Result set :result: Result set
:rtype: SQLAlchemy.ResultProxy :rtype: SQLAlchemy.ResultProxy
""" """
if block_offset:
if block_limit:
s = "SELECT block_number, tx_index FROM tx ORDER BY block_number DESC, tx_index DESC WHERE block_number >= {} and block_number <= {} LIMIT {} OFFSET {}".format(limit, offset, block_offset, block_limit)
else:
s = "SELECT block_number, tx_index FROM tx ORDER BY block_number DESC, tx_index DESC WHERE block_number >= {} LIMIT {} OFFSET {}".format(limit, offset, block_offset)
else:
s = "SELECT block_number, tx_index FROM tx ORDER BY block_number DESC, tx_index DESC LIMIT {} OFFSET {}".format(limit, offset) s = "SELECT block_number, tx_index FROM tx ORDER BY block_number DESC, tx_index DESC LIMIT {} OFFSET {}".format(limit, offset)
r = session.execute(s) r = session.execute(s)
return r return r
def list_transactions_mined_with_data( def list_transactions_mined_with_data(
session,
offset,
limit,
block_offset,
block_limit,
):
"""Executes db query to return all confirmed transactions according to the specified offset and limit.
:param block_offset: First block to include in search
:type block_offset: int
:param block_limit: Last block to include in search
:type block_limit: int
:result: Result set
:rtype: SQLAlchemy.ResultProxy
"""
if block_offset:
if block_limit:
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} AND block_number <= {} ORDER BY block_number ASC, tx_index ASC OFFSET {} LIMIT {}".format(block_offset, block_limit, offset, limit)
else:
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} ORDER BY block_number ASC, tx_index ASC OFFSET {} LIMIT {}".format(block_offset, offset, limit)
else:
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id ORDER BY block_number ASC, tx_index ASC OFFSET {} LIMIT {}".format(offset, limit)
r = session.execute(s)
return r
def list_transactions_mined_with_data_index(
session, session,
offset, offset,
end, end,
block_offset,
block_limit,
): ):
"""Executes db query to return all confirmed transactions according to the specified offset and limit. """Executes db query to return all confirmed transactions according to the specified offset and limit.
@ -42,7 +81,70 @@ def list_transactions_mined_with_data(
:result: Result set :result: Result set
:rtype: SQLAlchemy.ResultProxy :rtype: SQLAlchemy.ResultProxy
""" """
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} AND block_number <= {} ORDER BY block_number ASC, tx_index ASC".format(offset, end) if block_offset:
if block_limit:
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} and block_number <= {} ORDER BY block_number ASC, tx_index ASC OFFSET {} LIMIT {}".format(block_offset, block_limit, offset, end)
else:
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} ORDER BY block_number ASC, tx_index ASC OFFSET {} LIMIT {}".format(block_offset, offset, end)
else:
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id ORDER BY block_number ASC, tx_index ASC OFFSET {} LIMIT {}".format(offset, end)
r = session.execute(s)
return r
def list_transactions_account_mined_with_data_index(
session,
address,
offset,
limit,
block_offset,
block_limit,
):
"""Executes db query to return all confirmed transactions according to the specified offset and limit, filtered by address
:param offset: Offset in data set to return transactions from
:type offset: int
:param limit: Max number of transactions to retrieve
:type limit: int
:result: Result set
:rtype: SQLAlchemy.ResultProxy
"""
if block_offset:
if block_limit:
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} AND block_number <= {} AND (sender = '{}' OR recipient = '{}') ORDER BY block_number ASC, tx_index ASC OFFSET {} LIMIT {}".format(block_offset, block_limit, address, address, offset, limit)
else:
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} AND (sender = '{}' OR recipient = '{}') ORDER BY block_number ASC, tx_index ASC OFFSET {} LIMIT {}".format(block_offset, address, address, offset, limit)
else:
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE sender = '{}' OR recipient = '{}' ORDER BY block_number ASC, tx_index ASC OFFSET {} LIMIT {}".format(address, address, offset, limit)
r = session.execute(s)
return r
def list_transactions_account_mined_with_data(
session,
address,
offset,
limit,
block_offset,
block_limit,
):
"""Executes db query to return all confirmed transactions according to the specified offset and limit.
:param block_offset: First block to include in search
:type block_offset: int
:param block_limit: Last block to include in search
:type block_limit: int
:result: Result set
:rtype: SQLAlchemy.ResultProxy
"""
if block_offset:
if block_limit:
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} AND block_number <= {} AND (sender = '{}' OR recipient = '{}') ORDER BY block_number ASC, tx_index ASC OFFSET {} LIMIT {}".format(block_offset, block_limit, address, address, offset, limit)
else:
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} AND (sender = '{}' OR recipient = '{}') ORDER BY block_number ASC, tx_index ASC OFFSET {} LIMIT {}".format(block_offset, address, address, offset, limit)
else:
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE sender = '{}' OR recipient = '{}' ORDER BY block_number ASC, tx_index ASC".format(address, address, offset, limit)
r = session.execute(s) r = session.execute(s)
return r return r
@ -53,6 +155,8 @@ def list_transactions_account_mined(
address, address,
offset, offset,
limit, limit,
block_offset,
block_limit,
): ):
"""Same as list_transactions_mined(...), but only retrieves transaction where the specified account address is sender or recipient. """Same as list_transactions_mined(...), but only retrieves transaction where the specified account address is sender or recipient.

View File

@ -91,13 +91,14 @@ def process_transactions_all_data(session, env):
if env.get('HTTP_X_CIC_CACHE_MODE') != 'all': if env.get('HTTP_X_CIC_CACHE_MODE') != 'all':
return None return None
offset = r[1] logg.debug('got data request {}'.format(env))
end = r[2] block_offset = r[1]
block_end = r[2]
if int(r[2]) < int(r[1]): if int(r[2]) < int(r[1]):
raise ValueError('cart before the horse, dude') raise ValueError('cart before the horse, dude')
c = DataCache(session) c = DataCache(session)
(lowest_block, highest_block, tx_cache) = c.load_transactions_with_data(offset, end) (lowest_block, highest_block, tx_cache) = c.load_transactions_with_data(0, 0, block_offset, block_end)
for r in tx_cache: for r in tx_cache:
r['date_block'] = r['date_block'].timestamp() r['date_block'] = r['date_block'].timestamp()

View File

@ -2,7 +2,10 @@
import celery import celery
# local imports # local imports
from cic_cache.cache import BloomCache from cic_cache.cache import (
BloomCache,
DataCache,
)
from cic_cache.db.models.base import SessionBase from cic_cache.db.models.base import SessionBase
celery_app = celery.current_app celery_app = celery.current_app
@ -35,4 +38,23 @@ def tx_filter(self, offset, limit, address=None, encoding='hex'):
return o return o
@celery_app.task(bind=True)
def tx_filter_content(self, offset, limit, address=None, block_offset=None, block_limit=None, encoding='hex'):
session = SessionBase.create_session()
c = DataCache(session)
b = None
if address == None:
if block_offset:
(lowest_block, highest_block, tx_cache) = c.load_transactions_with_data(offset, limit, block_offset=block_offset, block_limit=block_limit)
else:
(lowest_block, highest_block, tx_cache) = c.load_transactions_with_data_index(offset, limit, block_offset=block_offset, block_limit=block_limit)
else:
if block_offset:
(lowest_block, highest_block, tx_cache) = c.load_transactions_account_with_data(address, offset, limit, block_offset=block_offset, block_limit=block_limit)
else:
(lowest_block, highest_block, tx_cache) = c.load_transactions_account_with_data_index(address, offset, limit, block_number=block_number, block_limit=block_limit)
session.close()
return (lowest_block, highest_block, tx_cache,)