cic-cache: Add data API
This commit is contained in:
parent
dae6526677
commit
60b36945df
@ -1,22 +1,28 @@
|
|||||||
# standard imports
|
# standard imports
|
||||||
import logging
|
import logging
|
||||||
|
import datetime
|
||||||
|
|
||||||
# third-party imports
|
# external imports
|
||||||
import moolb
|
import moolb
|
||||||
|
|
||||||
# local imports
|
# local imports
|
||||||
from cic_cache.db import list_transactions_mined
|
from cic_cache.db.list import (
|
||||||
from cic_cache.db import list_transactions_account_mined
|
list_transactions_mined,
|
||||||
|
list_transactions_account_mined,
|
||||||
|
list_transactions_mined_with_data,
|
||||||
|
)
|
||||||
|
|
||||||
logg = logging.getLogger()
|
logg = logging.getLogger()
|
||||||
|
|
||||||
|
|
||||||
class BloomCache:
|
class Cache:
|
||||||
|
|
||||||
def __init__(self, session):
|
def __init__(self, session):
|
||||||
self.session = session
|
self.session = session
|
||||||
|
|
||||||
|
|
||||||
|
class BloomCache(Cache):
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def __get_filter_size(n):
|
def __get_filter_size(n):
|
||||||
n = 8192 * 8
|
n = 8192 * 8
|
||||||
@ -87,3 +93,43 @@ class BloomCache:
|
|||||||
f_blocktx.add(block + tx)
|
f_blocktx.add(block + tx)
|
||||||
logg.debug('added block {} tx {} lo {} hi {}'.format(r[0], r[1], lowest_block, highest_block))
|
logg.debug('added block {} tx {} lo {} hi {}'.format(r[0], r[1], lowest_block, highest_block))
|
||||||
return (lowest_block, highest_block, f_block.to_bytes(), f_blocktx.to_bytes(),)
|
return (lowest_block, highest_block, f_block.to_bytes(), f_blocktx.to_bytes(),)
|
||||||
|
|
||||||
|
|
||||||
|
class DataCache(Cache):
|
||||||
|
|
||||||
|
def load_transactions_with_data(self, offset, end):
|
||||||
|
rows = list_transactions_mined_with_data(self.session, offset, end)
|
||||||
|
tx_cache = []
|
||||||
|
highest_block = -1;
|
||||||
|
lowest_block = -1;
|
||||||
|
date_is_str = None # stick this in startup
|
||||||
|
for r in rows:
|
||||||
|
if highest_block == -1:
|
||||||
|
highest_block = r['block_number']
|
||||||
|
lowest_block = r['block_number']
|
||||||
|
tx_type = 'unknown'
|
||||||
|
|
||||||
|
if r['value'] != None:
|
||||||
|
tx_type = '{}.{}'.format(r['domain'], r['value'])
|
||||||
|
|
||||||
|
if date_is_str == None:
|
||||||
|
date_is_str = type(r['date_block']).__name__ == 'str'
|
||||||
|
|
||||||
|
o = {
|
||||||
|
'block_number': r['block_number'],
|
||||||
|
'tx_hash': r['tx_hash'],
|
||||||
|
'date_block': r['date_block'],
|
||||||
|
'sender': r['sender'],
|
||||||
|
'recipient': r['recipient'],
|
||||||
|
'from_value': int(r['from_value']),
|
||||||
|
'to_value': int(r['to_value']),
|
||||||
|
'source_token': r['source_token'],
|
||||||
|
'destination_token': r['destination_token'],
|
||||||
|
'tx_type': tx_type,
|
||||||
|
}
|
||||||
|
|
||||||
|
if date_is_str:
|
||||||
|
o['date_block'] = datetime.datetime.fromisoformat(r['date_block'])
|
||||||
|
|
||||||
|
tx_cache.append(o)
|
||||||
|
return (lowest_block, highest_block, tx_cache)
|
||||||
|
@ -28,6 +28,26 @@ def list_transactions_mined(
|
|||||||
return r
|
return r
|
||||||
|
|
||||||
|
|
||||||
|
def list_transactions_mined_with_data(
|
||||||
|
session,
|
||||||
|
offset,
|
||||||
|
end,
|
||||||
|
):
|
||||||
|
"""Executes db query to return all confirmed transactions according to the specified offset and limit.
|
||||||
|
|
||||||
|
:param offset: Offset in data set to return transactions from
|
||||||
|
:type offset: int
|
||||||
|
:param limit: Max number of transactions to retrieve
|
||||||
|
:type limit: int
|
||||||
|
:result: Result set
|
||||||
|
:rtype: SQLAlchemy.ResultProxy
|
||||||
|
"""
|
||||||
|
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} AND block_number <= {} ORDER BY block_number ASC, tx_index ASC".format(offset, end)
|
||||||
|
|
||||||
|
r = session.execute(s)
|
||||||
|
return r
|
||||||
|
|
||||||
|
|
||||||
def list_transactions_account_mined(
|
def list_transactions_account_mined(
|
||||||
session,
|
session,
|
||||||
address,
|
address,
|
||||||
|
110
apps/cic-cache/cic_cache/runnable/daemons/query.py
Normal file
110
apps/cic-cache/cic_cache/runnable/daemons/query.py
Normal file
@ -0,0 +1,110 @@
|
|||||||
|
# standard imports
|
||||||
|
import logging
|
||||||
|
import json
|
||||||
|
import re
|
||||||
|
import base64
|
||||||
|
|
||||||
|
# local imports
|
||||||
|
from cic_cache.cache import (
|
||||||
|
BloomCache,
|
||||||
|
DataCache,
|
||||||
|
)
|
||||||
|
|
||||||
|
logg = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
re_transactions_all_bloom = r'/tx/(\d+)?/?(\d+)/?'
|
||||||
|
re_transactions_account_bloom = r'/tx/user/((0x)?[a-fA-F0-9]+)/?(\d+)?/?(\d+)/?'
|
||||||
|
re_transactions_all_data = r'/txa/(\d+)/(\d+)/?'
|
||||||
|
|
||||||
|
DEFAULT_LIMIT = 100
|
||||||
|
|
||||||
|
|
||||||
|
def process_transactions_account_bloom(session, env):
|
||||||
|
r = re.match(re_transactions_account_bloom, env.get('PATH_INFO'))
|
||||||
|
if not r:
|
||||||
|
return None
|
||||||
|
|
||||||
|
address = r[1]
|
||||||
|
if r[2] == None:
|
||||||
|
address = '0x' + address
|
||||||
|
offset = DEFAULT_LIMIT
|
||||||
|
if r.lastindex > 2:
|
||||||
|
offset = r[3]
|
||||||
|
limit = 0
|
||||||
|
if r.lastindex > 3:
|
||||||
|
limit = r[4]
|
||||||
|
|
||||||
|
c = BloomCache(session)
|
||||||
|
(lowest_block, highest_block, bloom_filter_block, bloom_filter_tx) = c.load_transactions_account(address, offset, limit)
|
||||||
|
|
||||||
|
o = {
|
||||||
|
'alg': 'sha256',
|
||||||
|
'low': lowest_block,
|
||||||
|
'high': highest_block,
|
||||||
|
'block_filter': base64.b64encode(bloom_filter_block).decode('utf-8'),
|
||||||
|
'blocktx_filter': base64.b64encode(bloom_filter_tx).decode('utf-8'),
|
||||||
|
'filter_rounds': 3,
|
||||||
|
}
|
||||||
|
|
||||||
|
j = json.dumps(o)
|
||||||
|
|
||||||
|
return ('application/json', j.encode('utf-8'),)
|
||||||
|
|
||||||
|
|
||||||
|
def process_transactions_all_bloom(session, env):
|
||||||
|
r = re.match(re_transactions_all_bloom, env.get('PATH_INFO'))
|
||||||
|
if not r:
|
||||||
|
return None
|
||||||
|
|
||||||
|
offset = DEFAULT_LIMIT
|
||||||
|
if r.lastindex > 0:
|
||||||
|
offset = r[1]
|
||||||
|
limit = 0
|
||||||
|
if r.lastindex > 1:
|
||||||
|
limit = r[2]
|
||||||
|
|
||||||
|
c = BloomCache(session)
|
||||||
|
(lowest_block, highest_block, bloom_filter_block, bloom_filter_tx) = c.load_transactions(offset, limit)
|
||||||
|
|
||||||
|
o = {
|
||||||
|
'alg': 'sha256',
|
||||||
|
'low': lowest_block,
|
||||||
|
'high': highest_block,
|
||||||
|
'block_filter': base64.b64encode(bloom_filter_block).decode('utf-8'),
|
||||||
|
'blocktx_filter': base64.b64encode(bloom_filter_tx).decode('utf-8'),
|
||||||
|
'filter_rounds': 3,
|
||||||
|
}
|
||||||
|
|
||||||
|
j = json.dumps(o)
|
||||||
|
|
||||||
|
return ('application/json', j.encode('utf-8'),)
|
||||||
|
|
||||||
|
|
||||||
|
def process_transactions_all_data(session, env):
|
||||||
|
r = re.match(re_transactions_all_data, env.get('PATH_INFO'))
|
||||||
|
if not r:
|
||||||
|
return None
|
||||||
|
if env.get('HTTP_X_CIC_CACHE_MODE') != 'all':
|
||||||
|
return None
|
||||||
|
|
||||||
|
offset = r[1]
|
||||||
|
end = r[2]
|
||||||
|
if r[2] < r[1]:
|
||||||
|
raise ValueError('cart before the horse, dude')
|
||||||
|
|
||||||
|
c = DataCache(session)
|
||||||
|
(lowest_block, highest_block, tx_cache) = c.load_transactions_with_data(offset, end)
|
||||||
|
|
||||||
|
for r in tx_cache:
|
||||||
|
r['date_block'] = r['date_block'].timestamp()
|
||||||
|
|
||||||
|
o = {
|
||||||
|
'low': lowest_block,
|
||||||
|
'high': highest_block,
|
||||||
|
'data': tx_cache,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
j = json.dumps(o)
|
||||||
|
|
||||||
|
return ('application/json', j.encode('utf-8'),)
|
@ -1,18 +1,20 @@
|
|||||||
# standard imports
|
# standard imports
|
||||||
import os
|
import os
|
||||||
import re
|
|
||||||
import logging
|
import logging
|
||||||
import argparse
|
import argparse
|
||||||
import json
|
|
||||||
import base64
|
import base64
|
||||||
|
|
||||||
# third-party imports
|
# external imports
|
||||||
import confini
|
import confini
|
||||||
|
|
||||||
# local imports
|
# local imports
|
||||||
from cic_cache import BloomCache
|
|
||||||
from cic_cache.db import dsn_from_config
|
from cic_cache.db import dsn_from_config
|
||||||
from cic_cache.db.models.base import SessionBase
|
from cic_cache.db.models.base import SessionBase
|
||||||
|
from cic_cache.runnable.daemons.query import (
|
||||||
|
process_transactions_account_bloom,
|
||||||
|
process_transactions_all_bloom,
|
||||||
|
process_transactions_all_data,
|
||||||
|
)
|
||||||
|
|
||||||
logging.basicConfig(level=logging.WARNING)
|
logging.basicConfig(level=logging.WARNING)
|
||||||
logg = logging.getLogger()
|
logg = logging.getLogger()
|
||||||
@ -44,72 +46,6 @@ logg.debug('config:\n{}'.format(config))
|
|||||||
dsn = dsn_from_config(config)
|
dsn = dsn_from_config(config)
|
||||||
SessionBase.connect(dsn, config.true('DATABASE_DEBUG'))
|
SessionBase.connect(dsn, config.true('DATABASE_DEBUG'))
|
||||||
|
|
||||||
re_transactions_all_bloom = r'/tx/(\d+)?/?(\d+)/?'
|
|
||||||
re_transactions_account_bloom = r'/tx/user/((0x)?[a-fA-F0-9]+)/?(\d+)?/?(\d+)/?'
|
|
||||||
|
|
||||||
DEFAULT_LIMIT = 100
|
|
||||||
|
|
||||||
|
|
||||||
def process_transactions_account_bloom(session, env):
|
|
||||||
r = re.match(re_transactions_account_bloom, env.get('PATH_INFO'))
|
|
||||||
if not r:
|
|
||||||
return None
|
|
||||||
|
|
||||||
address = r[1]
|
|
||||||
if r[2] == None:
|
|
||||||
address = '0x' + address
|
|
||||||
offset = DEFAULT_LIMIT
|
|
||||||
if r.lastindex > 2:
|
|
||||||
offset = r[3]
|
|
||||||
limit = 0
|
|
||||||
if r.lastindex > 3:
|
|
||||||
limit = r[4]
|
|
||||||
|
|
||||||
c = BloomCache(session)
|
|
||||||
(lowest_block, highest_block, bloom_filter_block, bloom_filter_tx) = c.load_transactions_account(address, offset, limit)
|
|
||||||
|
|
||||||
o = {
|
|
||||||
'alg': 'sha256',
|
|
||||||
'low': lowest_block,
|
|
||||||
'high': highest_block,
|
|
||||||
'block_filter': base64.b64encode(bloom_filter_block).decode('utf-8'),
|
|
||||||
'blocktx_filter': base64.b64encode(bloom_filter_tx).decode('utf-8'),
|
|
||||||
'filter_rounds': 3,
|
|
||||||
}
|
|
||||||
|
|
||||||
j = json.dumps(o)
|
|
||||||
|
|
||||||
return ('application/json', j.encode('utf-8'),)
|
|
||||||
|
|
||||||
|
|
||||||
def process_transactions_all_bloom(session, env):
|
|
||||||
r = re.match(re_transactions_all_bloom, env.get('PATH_INFO'))
|
|
||||||
if not r:
|
|
||||||
return None
|
|
||||||
|
|
||||||
offset = DEFAULT_LIMIT
|
|
||||||
if r.lastindex > 0:
|
|
||||||
offset = r[1]
|
|
||||||
limit = 0
|
|
||||||
if r.lastindex > 1:
|
|
||||||
limit = r[2]
|
|
||||||
|
|
||||||
c = BloomCache(session)
|
|
||||||
(lowest_block, highest_block, bloom_filter_block, bloom_filter_tx) = c.load_transactions(offset, limit)
|
|
||||||
|
|
||||||
o = {
|
|
||||||
'alg': 'sha256',
|
|
||||||
'low': lowest_block,
|
|
||||||
'high': highest_block,
|
|
||||||
'block_filter': base64.b64encode(bloom_filter_block).decode('utf-8'),
|
|
||||||
'blocktx_filter': base64.b64encode(bloom_filter_tx).decode('utf-8'),
|
|
||||||
'filter_rounds': 3,
|
|
||||||
}
|
|
||||||
|
|
||||||
j = json.dumps(o)
|
|
||||||
|
|
||||||
return ('application/json', j.encode('utf-8'),)
|
|
||||||
|
|
||||||
|
|
||||||
# uwsgi application
|
# uwsgi application
|
||||||
def application(env, start_response):
|
def application(env, start_response):
|
||||||
@ -119,10 +55,16 @@ def application(env, start_response):
|
|||||||
|
|
||||||
session = SessionBase.create_session()
|
session = SessionBase.create_session()
|
||||||
for handler in [
|
for handler in [
|
||||||
|
process_transactions_all_data,
|
||||||
process_transactions_all_bloom,
|
process_transactions_all_bloom,
|
||||||
process_transactions_account_bloom,
|
process_transactions_account_bloom,
|
||||||
]:
|
]:
|
||||||
r = handler(session, env)
|
r = None
|
||||||
|
try:
|
||||||
|
r = handler(session, env)
|
||||||
|
except ValueError as e:
|
||||||
|
start_response('400 {}'.format(str(e)))
|
||||||
|
return []
|
||||||
if r != None:
|
if r != None:
|
||||||
(mime_type, content) = r
|
(mime_type, content) = r
|
||||||
break
|
break
|
||||||
|
@ -88,3 +88,16 @@ def txs(
|
|||||||
tx_hash_first,
|
tx_hash_first,
|
||||||
tx_hash_second,
|
tx_hash_second,
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(scope='function')
|
||||||
|
def tag_txs(
|
||||||
|
init_database,
|
||||||
|
txs,
|
||||||
|
):
|
||||||
|
|
||||||
|
db.add_tag(init_database, 'taag', domain='test')
|
||||||
|
init_database.commit()
|
||||||
|
|
||||||
|
db.tag_transaction(init_database, txs[1], 'taag', domain='test')
|
||||||
|
|
||||||
|
31
apps/cic-cache/tests/test_api.py
Normal file
31
apps/cic-cache/tests/test_api.py
Normal file
@ -0,0 +1,31 @@
|
|||||||
|
# standard imports
|
||||||
|
import json
|
||||||
|
|
||||||
|
# external imports
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
# local imports
|
||||||
|
from cic_cache.runnable.daemons.query import process_transactions_all_data
|
||||||
|
|
||||||
|
|
||||||
|
def test_api_all_data(
|
||||||
|
init_database,
|
||||||
|
txs,
|
||||||
|
):
|
||||||
|
|
||||||
|
env = {
|
||||||
|
'PATH_INFO': '/txa/410000/420000',
|
||||||
|
'HTTP_X_CIC_CACHE_MODE': 'all',
|
||||||
|
}
|
||||||
|
j = process_transactions_all_data(init_database, env)
|
||||||
|
o = json.loads(j[1])
|
||||||
|
|
||||||
|
assert len(o['data']) == 2
|
||||||
|
|
||||||
|
env = {
|
||||||
|
'PATH_INFO': '/txa/420000/410000',
|
||||||
|
'HTTP_X_CIC_CACHE_MODE': 'all',
|
||||||
|
}
|
||||||
|
|
||||||
|
with pytest.raises(ValueError):
|
||||||
|
j = process_transactions_all_data(init_database, env)
|
@ -9,6 +9,7 @@ import pytest
|
|||||||
|
|
||||||
# local imports
|
# local imports
|
||||||
from cic_cache import BloomCache
|
from cic_cache import BloomCache
|
||||||
|
from cic_cache.cache import DataCache
|
||||||
|
|
||||||
logg = logging.getLogger()
|
logg = logging.getLogger()
|
||||||
|
|
||||||
@ -33,3 +34,23 @@ def test_cache(
|
|||||||
|
|
||||||
assert b[0] == list_defaults['block'] - 1
|
assert b[0] == list_defaults['block'] - 1
|
||||||
|
|
||||||
|
|
||||||
|
def test_cache_data(
|
||||||
|
init_database,
|
||||||
|
list_defaults,
|
||||||
|
list_actors,
|
||||||
|
list_tokens,
|
||||||
|
txs,
|
||||||
|
tag_txs,
|
||||||
|
):
|
||||||
|
|
||||||
|
session = init_database
|
||||||
|
|
||||||
|
c = DataCache(session)
|
||||||
|
b = c.load_transactions_with_data(410000, 420000)
|
||||||
|
|
||||||
|
assert len(b[2]) == 2
|
||||||
|
assert b[2][0]['tx_hash'] == txs[1]
|
||||||
|
assert b[2][1]['tx_type'] == 'unknown'
|
||||||
|
assert b[2][0]['tx_type'] == 'test.taag'
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user