diff --git a/.gitignore b/.gitignore index 5f5c740..558f498 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,4 @@ __pycache__ gmon.out build/ dist/ +*.sqlite diff --git a/MANIFEST.in b/MANIFEST.in index 54f05fe..6855854 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1 +1 @@ -include *requirements.txt LICENSE.txt sql/**/* +include *requirements.txt LICENSE.txt chainsyncer/db/migrations/default/* chainsyncer/db/migrations/default/versions/* chainsyncer/db/migrations/default/versions/src/* diff --git a/chainsyncer/backend/base.py b/chainsyncer/backend/base.py index 4b6cd63..ccf36f1 100644 --- a/chainsyncer/backend/base.py +++ b/chainsyncer/backend/base.py @@ -9,7 +9,17 @@ class Backend: def __init__(self, flags_reversed=False): self.filter_count = 0 self.flags_reversed = flags_reversed - + + self.block_height_offset = 0 + self.tx_index_offset = 0 + + self.block_height_cursor = 0 + self.tx_index_cursor = 0 + + self.block_height_target = 0 + self.tx_index_target = 0 + + def check_filter(self, n, flags): if self.flags_reversed: @@ -20,3 +30,16 @@ class Backend: pass return False return flags & (1 << n) > 0 + + + + def chain(self): + """Returns chain spec for syncer + + :returns: Chain spec + :rtype chain_spec: cic_registry.chain.ChainSpec + """ + return self.chain_spec + + def __str__(self): + return "syncerbackend chain {} start {} target {}".format(self.chain(), self.start(), self.target()) diff --git a/chainsyncer/backend/file.py b/chainsyncer/backend/file.py index 0f9f840..9df52f5 100644 --- a/chainsyncer/backend/file.py +++ b/chainsyncer/backend/file.py @@ -28,15 +28,6 @@ class FileBackend(Backend): super(FileBackend, self).__init__(flags_reversed=True) self.object_data_dir = data_dir_for(chain_spec, object_id, base_dir=base_dir) - self.block_height_offset = 0 - self.tx_index_offset = 0 - - self.block_height_cursor = 0 - self.tx_index_cursor = 0 - - self.block_height_target = 0 - self.tx_index_target = 0 - self.object_id = object_id self.db_object = None self.db_object_filter = None @@ -206,8 +197,18 @@ class FileBackend(Backend): o = FileBackend(chain_spec, uu, base_dir=base_dir) o.__set(target_block_height, 0, 'target') o.__set(start_block_height, 0, 'offset') + o.__set(start_block_height, 0, 'cursor') + + return o + + + @staticmethod + def live(chain_spec, block_height, base_dir=base_dir): + uu = FileBackend.create_object(chain_spec, base_dir=base_dir) + o = FileBackend(chain_spec, uu, base_dir=base_dir) + o.__set(block_height, 0, 'offset') + o.__set(block_height, 0, 'cursor') - #return uu return o @@ -245,15 +246,20 @@ class FileBackend(Backend): @staticmethod - def resume(chain_spec, base_dir=base_dir): - return FileBackend.__sorted_entries(chain_spec, base_dir=base_dir) + def resume(chain_spec, block_height, base_dir=base_dir): + try: + return FileBackend.__sorted_entries(chain_spec, base_dir=base_dir) + except FileNotFoundError: + return [] @staticmethod def first(chain_spec, base_dir=base_dir): - - entries = FileBackend.__sorted_entries(chain_spec, base_dir=base_dir) - + entries = [] + try: + entries = FileBackend.__sorted_entries(chain_spec, base_dir=base_dir) + except FileNotFoundError: + return entries return entries[len(entries)-1] diff --git a/chainsyncer/backend/memory.py b/chainsyncer/backend/memory.py index 8bcaf20..a2b451c 100644 --- a/chainsyncer/backend/memory.py +++ b/chainsyncer/backend/memory.py @@ -19,6 +19,7 @@ class MemBackend(Backend): self.target_block = target_block self.db_session = None self.filter_names = [] + self.filter_states = {} def connect(self): diff --git a/chainsyncer/backend/sql.py b/chainsyncer/backend/sql.py index f723043..53c2f97 100644 --- a/chainsyncer/backend/sql.py +++ b/chainsyncer/backend/sql.py @@ -78,14 +78,6 @@ class SQLBackend(Backend): self.db_session.close() self.db_session = None - - def chain(self): - """Returns chain spec for syncer - - :returns: Chain spec - :rtype chain_spec: cic_registry.chain.ChainSpec - """ - return self.chain_spec def get(self): @@ -313,5 +305,3 @@ class SQLBackend(Backend): self.disconnect() - def __str__(self): - return "syncerbackend chain {} start {} target {}".format(self.chain(), self.start(), self.target()) diff --git a/chainsyncer/db/migrations/__init__.py b/chainsyncer/db/migrations/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/chainsyncer/db/migrations/default/README b/chainsyncer/db/migrations/default/README new file mode 100644 index 0000000..98e4f9c --- /dev/null +++ b/chainsyncer/db/migrations/default/README @@ -0,0 +1 @@ +Generic single-database configuration. \ No newline at end of file diff --git a/chainsyncer/db/migrations/default/alembic.ini b/chainsyncer/db/migrations/default/alembic.ini new file mode 100644 index 0000000..c88a93b --- /dev/null +++ b/chainsyncer/db/migrations/default/alembic.ini @@ -0,0 +1,85 @@ +# A generic, single database configuration. + +[alembic] +# path to migration scripts +script_location = . + +# template used to generate migration files +# file_template = %%(rev)s_%%(slug)s + +# timezone to use when rendering the date +# within the migration file as well as the filename. +# string value is passed to dateutil.tz.gettz() +# leave blank for localtime +# timezone = + +# max length of characters to apply to the +# "slug" field +# truncate_slug_length = 40 + +# set to 'true' to run the environment during +# the 'revision' command, regardless of autogenerate +# revision_environment = false + +# set to 'true' to allow .pyc and .pyo files without +# a source .py file to be detected as revisions in the +# versions/ directory +# sourceless = false + +# version location specification; this defaults +# to ./versions. When using multiple version +# directories, initial revisions must be specified with --version-path +# version_locations = %(here)s/bar %(here)s/bat ./versions + +# the output encoding used when revision files +# are written from script.py.mako +# output_encoding = utf-8 + +sqlalchemy.url = driver://user:pass@localhost/dbname + + +[post_write_hooks] +# post_write_hooks defines scripts or Python functions that are run +# on newly generated revision scripts. See the documentation for further +# detail and examples + +# format using "black" - use the console_scripts runner, against the "black" entrypoint +# hooks=black +# black.type=console_scripts +# black.entrypoint=black +# black.options=-l 79 + +# Logging configuration +[loggers] +keys = root,sqlalchemy,alembic + +[handlers] +keys = console + +[formatters] +keys = generic + +[logger_root] +#level = WARN +handlers = console +qualname = + +[logger_sqlalchemy] +level = WARN +handlers = +qualname = sqlalchemy.engine + +[logger_alembic] +level = INFO +handlers = +qualname = alembic + +[handler_console] +class = StreamHandler +args = (sys.stderr,) +level = NOTSET +formatter = generic + +[formatter_generic] +format = %(levelname)-5.5s [%(name)s] %(message)s +datefmt = %H:%M:%S diff --git a/chainsyncer/db/migrations/default/env.py b/chainsyncer/db/migrations/default/env.py new file mode 100644 index 0000000..70518a2 --- /dev/null +++ b/chainsyncer/db/migrations/default/env.py @@ -0,0 +1,77 @@ +from logging.config import fileConfig + +from sqlalchemy import engine_from_config +from sqlalchemy import pool + +from alembic import context + +# this is the Alembic Config object, which provides +# access to the values within the .ini file in use. +config = context.config + +# Interpret the config file for Python logging. +# This line sets up loggers basically. +fileConfig(config.config_file_name) + +# add your model's MetaData object here +# for 'autogenerate' support +# from myapp import mymodel +# target_metadata = mymodel.Base.metadata +target_metadata = None + +# other values from the config, defined by the needs of env.py, +# can be acquired: +# my_important_option = config.get_main_option("my_important_option") +# ... etc. + + +def run_migrations_offline(): + """Run migrations in 'offline' mode. + + This configures the context with just a URL + and not an Engine, though an Engine is acceptable + here as well. By skipping the Engine creation + we don't even need a DBAPI to be available. + + Calls to context.execute() here emit the given string to the + script output. + + """ + url = config.get_main_option("sqlalchemy.url") + context.configure( + url=url, + target_metadata=target_metadata, + literal_binds=True, + dialect_opts={"paramstyle": "named"}, + ) + + with context.begin_transaction(): + context.run_migrations() + + +def run_migrations_online(): + """Run migrations in 'online' mode. + + In this scenario we need to create an Engine + and associate a connection with the context. + + """ + connectable = engine_from_config( + config.get_section(config.config_ini_section), + prefix="sqlalchemy.", + poolclass=pool.NullPool, + ) + + with connectable.connect() as connection: + context.configure( + connection=connection, target_metadata=target_metadata + ) + + with context.begin_transaction(): + context.run_migrations() + + +if context.is_offline_mode(): + run_migrations_offline() +else: + run_migrations_online() diff --git a/chainsyncer/db/migrations/default/export.py b/chainsyncer/db/migrations/default/export.py new file mode 100644 index 0000000..91e041e --- /dev/null +++ b/chainsyncer/db/migrations/default/export.py @@ -0,0 +1,37 @@ +from alembic import op +import sqlalchemy as sa + +from chainsyncer.db.migrations.default.versions.src.sync import ( + upgrade as upgrade_sync, + downgrade as downgrade_sync, +) + +from chainsyncer.db.migrations.default.versions.src.sync_tx import ( + upgrade as upgrade_sync_tx, + downgrade as downgrade_sync_tx, +) + +def chainsyncer_upgrade(major=0, minor=0, patch=3): + r0_0_1_u() + if patch >= 3: + r0_0_3_u() + +def chainsyncer_downgrade(major=0, minor=0, patch=3): + if patch >= 3: + r0_0_3_d() + r0_0_1_d() + +def r0_0_1_u(): + upgrade_sync() + +def r0_0_1_d(): + downgrade_sync() + + +# 0.0.3 + +def r0_0_3_u(): + upgrade_sync_tx() + +def r0_0_3_d(): + downgrade_sync_tx() diff --git a/chainsyncer/db/migrations/default/script.py.mako b/chainsyncer/db/migrations/default/script.py.mako new file mode 100644 index 0000000..2c01563 --- /dev/null +++ b/chainsyncer/db/migrations/default/script.py.mako @@ -0,0 +1,24 @@ +"""${message} + +Revision ID: ${up_revision} +Revises: ${down_revision | comma,n} +Create Date: ${create_date} + +""" +from alembic import op +import sqlalchemy as sa +${imports if imports else ""} + +# revision identifiers, used by Alembic. +revision = ${repr(up_revision)} +down_revision = ${repr(down_revision)} +branch_labels = ${repr(branch_labels)} +depends_on = ${repr(depends_on)} + + +def upgrade(): + ${upgrades if upgrades else "pass"} + + +def downgrade(): + ${downgrades if downgrades else "pass"} diff --git a/chainsyncer/db/migrations/default/versions/452ecfa81de3_base_setup.py b/chainsyncer/db/migrations/default/versions/452ecfa81de3_base_setup.py new file mode 100644 index 0000000..6266860 --- /dev/null +++ b/chainsyncer/db/migrations/default/versions/452ecfa81de3_base_setup.py @@ -0,0 +1,14 @@ +"""base setup + +Revision ID: 452ecfa81de3 +Revises: +Create Date: 2021-07-16 16:29:32.460027 + +""" +# revision identifiers, used by Alembic. +revision = '452ecfa81de3' +down_revision = None +branch_labels = None +depends_on = None + +from chainsyncer.db.migrations.default.versions.src.sync import upgrade, downgrade diff --git a/chainsyncer/db/migrations/default/versions/a2ce6826c5eb_sync_tx.py b/chainsyncer/db/migrations/default/versions/a2ce6826c5eb_sync_tx.py new file mode 100644 index 0000000..7482bf2 --- /dev/null +++ b/chainsyncer/db/migrations/default/versions/a2ce6826c5eb_sync_tx.py @@ -0,0 +1,14 @@ +"""sync-tx + +Revision ID: a2ce6826c5eb +Revises: 452ecfa81de3 +Create Date: 2021-07-16 18:17:53.439721 + +""" +# revision identifiers, used by Alembic. +revision = 'a2ce6826c5eb' +down_revision = '452ecfa81de3' +branch_labels = None +depends_on = None + +from chainsyncer.db.migrations.default.versions.src.sync_tx import upgrade, downgrade diff --git a/chainsyncer/db/migrations/default/versions/src/sync.py b/chainsyncer/db/migrations/default/versions/src/sync.py new file mode 100644 index 0000000..2557625 --- /dev/null +++ b/chainsyncer/db/migrations/default/versions/src/sync.py @@ -0,0 +1,32 @@ +from alembic import op +import sqlalchemy as sa + + +def upgrade(): + op.create_table( + 'chain_sync', + sa.Column('id', sa.Integer, primary_key=True), + sa.Column('blockchain', sa.String, nullable=False), + sa.Column('block_start', sa.Integer, nullable=False, default=0), + sa.Column('tx_start', sa.Integer, nullable=False, default=0), + sa.Column('block_cursor', sa.Integer, nullable=False, default=0), + sa.Column('tx_cursor', sa.Integer, nullable=False, default=0), + sa.Column('block_target', sa.Integer, nullable=True), + sa.Column('date_created', sa.DateTime, nullable=False), + sa.Column('date_updated', sa.DateTime), + ) + + op.create_table( + 'chain_sync_filter', + sa.Column('id', sa.Integer, primary_key=True), + sa.Column('chain_sync_id', sa.Integer, sa.ForeignKey('chain_sync.id'), nullable=True), + sa.Column('flags', sa.LargeBinary, nullable=True), + sa.Column('flags_start', sa.LargeBinary, nullable=True), + sa.Column('count', sa.Integer, nullable=False, default=0), + sa.Column('digest', sa.String(64), nullable=False), + ) + + +def downgrade(): + op.drop_table('chain_sync_filter') + op.drop_table('chain_sync') diff --git a/chainsyncer/db/migrations/default/versions/src/sync_tx.py b/chainsyncer/db/migrations/default/versions/src/sync_tx.py new file mode 100644 index 0000000..6edfa48 --- /dev/null +++ b/chainsyncer/db/migrations/default/versions/src/sync_tx.py @@ -0,0 +1,17 @@ +from alembic import op +import sqlalchemy as sa + + +def upgrade(): + op.create_table( + 'chain_sync_tx', + sa.Column('id', sa.Integer, primary_key=True), + sa.Column('blockchain', sa.String, nullable=False), + sa.Column('chain_sync_id', sa.Integer, sa.ForeignKey('chain_sync.id'), nullable=False), + sa.Column('flags', sa.LargeBinary, nullable=True), + sa.Column('block', sa.Integer, nullable=False), + sa.Column('tx', sa.Integer, nullable=False), + ) + +def downgrade(): + op.drop_table('chain_sync_tx') diff --git a/chainsyncer/db/migrations/sqlalchemy.py b/chainsyncer/db/migrations/sqlalchemy.py index bf7edce..a498beb 100644 --- a/chainsyncer/db/migrations/sqlalchemy.py +++ b/chainsyncer/db/migrations/sqlalchemy.py @@ -1,36 +1,37 @@ from alembic import op import sqlalchemy as sa -def chainsyncer_upgrade(major=0, minor=0, patch=1): - r0_0_1_u() +from chainsyncer.db.migrations.default.versions.tags.sync import + upgrade as upgrade_sync, + downgrade as downgrade_sync, +) -def chainsyncer_downgrade(major=0, minor=0, patch=1): +from chainsyncer.db.migrations.default.versions.tags.sync_tx import + upgrade as upgrade_sync_tx, + downgrade as downgrade_sync_tx, +) + +def chainsyncer_upgrade(major=0, minor=0, patch=3): + r0_0_1_u() + if patch >= 3: + r0_0_3_u() + +def chainsyncer_downgrade(major=0, minor=0, patch=3): + if patch >= 3: + r0_0_3_d() r0_0_1_d() def r0_0_1_u(): - op.create_table( - 'chain_sync', - sa.Column('id', sa.Integer, primary_key=True), - sa.Column('blockchain', sa.String, nullable=False), - sa.Column('block_start', sa.Integer, nullable=False, default=0), - sa.Column('tx_start', sa.Integer, nullable=False, default=0), - sa.Column('block_cursor', sa.Integer, nullable=False, default=0), - sa.Column('tx_cursor', sa.Integer, nullable=False, default=0), - sa.Column('block_target', sa.Integer, nullable=True), - sa.Column('date_created', sa.DateTime, nullable=False), - sa.Column('date_updated', sa.DateTime), - ) - - op.create_table( - 'chain_sync_filter', - sa.Column('id', sa.Integer, primary_key=True), - sa.Column('chain_sync_id', sa.Integer, sa.ForeignKey('chain_sync.id'), nullable=True), - sa.Column('flags', sa.LargeBinary, nullable=True), - sa.Column('flags_start', sa.LargeBinary, nullable=True), - sa.Column('count', sa.Integer, nullable=False, default=0), - sa.Column('digest', sa.String(64), nullable=False), - ) + upgrade_sync() def r0_0_1_d(): - op.drop_table('chain_sync_filter') - op.drop_table('chain_sync') + downgrade_sync() + + +# 0.0.3 + +def r0_0_3_u(): + upgrade_sync_tx() + +def r0_0_3_d(): + downgrade_sync_tx() diff --git a/chainsyncer/db/models/__init__.py b/chainsyncer/db/models/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/chainsyncer/db/models/base.py b/chainsyncer/db/models/base.py index db570df..26f466d 100644 --- a/chainsyncer/db/models/base.py +++ b/chainsyncer/db/models/base.py @@ -120,3 +120,4 @@ class SessionBase(Model): logg.debug('destroying session {}'.format(session_key)) session.commit() session.close() + del SessionBase.localsessions[session_key] diff --git a/chainsyncer/driver.py b/chainsyncer/driver.py deleted file mode 100644 index 655e467..0000000 --- a/chainsyncer/driver.py +++ /dev/null @@ -1,209 +0,0 @@ -# standard imports -import uuid -import logging -import time -import signal -import json - -# external imports -from chainlib.eth.block import ( - block_by_number, - Block, - ) -from chainlib.eth.tx import ( - receipt, - transaction, - Tx, - ) -from chainlib.error import JSONRPCException - -# local imports -from chainsyncer.filter import SyncFilter -from chainsyncer.error import ( - SyncDone, - NoBlockForYou, - ) - -logg = logging.getLogger().getChild(__name__) - - -def noop_callback(block, tx): - logg.debug('noop callback ({},{})'.format(block, tx)) - - -class Syncer: - - running_global = True - yield_delay=0.005 - signal_request = [signal.SIGINT, signal.SIGTERM] - signal_set = False - - def __init__(self, backend, pre_callback=None, block_callback=None, post_callback=None): - self.cursor = None - self.running = True - self.backend = backend - self.filter = SyncFilter(backend) - self.block_callback = block_callback - self.pre_callback = pre_callback - self.post_callback = post_callback - if not Syncer.signal_set: - for sig in Syncer.signal_request: - signal.signal(sig, Syncer.__sig_terminate) - Syncer.signal_set = True - - - @staticmethod - def __sig_terminate(sig, frame): - logg.warning('got signal {}'.format(sig)) - Syncer.terminate() - - - @staticmethod - def terminate(): - logg.info('termination requested!') - Syncer.running_global = False - - - def chain(self): - """Returns the string representation of the chain spec for the chain the syncer is running on. - - :returns: Chain spec string - :rtype: str - """ - return self.bc_cache.chain() - - - def add_filter(self, f): - self.filter.add(f) - self.backend.register_filter(str(f)) - - - def process_single(self, conn, block, tx): - self.backend.set(block.number, tx.index) - self.filter.apply(conn, block, tx) - - -class BlockPollSyncer(Syncer): - - def __init__(self, backend, pre_callback=None, block_callback=None, post_callback=None): - super(BlockPollSyncer, self).__init__(backend, pre_callback, block_callback, post_callback) - - - def loop(self, interval, conn): - (pair, fltr) = self.backend.get() - start_tx = pair[1] - - while self.running and Syncer.running_global: - if self.pre_callback != None: - self.pre_callback() - while True and Syncer.running_global: - if start_tx > 0: - start_tx -= 1 - continue - try: - block = self.get(conn) - except SyncDone as e: - logg.info('sync done: {}'.format(e)) - return self.backend.get() - except NoBlockForYou as e: - break -# TODO: To properly handle this, ensure that previous request is rolled back -# except sqlalchemy.exc.OperationalError as e: -# logg.error('database error: {}'.format(e)) -# break - - if self.block_callback != None: - self.block_callback(block, None) - - last_block = block - self.process(conn, block) - start_tx = 0 - time.sleep(self.yield_delay) - if self.post_callback != None: - self.post_callback() - time.sleep(interval) - - -class HeadSyncer(BlockPollSyncer): - - def process(self, conn, block): - (pair, fltr) = self.backend.get() - logg.debug('process block {} (backend {}:{})'.format(block, pair, fltr)) - i = pair[1] # set tx index from previous - tx = None - while True: - try: - tx = block.tx(i) - except AttributeError: - o = transaction(block.txs[i]) - r = conn.do(o) - tx = Tx(Tx.src_normalize(r), block=block) - except IndexError as e: - logg.debug('index error syncer rcpt get {}'.format(e)) - self.backend.set(block.number + 1, 0) - break - - # TODO: Move specifics to eth subpackage, receipts are not a global concept - rcpt = conn.do(receipt(tx.hash)) - if rcpt != None: - tx.apply_receipt(Tx.src_normalize(rcpt)) - - self.process_single(conn, block, tx) - self.backend.reset_filter() - - i += 1 - - - def get(self, conn): - (height, flags) = self.backend.get() - block_number = height[0] - block_hash = [] - o = block_by_number(block_number) - r = conn.do(o) - if r == None: - raise NoBlockForYou() - b = Block(r) - b.txs = b.txs[height[1]:] - - return b - - - def __str__(self): - return '[headsyncer] {}'.format(str(self.backend)) - - -class HistorySyncer(HeadSyncer): - - def __init__(self, backend, pre_callback=None, block_callback=None, post_callback=None): - super(HeadSyncer, self).__init__(backend, pre_callback, block_callback, post_callback) - self.block_target = None - (block_number, flags) = self.backend.target() - if block_number == None: - raise AttributeError('backend has no future target. Use HeadSyner instead') - self.block_target = block_number - logg.debug('block target {}'.format(self.block_target)) - - - def get(self, conn): - (height, flags) = self.backend.get() - if self.block_target < height[0]: - raise SyncDone(self.block_target) - block_number = height[0] - block_hash = [] - o = block_by_number(block_number) - try: - r = conn.do(o) - # TODO: Disambiguate whether error is temporary or permanent, if permanent, SyncDone should be raised, because a historical sync is attempted into the future - except JSONRPCException: - r = None - if r == None: - raise SyncDone() #NoBlockForYou() - b = Block(r) - - return b - - - def __str__(self): - return '[historysyncer] {}'.format(str(self.backend)) - - diff --git a/chainsyncer/driver/__init__.py b/chainsyncer/driver/__init__.py new file mode 100644 index 0000000..325f58d --- /dev/null +++ b/chainsyncer/driver/__init__.py @@ -0,0 +1 @@ +from .base import Syncer diff --git a/chainsyncer/driver/base.py b/chainsyncer/driver/base.py new file mode 100644 index 0000000..8e04c0a --- /dev/null +++ b/chainsyncer/driver/base.py @@ -0,0 +1,73 @@ +# standard imports +import uuid +import logging +import time +import signal +import json + +# external imports +from chainlib.error import JSONRPCException + +# local imports +from chainsyncer.filter import SyncFilter +from chainsyncer.error import ( + SyncDone, + NoBlockForYou, + ) + +logg = logging.getLogger(__name__) + + +def noop_callback(block, tx): + logg.debug('noop callback ({},{})'.format(block, tx)) + + +class Syncer: + + running_global = True + yield_delay=0.005 + signal_request = [signal.SIGINT, signal.SIGTERM] + signal_set = False + name = 'base' + + def __init__(self, backend, chain_interface, pre_callback=None, block_callback=None, post_callback=None): + self.chain_interface = chain_interface + self.cursor = None + self.running = True + self.backend = backend + self.filter = SyncFilter(backend) + self.block_callback = block_callback + self.pre_callback = pre_callback + self.post_callback = post_callback + if not Syncer.signal_set: + for sig in Syncer.signal_request: + signal.signal(sig, self.__sig_terminate) + Syncer.signal_set = True + + + def __sig_terminate(self, sig, frame): + logg.warning('got signal {}'.format(sig)) + self.terminate() + + + def terminate(self): + logg.info('termination requested!') + Syncer.running_global = False + Syncer.running = False + + + def add_filter(self, f): + self.filter.add(f) + self.backend.register_filter(str(f)) + + + def process_single(self, conn, block, tx): + self.backend.set(block.number, tx.index) + self.filter.apply(conn, block, tx) + + + def __str__(self): + return 'syncer "{}" {}'.format( + self.name, + self.backend, + ) diff --git a/chainsyncer/driver/head.py b/chainsyncer/driver/head.py new file mode 100644 index 0000000..b044a4e --- /dev/null +++ b/chainsyncer/driver/head.py @@ -0,0 +1,52 @@ +# standard imports +import logging + +# local imports +from chainsyncer.error import NoBlockForYou +from .poll import BlockPollSyncer + +logg = logging.getLogger(__name__) + +class HeadSyncer(BlockPollSyncer): + + name = 'head' + + def process(self, conn, block): + (pair, fltr) = self.backend.get() + logg.debug('process block {} (backend {}:{})'.format(block, pair, fltr)) + i = pair[1] # set tx index from previous + tx = None + while True: + try: + tx = block.tx(i) + except AttributeError: + o = tx(block.txs[i]) + r = conn.do(o) + tx = self.interface.tx_from_src(Tx.src_normalize(r), block=block) + #except IndexError as e: + # logg.debug('index error syncer tx get {}'.format(e)) + # break + + # TODO: Move specifics to eth subpackage, receipts are not a global concept + rcpt = conn.do(self.chain_interface.tx_receipt(tx.hash)) + if rcpt != None: + tx.apply_receipt(self.chain_interface.src_normalize(rcpt)) + + self.process_single(conn, block, tx) + self.backend.reset_filter() + + i += 1 + + + def get(self, conn): + (height, flags) = self.backend.get() + block_number = height[0] + block_hash = [] + o = self.chain_interface.block_by_number(block_number) + r = conn.do(o) + if r == None: + raise NoBlockForYou() + b = self.chain_interface.block_from_src(r) + b.txs = b.txs[height[1]:] + + return b diff --git a/chainsyncer/driver/history.py b/chainsyncer/driver/history.py new file mode 100644 index 0000000..1e03de9 --- /dev/null +++ b/chainsyncer/driver/history.py @@ -0,0 +1,45 @@ +# standard imports +import logging + +# external imports +from chainlib.error import RPCException + +# local imports +from .head import HeadSyncer +from chainsyncer.error import SyncDone +from chainlib.error import RPCException + +logg = logging.getLogger(__name__) + + +class HistorySyncer(HeadSyncer): + + name = 'history' + + def __init__(self, backend, chain_interface, pre_callback=None, block_callback=None, post_callback=None): + super(HeadSyncer, self).__init__(backend, chain_interface, pre_callback, block_callback, post_callback) + self.block_target = None + (block_number, flags) = self.backend.target() + if block_number == None: + raise AttributeError('backend has no future target. Use HeadSyner instead') + self.block_target = block_number + logg.debug('block target {}'.format(self.block_target)) + + + def get(self, conn): + (height, flags) = self.backend.get() + if self.block_target < height[0]: + raise SyncDone(self.block_target) + block_number = height[0] + block_hash = [] + o = self.chain_interface.block_by_number(block_number) + try: + r = conn.do(o) + # TODO: Disambiguate whether error is temporary or permanent, if permanent, SyncDone should be raised, because a historical sync is attempted into the future + except RPCException: + r = None + if r == None: + raise SyncDone() #NoBlockForYou() + b = self.chain_interface.block_from_src(r) + + return b diff --git a/chainsyncer/driver/poll.py b/chainsyncer/driver/poll.py new file mode 100644 index 0000000..c8d72e0 --- /dev/null +++ b/chainsyncer/driver/poll.py @@ -0,0 +1,59 @@ +# standard imports +import logging +import time + +# local imports +from .base import Syncer +from chainsyncer.error import ( + SyncDone, + NoBlockForYou, + ) + +logg = logging.getLogger(__name__) + + + +class BlockPollSyncer(Syncer): + + name = 'blockpoll' + + def __init__(self, backend, chain_interface, pre_callback=None, block_callback=None, post_callback=None): + super(BlockPollSyncer, self).__init__(backend, chain_interface, pre_callback, block_callback, post_callback) + + + def loop(self, interval, conn): + (pair, fltr) = self.backend.get() + start_tx = pair[1] + + while self.running and Syncer.running_global: + if self.pre_callback != None: + self.pre_callback() + while True and Syncer.running_global: + if start_tx > 0: + start_tx -= 1 + continue + try: + block = self.get(conn) + except SyncDone as e: + logg.info('all blocks sumitted for processing: {}'.format(e)) + return self.backend.get() + except NoBlockForYou as e: + break +# TODO: To properly handle this, ensure that previous request is rolled back +# except sqlalchemy.exc.OperationalError as e: +# logg.error('database error: {}'.format(e)) +# break + + if self.block_callback != None: + self.block_callback(block, None) + + last_block = block + try: + self.process(conn, block) + except IndexError: + self.backend.set(block.number + 1, 0) + start_tx = 0 + time.sleep(self.yield_delay) + if self.post_callback != None: + self.post_callback() + time.sleep(interval) diff --git a/chainsyncer/driver/thread.py b/chainsyncer/driver/thread.py new file mode 100644 index 0000000..ac8c250 --- /dev/null +++ b/chainsyncer/driver/thread.py @@ -0,0 +1,133 @@ +# standard imports +import logging +#import threading +import multiprocessing +import queue + +# external imports +from chainlib.error import RPCException + +# local imports +from .history import HistorySyncer +from chainsyncer.error import SyncDone + +logg = logging.getLogger(__name__) + + + +class ThreadedHistorySyncer(HistorySyncer): + + def __init__(self, conn_factory, thread_limit, backend, chain_interface, pre_callback=None, block_callback=None, post_callback=None, conn_limit=0): + super(ThreadedHistorySyncer, self).__init__(backend, chain_interface, pre_callback, block_callback, post_callback) + self.workers = [] + if conn_limit == 0: + conn_limit = thread_limit + #self.conn_pool = queue.Queue(conn_limit) + #self.queue = queue.Queue(thread_limit) + #self.quit_queue = queue.Queue(1) + self.conn_pool = multiprocessing.Queue(conn_limit) + self.queue = multiprocessing.Queue(thread_limit) + self.quit_queue = multiprocessing.Queue(1) + #self.lock = threading.Lock() + self.lock = multiprocessing.Lock() + for i in range(thread_limit): + #w = threading.Thread(target=self.worker) + w = multiprocessing.Process(target=self.worker) + self.workers.append(w) + + for i in range(conn_limit): + self.conn_pool.put(conn_factory()) + + + def terminate(self): + self.quit_queue.put(()) + super(ThreadedHistorySyncer, self).terminate() + + + def worker(self): + while True: + block_number = None + try: + block_number = self.queue.get(timeout=0.01) + except queue.Empty: + if self.quit_queue.qsize() > 0: + #logg.debug('{} received quit'.format(threading.current_thread().getName())) + logg.debug('{} received quit'.format(multiprocessing.current_process().name)) + return + continue + conn = self.conn_pool.get() + try: + logg.debug('processing parent {} {}'.format(conn, block_number)) + self.process_parent(conn, block_number) + except IndexError: + pass + except RPCException as e: + logg.error('RPC failure for block {}, resubmitting to queue: {}'.format(block, e)) + self.queue.put(block_number) + conn = self.conn_pool.put(conn) + + + def process_parent(self, conn, block_number): + logg.debug('getting block {}'.format(block_number)) + o = self.chain_interface.block_by_number(block_number) + r = conn.do(o) + block = self.chain_interface.block_from_src(r) + logg.debug('got block typ {}'.format(type(block))) + super(ThreadedHistorySyncer, self).process(conn, block) + + + def process_single(self, conn, block, tx): + self.filter.apply(conn, block, tx) + + + def process(self, conn, block): + pass + + + #def process(self, conn, block): + def get(self, conn): + if not self.running: + raise SyncDone() + + block_number = None + tx_index = None + flags = None + ((block_number, tx_index), flags) = self.backend.get() + try: + #logg.debug('putting {}'.format(block.number)) + #self.queue.put((conn, block_number,), timeout=0.1) + self.queue.put(block_number, timeout=0.1) + except queue.Full: + #logg.debug('queue full, try again') + return + + target, flags = self.backend.target() + next_block = block_number + 1 + if next_block > target: + self.quit_queue.put(()) + raise SyncDone() + self.backend.set(self.backend.block_height + 1, 0) + + +# def get(self, conn): +# try: +# r = super(ThreadedHistorySyncer, self).get(conn) +# return r +# except SyncDone as e: +# self.quit_queue.put(()) +# raise e + + + def loop(self, interval, conn): + for w in self.workers: + w.start() + r = super(ThreadedHistorySyncer, self).loop(interval, conn) + for w in self.workers: + w.join() + while True: + try: + self.quit_queue.get_nowait() + except queue.Empty: + break + + logg.info('workers done {}'.format(r)) diff --git a/chainsyncer/driver/threadpool.py b/chainsyncer/driver/threadpool.py new file mode 100644 index 0000000..bedcef4 --- /dev/null +++ b/chainsyncer/driver/threadpool.py @@ -0,0 +1,171 @@ +# standard imports +import logging +#import threading +import multiprocessing +import queue +import time + +# external imports +from chainlib.error import RPCException + +# local imports +from .history import HistorySyncer +from chainsyncer.error import SyncDone + +logg = logging.getLogger(__name__) + + +def foobarcb(v): + logg.debug('foooz {}'.format(v)) + + +class ThreadPoolTask: + + process_func = None + chain_interface = None + + def poolworker(self, block_number, conn): +# conn = args[1].get() + try: + logg.debug('processing parent {} {}'.format(conn, block_number)) + #self.process_parent(self.conn, block_number) + self.process_parent(conn, block_number) + except IndexError: + pass + except RPCException as e: + logg.error('RPC failure for block {}, resubmitting to queue: {}'.format(block, e)) + raise e + #self.queue.put(block_number) +# conn = self.conn_pool.put(conn) + + def process_parent(self, conn, block_number): + logg.debug('getting block {}'.format(block_number)) + o = self.chain_interface.block_by_number(block_number) + r = conn.do(o) + block = self.chain_interface.block_from_src(r) + logg.debug('got block typ {}'.format(type(block))) + #super(ThreadedHistorySyncer, self).process(conn, block) + self.process_func(conn, block) + + + +class ThreadPoolHistorySyncer(HistorySyncer): + + def __init__(self, conn_factory, thread_limit, backend, chain_interface, pre_callback=None, block_callback=None, post_callback=None, conn_limit=0): + super(ThreadPoolHistorySyncer, self).__init__(backend, chain_interface, pre_callback, block_callback, post_callback) + self.workers = [] + self.thread_limit = thread_limit + if conn_limit == 0: + self.conn_limit = self.thread_limit + #self.conn_pool = queue.Queue(conn_limit) + #self.queue = queue.Queue(thread_limit) + #self.quit_queue = queue.Queue(1) + #self.conn_pool = multiprocessing.Queue(conn_limit) + #self.queue = multiprocessing.Queue(thread_limit) + #self.quit_queue = multiprocessing.Queue(1) + #self.lock = threading.Lock() + #self.lock = multiprocessing.Lock() + ThreadPoolTask.process_func = super(ThreadPoolHistorySyncer, self).process + ThreadPoolTask.chain_interface = chain_interface + #for i in range(thread_limit): + #w = threading.Thread(target=self.worker) + # w = multiprocessing.Process(target=self.worker) + # self.workers.append(w) + + #for i in range(conn_limit): + # self.conn_pool.put(conn_factory()) + self.conn_factory = conn_factory + self.worker_pool = None + + + def terminate(self): + #self.quit_queue.put(()) + super(ThreadPoolHistorySyncer, self).terminate() + + +# def worker(self): +# while True: +# block_number = None +# try: +# block_number = self.queue.get(timeout=0.01) +# except queue.Empty: +# if self.quit_queue.qsize() > 0: +# #logg.debug('{} received quit'.format(threading.current_thread().getName())) +# logg.debug('{} received quit'.format(multiprocessing.current_process().name)) +# return +# continue +# conn = self.conn_pool.get() +# try: +# logg.debug('processing parent {} {}'.format(conn, block_number)) +# self.process_parent(conn, block_number) +# except IndexError: +# pass +# except RPCException as e: +# logg.error('RPC failure for block {}, resubmitting to queue: {}'.format(block, e)) +# self.queue.put(block_number) +# conn = self.conn_pool.put(conn) +# + + + def process_single(self, conn, block, tx): + self.filter.apply(conn, block, tx) + + + def process(self, conn, block): + pass + + + #def process(self, conn, block): + def get(self, conn): + if not self.running: + raise SyncDone() + + block_number = None + tx_index = None + flags = None + ((block_number, tx_index), flags) = self.backend.get() + #try: + #logg.debug('putting {}'.format(block.number)) + #self.queue.put((conn, block_number,), timeout=0.1) + #self.queue.put(block_number, timeout=0.1) + #except queue.Full: + #logg.debug('queue full, try again') + # return + task = ThreadPoolTask() + conn = self.conn_factory() + self.worker_pool.apply_async(task.poolworker, (block_number, conn,), {}, foobarcb) + + target, flags = self.backend.target() + next_block = block_number + 1 + if next_block > target: + #self.quit_queue.put(()) + self.worker_pool.close() + raise SyncDone() + self.backend.set(self.backend.block_height + 1, 0) + + +# def get(self, conn): +# try: +# r = super(ThreadedHistorySyncer, self).get(conn) +# return r +# except SyncDone as e: +# self.quit_queue.put(()) +# raise e + + + def loop(self, interval, conn): + self.worker_pool = multiprocessing.Pool(self.thread_limit) + #for w in self.workers: + # w.start() + r = super(ThreadPoolHistorySyncer, self).loop(interval, conn) + #for w in self.workers: + # w.join() + #while True: + # try: + # self.quit_queue.get_nowait() + # except queue.Empty: + # break + time.sleep(1) + self.worker_pool.join() + + logg.info('workers done {}'.format(r)) diff --git a/chainsyncer/error.py b/chainsyncer/error.py index 33b865a..0fd8458 100644 --- a/chainsyncer/error.py +++ b/chainsyncer/error.py @@ -13,3 +13,7 @@ class RequestError(Exception): class BackendError(Exception): pass + + +class AbortTx(Exception): + pass diff --git a/chainsyncer/filter.py b/chainsyncer/filter.py index 1b8b427..ea04618 100644 --- a/chainsyncer/filter.py +++ b/chainsyncer/filter.py @@ -20,7 +20,12 @@ class SyncFilter: logg.debug('added filter "{}"'.format(str(fltr))) self.filters.append(fltr) - + + + def apply_one(self, fltr, idx, conn, block, tx, session): + fltr.filter(conn, block, tx, session) + self.backend.complete_filter(idx) + def apply(self, conn, block, tx): session = None @@ -33,16 +38,15 @@ class SyncFilter: (pair, flags) = self.backend.get() for f in self.filters: if not self.backend.check_filter(i, flags): - #if flags & (1 << i) == 0: logg.debug('applying filter {} {}'.format(str(f), flags)) - f.filter(conn, block, tx, session) - self.backend.complete_filter(i) + self.apply_one(f, i, conn, block, tx, session) else: logg.debug('skipping previously applied filter {} {}'.format(str(f), flags)) i += 1 self.backend.disconnect() + class NoopFilter: def filter(self, conn, block, tx, db_session=None): diff --git a/chainsyncer/runnable/tracker.py b/chainsyncer/runnable/tracker.py deleted file mode 100644 index c8560b0..0000000 --- a/chainsyncer/runnable/tracker.py +++ /dev/null @@ -1,96 +0,0 @@ -# standard imports -import os -import sys -import logging -import time -import argparse -import sys -import re - -# external imports -import confini -from chainlib.eth.connection import HTTPConnection -from chainlib.eth.block import block_latest -from chainlib.chain import ChainSpec - -# local imports -from chainsyncer.driver import HeadSyncer -from chainsyncer.db import dsn_from_config -from chainsyncer.db.models.base import SessionBase -from chainsyncer.backend import SyncerBackend -from chainsyncer.error import LoopDone -from chainsyncer.filter import NoopFilter - -logging.basicConfig(level=logging.WARNING) -logg = logging.getLogger() - -config_dir = '/usr/local/etc/cic-syncer' - - -argparser = argparse.ArgumentParser(description='daemon that monitors transactions in new blocks') -argparser.add_argument('-p', '--provider', dest='p', type=str, help='chain rpc provider address') -argparser.add_argument('-c', type=str, default=config_dir, help='config root to use') -argparser.add_argument('-i', '--chain-spec', type=str, dest='i', help='chain spec') -argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration') -argparser.add_argument('--offset', type=int, help='block number to start sync') -argparser.add_argument('-q', type=str, default='cic-eth', help='celery queue to submit transaction tasks to') -argparser.add_argument('-v', help='be verbose', action='store_true') -argparser.add_argument('-vv', help='be more verbose', action='store_true') -args = argparser.parse_args(sys.argv[1:]) - -if args.v == True: - logging.getLogger().setLevel(logging.INFO) -elif args.vv == True: - logging.getLogger().setLevel(logging.DEBUG) - -config_dir = os.path.join(args.c) -config = confini.Config(config_dir, args.env_prefix) -config.process() -# override args -args_override = { - 'SYNCER_CHAIN_SPEC': getattr(args, 'i'), - 'ETH_PROVIDER': getattr(args, 'p'), - } -config.dict_override(args_override, 'cli flag') -config.censor('PASSWORD', 'DATABASE') -config.censor('PASSWORD', 'SSL') -logg.debug('config loaded from {}:\n{}'.format(config_dir, config)) - -#app = celery.Celery(backend=config.get('CELERY_RESULT_URL'), broker=config.get('CELERY_BROKER_URL')) - -queue = args.q - -dsn = dsn_from_config(config) -SessionBase.connect(dsn) - -conn = HTTPConnection(config.get('ETH_PROVIDER')) - -chain = ChainSpec.from_chain_str(config.get('SYNCER_CHAIN_SPEC')) - -block_offset = args.offset - - -def main(): - global block_offset - - if block_offset == None: - o = block_latest() - r = conn.do(o) - block_offset = r[1] - - syncer_backend = SyncerBackend.live(chain, 0) - syncer = HeadSyncer(syncer_backend) - fltr = NoopFilter() - syncer.add_filter(fltr) - - try: - logg.debug('block offset {}'.format(block_offset)) - syncer.loop(int(config.get('SYNCER_LOOP_INTERVAL')), conn) - except LoopDone as e: - sys.stderr.write("sync '{}' done at block {}\n".format(args.mode, e)) - - sys.exit(0) - - -if __name__ == '__main__': - main() diff --git a/chainsyncer/unittest/base.py b/chainsyncer/unittest/base.py index 5fd4b57..2591d8a 100644 --- a/chainsyncer/unittest/base.py +++ b/chainsyncer/unittest/base.py @@ -6,7 +6,7 @@ import logging from hexathon import add_0x # local imports -from chainsyncer.driver import HistorySyncer +from chainsyncer.driver.history import HistorySyncer from chainsyncer.error import NoBlockForYou logg = logging.getLogger().getChild(__name__) @@ -44,9 +44,9 @@ class MockBlock: class TestSyncer(HistorySyncer): - def __init__(self, backend, tx_counts=[]): + def __init__(self, backend, chain_interface, tx_counts=[]): self.tx_counts = tx_counts - super(TestSyncer, self).__init__(backend) + super(TestSyncer, self).__init__(backend, chain_interface) def get(self, conn): diff --git a/chainsyncer/unittest/db.py b/chainsyncer/unittest/db.py new file mode 100644 index 0000000..95b712c --- /dev/null +++ b/chainsyncer/unittest/db.py @@ -0,0 +1,55 @@ +# standard imports +import logging +import os + +# external imports +import alembic +import alembic.config + +# local imports +from chainsyncer.db.models.base import SessionBase +from chainsyncer.db import dsn_from_config +from chainsyncer.db.models.base import SessionBase + +logg = logging.getLogger(__name__) + + +class ChainSyncerDb: + + base = SessionBase + + def __init__(self, debug=False): + config = { + 'DATABASE_ENGINE': 'sqlite', + 'DATABASE_DRIVER': 'pysqlite', + 'DATABASE_NAME': 'chainsyncer.sqlite', + } + logg.debug('config {}'.format(config)) + + self.dsn = dsn_from_config(config) + + self.base.poolable = False + self.base.transactional = False + self.base.procedural = False + self.base.connect(self.dsn, debug=debug) # TODO: evaluates to "true" even if string is 0 + + rootdir = os.path.join(os.path.dirname(os.path.dirname(__file__)), '..') + dbdir = os.path.join(rootdir, 'chainsyncer', 'db') + #migrationsdir = os.path.join(dbdir, 'migrations', config.get('DATABASE_ENGINE')) + migrationsdir = os.path.join(dbdir, 'migrations', 'default') + logg.info('using migrations directory {}'.format(migrationsdir)) + + ac = alembic.config.Config(os.path.join(migrationsdir, 'alembic.ini')) + ac.set_main_option('sqlalchemy.url', self.dsn) + ac.set_main_option('script_location', migrationsdir) + + alembic.command.downgrade(ac, 'base') + alembic.command.upgrade(ac, 'head') + + + def bind_session(self, session=None): + return self.base.bind_session(session) + + + def release_session(self, session=None): + return self.base.release_session(session) diff --git a/requirements.txt b/requirements.txt index 04f3cae..f3999a4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ -confini~=0.3.6rc3 +confini>=0.3.6rc3,<0.5.0 semver==2.13.0 -hexathon~=0.0.1a7 -chainlib~=0.0.3rc2 +hexathon~=0.0.1a8 +chainlib>=0.0.9a2,<=0.1.0 diff --git a/run_tests.sh b/run_tests.sh new file mode 100644 index 0000000..f13bd41 --- /dev/null +++ b/run_tests.sh @@ -0,0 +1,12 @@ +#!/bin/bash + +set -e +set -x +for f in `ls tests/*.py`; do + python $f + if [ $? -gt 0 ]; then + exit + fi +done +set +x +set +e diff --git a/setup.cfg b/setup.cfg index e2f033f..e362b50 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = chainsyncer -version = 0.0.2a5 +version = 0.0.6a1 description = Generic blockchain syncer driver author = Louis Holbrook author_email = dev@holbrook.no @@ -26,10 +26,9 @@ python_requires = >= 3.6 packages = chainsyncer chainsyncer.db - chainsyncer.db.migrations chainsyncer.db.models - chainsyncer.runnable chainsyncer.backend + chainsyncer.driver chainsyncer.unittest [options.package_data] diff --git a/sql/postgresql/1.sql b/sql/postgresql/1.sql deleted file mode 100644 index 7487761..0000000 --- a/sql/postgresql/1.sql +++ /dev/null @@ -1,12 +0,0 @@ -DROP TABLE IF EXISTS chain_sync CASCADE; -CREATE TABLE IF NOT EXISTS chain_sync ( - id serial primary key not null, - blockchain varchar not null, - block_start int not null default 0, - tx_start int not null default 0, - block_cursor int not null default 0, - tx_cursor int not null default 0, - block_target int default null, - date_created timestamp not null, - date_updated timestamp default null -); diff --git a/sql/postgresql/2.sql b/sql/postgresql/2.sql deleted file mode 100644 index ff940d3..0000000 --- a/sql/postgresql/2.sql +++ /dev/null @@ -1,12 +0,0 @@ -DROP TABLE IF EXISTS chain_sync_filter; -CREATE TABLE IF NOT EXISTS chain_sync_filter ( - id serial primary key not null, - chain_sync_id integer not null, - flags bytea default null, - flags_start bytea default null, - count integer not null default 0, - digest char(64) not null, - CONSTRAINT fk_chain_sync - FOREIGN KEY(chain_sync_id) - REFERENCES chain_sync(id) -); diff --git a/sql/sqlite/1.sql b/sql/sqlite/1.sql deleted file mode 100644 index 0f115d4..0000000 --- a/sql/sqlite/1.sql +++ /dev/null @@ -1,11 +0,0 @@ -CREATE TABLE IF NOT EXISTS chain_sync ( - id integer primary key autoincrement, - blockchain varchar not null, - block_start integer not null default 0, - tx_start integer not null default 0, - block_cursor integer not null default 0, - tx_cursor integer not null default 0, - block_target integer default null, - date_created timestamp not null, - date_updated timestamp default null -); diff --git a/sql/sqlite/2.sql b/sql/sqlite/2.sql deleted file mode 100644 index 90be6b0..0000000 --- a/sql/sqlite/2.sql +++ /dev/null @@ -1,11 +0,0 @@ -CREATE TABLE IF NOT EXISTS chain_sync_filter ( - id integer primary key autoincrement not null, - chain_sync_id integer not null, - flags bytea default null, - flags_start bytea default null, - count integer not null default 0, - digest char(64) not null, - CONSTRAINT fk_chain_sync - FOREIGN KEY(chain_sync_id) - REFERENCES chain_sync(id) -); diff --git a/test_requirements.txt b/test_requirements.txt new file mode 100644 index 0000000..1baa8cd --- /dev/null +++ b/test_requirements.txt @@ -0,0 +1,4 @@ +chainlib-eth~=0.0.9a4 +psycopg2==2.8.6 +SQLAlchemy==1.3.20 +alembic==1.4.2 diff --git a/tests/base.py b/tests/base.py deleted file mode 100644 index 2d0a99c..0000000 --- a/tests/base.py +++ /dev/null @@ -1,54 +0,0 @@ -# standard imports -import logging -import unittest -import tempfile -import os -#import pysqlite - -# external imports -from chainlib.chain import ChainSpec - -# local imports -from chainsyncer.db import dsn_from_config -from chainsyncer.db.models.base import SessionBase - -script_dir = os.path.realpath(os.path.dirname(__file__)) - -logging.basicConfig(level=logging.DEBUG) - - -class TestBase(unittest.TestCase): - - def setUp(self): - db_dir = tempfile.mkdtemp() - self.db_path = os.path.join(db_dir, 'test.sqlite') - config = { - 'DATABASE_ENGINE': 'sqlite', - 'DATABASE_DRIVER': 'pysqlite', - 'DATABASE_NAME': self.db_path, - } - dsn = dsn_from_config(config) - SessionBase.poolable = False - SessionBase.transactional = False - SessionBase.procedural = False - SessionBase.connect(dsn, debug=False) - - f = open(os.path.join(script_dir, '..', 'sql', 'sqlite', '1.sql'), 'r') - sql = f.read() - f.close() - - conn = SessionBase.engine.connect() - conn.execute(sql) - - f = open(os.path.join(script_dir, '..', 'sql', 'sqlite', '2.sql'), 'r') - sql = f.read() - f.close() - - conn = SessionBase.engine.connect() - conn.execute(sql) - - self.chain_spec = ChainSpec('evm', 'foo', 42, 'bar') - - def tearDown(self): - SessionBase.disconnect() - os.unlink(self.db_path) diff --git a/tests/chainsyncer_base.py b/tests/chainsyncer_base.py new file mode 100644 index 0000000..34ceab7 --- /dev/null +++ b/tests/chainsyncer_base.py @@ -0,0 +1,57 @@ +# standard imports +import logging +import unittest +import tempfile +import os +#import pysqlite + +# external imports +from chainlib.chain import ChainSpec +from chainlib.interface import ChainInterface +from chainlib.eth.tx import receipt + +# local imports +from chainsyncer.db import dsn_from_config +from chainsyncer.db.models.base import SessionBase + +# test imports +from chainsyncer.unittest.db import ChainSyncerDb + +script_dir = os.path.realpath(os.path.dirname(__file__)) + +logging.basicConfig(level=logging.DEBUG) + + +class EthChainInterface(ChainInterface): + + def __init__(self): + self._tx_receipt = receipt + + +class TestBase(unittest.TestCase): + + interface = EthChainInterface() + + def setUp(self): + self.db = ChainSyncerDb() + + #f = open(os.path.join(script_dir, '..', 'sql', 'sqlite', '1.sql'), 'r') + #sql = f.read() + #f.close() + + #conn = SessionBase.engine.connect() + #conn.execute(sql) + + #f = open(os.path.join(script_dir, '..', 'sql', 'sqlite', '2.sql'), 'r') + #sql = f.read() + #f.close() + + #conn = SessionBase.engine.connect() + #conn.execute(sql) + self.session = self.db.bind_session() + self.chain_spec = ChainSpec('evm', 'foo', 42, 'bar') + + def tearDown(self): + self.session.commit() + self.db.release_session(self.session) + #os.unlink(self.db_path) diff --git a/tests/test_basic.py b/tests/test_basic.py index cc9e2b9..af8ab53 100644 --- a/tests/test_basic.py +++ b/tests/test_basic.py @@ -8,7 +8,7 @@ from chainlib.chain import ChainSpec from chainsyncer.backend.memory import MemBackend # testutil imports -from tests.base import TestBase +from tests.chainsyncer_base import TestBase class TestBasic(TestBase): diff --git a/tests/test_database.py b/tests/test_database.py index 63dfac5..298e639 100644 --- a/tests/test_database.py +++ b/tests/test_database.py @@ -11,7 +11,7 @@ from chainsyncer.db.models.filter import BlockchainSyncFilter from chainsyncer.backend.sql import SQLBackend # testutil imports -from tests.base import TestBase +from tests.chainsyncer_base import TestBase logg = logging.getLogger() diff --git a/tests/test_helo.py b/tests/test_helo.py new file mode 100644 index 0000000..bc760b3 --- /dev/null +++ b/tests/test_helo.py @@ -0,0 +1,15 @@ +# standard imports +import unittest + +# local imports +from tests.chainsyncer_base import TestBase + + +class TestHelo(TestBase): + + def test_helo(self): + pass + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_interrupt.py b/tests/test_interrupt.py index a63eb56..2df7d85 100644 --- a/tests/test_interrupt.py +++ b/tests/test_interrupt.py @@ -16,7 +16,7 @@ from chainsyncer.backend.file import ( ) # test imports -from tests.base import TestBase +from tests.chainsyncer_base import TestBase from chainsyncer.unittest.base import ( MockBlock, MockConn, @@ -77,7 +77,7 @@ class TestInterrupt(TestBase): ] - def assert_filter_interrupt(self, vector): + def assert_filter_interrupt(self, vector, chain_interface): logg.debug('running vector {} {}'.format(str(self.backend), vector)) @@ -85,7 +85,7 @@ class TestInterrupt(TestBase): for v in vector: z += v - syncer = TestSyncer(self.backend, vector) + syncer = TestSyncer(self.backend, chain_interface, vector) filters = [ CountFilter('foo'), @@ -114,7 +114,7 @@ class TestInterrupt(TestBase): def test_filter_interrupt_memory(self): for vector in self.vectors: self.backend = MemBackend(self.chain_spec, None, target_block=len(vector)) - self.assert_filter_interrupt(vector) + self.assert_filter_interrupt(vector, self.interface) def test_filter_interrupt_file(self): @@ -123,13 +123,13 @@ class TestInterrupt(TestBase): d = tempfile.mkdtemp() #os.makedirs(data_dir_for(self.chain_spec, 'foo', d)) self.backend = FileBackend.initial(self.chain_spec, len(vector), base_dir=d) #'foo', base_dir=d) - self.assert_filter_interrupt(vector) + self.assert_filter_interrupt(vector, self.interface) def test_filter_interrupt_sql(self): for vector in self.vectors: self.backend = SQLBackend.initial(self.chain_spec, len(vector)) - self.assert_filter_interrupt(vector) + self.assert_filter_interrupt(vector, self.interface)