diff --git a/apps/cic-cache/.config/database.ini b/apps/cic-cache/.config/database.ini index 72175b9..e34a1ff 100644 --- a/apps/cic-cache/.config/database.ini +++ b/apps/cic-cache/.config/database.ini @@ -6,3 +6,4 @@ HOST=localhost PORT=5432 ENGINE=postgresql DRIVER=psycopg2 +DEBUG= diff --git a/apps/cic-cache/.config/test/database.ini b/apps/cic-cache/.config/test/database.ini index 83397aa..960863a 100644 --- a/apps/cic-cache/.config/test/database.ini +++ b/apps/cic-cache/.config/test/database.ini @@ -6,3 +6,4 @@ HOST=localhost PORT=5432 ENGINE=sqlite DRIVER=pysqlite +DEBUG= diff --git a/apps/cic-cache/cic_cache/db/__init__.py b/apps/cic-cache/cic_cache/db/__init__.py index 48381e5..b2bb01b 100644 --- a/apps/cic-cache/cic_cache/db/__init__.py +++ b/apps/cic-cache/cic_cache/db/__init__.py @@ -2,9 +2,14 @@ import logging # local imports -from .list import list_transactions_mined -from .list import list_transactions_account_mined -from .list import add_transaction +from .list import ( + list_transactions_mined, + list_transactions_account_mined, + add_transaction, + tag_transaction, + add_tag, + ) + logg = logging.getLogger() diff --git a/apps/cic-cache/cic_cache/db/list.py b/apps/cic-cache/cic_cache/db/list.py index 355626e..efcf1cf 100644 --- a/apps/cic-cache/cic_cache/db/list.py +++ b/apps/cic-cache/cic_cache/db/list.py @@ -2,8 +2,9 @@ import logging import datetime -# third-party imports +# external imports from cic_cache.db.models.base import SessionBase +from sqlalchemy import text logg = logging.getLogger() @@ -50,7 +51,8 @@ def list_transactions_account_mined( def add_transaction( - session, tx_hash, + session, + tx_hash, block_number, tx_index, sender, @@ -62,6 +64,33 @@ def add_transaction( success, timestamp, ): + """Adds a single transaction to the cache persistent storage. Sensible interpretation of all fields is the responsibility of the caller. + + :param session: Persistent storage session object + :type session: SQLAlchemy session + :param tx_hash: Transaction hash + :type tx_hash: str, 0x-hex + :param block_number: Block number + :type block_number: int + :param tx_index: Transaction index in block + :type tx_index: int + :param sender: Ethereum address of effective sender + :type sender: str, 0x-hex + :param receiver: Ethereum address of effective recipient + :type receiver: str, 0x-hex + :param source_token: Ethereum address of token used by sender + :type source_token: str, 0x-hex + :param destination_token: Ethereum address of token received by recipient + :type destination_token: str, 0x-hex + :param from_value: Source token value spent in transaction + :type from_value: int + :param to_value: Destination token value received in transaction + :type to_value: int + :param success: True if code execution on network was successful + :type success: bool + :param date_block: Block timestamp + :type date_block: datetime + """ date_block = datetime.datetime.fromtimestamp(timestamp) s = "INSERT INTO tx (tx_hash, block_number, tx_index, sender, recipient, source_token, destination_token, from_value, to_value, success, date_block) VALUES ('{}', {}, {}, '{}', '{}', '{}', '{}', {}, {}, {}, '{}')".format( tx_hash, @@ -77,3 +106,74 @@ def add_transaction( date_block, ) session.execute(s) + + + +def tag_transaction( + session, + tx_hash, + name, + domain=None, + ): + """Tag a single transaction with a single tag. + + Tag must already exist in storage. + + :param session: Persistent storage session object + :type session: SQLAlchemy session + :param tx_hash: Transaction hash + :type tx_hash: str, 0x-hex + :param name: Tag value + :type name: str + :param domain: Tag domain + :type domain: str + :raises ValueError: Unknown tag or transaction hash + + """ + + s = text("SELECT id from tx where tx_hash = :a") + r = session.execute(s, {'a': tx_hash}).fetchall() + tx_id = r[0].values()[0] + + if tx_id == None: + raise ValueError('unknown tx hash {}'.format(tx_hash)) + + #s = text("SELECT id from tag where value = :a and domain = :b") + if domain == None: + s = text("SELECT id from tag where value = :a") + else: + s = text("SELECT id from tag where value = :a and domain = :b") + r = session.execute(s, {'a': name, 'b': domain}).fetchall() + tag_id = r[0].values()[0] + + logg.debug('type {} {}'.format(type(tag_id), type(tx_id))) + + if tag_id == None: + raise ValueError('unknown tag name {} domain {}'.format(name, domain)) + + s = text("INSERT INTO tag_tx_link (tag_id, tx_id) VALUES (:a, :b)") + r = session.execute(s, {'a': int(tag_id), 'b': int(tx_id)}) + + +def add_tag( + session, + name, + domain=None, + ): + """Add a single tag to storage. + + :param session: Persistent storage session object + :type session: SQLAlchemy session + :param name: Tag value + :type name: str + :param domain: Tag domain + :type domain: str + :raises sqlalchemy.exc.IntegrityError: Tag already exists + """ + + s = None + if domain == None: + s = text("INSERT INTO tag (value) VALUES (:b)") + else: + s = text("INSERT INTO tag (domain, value) VALUES (:a, :b)") + session.execute(s, {'a': domain, 'b': name}) diff --git a/apps/cic-cache/cic_cache/db/migrations/default/versions/aaf2bdce7d6e_transaction_tags.py b/apps/cic-cache/cic_cache/db/migrations/default/versions/aaf2bdce7d6e_transaction_tags.py new file mode 100644 index 0000000..aa2ee78 --- /dev/null +++ b/apps/cic-cache/cic_cache/db/migrations/default/versions/aaf2bdce7d6e_transaction_tags.py @@ -0,0 +1,38 @@ +"""Transaction tags + +Revision ID: aaf2bdce7d6e +Revises: 6604de4203e2 +Create Date: 2021-05-01 09:20:20.775082 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = 'aaf2bdce7d6e' +down_revision = '6604de4203e2' +branch_labels = None +depends_on = None + + +def upgrade(): + op.create_table( + 'tag', + sa.Column('id', sa.Integer, primary_key=True), + sa.Column('domain', sa.String(), nullable=True), + sa.Column('value', sa.String(), nullable=False), + ) + op.create_index('idx_tag_domain_value', 'tag', ['domain', 'value'], unique=True) + + op.create_table( + 'tag_tx_link', + sa.Column('id', sa.Integer, primary_key=True), + sa.Column('tag_id', sa.Integer, sa.ForeignKey('tag.id'), nullable=False), + sa.Column('tx_id', sa.Integer, sa.ForeignKey('tx.id'), nullable=False), + ) + +def downgrade(): + op.drop_table('tag_tx_link') + op.drop_index('idx_tag_domain_value') + op.drop_table('tag') diff --git a/apps/cic-cache/cic_cache/runnable/daemons/filters/base.py b/apps/cic-cache/cic_cache/runnable/daemons/filters/base.py index 5a57a48..ca4021c 100644 --- a/apps/cic-cache/cic_cache/runnable/daemons/filters/base.py +++ b/apps/cic-cache/cic_cache/runnable/daemons/filters/base.py @@ -1,2 +1,27 @@ -class SyncFilter: - pass +class TagSyncFilter: + """Holds tag name and domain for an implementing filter. + + :param name: Tag value + :type name: str + :param domain: Tag domain + :type domain: str + """ + + def __init__(self, name, domain=None): + self.tag_name = name + self.tag_domain = domain + + + def tag(self): + """Return tag value/domain. + + :rtype: Tuple + :returns: tag value/domain. + """ + return (self.tag_name, self.tag_domain) + + + def __str__(self): + if self.tag_domain == None: + return self.tag_name + return '{}.{}'.format(self.tag_domain, self.tag_name) diff --git a/apps/cic-cache/cic_cache/runnable/daemons/filters/erc20.py b/apps/cic-cache/cic_cache/runnable/daemons/filters/erc20.py index 0ff8885..0dc618e 100644 --- a/apps/cic-cache/cic_cache/runnable/daemons/filters/erc20.py +++ b/apps/cic-cache/cic_cache/runnable/daemons/filters/erc20.py @@ -15,15 +15,16 @@ from cic_eth_registry.error import ( ) # local imports -from .base import SyncFilter +from .base import TagSyncFilter from cic_cache import db as cic_cache_db logg = logging.getLogger().getChild(__name__) -class ERC20TransferFilter(SyncFilter): +class ERC20TransferFilter(TagSyncFilter): def __init__(self, chain_spec): + super(ERC20TransferFilter, self).__init__('transfer', domain='erc20') self.chain_spec = chain_spec @@ -46,6 +47,9 @@ class ERC20TransferFilter(SyncFilter): except RequestMismatchException: logg.debug('erc20 match but not a transfer, skipping') return False + except ValueError: + logg.debug('erc20 match but bogus data, skipping') + return False token_sender = tx.outputs[0] token_recipient = transfer_data[0] @@ -67,7 +71,13 @@ class ERC20TransferFilter(SyncFilter): tx.status == Status.SUCCESS, block.timestamp, ) - #db_session.flush() + db_session.flush() + cic_cache_db.tag_transaction( + db_session, + tx.hash, + self.tag_name, + domain=self.tag_domain, + ) db_session.commit() return True diff --git a/apps/cic-cache/cic_cache/runnable/daemons/tracker.py b/apps/cic-cache/cic_cache/runnable/daemons/tracker.py index 270e1f6..ceac150 100644 --- a/apps/cic-cache/cic_cache/runnable/daemons/tracker.py +++ b/apps/cic-cache/cic_cache/runnable/daemons/tracker.py @@ -7,9 +7,10 @@ import argparse import sys import re -# third-party imports +# external imports import confini import celery +import sqlalchemy import rlp import cic_base.config import cic_base.log @@ -34,7 +35,10 @@ from chainsyncer.driver import ( from chainsyncer.db.models.base import SessionBase # local imports -from cic_cache.db import dsn_from_config +from cic_cache.db import ( + dsn_from_config, + add_tag, + ) from cic_cache.runnable.daemons.filters import ( ERC20TransferFilter, ) @@ -59,6 +63,17 @@ chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC')) cic_base.rpc.setup(chain_spec, config.get('ETH_PROVIDER')) +def register_filter_tags(filters, session): + for f in filters: + tag = f.tag() + try: + add_tag(session, tag[0], domain=tag[1]) + session.commit() + logg.info('added tag name "{}" domain "{}"'.format(tag[0], tag[1])) + except sqlalchemy.exc.IntegrityError: + logg.debug('already have tag name "{}" domain "{}"'.format(tag[0], tag[1])) + + def main(): # Connect to blockchain with chainlib rpc = RPCConnection.connect(chain_spec, 'default') @@ -98,10 +113,19 @@ def main(): erc20_transfer_filter = ERC20TransferFilter(chain_spec) + filters = [ + erc20_transfer_filter, + ] + + session = SessionBase.create_session() + register_filter_tags(filters, session) + session.close() + i = 0 for syncer in syncers: logg.debug('running syncer index {}'.format(i)) - syncer.add_filter(erc20_transfer_filter) + for f in filters: + syncer.add_filter(f) r = syncer.loop(int(config.get('SYNCER_LOOP_INTERVAL')), rpc) sys.stderr.write("sync {} done at block {}\n".format(syncer, r)) diff --git a/apps/cic-cache/config/database.ini b/apps/cic-cache/config/database.ini index e34a1ff..7c5836a 100644 --- a/apps/cic-cache/config/database.ini +++ b/apps/cic-cache/config/database.ini @@ -6,4 +6,4 @@ HOST=localhost PORT=5432 ENGINE=postgresql DRIVER=psycopg2 -DEBUG= +DEBUG=0 diff --git a/apps/cic-cache/config/test/cic.ini b/apps/cic-cache/config/test/cic.ini index f52b27b..d985ae3 100644 --- a/apps/cic-cache/config/test/cic.ini +++ b/apps/cic-cache/config/test/cic.ini @@ -1,2 +1,4 @@ [cic] registry_address = +chain_spec = +trust_address = diff --git a/apps/cic-cache/config/test/database.ini b/apps/cic-cache/config/test/database.ini index 960863a..113c1fa 100644 --- a/apps/cic-cache/config/test/database.ini +++ b/apps/cic-cache/config/test/database.ini @@ -6,4 +6,4 @@ HOST=localhost PORT=5432 ENGINE=sqlite DRIVER=pysqlite -DEBUG= +DEBUG=1 diff --git a/apps/cic-cache/requirements.txt b/apps/cic-cache/requirements.txt index b538691..82eb421 100644 --- a/apps/cic-cache/requirements.txt +++ b/apps/cic-cache/requirements.txt @@ -1,9 +1,9 @@ -cic-base~=0.1.2a77 +cic-base~=0.1.2b6 alembic==1.4.2 confini~=0.3.6rc3 uwsgi==2.0.19.1 moolb~=0.1.0 -cic-eth-registry~=0.5.4a16 +cic-eth-registry~=0.5.5a1 SQLAlchemy==1.3.20 semver==2.13.0 psycopg2==2.8.6 diff --git a/apps/cic-cache/test_requirements.txt b/apps/cic-cache/test_requirements.txt index b6ca05a..dce79ea 100644 --- a/apps/cic-cache/test_requirements.txt +++ b/apps/cic-cache/test_requirements.txt @@ -4,3 +4,10 @@ pytest-mock==3.3.1 pysqlite3==0.4.3 sqlparse==0.4.1 pytest-celery==0.0.0a1 +eth_tester==0.5.0b3 +py-evm==0.3.0a20 +web3==5.12.2 +cic-eth-registry~=0.5.5a3 +giftable-erc20-token~=0.0.8a10 +eth-address-index~=0.1.1a10 +sarafu-faucet~=0.0.3a1 diff --git a/apps/cic-cache/tests/conftest.py b/apps/cic-cache/tests/conftest.py index 207e808..00db9ae 100644 --- a/apps/cic-cache/tests/conftest.py +++ b/apps/cic-cache/tests/conftest.py @@ -3,7 +3,7 @@ import os import sys import datetime -# third-party imports +# external imports import pytest # local imports @@ -84,3 +84,7 @@ def txs( session.commit() + return [ + tx_hash_first, + tx_hash_second, + ] diff --git a/apps/cic-cache/tests/filters/conftest.py b/apps/cic-cache/tests/filters/conftest.py new file mode 100644 index 0000000..50fa9b6 --- /dev/null +++ b/apps/cic-cache/tests/filters/conftest.py @@ -0,0 +1,3 @@ +from chainlib.eth.pytest import * +from cic_eth_registry.pytest.fixtures_tokens import * + diff --git a/apps/cic-cache/tests/filters/test_erc20.py b/apps/cic-cache/tests/filters/test_erc20.py new file mode 100644 index 0000000..d7582ed --- /dev/null +++ b/apps/cic-cache/tests/filters/test_erc20.py @@ -0,0 +1,69 @@ +# standard imports +import os +import datetime +import logging +import json + +# external imports +import pytest +from sqlalchemy import text +from chainlib.eth.tx import Tx +from chainlib.eth.block import Block +from chainlib.chain import ChainSpec +from hexathon import ( + strip_0x, + add_0x, + ) + +# local imports +from cic_cache.db import add_tag +from cic_cache.runnable.daemons.filters.erc20 import ERC20TransferFilter + +logg = logging.getLogger() + + +def test_cache( + eth_rpc, + foo_token, + init_database, + list_defaults, + list_actors, + tags, + ): + + chain_spec = ChainSpec('foo', 'bar', 42, 'baz') + + fltr = ERC20TransferFilter(chain_spec) + + add_tag(init_database, fltr.tag_name, domain=fltr.tag_domain) + + data = 'a9059cbb' + data += strip_0x(list_actors['alice']) + data += '1000'.ljust(64, '0') + + block = Block({ + 'hash': os.urandom(32).hex(), + 'number': 42, + 'timestamp': datetime.datetime.utcnow().timestamp(), + 'transactions': [], + }) + + tx = Tx({ + 'to': foo_token, + 'from': list_actors['bob'], + 'data': data, + 'value': 0, + 'hash': os.urandom(32).hex(), + 'nonce': 13, + 'gasPrice': 10000000, + 'gas': 123456, + }) + block.txs.append(tx) + tx.block = block + + r = fltr.filter(eth_rpc, block, tx, db_session=init_database) + assert r + + s = text("SELECT x.tx_hash FROM tag a INNER JOIN tag_tx_link l ON l.tag_id = a.id INNER JOIN tx x ON x.id = l.tx_id WHERE a.domain = :a AND a.value = :b") + r = init_database.execute(s, {'a': fltr.tag_domain, 'b': fltr.tag_name}).fetchone() + assert r[0] == tx.hash diff --git a/apps/cic-cache/tests/fixtures_config.py b/apps/cic-cache/tests/fixtures_config.py index 723fff1..17a41c9 100644 --- a/apps/cic-cache/tests/fixtures_config.py +++ b/apps/cic-cache/tests/fixtures_config.py @@ -2,7 +2,7 @@ import os import logging -# third-party imports +# external imports import pytest import confini @@ -13,7 +13,7 @@ logg = logging.getLogger(__file__) @pytest.fixture(scope='session') def load_config(): - config_dir = os.path.join(root_dir, '.config/test') + config_dir = os.path.join(root_dir, 'config/test') conf = confini.Config(config_dir, 'CICTEST') conf.process() logg.debug('config {}'.format(conf)) diff --git a/apps/cic-cache/tests/fixtures_database.py b/apps/cic-cache/tests/fixtures_database.py index 27067e6..f5ff610 100644 --- a/apps/cic-cache/tests/fixtures_database.py +++ b/apps/cic-cache/tests/fixtures_database.py @@ -3,13 +3,16 @@ import os import logging import re -# third-party imports +# external imports import pytest import sqlparse +import alembic +from alembic.config import Config as AlembicConfig # local imports from cic_cache.db.models.base import SessionBase from cic_cache.db import dsn_from_config +from cic_cache.db import add_tag logg = logging.getLogger(__file__) @@ -26,11 +29,10 @@ def database_engine( except FileNotFoundError: pass dsn = dsn_from_config(load_config) - SessionBase.connect(dsn) + SessionBase.connect(dsn, debug=load_config.true('DATABASE_DEBUG')) return dsn -# TODO: use alembic instead to migrate db, here we have to keep separate schema than migration script in script/migrate.py @pytest.fixture(scope='function') def init_database( load_config, @@ -38,52 +40,23 @@ def init_database( ): rootdir = os.path.dirname(os.path.dirname(__file__)) - schemadir = os.path.join(rootdir, 'db', load_config.get('DATABASE_DRIVER')) - - if load_config.get('DATABASE_ENGINE') == 'sqlite': - rconn = SessionBase.engine.raw_connection() - f = open(os.path.join(schemadir, 'db.sql')) - s = f.read() - f.close() - rconn.executescript(s) - - else: - rconn = SessionBase.engine.raw_connection() - rcursor = rconn.cursor() - - #rcursor.execute('DROP FUNCTION IF EXISTS public.transaction_list') - #rcursor.execute('DROP FUNCTION IF EXISTS public.balances') - - f = open(os.path.join(schemadir, 'db.sql')) - s = f.read() - f.close() - r = re.compile(r'^[A-Z]', re.MULTILINE) - for l in sqlparse.parse(s): - strl = str(l) - # we need to check for empty query lines, as sqlparse doesn't do that on its own (and psycopg complains when it gets them) - if not re.search(r, strl): - logg.warning('skipping parsed query line {}'.format(strl)) - continue - rcursor.execute(strl) - rconn.commit() - - rcursor.execute('SET search_path TO public') - -# this doesn't work when run separately, no idea why -# functions have been manually added to original schema from cic-eth -# f = open(os.path.join(schemadir, 'proc_transaction_list.sql')) -# s = f.read() -# f.close() -# rcursor.execute(s) -# -# f = open(os.path.join(schemadir, 'proc_balances.sql')) -# s = f.read() -# f.close() -# rcursor.execute(s) - - rcursor.close() + dbdir = os.path.join(rootdir, 'cic_cache', 'db') + migrationsdir = os.path.join(dbdir, 'migrations', load_config.get('DATABASE_ENGINE')) + if not os.path.isdir(migrationsdir): + migrationsdir = os.path.join(dbdir, 'migrations', 'default') + logg.info('using migrations directory {}'.format(migrationsdir)) session = SessionBase.create_session() + + ac = AlembicConfig(os.path.join(migrationsdir, 'alembic.ini')) + ac.set_main_option('sqlalchemy.url', database_engine) + ac.set_main_option('script_location', migrationsdir) + + alembic.command.downgrade(ac, 'base') + alembic.command.upgrade(ac, 'head') + + session.commit() + yield session session.commit() session.close() @@ -116,3 +89,14 @@ def list_defaults( return { 'block': 420000, } + + +@pytest.fixture(scope='function') +def tags( + init_database, + ): + + add_tag(init_database, 'foo') + add_tag(init_database, 'baz', domain='bar') + add_tag(init_database, 'xyzzy', domain='bar') + init_database.commit() diff --git a/apps/cic-cache/tests/test_cache.py b/apps/cic-cache/tests/test_cache.py index 0a9712c..3c77264 100644 --- a/apps/cic-cache/tests/test_cache.py +++ b/apps/cic-cache/tests/test_cache.py @@ -4,7 +4,7 @@ import datetime import logging import json -# third-party imports +# external imports import pytest # local imports diff --git a/apps/cic-cache/tests/test_tag.py b/apps/cic-cache/tests/test_tag.py new file mode 100644 index 0000000..4b97140 --- /dev/null +++ b/apps/cic-cache/tests/test_tag.py @@ -0,0 +1,37 @@ +import os +import datetime +import logging +import json + +# external imports +import pytest + +# local imports +from cic_cache.db import tag_transaction + +logg = logging.getLogger() + + +def test_cache( + init_database, + list_defaults, + list_actors, + list_tokens, + txs, + tags, + ): + + tag_transaction(init_database, txs[0], 'foo') + tag_transaction(init_database, txs[0], 'baz', domain='bar') + tag_transaction(init_database, txs[1], 'xyzzy', domain='bar') + + r = init_database.execute("SELECT x.tx_hash FROM tag a INNER JOIN tag_tx_link l ON l.tag_id = a.id INNER JOIN tx x ON x.id = l.tx_id WHERE a.value = 'foo'").fetchall() + assert r[0][0] == txs[0] + + + r = init_database.execute("SELECT x.tx_hash FROM tag a INNER JOIN tag_tx_link l ON l.tag_id = a.id INNER JOIN tx x ON x.id = l.tx_id WHERE a.domain = 'bar' AND a.value = 'baz'").fetchall() + assert r[0][0] == txs[0] + + + r = init_database.execute("SELECT x.tx_hash FROM tag a INNER JOIN tag_tx_link l ON l.tag_id = a.id INNER JOIN tx x ON x.id = l.tx_id WHERE a.domain = 'bar' AND a.value = 'xyzzy'").fetchall() + assert r[0][0] == txs[1] diff --git a/apps/cic-eth/tests/fixtures_database.py b/apps/cic-eth/tests/fixtures_database.py index 6e43a85..eb57eb2 100644 --- a/apps/cic-eth/tests/fixtures_database.py +++ b/apps/cic-eth/tests/fixtures_database.py @@ -2,7 +2,7 @@ import os import logging -# third-party imports +# external imports import pytest import alembic from alembic.config import Config as AlembicConfig