Make list bloom task work with updated cic cache filter task

This commit is contained in:
nolash
2021-07-25 15:07:28 +02:00
parent 72afbc0cab
commit a9073e1e23
10 changed files with 128 additions and 51 deletions

View File

@@ -55,13 +55,14 @@ class Api:
queue=callback_queue,
)
def list(self, offset=0, limit=100, address=None):
def list(self, offset=0, limit=100, address=None, oldest=False):
s = celery.signature(
'cic_cache.tasks.tx.tx_filter',
[
offset,
limit,
address,
oldest,
],
queue=self.queue,
)
@@ -73,7 +74,7 @@ class Api:
return t
def list_content(self, offset=0, limit=100, address=None, block_offset=None, block_limit=None):
def list_content(self, offset=0, limit=100, address=None, block_offset=None, block_limit=None, oldest=False):
s = celery.signature(
'cic_cache.tasks.tx.tx_filter_content',
[
@@ -82,6 +83,7 @@ class Api:
address,
block_offset,
block_limit,
oldest,
],
queue=self.queue,
)

View File

@@ -36,7 +36,7 @@ class BloomCache(Cache):
return n
def load_transactions(self, offset, limit):
def load_transactions(self, offset, limit, oldest=False):
"""Retrieves a list of transactions from cache and creates a bloom filter pointing to blocks and transactions.
Block and transaction numbers are serialized as 32-bit big-endian numbers. The input to the second bloom filter is the concatenation of the serialized block number and transaction index.
@@ -53,7 +53,7 @@ class BloomCache(Cache):
:return: Lowest block, bloom filter for blocks, bloom filter for blocks|tx
:rtype: tuple
"""
rows = list_transactions_mined(self.session, offset, limit, None, None)
rows = list_transactions_mined(self.session, offset, limit, block_offset=None, block_limit=None, oldest=oldest)
f_block = moolb.Bloom(BloomCache.__get_filter_size(limit), 3)
f_blocktx = moolb.Bloom(BloomCache.__get_filter_size(limit), 3)
@@ -71,7 +71,7 @@ class BloomCache(Cache):
return (lowest_block, highest_block, f_block.to_bytes(), f_blocktx.to_bytes(),)
def load_transactions_account(self, address, offset, limit):
def load_transactions_account(self, address, offset, limit, oldest=False):
"""Same as load_transactions(...), but only retrieves transactions where the specified account address is sender or recipient.
:param address: Address to retrieve transactions for.
@@ -83,7 +83,7 @@ class BloomCache(Cache):
:return: Lowest block, bloom filter for blocks, bloom filter for blocks|tx
:rtype: tuple
"""
rows = list_transactions_account_mined(self.session, address, offset, limit, None, None)
rows = list_transactions_account_mined(self.session, address, offset, limit, block_offset=None, block_limit=None, oldest=oldest)
f_block = moolb.Bloom(BloomCache.__get_filter_size(limit), 3)
f_blocktx = moolb.Bloom(BloomCache.__get_filter_size(limit), 3)
@@ -103,31 +103,31 @@ class BloomCache(Cache):
class DataCache(Cache):
def load_transactions_with_data_index(self, offset, limit, block_offset=None, block_limit=None):
def load_transactions_with_data_index(self, offset, limit, block_offset=None, block_limit=None, oldest=False):
if limit == 0:
limit = DEFAULT_LIMIT
rows = list_transactions_mined_with_data_index(self.session, offset, limit, block_offset, block_limit)
rows = list_transactions_mined_with_data_index(self.session, offset, limit, block_offset, block_limit, oldest=oldest)
return self.__load_transactions(rows)
def load_transactions_with_data(self, offset, limit, block_offset=None, block_limit=None):
def load_transactions_with_data(self, offset, limit, block_offset=None, block_limit=None, oldest=False):
if limit == 0:
limit = DEFAULT_LIMIT
rows = list_transactions_mined_with_data(self.session, offset, limit, block_offset, block_limit)
rows = list_transactions_mined_with_data(self.session, offset, limit, block_offset, block_limit, oldest=oldest)
return self.__load_transactions(rows)
def load_transactions_account_with_data_index(self, address, offset, limit, block_offset=None, block_limit=None):
def load_transactions_account_with_data_index(self, address, offset, limit, block_offset=None, block_limit=None, oldest=False):
if limit == 0:
limit = DEFAULT_LIMIT
rows = list_transactions_account_mined_with_data_index(self.session, address, offset, limit, block_offset, block_limit)
rows = list_transactions_account_mined_with_data_index(self.session, address, offset, limit, block_offset, block_limit, oldest=oldest)
return self.__load_transactions(rows)
def load_transactions_account_with_data(self, address, offset, limit, block_offset=None, block_limit=None):
def load_transactions_account_with_data(self, address, offset, limit, block_offset=None, block_limit=None, oldest=False):
if limit == 0:
limit = DEFAULT_LIMIT
rows = list_transactions_account_mined_with_data(self.session, address, offset, limit, block_offset, block_limit)
rows = list_transactions_account_mined_with_data(self.session, address, offset, limit, block_offset, block_limit, oldest=oldest)
return self.__load_transactions(rows)

