Merge branch 'lash/chainlib-basedir' into '0.0.5-dev'
Implement chainlib basedir override See merge request chaintool/chainsyncer!6
This commit is contained in:
commit
fa95d5c192
1
.gitignore
vendored
1
.gitignore
vendored
@ -5,3 +5,4 @@ __pycache__
|
||||
gmon.out
|
||||
build/
|
||||
dist/
|
||||
*.sqlite
|
||||
|
@ -1 +1 @@
|
||||
include *requirements.txt LICENSE.txt sql/**/*
|
||||
include *requirements.txt LICENSE.txt chainsyncer/db/migrations/default/* chainsyncer/db/migrations/default/versions/* chainsyncer/db/migrations/default/versions/src/*
|
||||
|
@ -9,7 +9,17 @@ class Backend:
|
||||
def __init__(self, flags_reversed=False):
|
||||
self.filter_count = 0
|
||||
self.flags_reversed = flags_reversed
|
||||
|
||||
|
||||
self.block_height_offset = 0
|
||||
self.tx_index_offset = 0
|
||||
|
||||
self.block_height_cursor = 0
|
||||
self.tx_index_cursor = 0
|
||||
|
||||
self.block_height_target = 0
|
||||
self.tx_index_target = 0
|
||||
|
||||
|
||||
|
||||
def check_filter(self, n, flags):
|
||||
if self.flags_reversed:
|
||||
@ -20,3 +30,16 @@ class Backend:
|
||||
pass
|
||||
return False
|
||||
return flags & (1 << n) > 0
|
||||
|
||||
|
||||
|
||||
def chain(self):
|
||||
"""Returns chain spec for syncer
|
||||
|
||||
:returns: Chain spec
|
||||
:rtype chain_spec: cic_registry.chain.ChainSpec
|
||||
"""
|
||||
return self.chain_spec
|
||||
|
||||
def __str__(self):
|
||||
return "syncerbackend chain {} start {} target {}".format(self.chain(), self.start(), self.target())
|
||||
|
@ -28,15 +28,6 @@ class FileBackend(Backend):
|
||||
super(FileBackend, self).__init__(flags_reversed=True)
|
||||
self.object_data_dir = data_dir_for(chain_spec, object_id, base_dir=base_dir)
|
||||
|
||||
self.block_height_offset = 0
|
||||
self.tx_index_offset = 0
|
||||
|
||||
self.block_height_cursor = 0
|
||||
self.tx_index_cursor = 0
|
||||
|
||||
self.block_height_target = 0
|
||||
self.tx_index_target = 0
|
||||
|
||||
self.object_id = object_id
|
||||
self.db_object = None
|
||||
self.db_object_filter = None
|
||||
@ -206,8 +197,18 @@ class FileBackend(Backend):
|
||||
o = FileBackend(chain_spec, uu, base_dir=base_dir)
|
||||
o.__set(target_block_height, 0, 'target')
|
||||
o.__set(start_block_height, 0, 'offset')
|
||||
o.__set(start_block_height, 0, 'cursor')
|
||||
|
||||
return o
|
||||
|
||||
|
||||
@staticmethod
|
||||
def live(chain_spec, block_height, base_dir=base_dir):
|
||||
uu = FileBackend.create_object(chain_spec, base_dir=base_dir)
|
||||
o = FileBackend(chain_spec, uu, base_dir=base_dir)
|
||||
o.__set(block_height, 0, 'offset')
|
||||
o.__set(block_height, 0, 'cursor')
|
||||
|
||||
#return uu
|
||||
return o
|
||||
|
||||
|
||||
@ -245,15 +246,20 @@ class FileBackend(Backend):
|
||||
|
||||
|
||||
@staticmethod
|
||||
def resume(chain_spec, base_dir=base_dir):
|
||||
return FileBackend.__sorted_entries(chain_spec, base_dir=base_dir)
|
||||
def resume(chain_spec, block_height, base_dir=base_dir):
|
||||
try:
|
||||
return FileBackend.__sorted_entries(chain_spec, base_dir=base_dir)
|
||||
except FileNotFoundError:
|
||||
return []
|
||||
|
||||
|
||||
@staticmethod
|
||||
def first(chain_spec, base_dir=base_dir):
|
||||
|
||||
entries = FileBackend.__sorted_entries(chain_spec, base_dir=base_dir)
|
||||
|
||||
entries = []
|
||||
try:
|
||||
entries = FileBackend.__sorted_entries(chain_spec, base_dir=base_dir)
|
||||
except FileNotFoundError:
|
||||
return entries
|
||||
return entries[len(entries)-1]
|
||||
|
||||
|
||||
|
@ -19,6 +19,7 @@ class MemBackend(Backend):
|
||||
self.target_block = target_block
|
||||
self.db_session = None
|
||||
self.filter_names = []
|
||||
self.filter_states = {}
|
||||
|
||||
|
||||
def connect(self):
|
||||
|
@ -78,14 +78,6 @@ class SQLBackend(Backend):
|
||||
self.db_session.close()
|
||||
self.db_session = None
|
||||
|
||||
|
||||
def chain(self):
|
||||
"""Returns chain spec for syncer
|
||||
|
||||
:returns: Chain spec
|
||||
:rtype chain_spec: cic_registry.chain.ChainSpec
|
||||
"""
|
||||
return self.chain_spec
|
||||
|
||||
|
||||
def get(self):
|
||||
@ -313,5 +305,3 @@ class SQLBackend(Backend):
|
||||
self.disconnect()
|
||||
|
||||
|
||||
def __str__(self):
|
||||
return "syncerbackend chain {} start {} target {}".format(self.chain(), self.start(), self.target())
|
||||
|
0
chainsyncer/db/migrations/__init__.py
Normal file
0
chainsyncer/db/migrations/__init__.py
Normal file
1
chainsyncer/db/migrations/default/README
Normal file
1
chainsyncer/db/migrations/default/README
Normal file
@ -0,0 +1 @@
|
||||
Generic single-database configuration.
|
85
chainsyncer/db/migrations/default/alembic.ini
Normal file
85
chainsyncer/db/migrations/default/alembic.ini
Normal file
@ -0,0 +1,85 @@
|
||||
# A generic, single database configuration.
|
||||
|
||||
[alembic]
|
||||
# path to migration scripts
|
||||
script_location = .
|
||||
|
||||
# template used to generate migration files
|
||||
# file_template = %%(rev)s_%%(slug)s
|
||||
|
||||
# timezone to use when rendering the date
|
||||
# within the migration file as well as the filename.
|
||||
# string value is passed to dateutil.tz.gettz()
|
||||
# leave blank for localtime
|
||||
# timezone =
|
||||
|
||||
# max length of characters to apply to the
|
||||
# "slug" field
|
||||
# truncate_slug_length = 40
|
||||
|
||||
# set to 'true' to run the environment during
|
||||
# the 'revision' command, regardless of autogenerate
|
||||
# revision_environment = false
|
||||
|
||||
# set to 'true' to allow .pyc and .pyo files without
|
||||
# a source .py file to be detected as revisions in the
|
||||
# versions/ directory
|
||||
# sourceless = false
|
||||
|
||||
# version location specification; this defaults
|
||||
# to ./versions. When using multiple version
|
||||
# directories, initial revisions must be specified with --version-path
|
||||
# version_locations = %(here)s/bar %(here)s/bat ./versions
|
||||
|
||||
# the output encoding used when revision files
|
||||
# are written from script.py.mako
|
||||
# output_encoding = utf-8
|
||||
|
||||
sqlalchemy.url = driver://user:pass@localhost/dbname
|
||||
|
||||
|
||||
[post_write_hooks]
|
||||
# post_write_hooks defines scripts or Python functions that are run
|
||||
# on newly generated revision scripts. See the documentation for further
|
||||
# detail and examples
|
||||
|
||||
# format using "black" - use the console_scripts runner, against the "black" entrypoint
|
||||
# hooks=black
|
||||
# black.type=console_scripts
|
||||
# black.entrypoint=black
|
||||
# black.options=-l 79
|
||||
|
||||
# Logging configuration
|
||||
[loggers]
|
||||
keys = root,sqlalchemy,alembic
|
||||
|
||||
[handlers]
|
||||
keys = console
|
||||
|
||||
[formatters]
|
||||
keys = generic
|
||||
|
||||
[logger_root]
|
||||
#level = WARN
|
||||
handlers = console
|
||||
qualname =
|
||||
|
||||
[logger_sqlalchemy]
|
||||
level = WARN
|
||||
handlers =
|
||||
qualname = sqlalchemy.engine
|
||||
|
||||
[logger_alembic]
|
||||
level = INFO
|
||||
handlers =
|
||||
qualname = alembic
|
||||
|
||||
[handler_console]
|
||||
class = StreamHandler
|
||||
args = (sys.stderr,)
|
||||
level = NOTSET
|
||||
formatter = generic
|
||||
|
||||
[formatter_generic]
|
||||
format = %(levelname)-5.5s [%(name)s] %(message)s
|
||||
datefmt = %H:%M:%S
|
77
chainsyncer/db/migrations/default/env.py
Normal file
77
chainsyncer/db/migrations/default/env.py
Normal file
@ -0,0 +1,77 @@
|
||||
from logging.config import fileConfig
|
||||
|
||||
from sqlalchemy import engine_from_config
|
||||
from sqlalchemy import pool
|
||||
|
||||
from alembic import context
|
||||
|
||||
# this is the Alembic Config object, which provides
|
||||
# access to the values within the .ini file in use.
|
||||
config = context.config
|
||||
|
||||
# Interpret the config file for Python logging.
|
||||
# This line sets up loggers basically.
|
||||
fileConfig(config.config_file_name)
|
||||
|
||||
# add your model's MetaData object here
|
||||
# for 'autogenerate' support
|
||||
# from myapp import mymodel
|
||||
# target_metadata = mymodel.Base.metadata
|
||||
target_metadata = None
|
||||
|
||||
# other values from the config, defined by the needs of env.py,
|
||||
# can be acquired:
|
||||
# my_important_option = config.get_main_option("my_important_option")
|
||||
# ... etc.
|
||||
|
||||
|
||||
def run_migrations_offline():
|
||||
"""Run migrations in 'offline' mode.
|
||||
|
||||
This configures the context with just a URL
|
||||
and not an Engine, though an Engine is acceptable
|
||||
here as well. By skipping the Engine creation
|
||||
we don't even need a DBAPI to be available.
|
||||
|
||||
Calls to context.execute() here emit the given string to the
|
||||
script output.
|
||||
|
||||
"""
|
||||
url = config.get_main_option("sqlalchemy.url")
|
||||
context.configure(
|
||||
url=url,
|
||||
target_metadata=target_metadata,
|
||||
literal_binds=True,
|
||||
dialect_opts={"paramstyle": "named"},
|
||||
)
|
||||
|
||||
with context.begin_transaction():
|
||||
context.run_migrations()
|
||||
|
||||
|
||||
def run_migrations_online():
|
||||
"""Run migrations in 'online' mode.
|
||||
|
||||
In this scenario we need to create an Engine
|
||||
and associate a connection with the context.
|
||||
|
||||
"""
|
||||
connectable = engine_from_config(
|
||||
config.get_section(config.config_ini_section),
|
||||
prefix="sqlalchemy.",
|
||||
poolclass=pool.NullPool,
|
||||
)
|
||||
|
||||
with connectable.connect() as connection:
|
||||
context.configure(
|
||||
connection=connection, target_metadata=target_metadata
|
||||
)
|
||||
|
||||
with context.begin_transaction():
|
||||
context.run_migrations()
|
||||
|
||||
|
||||
if context.is_offline_mode():
|
||||
run_migrations_offline()
|
||||
else:
|
||||
run_migrations_online()
|
37
chainsyncer/db/migrations/default/export.py
Normal file
37
chainsyncer/db/migrations/default/export.py
Normal file
@ -0,0 +1,37 @@
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
from chainsyncer.db.migrations.default.versions.src.sync import (
|
||||
upgrade as upgrade_sync,
|
||||
downgrade as downgrade_sync,
|
||||
)
|
||||
|
||||
from chainsyncer.db.migrations.default.versions.src.sync_tx import (
|
||||
upgrade as upgrade_sync_tx,
|
||||
downgrade as downgrade_sync_tx,
|
||||
)
|
||||
|
||||
def chainsyncer_upgrade(major=0, minor=0, patch=3):
|
||||
r0_0_1_u()
|
||||
if patch >= 3:
|
||||
r0_0_3_u()
|
||||
|
||||
def chainsyncer_downgrade(major=0, minor=0, patch=3):
|
||||
if patch >= 3:
|
||||
r0_0_3_d()
|
||||
r0_0_1_d()
|
||||
|
||||
def r0_0_1_u():
|
||||
upgrade_sync()
|
||||
|
||||
def r0_0_1_d():
|
||||
downgrade_sync()
|
||||
|
||||
|
||||
# 0.0.3
|
||||
|
||||
def r0_0_3_u():
|
||||
upgrade_sync_tx()
|
||||
|
||||
def r0_0_3_d():
|
||||
downgrade_sync_tx()
|
24
chainsyncer/db/migrations/default/script.py.mako
Normal file
24
chainsyncer/db/migrations/default/script.py.mako
Normal file
@ -0,0 +1,24 @@
|
||||
"""${message}
|
||||
|
||||
Revision ID: ${up_revision}
|
||||
Revises: ${down_revision | comma,n}
|
||||
Create Date: ${create_date}
|
||||
|
||||
"""
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
${imports if imports else ""}
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = ${repr(up_revision)}
|
||||
down_revision = ${repr(down_revision)}
|
||||
branch_labels = ${repr(branch_labels)}
|
||||
depends_on = ${repr(depends_on)}
|
||||
|
||||
|
||||
def upgrade():
|
||||
${upgrades if upgrades else "pass"}
|
||||
|
||||
|
||||
def downgrade():
|
||||
${downgrades if downgrades else "pass"}
|
@ -0,0 +1,14 @@
|
||||
"""base setup
|
||||
|
||||
Revision ID: 452ecfa81de3
|
||||
Revises:
|
||||
Create Date: 2021-07-16 16:29:32.460027
|
||||
|
||||
"""
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = '452ecfa81de3'
|
||||
down_revision = None
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
||||
from chainsyncer.db.migrations.default.versions.src.sync import upgrade, downgrade
|
@ -0,0 +1,14 @@
|
||||
"""sync-tx
|
||||
|
||||
Revision ID: a2ce6826c5eb
|
||||
Revises: 452ecfa81de3
|
||||
Create Date: 2021-07-16 18:17:53.439721
|
||||
|
||||
"""
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = 'a2ce6826c5eb'
|
||||
down_revision = '452ecfa81de3'
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
||||
from chainsyncer.db.migrations.default.versions.src.sync_tx import upgrade, downgrade
|
32
chainsyncer/db/migrations/default/versions/src/sync.py
Normal file
32
chainsyncer/db/migrations/default/versions/src/sync.py
Normal file
@ -0,0 +1,32 @@
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
|
||||
def upgrade():
|
||||
op.create_table(
|
||||
'chain_sync',
|
||||
sa.Column('id', sa.Integer, primary_key=True),
|
||||
sa.Column('blockchain', sa.String, nullable=False),
|
||||
sa.Column('block_start', sa.Integer, nullable=False, default=0),
|
||||
sa.Column('tx_start', sa.Integer, nullable=False, default=0),
|
||||
sa.Column('block_cursor', sa.Integer, nullable=False, default=0),
|
||||
sa.Column('tx_cursor', sa.Integer, nullable=False, default=0),
|
||||
sa.Column('block_target', sa.Integer, nullable=True),
|
||||
sa.Column('date_created', sa.DateTime, nullable=False),
|
||||
sa.Column('date_updated', sa.DateTime),
|
||||
)
|
||||
|
||||
op.create_table(
|
||||
'chain_sync_filter',
|
||||
sa.Column('id', sa.Integer, primary_key=True),
|
||||
sa.Column('chain_sync_id', sa.Integer, sa.ForeignKey('chain_sync.id'), nullable=True),
|
||||
sa.Column('flags', sa.LargeBinary, nullable=True),
|
||||
sa.Column('flags_start', sa.LargeBinary, nullable=True),
|
||||
sa.Column('count', sa.Integer, nullable=False, default=0),
|
||||
sa.Column('digest', sa.String(64), nullable=False),
|
||||
)
|
||||
|
||||
|
||||
def downgrade():
|
||||
op.drop_table('chain_sync_filter')
|
||||
op.drop_table('chain_sync')
|
17
chainsyncer/db/migrations/default/versions/src/sync_tx.py
Normal file
17
chainsyncer/db/migrations/default/versions/src/sync_tx.py
Normal file
@ -0,0 +1,17 @@
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
|
||||
def upgrade():
|
||||
op.create_table(
|
||||
'chain_sync_tx',
|
||||
sa.Column('id', sa.Integer, primary_key=True),
|
||||
sa.Column('blockchain', sa.String, nullable=False),
|
||||
sa.Column('chain_sync_id', sa.Integer, sa.ForeignKey('chain_sync.id'), nullable=False),
|
||||
sa.Column('flags', sa.LargeBinary, nullable=True),
|
||||
sa.Column('block', sa.Integer, nullable=False),
|
||||
sa.Column('tx', sa.Integer, nullable=False),
|
||||
)
|
||||
|
||||
def downgrade():
|
||||
op.drop_table('chain_sync_tx')
|
@ -1,36 +1,37 @@
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
def chainsyncer_upgrade(major=0, minor=0, patch=1):
|
||||
r0_0_1_u()
|
||||
from chainsyncer.db.migrations.default.versions.tags.sync import
|
||||
upgrade as upgrade_sync,
|
||||
downgrade as downgrade_sync,
|
||||
)
|
||||
|
||||
def chainsyncer_downgrade(major=0, minor=0, patch=1):
|
||||
from chainsyncer.db.migrations.default.versions.tags.sync_tx import
|
||||
upgrade as upgrade_sync_tx,
|
||||
downgrade as downgrade_sync_tx,
|
||||
)
|
||||
|
||||
def chainsyncer_upgrade(major=0, minor=0, patch=3):
|
||||
r0_0_1_u()
|
||||
if patch >= 3:
|
||||
r0_0_3_u()
|
||||
|
||||
def chainsyncer_downgrade(major=0, minor=0, patch=3):
|
||||
if patch >= 3:
|
||||
r0_0_3_d()
|
||||
r0_0_1_d()
|
||||
|
||||
def r0_0_1_u():
|
||||
op.create_table(
|
||||
'chain_sync',
|
||||
sa.Column('id', sa.Integer, primary_key=True),
|
||||
sa.Column('blockchain', sa.String, nullable=False),
|
||||
sa.Column('block_start', sa.Integer, nullable=False, default=0),
|
||||
sa.Column('tx_start', sa.Integer, nullable=False, default=0),
|
||||
sa.Column('block_cursor', sa.Integer, nullable=False, default=0),
|
||||
sa.Column('tx_cursor', sa.Integer, nullable=False, default=0),
|
||||
sa.Column('block_target', sa.Integer, nullable=True),
|
||||
sa.Column('date_created', sa.DateTime, nullable=False),
|
||||
sa.Column('date_updated', sa.DateTime),
|
||||
)
|
||||
|
||||
op.create_table(
|
||||
'chain_sync_filter',
|
||||
sa.Column('id', sa.Integer, primary_key=True),
|
||||
sa.Column('chain_sync_id', sa.Integer, sa.ForeignKey('chain_sync.id'), nullable=True),
|
||||
sa.Column('flags', sa.LargeBinary, nullable=True),
|
||||
sa.Column('flags_start', sa.LargeBinary, nullable=True),
|
||||
sa.Column('count', sa.Integer, nullable=False, default=0),
|
||||
sa.Column('digest', sa.String(64), nullable=False),
|
||||
)
|
||||
upgrade_sync()
|
||||
|
||||
def r0_0_1_d():
|
||||
op.drop_table('chain_sync_filter')
|
||||
op.drop_table('chain_sync')
|
||||
downgrade_sync()
|
||||
|
||||
|
||||
# 0.0.3
|
||||
|
||||
def r0_0_3_u():
|
||||
upgrade_sync_tx()
|
||||
|
||||
def r0_0_3_d():
|
||||
downgrade_sync_tx()
|
||||
|
0
chainsyncer/db/models/__init__.py
Normal file
0
chainsyncer/db/models/__init__.py
Normal file
@ -120,3 +120,4 @@ class SessionBase(Model):
|
||||
logg.debug('destroying session {}'.format(session_key))
|
||||
session.commit()
|
||||
session.close()
|
||||
del SessionBase.localsessions[session_key]
|
||||
|
@ -1,209 +0,0 @@
|
||||
# standard imports
|
||||
import uuid
|
||||
import logging
|
||||
import time
|
||||
import signal
|
||||
import json
|
||||
|
||||
# external imports
|
||||
from chainlib.eth.block import (
|
||||
block_by_number,
|
||||
Block,
|
||||
)
|
||||
from chainlib.eth.tx import (
|
||||
receipt,
|
||||
transaction,
|
||||
Tx,
|
||||
)
|
||||
from chainlib.error import JSONRPCException
|
||||
|
||||
# local imports
|
||||
from chainsyncer.filter import SyncFilter
|
||||
from chainsyncer.error import (
|
||||
SyncDone,
|
||||
NoBlockForYou,
|
||||
)
|
||||
|
||||
logg = logging.getLogger().getChild(__name__)
|
||||
|
||||
|
||||
def noop_callback(block, tx):
|
||||
logg.debug('noop callback ({},{})'.format(block, tx))
|
||||
|
||||
|
||||
class Syncer:
|
||||
|
||||
running_global = True
|
||||
yield_delay=0.005
|
||||
signal_request = [signal.SIGINT, signal.SIGTERM]
|
||||
signal_set = False
|
||||
|
||||
def __init__(self, backend, pre_callback=None, block_callback=None, post_callback=None):
|
||||
self.cursor = None
|
||||
self.running = True
|
||||
self.backend = backend
|
||||
self.filter = SyncFilter(backend)
|
||||
self.block_callback = block_callback
|
||||
self.pre_callback = pre_callback
|
||||
self.post_callback = post_callback
|
||||
if not Syncer.signal_set:
|
||||
for sig in Syncer.signal_request:
|
||||
signal.signal(sig, Syncer.__sig_terminate)
|
||||
Syncer.signal_set = True
|
||||
|
||||
|
||||
@staticmethod
|
||||
def __sig_terminate(sig, frame):
|
||||
logg.warning('got signal {}'.format(sig))
|
||||
Syncer.terminate()
|
||||
|
||||
|
||||
@staticmethod
|
||||
def terminate():
|
||||
logg.info('termination requested!')
|
||||
Syncer.running_global = False
|
||||
|
||||
|
||||
def chain(self):
|
||||
"""Returns the string representation of the chain spec for the chain the syncer is running on.
|
||||
|
||||
:returns: Chain spec string
|
||||
:rtype: str
|
||||
"""
|
||||
return self.bc_cache.chain()
|
||||
|
||||
|
||||
def add_filter(self, f):
|
||||
self.filter.add(f)
|
||||
self.backend.register_filter(str(f))
|
||||
|
||||
|
||||
def process_single(self, conn, block, tx):
|
||||
self.backend.set(block.number, tx.index)
|
||||
self.filter.apply(conn, block, tx)
|
||||
|
||||
|
||||
class BlockPollSyncer(Syncer):
|
||||
|
||||
def __init__(self, backend, pre_callback=None, block_callback=None, post_callback=None):
|
||||
super(BlockPollSyncer, self).__init__(backend, pre_callback, block_callback, post_callback)
|
||||
|
||||
|
||||
def loop(self, interval, conn):
|
||||
(pair, fltr) = self.backend.get()
|
||||
start_tx = pair[1]
|
||||
|
||||
while self.running and Syncer.running_global:
|
||||
if self.pre_callback != None:
|
||||
self.pre_callback()
|
||||
while True and Syncer.running_global:
|
||||
if start_tx > 0:
|
||||
start_tx -= 1
|
||||
continue
|
||||
try:
|
||||
block = self.get(conn)
|
||||
except SyncDone as e:
|
||||
logg.info('sync done: {}'.format(e))
|
||||
return self.backend.get()
|
||||
except NoBlockForYou as e:
|
||||
break
|
||||
# TODO: To properly handle this, ensure that previous request is rolled back
|
||||
# except sqlalchemy.exc.OperationalError as e:
|
||||
# logg.error('database error: {}'.format(e))
|
||||
# break
|
||||
|
||||
if self.block_callback != None:
|
||||
self.block_callback(block, None)
|
||||
|
||||
last_block = block
|
||||
self.process(conn, block)
|
||||
start_tx = 0
|
||||
time.sleep(self.yield_delay)
|
||||
if self.post_callback != None:
|
||||
self.post_callback()
|
||||
time.sleep(interval)
|
||||
|
||||
|
||||
class HeadSyncer(BlockPollSyncer):
|
||||
|
||||
def process(self, conn, block):
|
||||
(pair, fltr) = self.backend.get()
|
||||
logg.debug('process block {} (backend {}:{})'.format(block, pair, fltr))
|
||||
i = pair[1] # set tx index from previous
|
||||
tx = None
|
||||
while True:
|
||||
try:
|
||||
tx = block.tx(i)
|
||||
except AttributeError:
|
||||
o = transaction(block.txs[i])
|
||||
r = conn.do(o)
|
||||
tx = Tx(Tx.src_normalize(r), block=block)
|
||||
except IndexError as e:
|
||||
logg.debug('index error syncer rcpt get {}'.format(e))
|
||||
self.backend.set(block.number + 1, 0)
|
||||
break
|
||||
|
||||
# TODO: Move specifics to eth subpackage, receipts are not a global concept
|
||||
rcpt = conn.do(receipt(tx.hash))
|
||||
if rcpt != None:
|
||||
tx.apply_receipt(Tx.src_normalize(rcpt))
|
||||
|
||||
self.process_single(conn, block, tx)
|
||||
self.backend.reset_filter()
|
||||
|
||||
i += 1
|
||||
|
||||
|
||||
def get(self, conn):
|
||||
(height, flags) = self.backend.get()
|
||||
block_number = height[0]
|
||||
block_hash = []
|
||||
o = block_by_number(block_number)
|
||||
r = conn.do(o)
|
||||
if r == None:
|
||||
raise NoBlockForYou()
|
||||
b = Block(r)
|
||||
b.txs = b.txs[height[1]:]
|
||||
|
||||
return b
|
||||
|
||||
|
||||
def __str__(self):
|
||||
return '[headsyncer] {}'.format(str(self.backend))
|
||||
|
||||
|
||||
class HistorySyncer(HeadSyncer):
|
||||
|
||||
def __init__(self, backend, pre_callback=None, block_callback=None, post_callback=None):
|
||||
super(HeadSyncer, self).__init__(backend, pre_callback, block_callback, post_callback)
|
||||
self.block_target = None
|
||||
(block_number, flags) = self.backend.target()
|
||||
if block_number == None:
|
||||
raise AttributeError('backend has no future target. Use HeadSyner instead')
|
||||
self.block_target = block_number
|
||||
logg.debug('block target {}'.format(self.block_target))
|
||||
|
||||
|
||||
def get(self, conn):
|
||||
(height, flags) = self.backend.get()
|
||||
if self.block_target < height[0]:
|
||||
raise SyncDone(self.block_target)
|
||||
block_number = height[0]
|
||||
block_hash = []
|
||||
o = block_by_number(block_number)
|
||||
try:
|
||||
r = conn.do(o)
|
||||
# TODO: Disambiguate whether error is temporary or permanent, if permanent, SyncDone should be raised, because a historical sync is attempted into the future
|
||||
except JSONRPCException:
|
||||
r = None
|
||||
if r == None:
|
||||
raise SyncDone() #NoBlockForYou()
|
||||
b = Block(r)
|
||||
|
||||
return b
|
||||
|
||||
|
||||
def __str__(self):
|
||||
return '[historysyncer] {}'.format(str(self.backend))
|
||||
|
||||
|
1
chainsyncer/driver/__init__.py
Normal file
1
chainsyncer/driver/__init__.py
Normal file
@ -0,0 +1 @@
|
||||
from .base import Syncer
|
73
chainsyncer/driver/base.py
Normal file
73
chainsyncer/driver/base.py
Normal file
@ -0,0 +1,73 @@
|
||||
# standard imports
|
||||
import uuid
|
||||
import logging
|
||||
import time
|
||||
import signal
|
||||
import json
|
||||
|
||||
# external imports
|
||||
from chainlib.error import JSONRPCException
|
||||
|
||||
# local imports
|
||||
from chainsyncer.filter import SyncFilter
|
||||
from chainsyncer.error import (
|
||||
SyncDone,
|
||||
NoBlockForYou,
|
||||
)
|
||||
|
||||
logg = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def noop_callback(block, tx):
|
||||
logg.debug('noop callback ({},{})'.format(block, tx))
|
||||
|
||||
|
||||
class Syncer:
|
||||
|
||||
running_global = True
|
||||
yield_delay=0.005
|
||||
signal_request = [signal.SIGINT, signal.SIGTERM]
|
||||
signal_set = False
|
||||
name = 'base'
|
||||
|
||||
def __init__(self, backend, chain_interface, pre_callback=None, block_callback=None, post_callback=None):
|
||||
self.chain_interface = chain_interface
|
||||
self.cursor = None
|
||||
self.running = True
|
||||
self.backend = backend
|
||||
self.filter = SyncFilter(backend)
|
||||
self.block_callback = block_callback
|
||||
self.pre_callback = pre_callback
|
||||
self.post_callback = post_callback
|
||||
if not Syncer.signal_set:
|
||||
for sig in Syncer.signal_request:
|
||||
signal.signal(sig, self.__sig_terminate)
|
||||
Syncer.signal_set = True
|
||||
|
||||
|
||||
def __sig_terminate(self, sig, frame):
|
||||
logg.warning('got signal {}'.format(sig))
|
||||
self.terminate()
|
||||
|
||||
|
||||
def terminate(self):
|
||||
logg.info('termination requested!')
|
||||
Syncer.running_global = False
|
||||
Syncer.running = False
|
||||
|
||||
|
||||
def add_filter(self, f):
|
||||
self.filter.add(f)
|
||||
self.backend.register_filter(str(f))
|
||||
|
||||
|
||||
def process_single(self, conn, block, tx):
|
||||
self.backend.set(block.number, tx.index)
|
||||
self.filter.apply(conn, block, tx)
|
||||
|
||||
|
||||
def __str__(self):
|
||||
return 'syncer "{}" {}'.format(
|
||||
self.name,
|
||||
self.backend,
|
||||
)
|
52
chainsyncer/driver/head.py
Normal file
52
chainsyncer/driver/head.py
Normal file
@ -0,0 +1,52 @@
|
||||
# standard imports
|
||||
import logging
|
||||
|
||||
# local imports
|
||||
from chainsyncer.error import NoBlockForYou
|
||||
from .poll import BlockPollSyncer
|
||||
|
||||
logg = logging.getLogger(__name__)
|
||||
|
||||
class HeadSyncer(BlockPollSyncer):
|
||||
|
||||
name = 'head'
|
||||
|
||||
def process(self, conn, block):
|
||||
(pair, fltr) = self.backend.get()
|
||||
logg.debug('process block {} (backend {}:{})'.format(block, pair, fltr))
|
||||
i = pair[1] # set tx index from previous
|
||||
tx = None
|
||||
while True:
|
||||
try:
|
||||
tx = block.tx(i)
|
||||
except AttributeError:
|
||||
o = tx(block.txs[i])
|
||||
r = conn.do(o)
|
||||
tx = self.interface.tx_from_src(Tx.src_normalize(r), block=block)
|
||||
#except IndexError as e:
|
||||
# logg.debug('index error syncer tx get {}'.format(e))
|
||||
# break
|
||||
|
||||
# TODO: Move specifics to eth subpackage, receipts are not a global concept
|
||||
rcpt = conn.do(self.chain_interface.tx_receipt(tx.hash))
|
||||
if rcpt != None:
|
||||
tx.apply_receipt(self.chain_interface.src_normalize(rcpt))
|
||||
|
||||
self.process_single(conn, block, tx)
|
||||
self.backend.reset_filter()
|
||||
|
||||
i += 1
|
||||
|
||||
|
||||
def get(self, conn):
|
||||
(height, flags) = self.backend.get()
|
||||
block_number = height[0]
|
||||
block_hash = []
|
||||
o = self.chain_interface.block_by_number(block_number)
|
||||
r = conn.do(o)
|
||||
if r == None:
|
||||
raise NoBlockForYou()
|
||||
b = self.chain_interface.block_from_src(r)
|
||||
b.txs = b.txs[height[1]:]
|
||||
|
||||
return b
|
45
chainsyncer/driver/history.py
Normal file
45
chainsyncer/driver/history.py
Normal file
@ -0,0 +1,45 @@
|
||||
# standard imports
|
||||
import logging
|
||||
|
||||
# external imports
|
||||
from chainlib.error import RPCException
|
||||
|
||||
# local imports
|
||||
from .head import HeadSyncer
|
||||
from chainsyncer.error import SyncDone
|
||||
from chainlib.error import RPCException
|
||||
|
||||
logg = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class HistorySyncer(HeadSyncer):
|
||||
|
||||
name = 'history'
|
||||
|
||||
def __init__(self, backend, chain_interface, pre_callback=None, block_callback=None, post_callback=None):
|
||||
super(HeadSyncer, self).__init__(backend, chain_interface, pre_callback, block_callback, post_callback)
|
||||
self.block_target = None
|
||||
(block_number, flags) = self.backend.target()
|
||||
if block_number == None:
|
||||
raise AttributeError('backend has no future target. Use HeadSyner instead')
|
||||
self.block_target = block_number
|
||||
logg.debug('block target {}'.format(self.block_target))
|
||||
|
||||
|
||||
def get(self, conn):
|
||||
(height, flags) = self.backend.get()
|
||||
if self.block_target < height[0]:
|
||||
raise SyncDone(self.block_target)
|
||||
block_number = height[0]
|
||||
block_hash = []
|
||||
o = self.chain_interface.block_by_number(block_number)
|
||||
try:
|
||||
r = conn.do(o)
|
||||
# TODO: Disambiguate whether error is temporary or permanent, if permanent, SyncDone should be raised, because a historical sync is attempted into the future
|
||||
except RPCException:
|
||||
r = None
|
||||
if r == None:
|
||||
raise SyncDone() #NoBlockForYou()
|
||||
b = self.chain_interface.block_from_src(r)
|
||||
|
||||
return b
|
59
chainsyncer/driver/poll.py
Normal file
59
chainsyncer/driver/poll.py
Normal file
@ -0,0 +1,59 @@
|
||||
# standard imports
|
||||
import logging
|
||||
import time
|
||||
|
||||
# local imports
|
||||
from .base import Syncer
|
||||
from chainsyncer.error import (
|
||||
SyncDone,
|
||||
NoBlockForYou,
|
||||
)
|
||||
|
||||
logg = logging.getLogger(__name__)
|
||||
|
||||
|
||||
|
||||
class BlockPollSyncer(Syncer):
|
||||
|
||||
name = 'blockpoll'
|
||||
|
||||
def __init__(self, backend, chain_interface, pre_callback=None, block_callback=None, post_callback=None):
|
||||
super(BlockPollSyncer, self).__init__(backend, chain_interface, pre_callback, block_callback, post_callback)
|
||||
|
||||
|
||||
def loop(self, interval, conn):
|
||||
(pair, fltr) = self.backend.get()
|
||||
start_tx = pair[1]
|
||||
|
||||
while self.running and Syncer.running_global:
|
||||
if self.pre_callback != None:
|
||||
self.pre_callback()
|
||||
while True and Syncer.running_global:
|
||||
if start_tx > 0:
|
||||
start_tx -= 1
|
||||
continue
|
||||
try:
|
||||
block = self.get(conn)
|
||||
except SyncDone as e:
|
||||
logg.info('all blocks sumitted for processing: {}'.format(e))
|
||||
return self.backend.get()
|
||||
except NoBlockForYou as e:
|
||||
break
|
||||
# TODO: To properly handle this, ensure that previous request is rolled back
|
||||
# except sqlalchemy.exc.OperationalError as e:
|
||||
# logg.error('database error: {}'.format(e))
|
||||
# break
|
||||
|
||||
if self.block_callback != None:
|
||||
self.block_callback(block, None)
|
||||
|
||||
last_block = block
|
||||
try:
|
||||
self.process(conn, block)
|
||||
except IndexError:
|
||||
self.backend.set(block.number + 1, 0)
|
||||
start_tx = 0
|
||||
time.sleep(self.yield_delay)
|
||||
if self.post_callback != None:
|
||||
self.post_callback()
|
||||
time.sleep(interval)
|
133
chainsyncer/driver/thread.py
Normal file
133
chainsyncer/driver/thread.py
Normal file
@ -0,0 +1,133 @@
|
||||
# standard imports
|
||||
import logging
|
||||
#import threading
|
||||
import multiprocessing
|
||||
import queue
|
||||
|
||||
# external imports
|
||||
from chainlib.error import RPCException
|
||||
|
||||
# local imports
|
||||
from .history import HistorySyncer
|
||||
from chainsyncer.error import SyncDone
|
||||
|
||||
logg = logging.getLogger(__name__)
|
||||
|
||||
|
||||
|
||||
class ThreadedHistorySyncer(HistorySyncer):
|
||||
|
||||
def __init__(self, conn_factory, thread_limit, backend, chain_interface, pre_callback=None, block_callback=None, post_callback=None, conn_limit=0):
|
||||
super(ThreadedHistorySyncer, self).__init__(backend, chain_interface, pre_callback, block_callback, post_callback)
|
||||
self.workers = []
|
||||
if conn_limit == 0:
|
||||
conn_limit = thread_limit
|
||||
#self.conn_pool = queue.Queue(conn_limit)
|
||||
#self.queue = queue.Queue(thread_limit)
|
||||
#self.quit_queue = queue.Queue(1)
|
||||
self.conn_pool = multiprocessing.Queue(conn_limit)
|
||||
self.queue = multiprocessing.Queue(thread_limit)
|
||||
self.quit_queue = multiprocessing.Queue(1)
|
||||
#self.lock = threading.Lock()
|
||||
self.lock = multiprocessing.Lock()
|
||||
for i in range(thread_limit):
|
||||
#w = threading.Thread(target=self.worker)
|
||||
w = multiprocessing.Process(target=self.worker)
|
||||
self.workers.append(w)
|
||||
|
||||
for i in range(conn_limit):
|
||||
self.conn_pool.put(conn_factory())
|
||||
|
||||
|
||||
def terminate(self):
|
||||
self.quit_queue.put(())
|
||||
super(ThreadedHistorySyncer, self).terminate()
|
||||
|
||||
|
||||
def worker(self):
|
||||
while True:
|
||||
block_number = None
|
||||
try:
|
||||
block_number = self.queue.get(timeout=0.01)
|
||||
except queue.Empty:
|
||||
if self.quit_queue.qsize() > 0:
|
||||
#logg.debug('{} received quit'.format(threading.current_thread().getName()))
|
||||
logg.debug('{} received quit'.format(multiprocessing.current_process().name))
|
||||
return
|
||||
continue
|
||||
conn = self.conn_pool.get()
|
||||
try:
|
||||
logg.debug('processing parent {} {}'.format(conn, block_number))
|
||||
self.process_parent(conn, block_number)
|
||||
except IndexError:
|
||||
pass
|
||||
except RPCException as e:
|
||||
logg.error('RPC failure for block {}, resubmitting to queue: {}'.format(block, e))
|
||||
self.queue.put(block_number)
|
||||
conn = self.conn_pool.put(conn)
|
||||
|
||||
|
||||
def process_parent(self, conn, block_number):
|
||||
logg.debug('getting block {}'.format(block_number))
|
||||
o = self.chain_interface.block_by_number(block_number)
|
||||
r = conn.do(o)
|
||||
block = self.chain_interface.block_from_src(r)
|
||||
logg.debug('got block typ {}'.format(type(block)))
|
||||
super(ThreadedHistorySyncer, self).process(conn, block)
|
||||
|
||||
|
||||
def process_single(self, conn, block, tx):
|
||||
self.filter.apply(conn, block, tx)
|
||||
|
||||
|
||||
def process(self, conn, block):
|
||||
pass
|
||||
|
||||
|
||||
#def process(self, conn, block):
|
||||
def get(self, conn):
|
||||
if not self.running:
|
||||
raise SyncDone()
|
||||
|
||||
block_number = None
|
||||
tx_index = None
|
||||
flags = None
|
||||
((block_number, tx_index), flags) = self.backend.get()
|
||||
try:
|
||||
#logg.debug('putting {}'.format(block.number))
|
||||
#self.queue.put((conn, block_number,), timeout=0.1)
|
||||
self.queue.put(block_number, timeout=0.1)
|
||||
except queue.Full:
|
||||
#logg.debug('queue full, try again')
|
||||
return
|
||||
|
||||
target, flags = self.backend.target()
|
||||
next_block = block_number + 1
|
||||
if next_block > target:
|
||||
self.quit_queue.put(())
|
||||
raise SyncDone()
|
||||
self.backend.set(self.backend.block_height + 1, 0)
|
||||
|
||||
|
||||
# def get(self, conn):
|
||||
# try:
|
||||
# r = super(ThreadedHistorySyncer, self).get(conn)
|
||||
# return r
|
||||
# except SyncDone as e:
|
||||
# self.quit_queue.put(())
|
||||
# raise e
|
||||
|
||||
|
||||
def loop(self, interval, conn):
|
||||
for w in self.workers:
|
||||
w.start()
|
||||
r = super(ThreadedHistorySyncer, self).loop(interval, conn)
|
||||
for w in self.workers:
|
||||
w.join()
|
||||
while True:
|
||||
try:
|
||||
self.quit_queue.get_nowait()
|
||||
except queue.Empty:
|
||||
break
|
||||
|
||||
logg.info('workers done {}'.format(r))
|
171
chainsyncer/driver/threadpool.py
Normal file
171
chainsyncer/driver/threadpool.py
Normal file
@ -0,0 +1,171 @@
|
||||
# standard imports
|
||||
import logging
|
||||
#import threading
|
||||
import multiprocessing
|
||||
import queue
|
||||
import time
|
||||
|
||||
# external imports
|
||||
from chainlib.error import RPCException
|
||||
|
||||
# local imports
|
||||
from .history import HistorySyncer
|
||||
from chainsyncer.error import SyncDone
|
||||
|
||||
logg = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def foobarcb(v):
|
||||
logg.debug('foooz {}'.format(v))
|
||||
|
||||
|
||||
class ThreadPoolTask:
|
||||
|
||||
process_func = None
|
||||
chain_interface = None
|
||||
|
||||
def poolworker(self, block_number, conn):
|
||||
# conn = args[1].get()
|
||||
try:
|
||||
logg.debug('processing parent {} {}'.format(conn, block_number))
|
||||
#self.process_parent(self.conn, block_number)
|
||||
self.process_parent(conn, block_number)
|
||||
except IndexError:
|
||||
pass
|
||||
except RPCException as e:
|
||||
logg.error('RPC failure for block {}, resubmitting to queue: {}'.format(block, e))
|
||||
raise e
|
||||
#self.queue.put(block_number)
|
||||
# conn = self.conn_pool.put(conn)
|
||||
|
||||
def process_parent(self, conn, block_number):
|
||||
logg.debug('getting block {}'.format(block_number))
|
||||
o = self.chain_interface.block_by_number(block_number)
|
||||
r = conn.do(o)
|
||||
block = self.chain_interface.block_from_src(r)
|
||||
logg.debug('got block typ {}'.format(type(block)))
|
||||
#super(ThreadedHistorySyncer, self).process(conn, block)
|
||||
self.process_func(conn, block)
|
||||
|
||||
|
||||
|
||||
class ThreadPoolHistorySyncer(HistorySyncer):
|
||||
|
||||
def __init__(self, conn_factory, thread_limit, backend, chain_interface, pre_callback=None, block_callback=None, post_callback=None, conn_limit=0):
|
||||
super(ThreadPoolHistorySyncer, self).__init__(backend, chain_interface, pre_callback, block_callback, post_callback)
|
||||
self.workers = []
|
||||
self.thread_limit = thread_limit
|
||||
if conn_limit == 0:
|
||||
self.conn_limit = self.thread_limit
|
||||
#self.conn_pool = queue.Queue(conn_limit)
|
||||
#self.queue = queue.Queue(thread_limit)
|
||||
#self.quit_queue = queue.Queue(1)
|
||||
#self.conn_pool = multiprocessing.Queue(conn_limit)
|
||||
#self.queue = multiprocessing.Queue(thread_limit)
|
||||
#self.quit_queue = multiprocessing.Queue(1)
|
||||
#self.lock = threading.Lock()
|
||||
#self.lock = multiprocessing.Lock()
|
||||
ThreadPoolTask.process_func = super(ThreadPoolHistorySyncer, self).process
|
||||
ThreadPoolTask.chain_interface = chain_interface
|
||||
#for i in range(thread_limit):
|
||||
#w = threading.Thread(target=self.worker)
|
||||
# w = multiprocessing.Process(target=self.worker)
|
||||
# self.workers.append(w)
|
||||
|
||||
#for i in range(conn_limit):
|
||||
# self.conn_pool.put(conn_factory())
|
||||
self.conn_factory = conn_factory
|
||||
self.worker_pool = None
|
||||
|
||||
|
||||
def terminate(self):
|
||||
#self.quit_queue.put(())
|
||||
super(ThreadPoolHistorySyncer, self).terminate()
|
||||
|
||||
|
||||
# def worker(self):
|
||||
# while True:
|
||||
# block_number = None
|
||||
# try:
|
||||
# block_number = self.queue.get(timeout=0.01)
|
||||
# except queue.Empty:
|
||||
# if self.quit_queue.qsize() > 0:
|
||||
# #logg.debug('{} received quit'.format(threading.current_thread().getName()))
|
||||
# logg.debug('{} received quit'.format(multiprocessing.current_process().name))
|
||||
# return
|
||||
# continue
|
||||
# conn = self.conn_pool.get()
|
||||
# try:
|
||||
# logg.debug('processing parent {} {}'.format(conn, block_number))
|
||||
# self.process_parent(conn, block_number)
|
||||
# except IndexError:
|
||||
# pass
|
||||
# except RPCException as e:
|
||||
# logg.error('RPC failure for block {}, resubmitting to queue: {}'.format(block, e))
|
||||
# self.queue.put(block_number)
|
||||
# conn = self.conn_pool.put(conn)
|
||||
#
|
||||
|
||||
|
||||
def process_single(self, conn, block, tx):
|
||||
self.filter.apply(conn, block, tx)
|
||||
|
||||
|
||||
def process(self, conn, block):
|
||||
pass
|
||||
|
||||
|
||||
#def process(self, conn, block):
|
||||
def get(self, conn):
|
||||
if not self.running:
|
||||
raise SyncDone()
|
||||
|
||||
block_number = None
|
||||
tx_index = None
|
||||
flags = None
|
||||
((block_number, tx_index), flags) = self.backend.get()
|
||||
#try:
|
||||
#logg.debug('putting {}'.format(block.number))
|
||||
#self.queue.put((conn, block_number,), timeout=0.1)
|
||||
#self.queue.put(block_number, timeout=0.1)
|
||||
#except queue.Full:
|
||||
#logg.debug('queue full, try again')
|
||||
# return
|
||||
task = ThreadPoolTask()
|
||||
conn = self.conn_factory()
|
||||
self.worker_pool.apply_async(task.poolworker, (block_number, conn,), {}, foobarcb)
|
||||
|
||||
target, flags = self.backend.target()
|
||||
next_block = block_number + 1
|
||||
if next_block > target:
|
||||
#self.quit_queue.put(())
|
||||
self.worker_pool.close()
|
||||
raise SyncDone()
|
||||
self.backend.set(self.backend.block_height + 1, 0)
|
||||
|
||||
|
||||
# def get(self, conn):
|
||||
# try:
|
||||
# r = super(ThreadedHistorySyncer, self).get(conn)
|
||||
# return r
|
||||
# except SyncDone as e:
|
||||
# self.quit_queue.put(())
|
||||
# raise e
|
||||
|
||||
|
||||
def loop(self, interval, conn):
|
||||
self.worker_pool = multiprocessing.Pool(self.thread_limit)
|
||||
#for w in self.workers:
|
||||
# w.start()
|
||||
r = super(ThreadPoolHistorySyncer, self).loop(interval, conn)
|
||||
#for w in self.workers:
|
||||
# w.join()
|
||||
#while True:
|
||||
# try:
|
||||
# self.quit_queue.get_nowait()
|
||||
# except queue.Empty:
|
||||
# break
|
||||
time.sleep(1)
|
||||
self.worker_pool.join()
|
||||
|
||||
logg.info('workers done {}'.format(r))
|
@ -13,3 +13,7 @@ class RequestError(Exception):
|
||||
|
||||
class BackendError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class AbortTx(Exception):
|
||||
pass
|
||||
|
@ -20,7 +20,12 @@ class SyncFilter:
|
||||
logg.debug('added filter "{}"'.format(str(fltr)))
|
||||
|
||||
self.filters.append(fltr)
|
||||
|
||||
|
||||
|
||||
def apply_one(self, fltr, idx, conn, block, tx, session):
|
||||
fltr.filter(conn, block, tx, session)
|
||||
self.backend.complete_filter(idx)
|
||||
|
||||
|
||||
def apply(self, conn, block, tx):
|
||||
session = None
|
||||
@ -33,16 +38,15 @@ class SyncFilter:
|
||||
(pair, flags) = self.backend.get()
|
||||
for f in self.filters:
|
||||
if not self.backend.check_filter(i, flags):
|
||||
#if flags & (1 << i) == 0:
|
||||
logg.debug('applying filter {} {}'.format(str(f), flags))
|
||||
f.filter(conn, block, tx, session)
|
||||
self.backend.complete_filter(i)
|
||||
self.apply_one(f, i, conn, block, tx, session)
|
||||
else:
|
||||
logg.debug('skipping previously applied filter {} {}'.format(str(f), flags))
|
||||
i += 1
|
||||
|
||||
self.backend.disconnect()
|
||||
|
||||
|
||||
class NoopFilter:
|
||||
|
||||
def filter(self, conn, block, tx, db_session=None):
|
||||
|
@ -1,96 +0,0 @@
|
||||
# standard imports
|
||||
import os
|
||||
import sys
|
||||
import logging
|
||||
import time
|
||||
import argparse
|
||||
import sys
|
||||
import re
|
||||
|
||||
# external imports
|
||||
import confini
|
||||
from chainlib.eth.connection import HTTPConnection
|
||||
from chainlib.eth.block import block_latest
|
||||
from chainlib.chain import ChainSpec
|
||||
|
||||
# local imports
|
||||
from chainsyncer.driver import HeadSyncer
|
||||
from chainsyncer.db import dsn_from_config
|
||||
from chainsyncer.db.models.base import SessionBase
|
||||
from chainsyncer.backend import SyncerBackend
|
||||
from chainsyncer.error import LoopDone
|
||||
from chainsyncer.filter import NoopFilter
|
||||
|
||||
logging.basicConfig(level=logging.WARNING)
|
||||
logg = logging.getLogger()
|
||||
|
||||
config_dir = '/usr/local/etc/cic-syncer'
|
||||
|
||||
|
||||
argparser = argparse.ArgumentParser(description='daemon that monitors transactions in new blocks')
|
||||
argparser.add_argument('-p', '--provider', dest='p', type=str, help='chain rpc provider address')
|
||||
argparser.add_argument('-c', type=str, default=config_dir, help='config root to use')
|
||||
argparser.add_argument('-i', '--chain-spec', type=str, dest='i', help='chain spec')
|
||||
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
|
||||
argparser.add_argument('--offset', type=int, help='block number to start sync')
|
||||
argparser.add_argument('-q', type=str, default='cic-eth', help='celery queue to submit transaction tasks to')
|
||||
argparser.add_argument('-v', help='be verbose', action='store_true')
|
||||
argparser.add_argument('-vv', help='be more verbose', action='store_true')
|
||||
args = argparser.parse_args(sys.argv[1:])
|
||||
|
||||
if args.v == True:
|
||||
logging.getLogger().setLevel(logging.INFO)
|
||||
elif args.vv == True:
|
||||
logging.getLogger().setLevel(logging.DEBUG)
|
||||
|
||||
config_dir = os.path.join(args.c)
|
||||
config = confini.Config(config_dir, args.env_prefix)
|
||||
config.process()
|
||||
# override args
|
||||
args_override = {
|
||||
'SYNCER_CHAIN_SPEC': getattr(args, 'i'),
|
||||
'ETH_PROVIDER': getattr(args, 'p'),
|
||||
}
|
||||
config.dict_override(args_override, 'cli flag')
|
||||
config.censor('PASSWORD', 'DATABASE')
|
||||
config.censor('PASSWORD', 'SSL')
|
||||
logg.debug('config loaded from {}:\n{}'.format(config_dir, config))
|
||||
|
||||
#app = celery.Celery(backend=config.get('CELERY_RESULT_URL'), broker=config.get('CELERY_BROKER_URL'))
|
||||
|
||||
queue = args.q
|
||||
|
||||
dsn = dsn_from_config(config)
|
||||
SessionBase.connect(dsn)
|
||||
|
||||
conn = HTTPConnection(config.get('ETH_PROVIDER'))
|
||||
|
||||
chain = ChainSpec.from_chain_str(config.get('SYNCER_CHAIN_SPEC'))
|
||||
|
||||
block_offset = args.offset
|
||||
|
||||
|
||||
def main():
|
||||
global block_offset
|
||||
|
||||
if block_offset == None:
|
||||
o = block_latest()
|
||||
r = conn.do(o)
|
||||
block_offset = r[1]
|
||||
|
||||
syncer_backend = SyncerBackend.live(chain, 0)
|
||||
syncer = HeadSyncer(syncer_backend)
|
||||
fltr = NoopFilter()
|
||||
syncer.add_filter(fltr)
|
||||
|
||||
try:
|
||||
logg.debug('block offset {}'.format(block_offset))
|
||||
syncer.loop(int(config.get('SYNCER_LOOP_INTERVAL')), conn)
|
||||
except LoopDone as e:
|
||||
sys.stderr.write("sync '{}' done at block {}\n".format(args.mode, e))
|
||||
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
@ -6,7 +6,7 @@ import logging
|
||||
from hexathon import add_0x
|
||||
|
||||
# local imports
|
||||
from chainsyncer.driver import HistorySyncer
|
||||
from chainsyncer.driver.history import HistorySyncer
|
||||
from chainsyncer.error import NoBlockForYou
|
||||
|
||||
logg = logging.getLogger().getChild(__name__)
|
||||
@ -44,9 +44,9 @@ class MockBlock:
|
||||
class TestSyncer(HistorySyncer):
|
||||
|
||||
|
||||
def __init__(self, backend, tx_counts=[]):
|
||||
def __init__(self, backend, chain_interface, tx_counts=[]):
|
||||
self.tx_counts = tx_counts
|
||||
super(TestSyncer, self).__init__(backend)
|
||||
super(TestSyncer, self).__init__(backend, chain_interface)
|
||||
|
||||
|
||||
def get(self, conn):
|
||||
|
55
chainsyncer/unittest/db.py
Normal file
55
chainsyncer/unittest/db.py
Normal file
@ -0,0 +1,55 @@
|
||||
# standard imports
|
||||
import logging
|
||||
import os
|
||||
|
||||
# external imports
|
||||
import alembic
|
||||
import alembic.config
|
||||
|
||||
# local imports
|
||||
from chainsyncer.db.models.base import SessionBase
|
||||
from chainsyncer.db import dsn_from_config
|
||||
from chainsyncer.db.models.base import SessionBase
|
||||
|
||||
logg = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ChainSyncerDb:
|
||||
|
||||
base = SessionBase
|
||||
|
||||
def __init__(self, debug=False):
|
||||
config = {
|
||||
'DATABASE_ENGINE': 'sqlite',
|
||||
'DATABASE_DRIVER': 'pysqlite',
|
||||
'DATABASE_NAME': 'chainsyncer.sqlite',
|
||||
}
|
||||
logg.debug('config {}'.format(config))
|
||||
|
||||
self.dsn = dsn_from_config(config)
|
||||
|
||||
self.base.poolable = False
|
||||
self.base.transactional = False
|
||||
self.base.procedural = False
|
||||
self.base.connect(self.dsn, debug=debug) # TODO: evaluates to "true" even if string is 0
|
||||
|
||||
rootdir = os.path.join(os.path.dirname(os.path.dirname(__file__)), '..')
|
||||
dbdir = os.path.join(rootdir, 'chainsyncer', 'db')
|
||||
#migrationsdir = os.path.join(dbdir, 'migrations', config.get('DATABASE_ENGINE'))
|
||||
migrationsdir = os.path.join(dbdir, 'migrations', 'default')
|
||||
logg.info('using migrations directory {}'.format(migrationsdir))
|
||||
|
||||
ac = alembic.config.Config(os.path.join(migrationsdir, 'alembic.ini'))
|
||||
ac.set_main_option('sqlalchemy.url', self.dsn)
|
||||
ac.set_main_option('script_location', migrationsdir)
|
||||
|
||||
alembic.command.downgrade(ac, 'base')
|
||||
alembic.command.upgrade(ac, 'head')
|
||||
|
||||
|
||||
def bind_session(self, session=None):
|
||||
return self.base.bind_session(session)
|
||||
|
||||
|
||||
def release_session(self, session=None):
|
||||
return self.base.release_session(session)
|
@ -1,4 +1,4 @@
|
||||
confini~=0.3.6rc3
|
||||
confini>=0.3.6rc3,<0.5.0
|
||||
semver==2.13.0
|
||||
hexathon~=0.0.1a7
|
||||
chainlib~=0.0.3rc2
|
||||
hexathon~=0.0.1a8
|
||||
chainlib>=0.0.9a2,<=0.1.0
|
||||
|
12
run_tests.sh
Normal file
12
run_tests.sh
Normal file
@ -0,0 +1,12 @@
|
||||
#!/bin/bash
|
||||
|
||||
set -e
|
||||
set -x
|
||||
for f in `ls tests/*.py`; do
|
||||
python $f
|
||||
if [ $? -gt 0 ]; then
|
||||
exit
|
||||
fi
|
||||
done
|
||||
set +x
|
||||
set +e
|
@ -1,6 +1,6 @@
|
||||
[metadata]
|
||||
name = chainsyncer
|
||||
version = 0.0.2a5
|
||||
version = 0.0.6a1
|
||||
description = Generic blockchain syncer driver
|
||||
author = Louis Holbrook
|
||||
author_email = dev@holbrook.no
|
||||
@ -26,10 +26,9 @@ python_requires = >= 3.6
|
||||
packages =
|
||||
chainsyncer
|
||||
chainsyncer.db
|
||||
chainsyncer.db.migrations
|
||||
chainsyncer.db.models
|
||||
chainsyncer.runnable
|
||||
chainsyncer.backend
|
||||
chainsyncer.driver
|
||||
chainsyncer.unittest
|
||||
|
||||
[options.package_data]
|
||||
|
@ -1,12 +0,0 @@
|
||||
DROP TABLE IF EXISTS chain_sync CASCADE;
|
||||
CREATE TABLE IF NOT EXISTS chain_sync (
|
||||
id serial primary key not null,
|
||||
blockchain varchar not null,
|
||||
block_start int not null default 0,
|
||||
tx_start int not null default 0,
|
||||
block_cursor int not null default 0,
|
||||
tx_cursor int not null default 0,
|
||||
block_target int default null,
|
||||
date_created timestamp not null,
|
||||
date_updated timestamp default null
|
||||
);
|
@ -1,12 +0,0 @@
|
||||
DROP TABLE IF EXISTS chain_sync_filter;
|
||||
CREATE TABLE IF NOT EXISTS chain_sync_filter (
|
||||
id serial primary key not null,
|
||||
chain_sync_id integer not null,
|
||||
flags bytea default null,
|
||||
flags_start bytea default null,
|
||||
count integer not null default 0,
|
||||
digest char(64) not null,
|
||||
CONSTRAINT fk_chain_sync
|
||||
FOREIGN KEY(chain_sync_id)
|
||||
REFERENCES chain_sync(id)
|
||||
);
|
@ -1,11 +0,0 @@
|
||||
CREATE TABLE IF NOT EXISTS chain_sync (
|
||||
id integer primary key autoincrement,
|
||||
blockchain varchar not null,
|
||||
block_start integer not null default 0,
|
||||
tx_start integer not null default 0,
|
||||
block_cursor integer not null default 0,
|
||||
tx_cursor integer not null default 0,
|
||||
block_target integer default null,
|
||||
date_created timestamp not null,
|
||||
date_updated timestamp default null
|
||||
);
|
@ -1,11 +0,0 @@
|
||||
CREATE TABLE IF NOT EXISTS chain_sync_filter (
|
||||
id integer primary key autoincrement not null,
|
||||
chain_sync_id integer not null,
|
||||
flags bytea default null,
|
||||
flags_start bytea default null,
|
||||
count integer not null default 0,
|
||||
digest char(64) not null,
|
||||
CONSTRAINT fk_chain_sync
|
||||
FOREIGN KEY(chain_sync_id)
|
||||
REFERENCES chain_sync(id)
|
||||
);
|
4
test_requirements.txt
Normal file
4
test_requirements.txt
Normal file
@ -0,0 +1,4 @@
|
||||
chainlib-eth~=0.0.9a4
|
||||
psycopg2==2.8.6
|
||||
SQLAlchemy==1.3.20
|
||||
alembic==1.4.2
|
@ -1,54 +0,0 @@
|
||||
# standard imports
|
||||
import logging
|
||||
import unittest
|
||||
import tempfile
|
||||
import os
|
||||
#import pysqlite
|
||||
|
||||
# external imports
|
||||
from chainlib.chain import ChainSpec
|
||||
|
||||
# local imports
|
||||
from chainsyncer.db import dsn_from_config
|
||||
from chainsyncer.db.models.base import SessionBase
|
||||
|
||||
script_dir = os.path.realpath(os.path.dirname(__file__))
|
||||
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
|
||||
|
||||
class TestBase(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
db_dir = tempfile.mkdtemp()
|
||||
self.db_path = os.path.join(db_dir, 'test.sqlite')
|
||||
config = {
|
||||
'DATABASE_ENGINE': 'sqlite',
|
||||
'DATABASE_DRIVER': 'pysqlite',
|
||||
'DATABASE_NAME': self.db_path,
|
||||
}
|
||||
dsn = dsn_from_config(config)
|
||||
SessionBase.poolable = False
|
||||
SessionBase.transactional = False
|
||||
SessionBase.procedural = False
|
||||
SessionBase.connect(dsn, debug=False)
|
||||
|
||||
f = open(os.path.join(script_dir, '..', 'sql', 'sqlite', '1.sql'), 'r')
|
||||
sql = f.read()
|
||||
f.close()
|
||||
|
||||
conn = SessionBase.engine.connect()
|
||||
conn.execute(sql)
|
||||
|
||||
f = open(os.path.join(script_dir, '..', 'sql', 'sqlite', '2.sql'), 'r')
|
||||
sql = f.read()
|
||||
f.close()
|
||||
|
||||
conn = SessionBase.engine.connect()
|
||||
conn.execute(sql)
|
||||
|
||||
self.chain_spec = ChainSpec('evm', 'foo', 42, 'bar')
|
||||
|
||||
def tearDown(self):
|
||||
SessionBase.disconnect()
|
||||
os.unlink(self.db_path)
|
57
tests/chainsyncer_base.py
Normal file
57
tests/chainsyncer_base.py
Normal file
@ -0,0 +1,57 @@
|
||||
# standard imports
|
||||
import logging
|
||||
import unittest
|
||||
import tempfile
|
||||
import os
|
||||
#import pysqlite
|
||||
|
||||
# external imports
|
||||
from chainlib.chain import ChainSpec
|
||||
from chainlib.interface import ChainInterface
|
||||
from chainlib.eth.tx import receipt
|
||||
|
||||
# local imports
|
||||
from chainsyncer.db import dsn_from_config
|
||||
from chainsyncer.db.models.base import SessionBase
|
||||
|
||||
# test imports
|
||||
from chainsyncer.unittest.db import ChainSyncerDb
|
||||
|
||||
script_dir = os.path.realpath(os.path.dirname(__file__))
|
||||
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
|
||||
|
||||
class EthChainInterface(ChainInterface):
|
||||
|
||||
def __init__(self):
|
||||
self._tx_receipt = receipt
|
||||
|
||||
|
||||
class TestBase(unittest.TestCase):
|
||||
|
||||
interface = EthChainInterface()
|
||||
|
||||
def setUp(self):
|
||||
self.db = ChainSyncerDb()
|
||||
|
||||
#f = open(os.path.join(script_dir, '..', 'sql', 'sqlite', '1.sql'), 'r')
|
||||
#sql = f.read()
|
||||
#f.close()
|
||||
|
||||
#conn = SessionBase.engine.connect()
|
||||
#conn.execute(sql)
|
||||
|
||||
#f = open(os.path.join(script_dir, '..', 'sql', 'sqlite', '2.sql'), 'r')
|
||||
#sql = f.read()
|
||||
#f.close()
|
||||
|
||||
#conn = SessionBase.engine.connect()
|
||||
#conn.execute(sql)
|
||||
self.session = self.db.bind_session()
|
||||
self.chain_spec = ChainSpec('evm', 'foo', 42, 'bar')
|
||||
|
||||
def tearDown(self):
|
||||
self.session.commit()
|
||||
self.db.release_session(self.session)
|
||||
#os.unlink(self.db_path)
|
@ -8,7 +8,7 @@ from chainlib.chain import ChainSpec
|
||||
from chainsyncer.backend.memory import MemBackend
|
||||
|
||||
# testutil imports
|
||||
from tests.base import TestBase
|
||||
from tests.chainsyncer_base import TestBase
|
||||
|
||||
|
||||
class TestBasic(TestBase):
|
||||
|
@ -11,7 +11,7 @@ from chainsyncer.db.models.filter import BlockchainSyncFilter
|
||||
from chainsyncer.backend.sql import SQLBackend
|
||||
|
||||
# testutil imports
|
||||
from tests.base import TestBase
|
||||
from tests.chainsyncer_base import TestBase
|
||||
|
||||
logg = logging.getLogger()
|
||||
|
||||
|
15
tests/test_helo.py
Normal file
15
tests/test_helo.py
Normal file
@ -0,0 +1,15 @@
|
||||
# standard imports
|
||||
import unittest
|
||||
|
||||
# local imports
|
||||
from tests.chainsyncer_base import TestBase
|
||||
|
||||
|
||||
class TestHelo(TestBase):
|
||||
|
||||
def test_helo(self):
|
||||
pass
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
@ -16,7 +16,7 @@ from chainsyncer.backend.file import (
|
||||
)
|
||||
|
||||
# test imports
|
||||
from tests.base import TestBase
|
||||
from tests.chainsyncer_base import TestBase
|
||||
from chainsyncer.unittest.base import (
|
||||
MockBlock,
|
||||
MockConn,
|
||||
@ -77,7 +77,7 @@ class TestInterrupt(TestBase):
|
||||
]
|
||||
|
||||
|
||||
def assert_filter_interrupt(self, vector):
|
||||
def assert_filter_interrupt(self, vector, chain_interface):
|
||||
|
||||
logg.debug('running vector {} {}'.format(str(self.backend), vector))
|
||||
|
||||
@ -85,7 +85,7 @@ class TestInterrupt(TestBase):
|
||||
for v in vector:
|
||||
z += v
|
||||
|
||||
syncer = TestSyncer(self.backend, vector)
|
||||
syncer = TestSyncer(self.backend, chain_interface, vector)
|
||||
|
||||
filters = [
|
||||
CountFilter('foo'),
|
||||
@ -114,7 +114,7 @@ class TestInterrupt(TestBase):
|
||||
def test_filter_interrupt_memory(self):
|
||||
for vector in self.vectors:
|
||||
self.backend = MemBackend(self.chain_spec, None, target_block=len(vector))
|
||||
self.assert_filter_interrupt(vector)
|
||||
self.assert_filter_interrupt(vector, self.interface)
|
||||
|
||||
|
||||
def test_filter_interrupt_file(self):
|
||||
@ -123,13 +123,13 @@ class TestInterrupt(TestBase):
|
||||
d = tempfile.mkdtemp()
|
||||
#os.makedirs(data_dir_for(self.chain_spec, 'foo', d))
|
||||
self.backend = FileBackend.initial(self.chain_spec, len(vector), base_dir=d) #'foo', base_dir=d)
|
||||
self.assert_filter_interrupt(vector)
|
||||
self.assert_filter_interrupt(vector, self.interface)
|
||||
|
||||
|
||||
def test_filter_interrupt_sql(self):
|
||||
for vector in self.vectors:
|
||||
self.backend = SQLBackend.initial(self.chain_spec, len(vector))
|
||||
self.assert_filter_interrupt(vector)
|
||||
self.assert_filter_interrupt(vector, self.interface)
|
||||
|
||||
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user