From a9073e1e23a5cd3d96f62cd95775e544d8b3abaa Mon Sep 17 00:00:00 2001 From: nolash Date: Sun, 25 Jul 2021 15:07:28 +0200 Subject: [PATCH] Make list bloom task work with updated cic cache filter task --- apps/cic-cache/cic_cache/api.py | 6 +- apps/cic-cache/cic_cache/cache.py | 24 +++--- apps/cic-cache/cic_cache/db/list.py | 74 +++++++++++++++---- .../cic_cache/runnable/daemons/query.py | 2 +- apps/cic-cache/cic_cache/tasks/tx.py | 12 +-- apps/cic-cache/cic_cache/version.py | 4 +- apps/cic-cache/setup.cfg | 1 + apps/cic-eth/cic_eth/api/api_task.py | 2 +- apps/cic-eth/cic_eth/ext/tx.py | 52 ++++++++++--- docker-compose.yml | 2 +- 10 files changed, 128 insertions(+), 51 deletions(-) diff --git a/apps/cic-cache/cic_cache/api.py b/apps/cic-cache/cic_cache/api.py index 464ceb6e..74083ba5 100644 --- a/apps/cic-cache/cic_cache/api.py +++ b/apps/cic-cache/cic_cache/api.py @@ -55,13 +55,14 @@ class Api: queue=callback_queue, ) - def list(self, offset=0, limit=100, address=None): + def list(self, offset=0, limit=100, address=None, oldest=False): s = celery.signature( 'cic_cache.tasks.tx.tx_filter', [ offset, limit, address, + oldest, ], queue=self.queue, ) @@ -73,7 +74,7 @@ class Api: return t - def list_content(self, offset=0, limit=100, address=None, block_offset=None, block_limit=None): + def list_content(self, offset=0, limit=100, address=None, block_offset=None, block_limit=None, oldest=False): s = celery.signature( 'cic_cache.tasks.tx.tx_filter_content', [ @@ -82,6 +83,7 @@ class Api: address, block_offset, block_limit, + oldest, ], queue=self.queue, ) diff --git a/apps/cic-cache/cic_cache/cache.py b/apps/cic-cache/cic_cache/cache.py index 23bb07cc..06397b31 100644 --- a/apps/cic-cache/cic_cache/cache.py +++ b/apps/cic-cache/cic_cache/cache.py @@ -36,7 +36,7 @@ class BloomCache(Cache): return n - def load_transactions(self, offset, limit): + def load_transactions(self, offset, limit, oldest=False): """Retrieves a list of transactions from cache and creates a bloom filter pointing to blocks and transactions. Block and transaction numbers are serialized as 32-bit big-endian numbers. The input to the second bloom filter is the concatenation of the serialized block number and transaction index. @@ -53,7 +53,7 @@ class BloomCache(Cache): :return: Lowest block, bloom filter for blocks, bloom filter for blocks|tx :rtype: tuple """ - rows = list_transactions_mined(self.session, offset, limit, None, None) + rows = list_transactions_mined(self.session, offset, limit, block_offset=None, block_limit=None, oldest=oldest) f_block = moolb.Bloom(BloomCache.__get_filter_size(limit), 3) f_blocktx = moolb.Bloom(BloomCache.__get_filter_size(limit), 3) @@ -71,7 +71,7 @@ class BloomCache(Cache): return (lowest_block, highest_block, f_block.to_bytes(), f_blocktx.to_bytes(),) - def load_transactions_account(self, address, offset, limit): + def load_transactions_account(self, address, offset, limit, oldest=False): """Same as load_transactions(...), but only retrieves transactions where the specified account address is sender or recipient. :param address: Address to retrieve transactions for. @@ -83,7 +83,7 @@ class BloomCache(Cache): :return: Lowest block, bloom filter for blocks, bloom filter for blocks|tx :rtype: tuple """ - rows = list_transactions_account_mined(self.session, address, offset, limit, None, None) + rows = list_transactions_account_mined(self.session, address, offset, limit, block_offset=None, block_limit=None, oldest=oldest) f_block = moolb.Bloom(BloomCache.__get_filter_size(limit), 3) f_blocktx = moolb.Bloom(BloomCache.__get_filter_size(limit), 3) @@ -103,31 +103,31 @@ class BloomCache(Cache): class DataCache(Cache): - def load_transactions_with_data_index(self, offset, limit, block_offset=None, block_limit=None): + def load_transactions_with_data_index(self, offset, limit, block_offset=None, block_limit=None, oldest=False): if limit == 0: limit = DEFAULT_LIMIT - rows = list_transactions_mined_with_data_index(self.session, offset, limit, block_offset, block_limit) + rows = list_transactions_mined_with_data_index(self.session, offset, limit, block_offset, block_limit, oldest=oldest) return self.__load_transactions(rows) - def load_transactions_with_data(self, offset, limit, block_offset=None, block_limit=None): + def load_transactions_with_data(self, offset, limit, block_offset=None, block_limit=None, oldest=False): if limit == 0: limit = DEFAULT_LIMIT - rows = list_transactions_mined_with_data(self.session, offset, limit, block_offset, block_limit) + rows = list_transactions_mined_with_data(self.session, offset, limit, block_offset, block_limit, oldest=oldest) return self.__load_transactions(rows) - def load_transactions_account_with_data_index(self, address, offset, limit, block_offset=None, block_limit=None): + def load_transactions_account_with_data_index(self, address, offset, limit, block_offset=None, block_limit=None, oldest=False): if limit == 0: limit = DEFAULT_LIMIT - rows = list_transactions_account_mined_with_data_index(self.session, address, offset, limit, block_offset, block_limit) + rows = list_transactions_account_mined_with_data_index(self.session, address, offset, limit, block_offset, block_limit, oldest=oldest) return self.__load_transactions(rows) - def load_transactions_account_with_data(self, address, offset, limit, block_offset=None, block_limit=None): + def load_transactions_account_with_data(self, address, offset, limit, block_offset=None, block_limit=None, oldest=False): if limit == 0: limit = DEFAULT_LIMIT - rows = list_transactions_account_mined_with_data(self.session, address, offset, limit, block_offset, block_limit) + rows = list_transactions_account_mined_with_data(self.session, address, offset, limit, block_offset, block_limit, oldest=oldest) return self.__load_transactions(rows) diff --git a/apps/cic-cache/cic_cache/db/list.py b/apps/cic-cache/cic_cache/db/list.py index 1c77d2e6..6bf29b96 100644 --- a/apps/cic-cache/cic_cache/db/list.py +++ b/apps/cic-cache/cic_cache/db/list.py @@ -15,6 +15,7 @@ def list_transactions_mined( limit, block_offset, block_limit, + oldest=False, ): """Executes db query to return all confirmed transactions according to the specified offset and limit. @@ -25,13 +26,17 @@ def list_transactions_mined( :result: Result set :rtype: SQLAlchemy.ResultProxy """ + order_by = 'DESC' + if oldest: + order_by = 'ASC' + if block_offset: if block_limit: - s = "SELECT block_number, tx_index FROM tx ORDER BY block_number DESC, tx_index DESC WHERE block_number >= {} and block_number <= {} LIMIT {} OFFSET {}".format(limit, offset, block_offset, block_limit) + s = "SELECT block_number, tx_index FROM tx ORDER BY block_number {}, tx_index {} WHERE block_number >= {} and block_number <= {} LIMIT {} OFFSET {}".format(order_by, order_by, limit, offset, block_offset, block_limit) else: - s = "SELECT block_number, tx_index FROM tx ORDER BY block_number DESC, tx_index DESC WHERE block_number >= {} LIMIT {} OFFSET {}".format(limit, offset, block_offset) + s = "SELECT block_number, tx_index FROM tx ORDER BY block_number {}, tx_index {} WHERE block_number >= {} LIMIT {} OFFSET {}".format(order_by, order_by, limit, offset, block_offset) else: - s = "SELECT block_number, tx_index FROM tx ORDER BY block_number DESC, tx_index DESC LIMIT {} OFFSET {}".format(limit, offset) + s = "SELECT block_number, tx_index FROM tx ORDER BY block_number {}, tx_index {} LIMIT {} OFFSET {}".format(order_by, order_by, limit, offset) r = session.execute(s) return r @@ -42,6 +47,7 @@ def list_transactions_mined_with_data( limit, block_offset, block_limit, + oldest=False, ): """Executes db query to return all confirmed transactions according to the specified offset and limit. @@ -52,13 +58,17 @@ def list_transactions_mined_with_data( :result: Result set :rtype: SQLAlchemy.ResultProxy """ + order_by = 'DESC' + if oldest: + order_by = 'ASC' + if block_offset: if block_limit: - s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} AND block_number <= {} ORDER BY block_number ASC, tx_index ASC OFFSET {} LIMIT {}".format(block_offset, block_limit, offset, limit) + s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} AND block_number <= {} ORDER BY block_number {}, tx_index {} OFFSET {} LIMIT {}".format(block_offset, block_limit, order_by, order_by, offset, limit) else: - s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} ORDER BY block_number ASC, tx_index ASC OFFSET {} LIMIT {}".format(block_offset, offset, limit) + s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} ORDER BY block_number {}, tx_index {} OFFSET {} LIMIT {}".format(block_offset, order_by, order_by, offset, limit) else: - s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id ORDER BY block_number ASC, tx_index ASC OFFSET {} LIMIT {}".format(offset, limit) + s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id ORDER BY block_number {}, tx_index {} OFFSET {} LIMIT {}".format(order_by, order_by, offset, limit) r = session.execute(s) @@ -71,6 +81,7 @@ def list_transactions_mined_with_data_index( end, block_offset, block_limit, + oldest=False, ): """Executes db query to return all confirmed transactions according to the specified offset and limit. @@ -81,13 +92,18 @@ def list_transactions_mined_with_data_index( :result: Result set :rtype: SQLAlchemy.ResultProxy """ + + order_by = 'DESC' + if oldest: + order_by = 'ASC' + if block_offset: if block_limit: - s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} and block_number <= {} ORDER BY block_number ASC, tx_index ASC OFFSET {} LIMIT {}".format(block_offset, block_limit, offset, end) + s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} and block_number <= {} ORDER BY block_number {}, tx_index {} OFFSET {} LIMIT {}".format(block_offset, block_limit, order_by, order_by, offset, end) else: - s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} ORDER BY block_number ASC, tx_index ASC OFFSET {} LIMIT {}".format(block_offset, offset, end) + s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} ORDER BY block_number {}, tx_index {} OFFSET {} LIMIT {}".format(block_offset, order_by, order_by, offset, end) else: - s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id ORDER BY block_number ASC, tx_index ASC OFFSET {} LIMIT {}".format(offset, end) + s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id ORDER BY block_number {}, tx_index {} OFFSET {} LIMIT {}".format(order_by, order_by, offset, end) r = session.execute(s) return r @@ -100,6 +116,7 @@ def list_transactions_account_mined_with_data_index( limit, block_offset, block_limit, + oldest=False, ): """Executes db query to return all confirmed transactions according to the specified offset and limit, filtered by address @@ -110,13 +127,18 @@ def list_transactions_account_mined_with_data_index( :result: Result set :rtype: SQLAlchemy.ResultProxy """ + + order_by = 'DESC' + if oldest: + order_by = 'ASC' + if block_offset: if block_limit: - s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} AND block_number <= {} AND (sender = '{}' OR recipient = '{}') ORDER BY block_number ASC, tx_index ASC OFFSET {} LIMIT {}".format(block_offset, block_limit, address, address, offset, limit) + s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} AND block_number <= {} AND (sender = '{}' OR recipient = '{}') ORDER BY block_number {}, tx_index {} OFFSET {} LIMIT {}".format(block_offset, block_limit, address, address, order_by, order_by, offset, limit) else: - s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} AND (sender = '{}' OR recipient = '{}') ORDER BY block_number ASC, tx_index ASC OFFSET {} LIMIT {}".format(block_offset, address, address, offset, limit) + s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} AND (sender = '{}' OR recipient = '{}') ORDER BY block_number {}, tx_index {} OFFSET {} LIMIT {}".format(block_offset, address, address, order_by, order_by, offset, limit) else: - s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE sender = '{}' OR recipient = '{}' ORDER BY block_number ASC, tx_index ASC OFFSET {} LIMIT {}".format(address, address, offset, limit) + s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE sender = '{}' OR recipient = '{}' ORDER BY block_number {}, tx_index {} OFFSET {} LIMIT {}".format(address, address, order_by, order_by, offset, limit) r = session.execute(s) return r @@ -128,6 +150,7 @@ def list_transactions_account_mined_with_data( limit, block_offset, block_limit, + oldest=False, ): """Executes db query to return all confirmed transactions according to the specified offset and limit. @@ -138,13 +161,18 @@ def list_transactions_account_mined_with_data( :result: Result set :rtype: SQLAlchemy.ResultProxy """ + + order_by = 'DESC' + if oldest: + order_by = 'ASC' + if block_offset: if block_limit: - s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} AND block_number <= {} AND (sender = '{}' OR recipient = '{}') ORDER BY block_number ASC, tx_index ASC OFFSET {} LIMIT {}".format(block_offset, block_limit, address, address, offset, limit) + s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} AND block_number <= {} AND (sender = '{}' OR recipient = '{}') ORDER BY block_number {}, tx_index {} OFFSET {} LIMIT {}".format(block_offset, block_limit, address, address, order_by, order_by, offset, limit) else: - s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} AND (sender = '{}' OR recipient = '{}') ORDER BY block_number ASC, tx_index ASC OFFSET {} LIMIT {}".format(block_offset, address, address, offset, limit) + s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} AND (sender = '{}' OR recipient = '{}') ORDER BY block_number {}, tx_index {} OFFSET {} LIMIT {}".format(block_offset, address, address, order_by, order_by, offset, limit) else: - s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE sender = '{}' OR recipient = '{}' ORDER BY block_number ASC, tx_index ASC".format(address, address, offset, limit) + s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE sender = '{}' OR recipient = '{}' ORDER BY block_number , tx_index {} OFFSET {} LIMIT {}".format(address, address, order_by, order_by, offset, limit) r = session.execute(s) return r @@ -157,6 +185,7 @@ def list_transactions_account_mined( limit, block_offset, block_limit, + oldest=False, ): """Same as list_transactions_mined(...), but only retrieves transaction where the specified account address is sender or recipient. @@ -169,7 +198,20 @@ def list_transactions_account_mined( :result: Result set :rtype: SQLAlchemy.ResultProxy """ - s = "SELECT block_number, tx_index FROM tx WHERE sender = '{}' OR recipient = '{}' ORDER BY block_number DESC, tx_index DESC LIMIT {} OFFSET {}".format(address, address, limit, offset) + + order_by = 'DESC' + if oldest: + order_by = 'ASC' + + if block_offset: + if block_limit: + s = "SELECT block_number, tx_index FROM tx WHERE block_number >= {} AND block_number <= {} AND (sender = '{}' OR recipient = '{}') ORDER BY block_number {}, tx_index {} LIMIT {} OFFSET {}".format(block_offset, block_limit, address, address, order_by, order_by, limit, offset) + else: + s = "SELECT block_number, tx_index FROM tx WHERE block_number >= {} AND (sender = '{}' OR recipient = '{}') ORDER BY block_number {}, tx_index {} LIMIT {} OFFSET {}".format(block_offset, address, address, order_by, order_by, limit, offset) + + else: + s = "SELECT block_number, tx_index FROM tx WHERE sender = '{}' OR recipient = '{}' ORDER BY block_number {}, tx_index {} LIMIT {} OFFSET {}".format(address, address, order_by, order_by, limit, offset) + r = session.execute(s) return r diff --git a/apps/cic-cache/cic_cache/runnable/daemons/query.py b/apps/cic-cache/cic_cache/runnable/daemons/query.py index 7d82605e..d7edf6ab 100644 --- a/apps/cic-cache/cic_cache/runnable/daemons/query.py +++ b/apps/cic-cache/cic_cache/runnable/daemons/query.py @@ -98,7 +98,7 @@ def process_transactions_all_data(session, env): raise ValueError('cart before the horse, dude') c = DataCache(session) - (lowest_block, highest_block, tx_cache) = c.load_transactions_with_data(0, 0, block_offset, block_end) + (lowest_block, highest_block, tx_cache) = c.load_transactions_with_data(0, 0, block_offset, block_end, oldest=True) # oldest needs to be settable for r in tx_cache: r['date_block'] = r['date_block'].timestamp() diff --git a/apps/cic-cache/cic_cache/tasks/tx.py b/apps/cic-cache/cic_cache/tasks/tx.py index 7bc85cd6..525dac84 100644 --- a/apps/cic-cache/cic_cache/tasks/tx.py +++ b/apps/cic-cache/cic_cache/tasks/tx.py @@ -12,7 +12,7 @@ celery_app = celery.current_app @celery_app.task(bind=True) -def tx_filter(self, offset, limit, address=None, encoding='hex'): +def tx_filter(self, offset, limit, address=None, oldest=False, encoding='hex'): queue = self.request.delivery_info.get('routing_key') session = SessionBase.create_session() @@ -20,9 +20,9 @@ def tx_filter(self, offset, limit, address=None, encoding='hex'): c = BloomCache(session) b = None if address == None: - (lowest_block, highest_block, bloom_filter_block, bloom_filter_tx) = c.load_transactions(offset, limit) + (lowest_block, highest_block, bloom_filter_block, bloom_filter_tx) = c.load_transactions(offset, limit, oldest=oldest) else: - (lowest_block, highest_block, bloom_filter_block, bloom_filter_tx) = c.load_transactions_account(address, offset, limit) + (lowest_block, highest_block, bloom_filter_block, bloom_filter_tx) = c.load_transactions_account(address, offset, limit, oldest=oldest) session.close() @@ -39,21 +39,21 @@ def tx_filter(self, offset, limit, address=None, encoding='hex'): @celery_app.task(bind=True) -def tx_filter_content(self, offset, limit, address=None, block_offset=None, block_limit=None, encoding='hex'): +def tx_filter_content(self, offset, limit, address=None, block_offset=None, block_limit=None, oldest=False, encoding='hex'): session = SessionBase.create_session() c = DataCache(session) b = None if address == None: if block_offset: - (lowest_block, highest_block, tx_cache) = c.load_transactions_with_data(offset, limit, block_offset=block_offset, block_limit=block_limit) + (lowest_block, highest_block, tx_cache) = c.load_transactions_with_data(offset, limit, block_offset=block_offset, block_limit=block_limit, oldest=oldest) else: (lowest_block, highest_block, tx_cache) = c.load_transactions_with_data_index(offset, limit, block_offset=block_offset, block_limit=block_limit) else: if block_offset: (lowest_block, highest_block, tx_cache) = c.load_transactions_account_with_data(address, offset, limit, block_offset=block_offset, block_limit=block_limit) else: - (lowest_block, highest_block, tx_cache) = c.load_transactions_account_with_data_index(address, offset, limit, block_number=block_number, block_limit=block_limit) + (lowest_block, highest_block, tx_cache) = c.load_transactions_account_with_data_index(address, offset, limit, block_offset=block_offset, block_limit=block_limit) session.close() diff --git a/apps/cic-cache/cic_cache/version.py b/apps/cic-cache/cic_cache/version.py index 338bb831..b5293a59 100644 --- a/apps/cic-cache/cic_cache/version.py +++ b/apps/cic-cache/cic_cache/version.py @@ -4,8 +4,8 @@ import semver version = ( 0, 2, - 0, - 'alpha.2', + 1, + 'alpha.1', ) version_object = semver.VersionInfo( diff --git a/apps/cic-cache/setup.cfg b/apps/cic-cache/setup.cfg index b6de6a9c..f8393c92 100644 --- a/apps/cic-cache/setup.cfg +++ b/apps/cic-cache/setup.cfg @@ -41,3 +41,4 @@ console_scripts = cic-cache-trackerd = cic_cache.runnable.daemons.tracker:main cic-cache-serverd = cic_cache.runnable.daemons.server:main cic-cache-taskerd = cic_cache.runnable.daemons.tasker:main + cic-cache-list = cic_cache.runable.list:main diff --git a/apps/cic-eth/cic_eth/api/api_task.py b/apps/cic-eth/cic_eth/api/api_task.py index 64448bfb..79501d30 100644 --- a/apps/cic-eth/cic_eth/api/api_task.py +++ b/apps/cic-eth/cic_eth/api/api_task.py @@ -520,9 +520,9 @@ class Api(ApiBase): s_external_get = celery.signature( external_task, [ - address, offset, limit, + address, ], queue=external_queue, ) diff --git a/apps/cic-eth/cic_eth/ext/tx.py b/apps/cic-eth/cic_eth/ext/tx.py index 77083874..4769d2aa 100644 --- a/apps/cic-eth/cic_eth/ext/tx.py +++ b/apps/cic-eth/cic_eth/ext/tx.py @@ -12,6 +12,7 @@ from chainlib.eth.tx import ( transaction_by_block, receipt, ) +from chainlib.eth.error import RequestMismatchException from chainlib.eth.block import block_by_number from chainlib.eth.contract import abi_decode_single from chainlib.eth.constant import ZERO_ADDRESS @@ -23,6 +24,7 @@ from chainqueue.db.models.otx import Otx from chainqueue.db.enum import StatusEnum from chainqueue.sql.query import get_tx_cache from eth_erc20 import ERC20 +from erc20_faucet import Faucet # local imports from cic_eth.queue.time import tx_times @@ -35,6 +37,33 @@ logg = logging.getLogger() MAX_BLOCK_TX = 250 +def parse_transaction(chain_spec, rpc, tx, sender_address=None): + try: + transfer_data = ERC20.parse_transfer_request(tx['input']) + tx_address = transfer_data[0] + tx_token_value = transfer_data[1] + logg.debug('matched transfer transaction {} sender {} recipient {} value {}'.format(tx['hash'], tx['from'], tx_address, tx_token_value)) + return (tx_address, tx_token_value) + except RequestMismatchException: + pass + + try: + transfer_data = Faucet.parse_give_to_request(tx['input']) + tx_address = transfer_data[0] + logg.warning('looking up current faucet value, historic faucet value not implemented yet. use with care!!') + c = Faucet(chain_spec) + o = c.token_amount(tx['to'], sender_address=sender_address) + r = rpc.do(o) + tx_token_value = Faucet.parse_token_amount(r) + logg.debug('matched giveto transaction {} sender {} recipient {} value {}'.format(tx['hash'], tx['from'], tx_address, tx_token_value)) + return (tx_address, tx_token_value) + + except RequestMismatchException: + pass + + return None + + # TODO: Make this method easier to read @celery_app.task(bind=True, base=BaseTask) def list_tx_by_bloom(self, bloomspec, address, chain_spec_dict): @@ -71,6 +100,7 @@ def list_tx_by_bloom(self, bloomspec, address, chain_spec_dict): tx_filter = moolb.Bloom(databitlen, bloomspec['filter_rounds'], default_data=tx_filter_data) txs = {} + logg.debug('processing filter with span low {} to high {}'.format(bloomspec['low'], bloomspec['high'])) for block_height in range(bloomspec['low'], bloomspec['high']): block_height_bytes = block_height.to_bytes(4, 'big') if block_filter.check(block_height_bytes): @@ -80,9 +110,9 @@ def list_tx_by_bloom(self, bloomspec, address, chain_spec_dict): logg.debug('block {}'.format(block)) for tx_index in range(0, len(block['transactions'])): - composite = tx_index + block_height - tx_index_bytes = composite.to_bytes(4, 'big') - if tx_filter.check(tx_index_bytes): + tx_index_bytes = tx_index.to_bytes(4, 'big') + composite = block_height_bytes + tx_index_bytes + if tx_filter.check(composite): logg.debug('filter matched block {} tx {}'.format(block_height, tx_index)) try: @@ -91,16 +121,17 @@ def list_tx_by_bloom(self, bloomspec, address, chain_spec_dict): tx = rpc.do(o) except Exception as e: logg.debug('false positive on block {} tx {} ({})'.format(block_height, tx_index, e)) - continue + + logg.debug('got tx {}'.format(tx)) tx_address = None tx_token_value = 0 - try: - transfer_data = ERC20.parse_transfer_request(tx['data']) - tx_address = transfer_data[0] - tx_token_value = transfer_data[1] - except ValueError: - logg.debug('not a transfer transaction, skipping {}'.format(tx)) + + transfer_data = parse_transaction(chain_spec, rpc, tx, sender_address=BaseTask.call_address) + if transfer_data == None: continue + tx_address = transfer_data[0] + tx_token_value = transfer_data[1] + if address == tx_address: status = StatusEnum.SENT try: @@ -136,6 +167,7 @@ def list_tx_by_bloom(self, bloomspec, address, chain_spec_dict): return txs + # TODO: Surely it must be possible to optimize this # TODO: DRY this with callback filter in cic_eth/runnable/manager # TODO: Remove redundant fields from end representation (timestamp, tx_hash) diff --git a/docker-compose.yml b/docker-compose.yml index 00fddb4d..dd99574f 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -238,7 +238,7 @@ services: - | if [[ -f /tmp/cic/config/.env ]]; then source /tmp/cic/config/.env; fi "/usr/local/bin/uwsgi" \ - --wsgi-file /usr/src/cic-cache/cic_cache/runnable/daemons/server.py \ + --wsgi-file /root/cic_cache/runnable/daemons/server.py \ --http :8000 \ --pyargv "-vv"