From 60b36945df51fb0e99e7f19652e2aea482ae565f Mon Sep 17 00:00:00 2001 From: Louis Holbrook Date: Tue, 18 May 2021 17:13:57 +0000 Subject: [PATCH] cic-cache: Add data API --- apps/cic-cache/cic_cache/cache.py | 54 ++++++++- apps/cic-cache/cic_cache/db/list.py | 20 ++++ .../cic_cache/runnable/daemons/query.py | 110 ++++++++++++++++++ .../cic_cache/runnable/daemons/server.py | 84 +++---------- apps/cic-cache/tests/conftest.py | 13 +++ apps/cic-cache/tests/test_api.py | 31 +++++ apps/cic-cache/tests/test_cache.py | 21 ++++ 7 files changed, 258 insertions(+), 75 deletions(-) create mode 100644 apps/cic-cache/cic_cache/runnable/daemons/query.py create mode 100644 apps/cic-cache/tests/test_api.py diff --git a/apps/cic-cache/cic_cache/cache.py b/apps/cic-cache/cic_cache/cache.py index 341c7c9..f0cd2ce 100644 --- a/apps/cic-cache/cic_cache/cache.py +++ b/apps/cic-cache/cic_cache/cache.py @@ -1,22 +1,28 @@ # standard imports import logging +import datetime -# third-party imports +# external imports import moolb # local imports -from cic_cache.db import list_transactions_mined -from cic_cache.db import list_transactions_account_mined +from cic_cache.db.list import ( + list_transactions_mined, + list_transactions_account_mined, + list_transactions_mined_with_data, + ) logg = logging.getLogger() -class BloomCache: +class Cache: def __init__(self, session): self.session = session +class BloomCache(Cache): + @staticmethod def __get_filter_size(n): n = 8192 * 8 @@ -87,3 +93,43 @@ class BloomCache: f_blocktx.add(block + tx) logg.debug('added block {} tx {} lo {} hi {}'.format(r[0], r[1], lowest_block, highest_block)) return (lowest_block, highest_block, f_block.to_bytes(), f_blocktx.to_bytes(),) + + +class DataCache(Cache): + + def load_transactions_with_data(self, offset, end): + rows = list_transactions_mined_with_data(self.session, offset, end) + tx_cache = [] + highest_block = -1; + lowest_block = -1; + date_is_str = None # stick this in startup + for r in rows: + if highest_block == -1: + highest_block = r['block_number'] + lowest_block = r['block_number'] + tx_type = 'unknown' + + if r['value'] != None: + tx_type = '{}.{}'.format(r['domain'], r['value']) + + if date_is_str == None: + date_is_str = type(r['date_block']).__name__ == 'str' + + o = { + 'block_number': r['block_number'], + 'tx_hash': r['tx_hash'], + 'date_block': r['date_block'], + 'sender': r['sender'], + 'recipient': r['recipient'], + 'from_value': int(r['from_value']), + 'to_value': int(r['to_value']), + 'source_token': r['source_token'], + 'destination_token': r['destination_token'], + 'tx_type': tx_type, + } + + if date_is_str: + o['date_block'] = datetime.datetime.fromisoformat(r['date_block']) + + tx_cache.append(o) + return (lowest_block, highest_block, tx_cache) diff --git a/apps/cic-cache/cic_cache/db/list.py b/apps/cic-cache/cic_cache/db/list.py index efcf1cf..e469d66 100644 --- a/apps/cic-cache/cic_cache/db/list.py +++ b/apps/cic-cache/cic_cache/db/list.py @@ -28,6 +28,26 @@ def list_transactions_mined( return r +def list_transactions_mined_with_data( + session, + offset, + end, + ): + """Executes db query to return all confirmed transactions according to the specified offset and limit. + + :param offset: Offset in data set to return transactions from + :type offset: int + :param limit: Max number of transactions to retrieve + :type limit: int + :result: Result set + :rtype: SQLAlchemy.ResultProxy + """ + s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} AND block_number <= {} ORDER BY block_number ASC, tx_index ASC".format(offset, end) + + r = session.execute(s) + return r + + def list_transactions_account_mined( session, address, diff --git a/apps/cic-cache/cic_cache/runnable/daemons/query.py b/apps/cic-cache/cic_cache/runnable/daemons/query.py new file mode 100644 index 0000000..bafde93 --- /dev/null +++ b/apps/cic-cache/cic_cache/runnable/daemons/query.py @@ -0,0 +1,110 @@ +# standard imports +import logging +import json +import re +import base64 + +# local imports +from cic_cache.cache import ( + BloomCache, + DataCache, + ) + +logg = logging.getLogger(__name__) + +re_transactions_all_bloom = r'/tx/(\d+)?/?(\d+)/?' +re_transactions_account_bloom = r'/tx/user/((0x)?[a-fA-F0-9]+)/?(\d+)?/?(\d+)/?' +re_transactions_all_data = r'/txa/(\d+)/(\d+)/?' + +DEFAULT_LIMIT = 100 + + +def process_transactions_account_bloom(session, env): + r = re.match(re_transactions_account_bloom, env.get('PATH_INFO')) + if not r: + return None + + address = r[1] + if r[2] == None: + address = '0x' + address + offset = DEFAULT_LIMIT + if r.lastindex > 2: + offset = r[3] + limit = 0 + if r.lastindex > 3: + limit = r[4] + + c = BloomCache(session) + (lowest_block, highest_block, bloom_filter_block, bloom_filter_tx) = c.load_transactions_account(address, offset, limit) + + o = { + 'alg': 'sha256', + 'low': lowest_block, + 'high': highest_block, + 'block_filter': base64.b64encode(bloom_filter_block).decode('utf-8'), + 'blocktx_filter': base64.b64encode(bloom_filter_tx).decode('utf-8'), + 'filter_rounds': 3, + } + + j = json.dumps(o) + + return ('application/json', j.encode('utf-8'),) + + +def process_transactions_all_bloom(session, env): + r = re.match(re_transactions_all_bloom, env.get('PATH_INFO')) + if not r: + return None + + offset = DEFAULT_LIMIT + if r.lastindex > 0: + offset = r[1] + limit = 0 + if r.lastindex > 1: + limit = r[2] + + c = BloomCache(session) + (lowest_block, highest_block, bloom_filter_block, bloom_filter_tx) = c.load_transactions(offset, limit) + + o = { + 'alg': 'sha256', + 'low': lowest_block, + 'high': highest_block, + 'block_filter': base64.b64encode(bloom_filter_block).decode('utf-8'), + 'blocktx_filter': base64.b64encode(bloom_filter_tx).decode('utf-8'), + 'filter_rounds': 3, + } + + j = json.dumps(o) + + return ('application/json', j.encode('utf-8'),) + + +def process_transactions_all_data(session, env): + r = re.match(re_transactions_all_data, env.get('PATH_INFO')) + if not r: + return None + if env.get('HTTP_X_CIC_CACHE_MODE') != 'all': + return None + + offset = r[1] + end = r[2] + if r[2] < r[1]: + raise ValueError('cart before the horse, dude') + + c = DataCache(session) + (lowest_block, highest_block, tx_cache) = c.load_transactions_with_data(offset, end) + + for r in tx_cache: + r['date_block'] = r['date_block'].timestamp() + + o = { + 'low': lowest_block, + 'high': highest_block, + 'data': tx_cache, + } + + + j = json.dumps(o) + + return ('application/json', j.encode('utf-8'),) diff --git a/apps/cic-cache/cic_cache/runnable/daemons/server.py b/apps/cic-cache/cic_cache/runnable/daemons/server.py index e0e8465..d7a29ed 100644 --- a/apps/cic-cache/cic_cache/runnable/daemons/server.py +++ b/apps/cic-cache/cic_cache/runnable/daemons/server.py @@ -1,18 +1,20 @@ # standard imports import os -import re import logging import argparse -import json import base64 -# third-party imports +# external imports import confini # local imports -from cic_cache import BloomCache from cic_cache.db import dsn_from_config from cic_cache.db.models.base import SessionBase +from cic_cache.runnable.daemons.query import ( + process_transactions_account_bloom, + process_transactions_all_bloom, + process_transactions_all_data, + ) logging.basicConfig(level=logging.WARNING) logg = logging.getLogger() @@ -44,72 +46,6 @@ logg.debug('config:\n{}'.format(config)) dsn = dsn_from_config(config) SessionBase.connect(dsn, config.true('DATABASE_DEBUG')) -re_transactions_all_bloom = r'/tx/(\d+)?/?(\d+)/?' -re_transactions_account_bloom = r'/tx/user/((0x)?[a-fA-F0-9]+)/?(\d+)?/?(\d+)/?' - -DEFAULT_LIMIT = 100 - - -def process_transactions_account_bloom(session, env): - r = re.match(re_transactions_account_bloom, env.get('PATH_INFO')) - if not r: - return None - - address = r[1] - if r[2] == None: - address = '0x' + address - offset = DEFAULT_LIMIT - if r.lastindex > 2: - offset = r[3] - limit = 0 - if r.lastindex > 3: - limit = r[4] - - c = BloomCache(session) - (lowest_block, highest_block, bloom_filter_block, bloom_filter_tx) = c.load_transactions_account(address, offset, limit) - - o = { - 'alg': 'sha256', - 'low': lowest_block, - 'high': highest_block, - 'block_filter': base64.b64encode(bloom_filter_block).decode('utf-8'), - 'blocktx_filter': base64.b64encode(bloom_filter_tx).decode('utf-8'), - 'filter_rounds': 3, - } - - j = json.dumps(o) - - return ('application/json', j.encode('utf-8'),) - - -def process_transactions_all_bloom(session, env): - r = re.match(re_transactions_all_bloom, env.get('PATH_INFO')) - if not r: - return None - - offset = DEFAULT_LIMIT - if r.lastindex > 0: - offset = r[1] - limit = 0 - if r.lastindex > 1: - limit = r[2] - - c = BloomCache(session) - (lowest_block, highest_block, bloom_filter_block, bloom_filter_tx) = c.load_transactions(offset, limit) - - o = { - 'alg': 'sha256', - 'low': lowest_block, - 'high': highest_block, - 'block_filter': base64.b64encode(bloom_filter_block).decode('utf-8'), - 'blocktx_filter': base64.b64encode(bloom_filter_tx).decode('utf-8'), - 'filter_rounds': 3, - } - - j = json.dumps(o) - - return ('application/json', j.encode('utf-8'),) - # uwsgi application def application(env, start_response): @@ -119,10 +55,16 @@ def application(env, start_response): session = SessionBase.create_session() for handler in [ + process_transactions_all_data, process_transactions_all_bloom, process_transactions_account_bloom, ]: - r = handler(session, env) + r = None + try: + r = handler(session, env) + except ValueError as e: + start_response('400 {}'.format(str(e))) + return [] if r != None: (mime_type, content) = r break diff --git a/apps/cic-cache/tests/conftest.py b/apps/cic-cache/tests/conftest.py index 00db9ae..f5e8edd 100644 --- a/apps/cic-cache/tests/conftest.py +++ b/apps/cic-cache/tests/conftest.py @@ -88,3 +88,16 @@ def txs( tx_hash_first, tx_hash_second, ] + + +@pytest.fixture(scope='function') +def tag_txs( + init_database, + txs, + ): + + db.add_tag(init_database, 'taag', domain='test') + init_database.commit() + + db.tag_transaction(init_database, txs[1], 'taag', domain='test') + diff --git a/apps/cic-cache/tests/test_api.py b/apps/cic-cache/tests/test_api.py new file mode 100644 index 0000000..02561bc --- /dev/null +++ b/apps/cic-cache/tests/test_api.py @@ -0,0 +1,31 @@ +# standard imports +import json + +# external imports +import pytest + +# local imports +from cic_cache.runnable.daemons.query import process_transactions_all_data + + +def test_api_all_data( + init_database, + txs, + ): + + env = { + 'PATH_INFO': '/txa/410000/420000', + 'HTTP_X_CIC_CACHE_MODE': 'all', + } + j = process_transactions_all_data(init_database, env) + o = json.loads(j[1]) + + assert len(o['data']) == 2 + + env = { + 'PATH_INFO': '/txa/420000/410000', + 'HTTP_X_CIC_CACHE_MODE': 'all', + } + + with pytest.raises(ValueError): + j = process_transactions_all_data(init_database, env) diff --git a/apps/cic-cache/tests/test_cache.py b/apps/cic-cache/tests/test_cache.py index 3c77264..cdd038c 100644 --- a/apps/cic-cache/tests/test_cache.py +++ b/apps/cic-cache/tests/test_cache.py @@ -9,6 +9,7 @@ import pytest # local imports from cic_cache import BloomCache +from cic_cache.cache import DataCache logg = logging.getLogger() @@ -33,3 +34,23 @@ def test_cache( assert b[0] == list_defaults['block'] - 1 + +def test_cache_data( + init_database, + list_defaults, + list_actors, + list_tokens, + txs, + tag_txs, + ): + + session = init_database + + c = DataCache(session) + b = c.load_transactions_with_data(410000, 420000) + + assert len(b[2]) == 2 + assert b[2][0]['tx_hash'] == txs[1] + assert b[2][1]['tx_type'] == 'unknown' + assert b[2][0]['tx_type'] == 'test.taag' +