From 019f84088cd01448e669823d5cd9aa89a3f8198e Mon Sep 17 00:00:00 2001 From: nolash Date: Sun, 25 Jul 2021 12:19:22 +0200 Subject: [PATCH] Allow simultaneous block ranges and index ranges --- apps/cic-cache/cic_cache/api.py | 28 ++++- apps/cic-cache/cic_cache/cache.py | 35 +++++- apps/cic-cache/cic_cache/cli/arg.py | 2 +- .../cic_cache/data/config/celery.ini | 2 +- apps/cic-cache/cic_cache/db/list.py | 108 +++++++++++++++++- .../cic_cache/runnable/daemons/query.py | 7 +- apps/cic-cache/cic_cache/tasks/tx.py | 24 +++- 7 files changed, 192 insertions(+), 14 deletions(-) diff --git a/apps/cic-cache/cic_cache/api.py b/apps/cic-cache/cic_cache/api.py index 0340e65c..464ceb6e 100644 --- a/apps/cic-cache/cic_cache/api.py +++ b/apps/cic-cache/cic_cache/api.py @@ -55,15 +55,35 @@ class Api: queue=callback_queue, ) - def list(self, offset, limit, address=None): + def list(self, offset=0, limit=100, address=None): s = celery.signature( 'cic_cache.tasks.tx.tx_filter', [ - 0, - 100, + offset, + limit, address, ], - queue=None + queue=self.queue, + ) + if self.callback_param != None: + s.link(self.callback_success).on_error(self.callback_error) + + t = s.apply_async() + + return t + + + def list_content(self, offset=0, limit=100, address=None, block_offset=None, block_limit=None): + s = celery.signature( + 'cic_cache.tasks.tx.tx_filter_content', + [ + offset, + limit, + address, + block_offset, + block_limit, + ], + queue=self.queue, ) if self.callback_param != None: s.link(self.callback_success).on_error(self.callback_error) diff --git a/apps/cic-cache/cic_cache/cache.py b/apps/cic-cache/cic_cache/cache.py index 7c4d1040..971a4b76 100644 --- a/apps/cic-cache/cic_cache/cache.py +++ b/apps/cic-cache/cic_cache/cache.py @@ -10,12 +10,16 @@ from cic_cache.db.list import ( list_transactions_mined, list_transactions_account_mined, list_transactions_mined_with_data, + list_transactions_mined_with_data_index, + list_transactions_account_mined_with_data_index, + list_transactions_account_mined_with_data, ) logg = logging.getLogger() DEFAULT_FILTER_SIZE = 8192 * 8 +DEFAULT_LIMIT = 100 class Cache: @@ -99,8 +103,35 @@ class BloomCache(Cache): class DataCache(Cache): - def load_transactions_with_data(self, offset, end): - rows = list_transactions_mined_with_data(self.session, offset, end) + def load_transactions_with_data_index(self, offset, limit, block_offset=None, block_limit=None): + if limit == 0: + limit = DEFAULT_LIMIT + rows = list_transactions_mined_with_data_index(self.session, offset, limit, block_offset, block_limit) + return self.__load_transactions(rows) + + + def load_transactions_with_data(self, offset, limit, block_offset=None, block_limit=None): + if limit == 0: + limit = DEFAULT_LIMIT + rows = list_transactions_mined_with_data(self.session, offset, limit, block_offset, block_limit) + return self.__load_transactions(rows) + + + def load_transactions_account_with_data_index(self, address, offset, limit, block_offset=None, block_limit=None): + if limit == 0: + limit = DEFAULT_LIMIT + rows = list_transactions_account_mined_with_data_index(self.session, address, offset, limit, block_offset, block_limit) + return self.__load_transactions(rows) + + + def load_transactions_account_with_data(self, address, offset, limit, block_offset=None, block_limit=None): + if limit == 0: + limit = DEFAULT_LIMIT + rows = list_transactions_account_mined_with_data(self.session, address, offset, limit, block_offset, block_limit) + return self.__load_transactions(rows) + + + def __load_transactions(self, rows): tx_cache = [] highest_block = -1; lowest_block = -1; diff --git a/apps/cic-cache/cic_cache/cli/arg.py b/apps/cic-cache/cic_cache/cli/arg.py index 43428cb1..2d4e6e8a 100644 --- a/apps/cic-cache/cic_cache/cli/arg.py +++ b/apps/cic-cache/cic_cache/cli/arg.py @@ -12,7 +12,7 @@ class ArgumentParser(BaseArgumentParser): def process_local_flags(self, local_arg_flags): if local_arg_flags & CICFlag.CELERY: - self.add_argument('-q', '--celery-queue', dest='celery_queue', type=str, default='cic-eth', help='Task queue') + self.add_argument('-q', '--celery-queue', dest='celery_queue', type=str, default='cic-cache', help='Task queue') if local_arg_flags & CICFlag.SYNCER: self.add_argument('--offset', type=int, default=0, help='Start block height for initial history sync') self.add_argument('--no-history', action='store_true', dest='no_history', help='Skip initial history sync') diff --git a/apps/cic-cache/cic_cache/data/config/celery.ini b/apps/cic-cache/cic_cache/data/config/celery.ini index f2ad10ab..a799cb4a 100644 --- a/apps/cic-cache/cic_cache/data/config/celery.ini +++ b/apps/cic-cache/cic_cache/data/config/celery.ini @@ -1,5 +1,5 @@ [celery] broker_url = redis://localhost:6379 result_url = -queue = cic-eth +queue = cic-cache debug = 0 diff --git a/apps/cic-cache/cic_cache/db/list.py b/apps/cic-cache/cic_cache/db/list.py index 62e0a2e0..1c77d2e6 100644 --- a/apps/cic-cache/cic_cache/db/list.py +++ b/apps/cic-cache/cic_cache/db/list.py @@ -13,6 +13,8 @@ def list_transactions_mined( session, offset, limit, + block_offset, + block_limit, ): """Executes db query to return all confirmed transactions according to the specified offset and limit. @@ -23,15 +25,52 @@ def list_transactions_mined( :result: Result set :rtype: SQLAlchemy.ResultProxy """ - s = "SELECT block_number, tx_index FROM tx ORDER BY block_number DESC, tx_index DESC LIMIT {} OFFSET {}".format(limit, offset) + if block_offset: + if block_limit: + s = "SELECT block_number, tx_index FROM tx ORDER BY block_number DESC, tx_index DESC WHERE block_number >= {} and block_number <= {} LIMIT {} OFFSET {}".format(limit, offset, block_offset, block_limit) + else: + s = "SELECT block_number, tx_index FROM tx ORDER BY block_number DESC, tx_index DESC WHERE block_number >= {} LIMIT {} OFFSET {}".format(limit, offset, block_offset) + else: + s = "SELECT block_number, tx_index FROM tx ORDER BY block_number DESC, tx_index DESC LIMIT {} OFFSET {}".format(limit, offset) r = session.execute(s) return r def list_transactions_mined_with_data( + session, + offset, + limit, + block_offset, + block_limit, + ): + """Executes db query to return all confirmed transactions according to the specified offset and limit. + + :param block_offset: First block to include in search + :type block_offset: int + :param block_limit: Last block to include in search + :type block_limit: int + :result: Result set + :rtype: SQLAlchemy.ResultProxy + """ + if block_offset: + if block_limit: + s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} AND block_number <= {} ORDER BY block_number ASC, tx_index ASC OFFSET {} LIMIT {}".format(block_offset, block_limit, offset, limit) + else: + s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} ORDER BY block_number ASC, tx_index ASC OFFSET {} LIMIT {}".format(block_offset, offset, limit) + else: + s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id ORDER BY block_number ASC, tx_index ASC OFFSET {} LIMIT {}".format(offset, limit) + + + r = session.execute(s) + return r + + +def list_transactions_mined_with_data_index( session, offset, end, + block_offset, + block_limit, ): """Executes db query to return all confirmed transactions according to the specified offset and limit. @@ -42,7 +81,70 @@ def list_transactions_mined_with_data( :result: Result set :rtype: SQLAlchemy.ResultProxy """ - s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} AND block_number <= {} ORDER BY block_number ASC, tx_index ASC".format(offset, end) + if block_offset: + if block_limit: + s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} and block_number <= {} ORDER BY block_number ASC, tx_index ASC OFFSET {} LIMIT {}".format(block_offset, block_limit, offset, end) + else: + s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} ORDER BY block_number ASC, tx_index ASC OFFSET {} LIMIT {}".format(block_offset, offset, end) + else: + s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id ORDER BY block_number ASC, tx_index ASC OFFSET {} LIMIT {}".format(offset, end) + + r = session.execute(s) + return r + + +def list_transactions_account_mined_with_data_index( + session, + address, + offset, + limit, + block_offset, + block_limit, + ): + """Executes db query to return all confirmed transactions according to the specified offset and limit, filtered by address + + :param offset: Offset in data set to return transactions from + :type offset: int + :param limit: Max number of transactions to retrieve + :type limit: int + :result: Result set + :rtype: SQLAlchemy.ResultProxy + """ + if block_offset: + if block_limit: + s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} AND block_number <= {} AND (sender = '{}' OR recipient = '{}') ORDER BY block_number ASC, tx_index ASC OFFSET {} LIMIT {}".format(block_offset, block_limit, address, address, offset, limit) + else: + s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} AND (sender = '{}' OR recipient = '{}') ORDER BY block_number ASC, tx_index ASC OFFSET {} LIMIT {}".format(block_offset, address, address, offset, limit) + else: + s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE sender = '{}' OR recipient = '{}' ORDER BY block_number ASC, tx_index ASC OFFSET {} LIMIT {}".format(address, address, offset, limit) + + r = session.execute(s) + return r + +def list_transactions_account_mined_with_data( + session, + address, + offset, + limit, + block_offset, + block_limit, + ): + """Executes db query to return all confirmed transactions according to the specified offset and limit. + + :param block_offset: First block to include in search + :type block_offset: int + :param block_limit: Last block to include in search + :type block_limit: int + :result: Result set + :rtype: SQLAlchemy.ResultProxy + """ + if block_offset: + if block_limit: + s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} AND block_number <= {} AND (sender = '{}' OR recipient = '{}') ORDER BY block_number ASC, tx_index ASC OFFSET {} LIMIT {}".format(block_offset, block_limit, address, address, offset, limit) + else: + s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} AND (sender = '{}' OR recipient = '{}') ORDER BY block_number ASC, tx_index ASC OFFSET {} LIMIT {}".format(block_offset, address, address, offset, limit) + else: + s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE sender = '{}' OR recipient = '{}' ORDER BY block_number ASC, tx_index ASC".format(address, address, offset, limit) r = session.execute(s) return r @@ -53,6 +155,8 @@ def list_transactions_account_mined( address, offset, limit, + block_offset, + block_limit, ): """Same as list_transactions_mined(...), but only retrieves transaction where the specified account address is sender or recipient. diff --git a/apps/cic-cache/cic_cache/runnable/daemons/query.py b/apps/cic-cache/cic_cache/runnable/daemons/query.py index a698a692..7d82605e 100644 --- a/apps/cic-cache/cic_cache/runnable/daemons/query.py +++ b/apps/cic-cache/cic_cache/runnable/daemons/query.py @@ -91,13 +91,14 @@ def process_transactions_all_data(session, env): if env.get('HTTP_X_CIC_CACHE_MODE') != 'all': return None - offset = r[1] - end = r[2] + logg.debug('got data request {}'.format(env)) + block_offset = r[1] + block_end = r[2] if int(r[2]) < int(r[1]): raise ValueError('cart before the horse, dude') c = DataCache(session) - (lowest_block, highest_block, tx_cache) = c.load_transactions_with_data(offset, end) + (lowest_block, highest_block, tx_cache) = c.load_transactions_with_data(0, 0, block_offset, block_end) for r in tx_cache: r['date_block'] = r['date_block'].timestamp() diff --git a/apps/cic-cache/cic_cache/tasks/tx.py b/apps/cic-cache/cic_cache/tasks/tx.py index 0e2f7435..7bc85cd6 100644 --- a/apps/cic-cache/cic_cache/tasks/tx.py +++ b/apps/cic-cache/cic_cache/tasks/tx.py @@ -2,7 +2,10 @@ import celery # local imports -from cic_cache.cache import BloomCache +from cic_cache.cache import ( + BloomCache, + DataCache, + ) from cic_cache.db.models.base import SessionBase celery_app = celery.current_app @@ -35,4 +38,23 @@ def tx_filter(self, offset, limit, address=None, encoding='hex'): return o +@celery_app.task(bind=True) +def tx_filter_content(self, offset, limit, address=None, block_offset=None, block_limit=None, encoding='hex'): + session = SessionBase.create_session() + c = DataCache(session) + b = None + if address == None: + if block_offset: + (lowest_block, highest_block, tx_cache) = c.load_transactions_with_data(offset, limit, block_offset=block_offset, block_limit=block_limit) + else: + (lowest_block, highest_block, tx_cache) = c.load_transactions_with_data_index(offset, limit, block_offset=block_offset, block_limit=block_limit) + else: + if block_offset: + (lowest_block, highest_block, tx_cache) = c.load_transactions_account_with_data(address, offset, limit, block_offset=block_offset, block_limit=block_limit) + else: + (lowest_block, highest_block, tx_cache) = c.load_transactions_account_with_data_index(address, offset, limit, block_number=block_number, block_limit=block_limit) + + session.close() + + return (lowest_block, highest_block, tx_cache,)