From da54a077e5a6850e9620afb6a39a39e5a7c11241 Mon Sep 17 00:00:00 2001 From: nolash Date: Wed, 17 Feb 2021 12:44:35 +0100 Subject: [PATCH] Add filtering --- chainsyncer/backend.py | 9 ++- chainsyncer/client/__init__.py | 0 chainsyncer/client/block.py | 16 ----- chainsyncer/client/evm/response.py | 52 ----------------- chainsyncer/client/evm/websocket.py | 90 ----------------------------- chainsyncer/client/translate.py | 10 ---- chainsyncer/client/tx.py | 6 -- chainsyncer/db/models/filter.py | 40 +++++++++++++ chainsyncer/driver.py | 52 ++++++++++------- chainsyncer/filter.py | 34 +++++++++++ chainsyncer/runnable/tracker.py | 30 +++++++--- config/syncer.ini | 1 + requirements.txt | 8 +-- setup.cfg | 2 +- sql/postgresql/1.sql | 14 +++++ sql/sqlite/1.sql | 13 +++++ sql/sqlite/2.sql | 10 ++++ tests/base.py | 46 +++++++++++++++ tests/test_basic.py | 22 +++++++ 19 files changed, 243 insertions(+), 212 deletions(-) delete mode 100644 chainsyncer/client/__init__.py delete mode 100644 chainsyncer/client/block.py delete mode 100644 chainsyncer/client/evm/response.py delete mode 100644 chainsyncer/client/evm/websocket.py delete mode 100644 chainsyncer/client/translate.py delete mode 100644 chainsyncer/client/tx.py create mode 100644 chainsyncer/db/models/filter.py create mode 100644 chainsyncer/filter.py create mode 100644 sql/sqlite/1.sql create mode 100644 sql/sqlite/2.sql create mode 100644 tests/base.py create mode 100644 tests/test_basic.py diff --git a/chainsyncer/backend.py b/chainsyncer/backend.py index d265ab7..d7414fb 100644 --- a/chainsyncer/backend.py +++ b/chainsyncer/backend.py @@ -2,6 +2,9 @@ import logging import uuid +# third-party imports +from chainlib.chain import ChainSpec + # local imports from chainsyncer.db.models.sync import BlockchainSync from chainsyncer.db.models.base import SessionBase @@ -176,7 +179,7 @@ class SyncerBackend: @staticmethod - def live(chain, block_height): + def live(chain_spec, block_height): """Creates a new open-ended syncer session starting at the given block height. :param chain: Chain spec of chain that syncer is running for. @@ -188,13 +191,13 @@ class SyncerBackend: """ object_id = None session = SessionBase.create_session() - o = BlockchainSync(chain, block_height, 0, None) + o = BlockchainSync(str(chain_spec), block_height, 0, None) session.add(o) session.commit() object_id = o.id session.close() - return SyncerBackend(chain, object_id) + return SyncerBackend(chain_spec, object_id) class MemBackend: diff --git a/chainsyncer/client/__init__.py b/chainsyncer/client/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/chainsyncer/client/block.py b/chainsyncer/client/block.py deleted file mode 100644 index 06cbfb3..0000000 --- a/chainsyncer/client/block.py +++ /dev/null @@ -1,16 +0,0 @@ -class Block: - - def __init__(self, hsh, obj): - self.hash = hsh - self.obj = obj - - - def tx(self, idx): - return NotImplementedError - - - def number(self): - return NotImplementedError - - - diff --git a/chainsyncer/client/evm/response.py b/chainsyncer/client/evm/response.py deleted file mode 100644 index bac12f2..0000000 --- a/chainsyncer/client/evm/response.py +++ /dev/null @@ -1,52 +0,0 @@ -import json - -from chainsyncer.client import translate -from chainsyncer.client.block import Block -from chainsyncer.client.tx import Tx - - -translations = { - 'block_number': translate.hex_to_int, - 'get_block': json.dumps, - 'number': translate.hex_to_int, - } - - -class EVMResponse: - - def __init__(self, item, response_object): - self.response_object = response_object - self.item = item - self.fn = translations[self.item] - - - def get_error(self): - return self.response_object.get('error') - - - def get_result(self): - r = self.fn(self.response_object.get('result')) - if r == 'null': - return None - return r - - -class EVMTx(Tx): - - def __init__(self, block, tx_number, obj): - super(EVMTx, self).__init__(block, tx_number, obj) - - -class EVMBlock(Block): - - def tx(self, idx): - o = self.obj['transactions'][idx] - return Tx(self, idx, o) - - - def number(self): - return translate.hex_to_int(self.obj['number']) - - - def __str__(self): - return str('block {} {}'.format(self.number(), self.hash)) diff --git a/chainsyncer/client/evm/websocket.py b/chainsyncer/client/evm/websocket.py deleted file mode 100644 index f857800..0000000 --- a/chainsyncer/client/evm/websocket.py +++ /dev/null @@ -1,90 +0,0 @@ -# standard imports -import logging -import uuid -import json - -# third-party imports -import websocket -from hexathon import add_0x - -# local imports -from .response import EVMResponse -from chainsyncer.error import RequestError -from chainsyncer.client.evm.response import EVMBlock - -logg = logging.getLogger() - - -class EVMWebsocketClient: - - def __init__(self, url): - self.url = url - self.conn = websocket.create_connection(url) - - - def __del__(self): - self.conn.close() - - - def block_number(self): - req_id = str(uuid.uuid4()) - req = { - 'jsonrpc': '2.0', - 'method': 'eth_blockNumber', - 'id': str(req_id), - 'params': [], - } - self.conn.send(json.dumps(req)) - r = self.conn.recv() - res = EVMResponse('block_number', json.loads(r)) - err = res.get_error() - if err != None: - raise RequestError(err) - - return res.get_result() - - - def block_by_integer(self, n): - req_id = str(uuid.uuid4()) - nhx = '0x' + n.to_bytes(8, 'big').hex() - req = { - 'jsonrpc': '2.0', - 'method': 'eth_getBlockByNumber', - 'id': str(req_id), - 'params': [nhx, False], - } - self.conn.send(json.dumps(req)) - r = self.conn.recv() - res = EVMResponse('get_block', json.loads(r)) - err = res.get_error() - if err != None: - raise RequestError(err) - - j = res.get_result() - if j == None: - return None - o = json.loads(j) - return EVMBlock(o['hash'], o) - - - def block_by_hash(self, hx_in): - req_id = str(uuid.uuid4()) - hx = add_0x(hx_in) - req ={ - 'jsonrpc': '2.0', - 'method': 'eth_getBlockByHash', - 'id': str(req_id), - 'params': [hx, False], - } - self.conn.send(json.dumps(req)) - r = self.conn.recv() - res = EVMResponse('get_block', json.loads(r)) - err = res.get_error() - if err != None: - raise RequestError(err) - - j = res.get_result() - if j == None: - return None - o = json.loads(j) - return EVMBlock(o['hash'], o) diff --git a/chainsyncer/client/translate.py b/chainsyncer/client/translate.py deleted file mode 100644 index 0b29e47..0000000 --- a/chainsyncer/client/translate.py +++ /dev/null @@ -1,10 +0,0 @@ -# third-party imports -from hexathon import strip_0x - - -def hex_to_int(hx, endianness='big'): - hx = strip_0x(hx) - if len(hx) % 2 == 1: - hx = '0' + hx - b = bytes.fromhex(hx) - return int.from_bytes(b, endianness) diff --git a/chainsyncer/client/tx.py b/chainsyncer/client/tx.py deleted file mode 100644 index 594491a..0000000 --- a/chainsyncer/client/tx.py +++ /dev/null @@ -1,6 +0,0 @@ -class Tx: - - def __init__(self, block, tx_number, obj): - self.block = block - self.tx_number = tx_number - self.obj = obj diff --git a/chainsyncer/db/models/filter.py b/chainsyncer/db/models/filter.py new file mode 100644 index 0000000..89bd564 --- /dev/null +++ b/chainsyncer/db/models/filter.py @@ -0,0 +1,40 @@ +# standard imports +import hashlib + +# third-party imports +from sqlalchemy import Column, String, Integer, BLOB +from sqlalchemy.ext.hybrid import hybrid_property, hybrid_method + +# local imports +from .base import SessionBase + + +zero_digest = '{:<064s'.format('0') + + +class BlockchainSyncFilter(SessionBase): + + __tablename__ = 'chain_sync_filter' + + chain_sync_id = Column(Integer, ForeignKey='chain_sync.id') + flags = Column(BLOB) + digest = Column(String) + count = Column(Integer) + + @staticmethod + def set(self, names): + + + def __init__(self, names, chain_sync, digest=None): + if len(names) == 0: + digest = zero_digest + elif digest == None: + h = hashlib.new('sha256') + for n in names: + h.update(n.encode('utf-8') + b'\x00') + z = h.digest() + digest = z.hex() + self.digest = digest + self.count = len(names) + self.flags = bytearray((len(names) -1 ) / 8 + 1) + self.chain_sync_id = chain_sync.id diff --git a/chainsyncer/driver.py b/chainsyncer/driver.py index 897c3b6..8ffb0d8 100644 --- a/chainsyncer/driver.py +++ b/chainsyncer/driver.py @@ -3,6 +3,15 @@ import uuid import logging import time +# external imports +from chainlib.eth.block import ( + block_by_number, + Block, + ) + +# local imports +from chainsyncer.filter import SyncFilter + logg = logging.getLogger() @@ -19,7 +28,7 @@ class Syncer: self.cursor = None self.running = True self.backend = backend - self.filter = [] + self.filter = SyncFilter() self.progress_callback = progress_callback @@ -33,64 +42,63 @@ class Syncer: def add_filter(self, f): - self.filter.append(f) + self.filter.add(f) -class MinedSyncer(Syncer): +class BlockSyncer(Syncer): - def __init__(self, backend, progress_callback): - super(MinedSyncer, self).__init__(backend, progress_callback) + def __init__(self, backend, progress_callback=noop_progress): + super(BlockSyncer, self).__init__(backend, progress_callback) - def loop(self, interval, getter): + def loop(self, interval, conn): g = self.backend.get() last_tx = g[1] last_block = g[0] self.progress_callback('loop started', last_block, last_tx) while self.running and Syncer.running_global: while True: - block = self.get(getter) - if block == None: + try: + block = self.get(conn) + except Exception: break last_block = block.number - self.process(getter, block) + self.process(conn, block) start_tx = 0 self.progress_callback('processed block {}'.format(self.backend.get()), last_block, last_tx) time.sleep(self.yield_delay) - #self.progress_callback('loop ended', last_block + 1, last_tx) + self.progress_callback('loop ended', last_block + 1, last_tx) time.sleep(interval) -class HeadSyncer(MinedSyncer): +class HeadSyncer(BlockSyncer): - def __init__(self, backend, progress_callback): + def __init__(self, backend, progress_callback=noop_progress): super(HeadSyncer, self).__init__(backend, progress_callback) - def process(self, getter, block): + def process(self, conn, block): logg.debug('process block {}'.format(block)) i = 0 tx = None while True: try: - #self.filter[0].handle(getter, block, None) tx = block.tx(i) self.progress_callback('processing {}'.format(repr(tx)), block.number, i) self.backend.set(block.number, i) - for f in self.filter: - f.handle(getter, block, tx) - self.progress_callback('applied filter {} on {}'.format(f.name(), repr(tx)), block.number, i) + self.filter.apply(conn, block, tx) except IndexError as e: self.backend.set(block.number + 1, 0) break i += 1 - def get(self, getter): + def get(self, conn): (block_number, tx_number) = self.backend.get() block_hash = [] - uu = uuid.uuid4() - res = getter.get(block_number) - logg.debug('get {}'.format(res)) + o = block_by_number(block_number) + r = conn.do(o) + b = Block(r) + logg.debug('get {}'.format(b)) - return res + return b diff --git a/chainsyncer/filter.py b/chainsyncer/filter.py new file mode 100644 index 0000000..883bfe8 --- /dev/null +++ b/chainsyncer/filter.py @@ -0,0 +1,34 @@ +# standard imports +import logging + +logg = logging.getLogger(__name__) + +class SyncFilter: + + def __init__(self, safe=True): + self.safe = safe + self.filters = [] + + + def add(self, fltr): + if getattr(fltr, 'filter') == None: + raise ValueError('filter object must implement have method filter') + logg.debug('added filter {}'.format(str(fltr))) + + self.filters.append(fltr) + + + def apply(self, conn, block, tx): + for f in self.filters: + logg.debug('applying filter {}'.format(str(f))) + f.filter(conn, block, tx) + + +class NoopFilter(SyncFilter): + + def filter(self, conn, block, tx): + logg.debug('noop filter :received\n{} {}'.format(block, tx)) + + + def __str__(self): + return 'noopfilter' diff --git a/chainsyncer/runnable/tracker.py b/chainsyncer/runnable/tracker.py index 9a46d90..5ba5292 100644 --- a/chainsyncer/runnable/tracker.py +++ b/chainsyncer/runnable/tracker.py @@ -7,14 +7,19 @@ import argparse import sys import re -# third-party imports +# external imports import confini from chainlib.eth.connection import HTTPConnection +from chainlib.eth.block import block_latest +from chainlib.chain import ChainSpec + +# local imports from chainsyncer.driver import HeadSyncer from chainsyncer.db import dsn_from_config from chainsyncer.db.models.base import SessionBase from chainsyncer.backend import SyncerBackend from chainsyncer.error import LoopDone +from chainsyncer.filter import NoopFilter logging.basicConfig(level=logging.WARNING) logg = logging.getLogger() @@ -39,6 +44,7 @@ argparser.add_argument('-c', type=str, default=config_dir, help='config root to argparser.add_argument('-i', '--chain-spec', type=str, dest='i', help='chain spec') argparser.add_argument('--abi-dir', dest='abi_dir', type=str, help='Directory containing bytecode and abi') argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration') +argparser.add_argument('--offset', type=int, help='block number to start sync') argparser.add_argument('-q', type=str, default='cic-eth', help='celery queue to submit transaction tasks to') argparser.add_argument('-v', help='be verbose', action='store_true') argparser.add_argument('-vv', help='be more verbose', action='store_true') @@ -55,7 +61,7 @@ config = confini.Config(config_dir, args.env_prefix) config.process() # override args args_override = { - 'CHAIN_SPEC': getattr(args, 'i'), + 'SYNCER_CHAIN_SPEC': getattr(args, 'i'), 'ETH_PROVIDER': getattr(args, 'p'), } config.dict_override(args_override, 'cli flag') @@ -70,19 +76,29 @@ queue = args.q dsn = dsn_from_config(config) SessionBase.connect(dsn) -c = HTTPConnection(config.get('ETH_PROVIDER')) -chain = config.get('CHAIN_SPEC') +conn = HTTPConnection(config.get('ETH_PROVIDER')) + +chain = ChainSpec.from_chain_str(config.get('SYNCER_CHAIN_SPEC')) + +block_offset = args.offset def main(): - block_offset = c.block_number() + global block_offset + + if block_offset == None: + o = block_latest() + r = conn.do(o) + block_offset = r[1] syncer_backend = SyncerBackend.live(chain, 0) syncer = HeadSyncer(syncer_backend) + fltr = NoopFilter() + syncer.add_filter(fltr) try: - logg.debug('block offset {} {}'.format(block_offset, c)) - syncer.loop(int(config.get('SYNCER_LOOP_INTERVAL')), c) + logg.debug('block offset {}'.format(block_offset)) + syncer.loop(int(config.get('SYNCER_LOOP_INTERVAL')), conn) except LoopDone as e: sys.stderr.write("sync '{}' done at block {}\n".format(args.mode, e)) diff --git a/config/syncer.ini b/config/syncer.ini index 452fb81..f289ebd 100644 --- a/config/syncer.ini +++ b/config/syncer.ini @@ -1,2 +1,3 @@ [syncer] loop_interval = 1 +chain_spec = diff --git a/requirements.txt b/requirements.txt index 581efa8..02759e5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,8 +1,6 @@ psycopg2==2.8.6 SQLAlchemy==1.3.20 -py-evm==0.3.0a20 -eth-tester==0.5.0b3 -confini==0.3.6b2 +confini~=0.3.6b2 semver==2.13.0 -hexathon==0.0.1a2 -chainlib~=0.0.1a7 +hexathon~=0.0.1a3 +chainlib~=0.0.1a13 diff --git a/setup.cfg b/setup.cfg index a50be2f..605d560 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = chainsyncer -version = 0.0.1a5 +version = 0.0.1a6 description = Generic blockchain syncer driver author = Louis Holbrook author_email = dev@holbrook.no diff --git a/sql/postgresql/1.sql b/sql/postgresql/1.sql index 521e3ea..17a9794 100644 --- a/sql/postgresql/1.sql +++ b/sql/postgresql/1.sql @@ -6,7 +6,21 @@ CREATE TABLE IF NOT EXISTS chain_sync ( tx_start int not null default 0, block_cursor int not null default 0, tx_cursor int not null default 0, + flags bytea not null, + num_flags int not null, block_target int default null, date_created timestamp not null, date_updated timestamp default null ); + +DROP TABLE chain_sync_filter; +CREATE TABLE IF NOT EXISTS chain_sync_filter ( + id serial primary key not null, + chain_sync_id int not null, + flags bytea default null, + count int not null default 0, + digest char(64) not null default '0000000000000000000000000000000000000000000000000000000000000000', + CONSTRAINT fk_chain_sync + FOREIGN KEY(chain_sync_id) + REFERENCES chain_sync(id) +); diff --git a/sql/sqlite/1.sql b/sql/sqlite/1.sql new file mode 100644 index 0000000..fa9f6fa --- /dev/null +++ b/sql/sqlite/1.sql @@ -0,0 +1,13 @@ +CREATE TABLE IF NOT EXISTS chain_sync ( + id serial primary key not null, + blockchain varchar not null, + block_start int not null default 0, + tx_start int not null default 0, + block_cursor int not null default 0, + tx_cursor int not null default 0, + flags bytea not null, + num_flags int not null, + block_target int default null, + date_created timestamp not null, + date_updated timestamp default null +); diff --git a/sql/sqlite/2.sql b/sql/sqlite/2.sql new file mode 100644 index 0000000..c43624e --- /dev/null +++ b/sql/sqlite/2.sql @@ -0,0 +1,10 @@ +CREATE TABLE IF NOT EXISTS chain_sync_filter ( + id serial primary key not null, + chain_sync_id int not null, + flags bytea default null, + count int not null default 0, + digest char(64) not null default '0000000000000000000000000000000000000000000000000000000000000000', + CONSTRAINT fk_chain_sync + FOREIGN KEY(chain_sync_id) + REFERENCES chain_sync(id) +); diff --git a/tests/base.py b/tests/base.py new file mode 100644 index 0000000..45ae722 --- /dev/null +++ b/tests/base.py @@ -0,0 +1,46 @@ +import unittest +import tempfile +import os +#import pysqlite + +from chainsyncer.db import dsn_from_config +from chainsyncer.db.models.base import SessionBase + +script_dir = os.path.realpath(os.path.dirname(__file__)) + + +class TestBase(unittest.TestCase): + + def setUp(self): + db_dir = tempfile.mkdtemp() + self.db_path = os.path.join(db_dir, 'test.sqlite') + config = { + 'DATABASE_ENGINE': 'sqlite', + 'DATABASE_DRIVER': 'pysqlite', + 'DATABASE_NAME': self.db_path, + } + dsn = dsn_from_config(config) + SessionBase.poolable = False + SessionBase.transactional = False + SessionBase.procedural = False + SessionBase.connect(dsn, debug=True) + + f = open(os.path.join(script_dir, '..', 'sql', 'sqlite', '1.sql'), 'r') + sql = f.read() + f.close() + + conn = SessionBase.engine.connect() + conn.execute(sql) + + f = open(os.path.join(script_dir, '..', 'sql', 'sqlite', '2.sql'), 'r') + sql = f.read() + f.close() + + conn = SessionBase.engine.connect() + conn.execute(sql) + + def tearDown(self): + SessionBase.disconnect() + os.unlink(self.db_path) + + diff --git a/tests/test_basic.py b/tests/test_basic.py new file mode 100644 index 0000000..6138cbe --- /dev/null +++ b/tests/test_basic.py @@ -0,0 +1,22 @@ +# standard imports +import unittest + +# external imports +from chainlib.chain import ChainSpec + +# local imports +from chainsyncer.backend import SyncerBackend + +# testutil imports +from tests.base import TestBase + + +class TestBasic(TestBase): + + def test_hello(self): + chain_spec = ChainSpec('evm', 'bloxberg', 8996, 'foo') + backend = SyncerBackend(chain_spec, 'foo') + + +if __name__ == '__main__': + unittest.main()