View File

@@ -15,6 +15,7 @@ def list_transactions_mined(
limit,
block_offset,
block_limit,
oldest=False,
):
"""Executes db query to return all confirmed transactions according to the specified offset and limit.
@@ -25,13 +26,17 @@ def list_transactions_mined(
:result: Result set
:rtype: SQLAlchemy.ResultProxy
"""
order_by = 'DESC'
if oldest:
order_by = 'ASC'
if block_offset:
if block_limit:
s = "SELECT block_number, tx_index FROM tx ORDER BY block_number DESC, tx_index DESC WHERE block_number >= {} and block_number <= {} LIMIT {} OFFSET {}".format(limit, offset, block_offset, block_limit)
s = "SELECT block_number, tx_index FROM tx ORDER BY block_number {}, tx_index {} WHERE block_number >= {} and block_number <= {} LIMIT {} OFFSET {}".format(order_by, order_by, limit, offset, block_offset, block_limit)
else:
s = "SELECT block_number, tx_index FROM tx ORDER BY block_number DESC, tx_index DESC WHERE block_number >= {} LIMIT {} OFFSET {}".format(limit, offset, block_offset)
s = "SELECT block_number, tx_index FROM tx ORDER BY block_number {}, tx_index {} WHERE block_number >= {} LIMIT {} OFFSET {}".format(order_by, order_by, limit, offset, block_offset)
else:
s = "SELECT block_number, tx_index FROM tx ORDER BY block_number DESC, tx_index DESC LIMIT {} OFFSET {}".format(limit, offset)
s = "SELECT block_number, tx_index FROM tx ORDER BY block_number {}, tx_index {} LIMIT {} OFFSET {}".format(order_by, order_by, limit, offset)
r = session.execute(s)
return r
@@ -42,6 +47,7 @@ def list_transactions_mined_with_data(
limit,
block_offset,
block_limit,
oldest=False,
):
"""Executes db query to return all confirmed transactions according to the specified offset and limit.
@@ -52,13 +58,17 @@ def list_transactions_mined_with_data(
:result: Result set
:rtype: SQLAlchemy.ResultProxy
"""
order_by = 'DESC'
if oldest:
order_by = 'ASC'
if block_offset:
if block_limit:
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} AND block_number <= {} ORDER BY block_number ASC, tx_index ASC OFFSET {} LIMIT {}".format(block_offset, block_limit, offset, limit)
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} AND block_number <= {} ORDER BY block_number {}, tx_index {} OFFSET {} LIMIT {}".format(block_offset, block_limit, order_by, order_by, offset, limit)
else:
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} ORDER BY block_number ASC, tx_index ASC OFFSET {} LIMIT {}".format(block_offset, offset, limit)
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} ORDER BY block_number {}, tx_index {} OFFSET {} LIMIT {}".format(block_offset, order_by, order_by, offset, limit)
else:
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id ORDER BY block_number ASC, tx_index ASC OFFSET {} LIMIT {}".format(offset, limit)
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id ORDER BY block_number {}, tx_index {} OFFSET {} LIMIT {}".format(order_by, order_by, offset, limit)
r = session.execute(s)
@@ -71,6 +81,7 @@ def list_transactions_mined_with_data_index(
end,
block_offset,
block_limit,
oldest=False,
):
"""Executes db query to return all confirmed transactions according to the specified offset and limit.
@@ -81,13 +92,18 @@ def list_transactions_mined_with_data_index(
:result: Result set
:rtype: SQLAlchemy.ResultProxy
"""
order_by = 'DESC'
if oldest:
order_by = 'ASC'
if block_offset:
if block_limit:
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} and block_number <= {} ORDER BY block_number ASC, tx_index ASC OFFSET {} LIMIT {}".format(block_offset, block_limit, offset, end)
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} and block_number <= {} ORDER BY block_number {}, tx_index {} OFFSET {} LIMIT {}".format(block_offset, block_limit, order_by, order_by, offset, end)
else:
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} ORDER BY block_number ASC, tx_index ASC OFFSET {} LIMIT {}".format(block_offset, offset, end)
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} ORDER BY block_number {}, tx_index {} OFFSET {} LIMIT {}".format(block_offset, order_by, order_by, offset, end)
else:
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id ORDER BY block_number ASC, tx_index ASC OFFSET {} LIMIT {}".format(offset, end)
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id ORDER BY block_number {}, tx_index {} OFFSET {} LIMIT {}".format(order_by, order_by, offset, end)
r = session.execute(s)
return r
@@ -100,6 +116,7 @@ def list_transactions_account_mined_with_data_index(
limit,
block_offset,
block_limit,
oldest=False,
):
"""Executes db query to return all confirmed transactions according to the specified offset and limit, filtered by address
@@ -110,13 +127,18 @@ def list_transactions_account_mined_with_data_index(
:result: Result set
:rtype: SQLAlchemy.ResultProxy
"""
order_by = 'DESC'
if oldest:
order_by = 'ASC'
if block_offset:
if block_limit:
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} AND block_number <= {} AND (sender = '{}' OR recipient = '{}') ORDER BY block_number ASC, tx_index ASC OFFSET {} LIMIT {}".format(block_offset, block_limit, address, address, offset, limit)
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} AND block_number <= {} AND (sender = '{}' OR recipient = '{}') ORDER BY block_number {}, tx_index {} OFFSET {} LIMIT {}".format(block_offset, block_limit, address, address, order_by, order_by, offset, limit)
else:
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} AND (sender = '{}' OR recipient = '{}') ORDER BY block_number ASC, tx_index ASC OFFSET {} LIMIT {}".format(block_offset, address, address, offset, limit)
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} AND (sender = '{}' OR recipient = '{}') ORDER BY block_number {}, tx_index {} OFFSET {} LIMIT {}".format(block_offset, address, address, order_by, order_by, offset, limit)
else:
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE sender = '{}' OR recipient = '{}' ORDER BY block_number ASC, tx_index ASC OFFSET {} LIMIT {}".format(address, address, offset, limit)
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE sender = '{}' OR recipient = '{}' ORDER BY block_number {}, tx_index {} OFFSET {} LIMIT {}".format(address, address, order_by, order_by, offset, limit)
r = session.execute(s)
return r
@@ -128,6 +150,7 @@ def list_transactions_account_mined_with_data(
limit,
block_offset,
block_limit,
oldest=False,
):
"""Executes db query to return all confirmed transactions according to the specified offset and limit.
@@ -138,13 +161,18 @@ def list_transactions_account_mined_with_data(
:result: Result set
:rtype: SQLAlchemy.ResultProxy
"""
order_by = 'DESC'
if oldest:
order_by = 'ASC'
if block_offset:
if block_limit:
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} AND block_number <= {} AND (sender = '{}' OR recipient = '{}') ORDER BY block_number ASC, tx_index ASC OFFSET {} LIMIT {}".format(block_offset, block_limit, address, address, offset, limit)
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} AND block_number <= {} AND (sender = '{}' OR recipient = '{}') ORDER BY block_number {}, tx_index {} OFFSET {} LIMIT {}".format(block_offset, block_limit, address, address, order_by, order_by, offset, limit)
else:
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} AND (sender = '{}' OR recipient = '{}') ORDER BY block_number ASC, tx_index ASC OFFSET {} LIMIT {}".format(block_offset, address, address, offset, limit)
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} AND (sender = '{}' OR recipient = '{}') ORDER BY block_number {}, tx_index {} OFFSET {} LIMIT {}".format(block_offset, address, address, order_by, order_by, offset, limit)
else:
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE sender = '{}' OR recipient = '{}' ORDER BY block_number ASC, tx_index ASC".format(address, address, offset, limit)
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE sender = '{}' OR recipient = '{}' ORDER BY block_number , tx_index {} OFFSET {} LIMIT {}".format(address, address, order_by, order_by, offset, limit)
r = session.execute(s)
return r
@@ -157,6 +185,7 @@ def list_transactions_account_mined(
limit,
block_offset,
block_limit,
oldest=False,
):
"""Same as list_transactions_mined(...), but only retrieves transaction where the specified account address is sender or recipient.
@@ -169,7 +198,20 @@ def list_transactions_account_mined(
:result: Result set
:rtype: SQLAlchemy.ResultProxy
"""
s = "SELECT block_number, tx_index FROM tx WHERE sender = '{}' OR recipient = '{}' ORDER BY block_number DESC, tx_index DESC LIMIT {} OFFSET {}".format(address, address, limit, offset)
order_by = 'DESC'
if oldest:
order_by = 'ASC'
if block_offset:
if block_limit:
s = "SELECT block_number, tx_index FROM tx WHERE block_number >= {} AND block_number <= {} AND (sender = '{}' OR recipient = '{}') ORDER BY block_number {}, tx_index {} LIMIT {} OFFSET {}".format(block_offset, block_limit, address, address, order_by, order_by, limit, offset)
else:
s = "SELECT block_number, tx_index FROM tx WHERE block_number >= {} AND (sender = '{}' OR recipient = '{}') ORDER BY block_number {}, tx_index {} LIMIT {} OFFSET {}".format(block_offset, address, address, order_by, order_by, limit, offset)
else:
s = "SELECT block_number, tx_index FROM tx WHERE sender = '{}' OR recipient = '{}' ORDER BY block_number {}, tx_index {} LIMIT {} OFFSET {}".format(address, address, order_by, order_by, limit, offset)
r = session.execute(s)
return r

