cic-cache: Add tx tags in persistent storage

This commit is contained in:
Louis Holbrook 2021-05-01 13:20:14 +00:00
parent 8db76dc0a8
commit 75bf8f15be
21 changed files with 379 additions and 69 deletions

View File

@ -6,3 +6,4 @@ HOST=localhost
PORT=5432
ENGINE=postgresql
DRIVER=psycopg2
DEBUG=

View File

@ -6,3 +6,4 @@ HOST=localhost
PORT=5432
ENGINE=sqlite
DRIVER=pysqlite
DEBUG=

View File

@ -2,9 +2,14 @@
import logging
# local imports
from .list import list_transactions_mined
from .list import list_transactions_account_mined
from .list import add_transaction
from .list import (
list_transactions_mined,
list_transactions_account_mined,
add_transaction,
tag_transaction,
add_tag,
)
logg = logging.getLogger()

View File

@ -2,8 +2,9 @@
import logging
import datetime
# third-party imports
# external imports
from cic_cache.db.models.base import SessionBase
from sqlalchemy import text
logg = logging.getLogger()
@ -50,7 +51,8 @@ def list_transactions_account_mined(
def add_transaction(
session, tx_hash,
session,
tx_hash,
block_number,
tx_index,
sender,
@ -62,6 +64,33 @@ def add_transaction(
success,
timestamp,
):
"""Adds a single transaction to the cache persistent storage. Sensible interpretation of all fields is the responsibility of the caller.
:param session: Persistent storage session object
:type session: SQLAlchemy session
:param tx_hash: Transaction hash
:type tx_hash: str, 0x-hex
:param block_number: Block number
:type block_number: int
:param tx_index: Transaction index in block
:type tx_index: int
:param sender: Ethereum address of effective sender
:type sender: str, 0x-hex
:param receiver: Ethereum address of effective recipient
:type receiver: str, 0x-hex
:param source_token: Ethereum address of token used by sender
:type source_token: str, 0x-hex
:param destination_token: Ethereum address of token received by recipient
:type destination_token: str, 0x-hex
:param from_value: Source token value spent in transaction
:type from_value: int
:param to_value: Destination token value received in transaction
:type to_value: int
:param success: True if code execution on network was successful
:type success: bool
:param date_block: Block timestamp
:type date_block: datetime
"""
date_block = datetime.datetime.fromtimestamp(timestamp)
s = "INSERT INTO tx (tx_hash, block_number, tx_index, sender, recipient, source_token, destination_token, from_value, to_value, success, date_block) VALUES ('{}', {}, {}, '{}', '{}', '{}', '{}', {}, {}, {}, '{}')".format(
tx_hash,
@ -77,3 +106,74 @@ def add_transaction(
date_block,
)
session.execute(s)
def tag_transaction(
session,
tx_hash,
name,
domain=None,
):
"""Tag a single transaction with a single tag.
Tag must already exist in storage.
:param session: Persistent storage session object
:type session: SQLAlchemy session
:param tx_hash: Transaction hash
:type tx_hash: str, 0x-hex
:param name: Tag value
:type name: str
:param domain: Tag domain
:type domain: str
:raises ValueError: Unknown tag or transaction hash
"""
s = text("SELECT id from tx where tx_hash = :a")
r = session.execute(s, {'a': tx_hash}).fetchall()
tx_id = r[0].values()[0]
if tx_id == None:
raise ValueError('unknown tx hash {}'.format(tx_hash))
#s = text("SELECT id from tag where value = :a and domain = :b")
if domain == None:
s = text("SELECT id from tag where value = :a")
else:
s = text("SELECT id from tag where value = :a and domain = :b")
r = session.execute(s, {'a': name, 'b': domain}).fetchall()
tag_id = r[0].values()[0]
logg.debug('type {} {}'.format(type(tag_id), type(tx_id)))
if tag_id == None:
raise ValueError('unknown tag name {} domain {}'.format(name, domain))
s = text("INSERT INTO tag_tx_link (tag_id, tx_id) VALUES (:a, :b)")
r = session.execute(s, {'a': int(tag_id), 'b': int(tx_id)})
def add_tag(
session,
name,
domain=None,
):
"""Add a single tag to storage.
:param session: Persistent storage session object
:type session: SQLAlchemy session
:param name: Tag value
:type name: str
:param domain: Tag domain
:type domain: str
:raises sqlalchemy.exc.IntegrityError: Tag already exists
"""
s = None
if domain == None:
s = text("INSERT INTO tag (value) VALUES (:b)")
else:
s = text("INSERT INTO tag (domain, value) VALUES (:a, :b)")
session.execute(s, {'a': domain, 'b': name})

View File

@ -0,0 +1,38 @@
"""Transaction tags
Revision ID: aaf2bdce7d6e
Revises: 6604de4203e2
Create Date: 2021-05-01 09:20:20.775082
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'aaf2bdce7d6e'
down_revision = '6604de4203e2'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'tag',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('domain', sa.String(), nullable=True),
sa.Column('value', sa.String(), nullable=False),
)
op.create_index('idx_tag_domain_value', 'tag', ['domain', 'value'], unique=True)
op.create_table(
'tag_tx_link',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('tag_id', sa.Integer, sa.ForeignKey('tag.id'), nullable=False),
sa.Column('tx_id', sa.Integer, sa.ForeignKey('tx.id'), nullable=False),
)
def downgrade():
op.drop_table('tag_tx_link')
op.drop_index('idx_tag_domain_value')
op.drop_table('tag')

View File

@ -1,2 +1,27 @@
class SyncFilter:
pass
class TagSyncFilter:
"""Holds tag name and domain for an implementing filter.
:param name: Tag value
:type name: str
:param domain: Tag domain
:type domain: str
"""
def __init__(self, name, domain=None):
self.tag_name = name
self.tag_domain = domain
def tag(self):
"""Return tag value/domain.
:rtype: Tuple
:returns: tag value/domain.
"""
return (self.tag_name, self.tag_domain)
def __str__(self):
if self.tag_domain == None:
return self.tag_name
return '{}.{}'.format(self.tag_domain, self.tag_name)

View File

@ -15,15 +15,16 @@ from cic_eth_registry.error import (
)
# local imports
from .base import SyncFilter
from .base import TagSyncFilter
from cic_cache import db as cic_cache_db
logg = logging.getLogger().getChild(__name__)
class ERC20TransferFilter(SyncFilter):
class ERC20TransferFilter(TagSyncFilter):
def __init__(self, chain_spec):
super(ERC20TransferFilter, self).__init__('transfer', domain='erc20')
self.chain_spec = chain_spec
@ -46,6 +47,9 @@ class ERC20TransferFilter(SyncFilter):
except RequestMismatchException:
logg.debug('erc20 match but not a transfer, skipping')
return False
except ValueError:
logg.debug('erc20 match but bogus data, skipping')
return False
token_sender = tx.outputs[0]
token_recipient = transfer_data[0]
@ -67,7 +71,13 @@ class ERC20TransferFilter(SyncFilter):
tx.status == Status.SUCCESS,
block.timestamp,
)
#db_session.flush()
db_session.flush()
cic_cache_db.tag_transaction(
db_session,
tx.hash,
self.tag_name,
domain=self.tag_domain,
)
db_session.commit()
return True

View File

@ -7,9 +7,10 @@ import argparse
import sys
import re
# third-party imports
# external imports
import confini
import celery
import sqlalchemy
import rlp
import cic_base.config
import cic_base.log
@ -34,7 +35,10 @@ from chainsyncer.driver import (
from chainsyncer.db.models.base import SessionBase
# local imports
from cic_cache.db import dsn_from_config
from cic_cache.db import (
dsn_from_config,
add_tag,
)
from cic_cache.runnable.daemons.filters import (
ERC20TransferFilter,
)
@ -59,6 +63,17 @@ chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
cic_base.rpc.setup(chain_spec, config.get('ETH_PROVIDER'))
def register_filter_tags(filters, session):
for f in filters:
tag = f.tag()
try:
add_tag(session, tag[0], domain=tag[1])
session.commit()
logg.info('added tag name "{}" domain "{}"'.format(tag[0], tag[1]))
except sqlalchemy.exc.IntegrityError:
logg.debug('already have tag name "{}" domain "{}"'.format(tag[0], tag[1]))
def main():
# Connect to blockchain with chainlib
rpc = RPCConnection.connect(chain_spec, 'default')
@ -98,10 +113,19 @@ def main():
erc20_transfer_filter = ERC20TransferFilter(chain_spec)
filters = [
erc20_transfer_filter,
]
session = SessionBase.create_session()
register_filter_tags(filters, session)
session.close()
i = 0
for syncer in syncers:
logg.debug('running syncer index {}'.format(i))
syncer.add_filter(erc20_transfer_filter)
for f in filters:
syncer.add_filter(f)
r = syncer.loop(int(config.get('SYNCER_LOOP_INTERVAL')), rpc)
sys.stderr.write("sync {} done at block {}\n".format(syncer, r))

View File

@ -6,4 +6,4 @@ HOST=localhost
PORT=5432
ENGINE=postgresql
DRIVER=psycopg2
DEBUG=
DEBUG=0

View File

@ -1,2 +1,4 @@
[cic]
registry_address =
chain_spec =
trust_address =

View File

@ -6,4 +6,4 @@ HOST=localhost
PORT=5432
ENGINE=sqlite
DRIVER=pysqlite
DEBUG=
DEBUG=1

View File

@ -1,9 +1,9 @@
cic-base~=0.1.2a77
cic-base~=0.1.2b6
alembic==1.4.2
confini~=0.3.6rc3
uwsgi==2.0.19.1
moolb~=0.1.0
cic-eth-registry~=0.5.4a16
cic-eth-registry~=0.5.5a1
SQLAlchemy==1.3.20
semver==2.13.0
psycopg2==2.8.6

View File

@ -4,3 +4,10 @@ pytest-mock==3.3.1
pysqlite3==0.4.3
sqlparse==0.4.1
pytest-celery==0.0.0a1
eth_tester==0.5.0b3
py-evm==0.3.0a20
web3==5.12.2
cic-eth-registry~=0.5.5a3
giftable-erc20-token~=0.0.8a10
eth-address-index~=0.1.1a10
sarafu-faucet~=0.0.3a1

View File

@ -3,7 +3,7 @@ import os
import sys
import datetime
# third-party imports
# external imports
import pytest
# local imports
@ -84,3 +84,7 @@ def txs(
session.commit()
return [
tx_hash_first,
tx_hash_second,
]

View File

@ -0,0 +1,3 @@
from chainlib.eth.pytest import *
from cic_eth_registry.pytest.fixtures_tokens import *

View File

@ -0,0 +1,69 @@
# standard imports
import os
import datetime
import logging
import json
# external imports
import pytest
from sqlalchemy import text
from chainlib.eth.tx import Tx
from chainlib.eth.block import Block
from chainlib.chain import ChainSpec
from hexathon import (
strip_0x,
add_0x,
)
# local imports
from cic_cache.db import add_tag
from cic_cache.runnable.daemons.filters.erc20 import ERC20TransferFilter
logg = logging.getLogger()
def test_cache(
eth_rpc,
foo_token,
init_database,
list_defaults,
list_actors,
tags,
):
chain_spec = ChainSpec('foo', 'bar', 42, 'baz')
fltr = ERC20TransferFilter(chain_spec)
add_tag(init_database, fltr.tag_name, domain=fltr.tag_domain)
data = 'a9059cbb'
data += strip_0x(list_actors['alice'])
data += '1000'.ljust(64, '0')
block = Block({
'hash': os.urandom(32).hex(),
'number': 42,
'timestamp': datetime.datetime.utcnow().timestamp(),
'transactions': [],
})
tx = Tx({
'to': foo_token,
'from': list_actors['bob'],
'data': data,
'value': 0,
'hash': os.urandom(32).hex(),
'nonce': 13,
'gasPrice': 10000000,
'gas': 123456,
})
block.txs.append(tx)
tx.block = block
r = fltr.filter(eth_rpc, block, tx, db_session=init_database)
assert r
s = text("SELECT x.tx_hash FROM tag a INNER JOIN tag_tx_link l ON l.tag_id = a.id INNER JOIN tx x ON x.id = l.tx_id WHERE a.domain = :a AND a.value = :b")
r = init_database.execute(s, {'a': fltr.tag_domain, 'b': fltr.tag_name}).fetchone()
assert r[0] == tx.hash

View File

@ -2,7 +2,7 @@
import os
import logging
# third-party imports
# external imports
import pytest
import confini
@ -13,7 +13,7 @@ logg = logging.getLogger(__file__)
@pytest.fixture(scope='session')
def load_config():
config_dir = os.path.join(root_dir, '.config/test')
config_dir = os.path.join(root_dir, 'config/test')
conf = confini.Config(config_dir, 'CICTEST')
conf.process()
logg.debug('config {}'.format(conf))

View File

@ -3,13 +3,16 @@ import os
import logging
import re
# third-party imports
# external imports
import pytest
import sqlparse
import alembic
from alembic.config import Config as AlembicConfig
# local imports
from cic_cache.db.models.base import SessionBase
from cic_cache.db import dsn_from_config
from cic_cache.db import add_tag
logg = logging.getLogger(__file__)
@ -26,11 +29,10 @@ def database_engine(
except FileNotFoundError:
pass
dsn = dsn_from_config(load_config)
SessionBase.connect(dsn)
SessionBase.connect(dsn, debug=load_config.true('DATABASE_DEBUG'))
return dsn
# TODO: use alembic instead to migrate db, here we have to keep separate schema than migration script in script/migrate.py
@pytest.fixture(scope='function')
def init_database(
load_config,
@ -38,52 +40,23 @@ def init_database(
):
rootdir = os.path.dirname(os.path.dirname(__file__))
schemadir = os.path.join(rootdir, 'db', load_config.get('DATABASE_DRIVER'))
if load_config.get('DATABASE_ENGINE') == 'sqlite':
rconn = SessionBase.engine.raw_connection()
f = open(os.path.join(schemadir, 'db.sql'))
s = f.read()
f.close()
rconn.executescript(s)
else:
rconn = SessionBase.engine.raw_connection()
rcursor = rconn.cursor()
#rcursor.execute('DROP FUNCTION IF EXISTS public.transaction_list')
#rcursor.execute('DROP FUNCTION IF EXISTS public.balances')
f = open(os.path.join(schemadir, 'db.sql'))
s = f.read()
f.close()
r = re.compile(r'^[A-Z]', re.MULTILINE)
for l in sqlparse.parse(s):
strl = str(l)
# we need to check for empty query lines, as sqlparse doesn't do that on its own (and psycopg complains when it gets them)
if not re.search(r, strl):
logg.warning('skipping parsed query line {}'.format(strl))
continue
rcursor.execute(strl)
rconn.commit()
rcursor.execute('SET search_path TO public')
# this doesn't work when run separately, no idea why
# functions have been manually added to original schema from cic-eth
# f = open(os.path.join(schemadir, 'proc_transaction_list.sql'))
# s = f.read()
# f.close()
# rcursor.execute(s)
#
# f = open(os.path.join(schemadir, 'proc_balances.sql'))
# s = f.read()
# f.close()
# rcursor.execute(s)
rcursor.close()
dbdir = os.path.join(rootdir, 'cic_cache', 'db')
migrationsdir = os.path.join(dbdir, 'migrations', load_config.get('DATABASE_ENGINE'))
if not os.path.isdir(migrationsdir):
migrationsdir = os.path.join(dbdir, 'migrations', 'default')
logg.info('using migrations directory {}'.format(migrationsdir))
session = SessionBase.create_session()
ac = AlembicConfig(os.path.join(migrationsdir, 'alembic.ini'))
ac.set_main_option('sqlalchemy.url', database_engine)
ac.set_main_option('script_location', migrationsdir)
alembic.command.downgrade(ac, 'base')
alembic.command.upgrade(ac, 'head')
session.commit()
yield session
session.commit()
session.close()
@ -116,3 +89,14 @@ def list_defaults(
return {
'block': 420000,
}
@pytest.fixture(scope='function')
def tags(
init_database,
):
add_tag(init_database, 'foo')
add_tag(init_database, 'baz', domain='bar')
add_tag(init_database, 'xyzzy', domain='bar')
init_database.commit()

View File

@ -4,7 +4,7 @@ import datetime
import logging
import json
# third-party imports
# external imports
import pytest
# local imports

View File

@ -0,0 +1,37 @@
import os
import datetime
import logging
import json
# external imports
import pytest
# local imports
from cic_cache.db import tag_transaction
logg = logging.getLogger()
def test_cache(
init_database,
list_defaults,
list_actors,
list_tokens,
txs,
tags,
):
tag_transaction(init_database, txs[0], 'foo')
tag_transaction(init_database, txs[0], 'baz', domain='bar')
tag_transaction(init_database, txs[1], 'xyzzy', domain='bar')
r = init_database.execute("SELECT x.tx_hash FROM tag a INNER JOIN tag_tx_link l ON l.tag_id = a.id INNER JOIN tx x ON x.id = l.tx_id WHERE a.value = 'foo'").fetchall()
assert r[0][0] == txs[0]
r = init_database.execute("SELECT x.tx_hash FROM tag a INNER JOIN tag_tx_link l ON l.tag_id = a.id INNER JOIN tx x ON x.id = l.tx_id WHERE a.domain = 'bar' AND a.value = 'baz'").fetchall()
assert r[0][0] == txs[0]
r = init_database.execute("SELECT x.tx_hash FROM tag a INNER JOIN tag_tx_link l ON l.tag_id = a.id INNER JOIN tx x ON x.id = l.tx_id WHERE a.domain = 'bar' AND a.value = 'xyzzy'").fetchall()
assert r[0][0] == txs[1]

View File

@ -2,7 +2,7 @@
import os
import logging
# third-party imports
# external imports
import pytest
import alembic
from alembic.config import Config as AlembicConfig