View File

@@ -98,7 +98,7 @@ def process_transactions_all_data(session, env):
raise ValueError('cart before the horse, dude')
c = DataCache(session)
(lowest_block, highest_block, tx_cache) = c.load_transactions_with_data(0, 0, block_offset, block_end)
(lowest_block, highest_block, tx_cache) = c.load_transactions_with_data(0, 0, block_offset, block_end, oldest=True) # oldest needs to be settable
for r in tx_cache:
r['date_block'] = r['date_block'].timestamp()

View File

@@ -12,7 +12,7 @@ celery_app = celery.current_app
@celery_app.task(bind=True)
def tx_filter(self, offset, limit, address=None, encoding='hex'):
def tx_filter(self, offset, limit, address=None, oldest=False, encoding='hex'):
queue = self.request.delivery_info.get('routing_key')
session = SessionBase.create_session()
@@ -20,9 +20,9 @@ def tx_filter(self, offset, limit, address=None, encoding='hex'):
c = BloomCache(session)
b = None
if address == None:
(lowest_block, highest_block, bloom_filter_block, bloom_filter_tx) = c.load_transactions(offset, limit)
(lowest_block, highest_block, bloom_filter_block, bloom_filter_tx) = c.load_transactions(offset, limit, oldest=oldest)
else:
(lowest_block, highest_block, bloom_filter_block, bloom_filter_tx) = c.load_transactions_account(address, offset, limit)
(lowest_block, highest_block, bloom_filter_block, bloom_filter_tx) = c.load_transactions_account(address, offset, limit, oldest=oldest)
session.close()
@@ -39,21 +39,21 @@ def tx_filter(self, offset, limit, address=None, encoding='hex'):
@celery_app.task(bind=True)
def tx_filter_content(self, offset, limit, address=None, block_offset=None, block_limit=None, encoding='hex'):
def tx_filter_content(self, offset, limit, address=None, block_offset=None, block_limit=None, oldest=False, encoding='hex'):
session = SessionBase.create_session()
c = DataCache(session)
b = None
if address == None:
if block_offset:
(lowest_block, highest_block, tx_cache) = c.load_transactions_with_data(offset, limit, block_offset=block_offset, block_limit=block_limit)
(lowest_block, highest_block, tx_cache) = c.load_transactions_with_data(offset, limit, block_offset=block_offset, block_limit=block_limit, oldest=oldest)
else:
(lowest_block, highest_block, tx_cache) = c.load_transactions_with_data_index(offset, limit, block_offset=block_offset, block_limit=block_limit)
else:
if block_offset:
(lowest_block, highest_block, tx_cache) = c.load_transactions_account_with_data(address, offset, limit, block_offset=block_offset, block_limit=block_limit)
else:
(lowest_block, highest_block, tx_cache) = c.load_transactions_account_with_data_index(address, offset, limit, block_number=block_number, block_limit=block_limit)
(lowest_block, highest_block, tx_cache) = c.load_transactions_account_with_data_index(address, offset, limit, block_offset=block_offset, block_limit=block_limit)
session.close()

View File

@@ -4,8 +4,8 @@ import semver
version = (
0,
2,
0,
'alpha.2',
1,
'alpha.1',
)
version_object = semver.VersionInfo(