Add filtering

This commit is contained in:
nolash 2021-02-17 12:44:35 +01:00
parent 2f22d6df1a
commit da54a077e5
Signed by: lash
GPG Key ID: 21D2E7BB88C2A746
19 changed files with 243 additions and 212 deletions

View File

@ -2,6 +2,9 @@
import logging import logging
import uuid import uuid
# third-party imports
from chainlib.chain import ChainSpec
# local imports # local imports
from chainsyncer.db.models.sync import BlockchainSync from chainsyncer.db.models.sync import BlockchainSync
from chainsyncer.db.models.base import SessionBase from chainsyncer.db.models.base import SessionBase
@ -176,7 +179,7 @@ class SyncerBackend:
@staticmethod @staticmethod
def live(chain, block_height): def live(chain_spec, block_height):
"""Creates a new open-ended syncer session starting at the given block height. """Creates a new open-ended syncer session starting at the given block height.
:param chain: Chain spec of chain that syncer is running for. :param chain: Chain spec of chain that syncer is running for.
@ -188,13 +191,13 @@ class SyncerBackend:
""" """
object_id = None object_id = None
session = SessionBase.create_session() session = SessionBase.create_session()
o = BlockchainSync(chain, block_height, 0, None) o = BlockchainSync(str(chain_spec), block_height, 0, None)
session.add(o) session.add(o)
session.commit() session.commit()
object_id = o.id object_id = o.id
session.close() session.close()
return SyncerBackend(chain, object_id) return SyncerBackend(chain_spec, object_id)
class MemBackend: class MemBackend:

View File

@ -1,16 +0,0 @@
class Block:
def __init__(self, hsh, obj):
self.hash = hsh
self.obj = obj
def tx(self, idx):
return NotImplementedError
def number(self):
return NotImplementedError

View File

@ -1,52 +0,0 @@
import json
from chainsyncer.client import translate
from chainsyncer.client.block import Block
from chainsyncer.client.tx import Tx
translations = {
'block_number': translate.hex_to_int,
'get_block': json.dumps,
'number': translate.hex_to_int,
}
class EVMResponse:
def __init__(self, item, response_object):
self.response_object = response_object
self.item = item
self.fn = translations[self.item]
def get_error(self):
return self.response_object.get('error')
def get_result(self):
r = self.fn(self.response_object.get('result'))
if r == 'null':
return None
return r
class EVMTx(Tx):
def __init__(self, block, tx_number, obj):
super(EVMTx, self).__init__(block, tx_number, obj)
class EVMBlock(Block):
def tx(self, idx):
o = self.obj['transactions'][idx]
return Tx(self, idx, o)
def number(self):
return translate.hex_to_int(self.obj['number'])
def __str__(self):
return str('block {} {}'.format(self.number(), self.hash))

View File

@ -1,90 +0,0 @@
# standard imports
import logging
import uuid
import json
# third-party imports
import websocket
from hexathon import add_0x
# local imports
from .response import EVMResponse
from chainsyncer.error import RequestError
from chainsyncer.client.evm.response import EVMBlock
logg = logging.getLogger()
class EVMWebsocketClient:
def __init__(self, url):
self.url = url
self.conn = websocket.create_connection(url)
def __del__(self):
self.conn.close()
def block_number(self):
req_id = str(uuid.uuid4())
req = {
'jsonrpc': '2.0',
'method': 'eth_blockNumber',
'id': str(req_id),
'params': [],
}
self.conn.send(json.dumps(req))
r = self.conn.recv()
res = EVMResponse('block_number', json.loads(r))
err = res.get_error()
if err != None:
raise RequestError(err)
return res.get_result()
def block_by_integer(self, n):
req_id = str(uuid.uuid4())
nhx = '0x' + n.to_bytes(8, 'big').hex()
req = {
'jsonrpc': '2.0',
'method': 'eth_getBlockByNumber',
'id': str(req_id),
'params': [nhx, False],
}
self.conn.send(json.dumps(req))
r = self.conn.recv()
res = EVMResponse('get_block', json.loads(r))
err = res.get_error()
if err != None:
raise RequestError(err)
j = res.get_result()
if j == None:
return None
o = json.loads(j)
return EVMBlock(o['hash'], o)
def block_by_hash(self, hx_in):
req_id = str(uuid.uuid4())
hx = add_0x(hx_in)
req ={
'jsonrpc': '2.0',
'method': 'eth_getBlockByHash',
'id': str(req_id),
'params': [hx, False],
}
self.conn.send(json.dumps(req))
r = self.conn.recv()
res = EVMResponse('get_block', json.loads(r))
err = res.get_error()
if err != None:
raise RequestError(err)
j = res.get_result()
if j == None:
return None
o = json.loads(j)
return EVMBlock(o['hash'], o)

View File

@ -1,10 +0,0 @@
# third-party imports
from hexathon import strip_0x
def hex_to_int(hx, endianness='big'):
hx = strip_0x(hx)
if len(hx) % 2 == 1:
hx = '0' + hx
b = bytes.fromhex(hx)
return int.from_bytes(b, endianness)

View File

@ -1,6 +0,0 @@
class Tx:
def __init__(self, block, tx_number, obj):
self.block = block
self.tx_number = tx_number
self.obj = obj

View File

@ -0,0 +1,40 @@
# standard imports
import hashlib
# third-party imports
from sqlalchemy import Column, String, Integer, BLOB
from sqlalchemy.ext.hybrid import hybrid_property, hybrid_method
# local imports
from .base import SessionBase
zero_digest = '{:<064s'.format('0')
class BlockchainSyncFilter(SessionBase):
__tablename__ = 'chain_sync_filter'
chain_sync_id = Column(Integer, ForeignKey='chain_sync.id')
flags = Column(BLOB)
digest = Column(String)
count = Column(Integer)
@staticmethod
def set(self, names):
def __init__(self, names, chain_sync, digest=None):
if len(names) == 0:
digest = zero_digest
elif digest == None:
h = hashlib.new('sha256')
for n in names:
h.update(n.encode('utf-8') + b'\x00')
z = h.digest()
digest = z.hex()
self.digest = digest
self.count = len(names)
self.flags = bytearray((len(names) -1 ) / 8 + 1)
self.chain_sync_id = chain_sync.id

View File

@ -3,6 +3,15 @@ import uuid
import logging import logging
import time import time
# external imports
from chainlib.eth.block import (
block_by_number,
Block,
)
# local imports
from chainsyncer.filter import SyncFilter
logg = logging.getLogger() logg = logging.getLogger()
@ -19,7 +28,7 @@ class Syncer:
self.cursor = None self.cursor = None
self.running = True self.running = True
self.backend = backend self.backend = backend
self.filter = [] self.filter = SyncFilter()
self.progress_callback = progress_callback self.progress_callback = progress_callback
@ -33,64 +42,63 @@ class Syncer:
def add_filter(self, f): def add_filter(self, f):
self.filter.append(f) self.filter.add(f)
class MinedSyncer(Syncer): class BlockSyncer(Syncer):
def __init__(self, backend, progress_callback): def __init__(self, backend, progress_callback=noop_progress):
super(MinedSyncer, self).__init__(backend, progress_callback) super(BlockSyncer, self).__init__(backend, progress_callback)
def loop(self, interval, getter): def loop(self, interval, conn):
g = self.backend.get() g = self.backend.get()
last_tx = g[1] last_tx = g[1]
last_block = g[0] last_block = g[0]
self.progress_callback('loop started', last_block, last_tx) self.progress_callback('loop started', last_block, last_tx)
while self.running and Syncer.running_global: while self.running and Syncer.running_global:
while True: while True:
block = self.get(getter) try:
if block == None: block = self.get(conn)
except Exception:
break break
last_block = block.number last_block = block.number
self.process(getter, block) self.process(conn, block)
start_tx = 0 start_tx = 0
self.progress_callback('processed block {}'.format(self.backend.get()), last_block, last_tx) self.progress_callback('processed block {}'.format(self.backend.get()), last_block, last_tx)
time.sleep(self.yield_delay) time.sleep(self.yield_delay)
#self.progress_callback('loop ended', last_block + 1, last_tx) self.progress_callback('loop ended', last_block + 1, last_tx)
time.sleep(interval) time.sleep(interval)
class HeadSyncer(MinedSyncer): class HeadSyncer(BlockSyncer):
def __init__(self, backend, progress_callback): def __init__(self, backend, progress_callback=noop_progress):
super(HeadSyncer, self).__init__(backend, progress_callback) super(HeadSyncer, self).__init__(backend, progress_callback)
def process(self, getter, block): def process(self, conn, block):
logg.debug('process block {}'.format(block)) logg.debug('process block {}'.format(block))
i = 0 i = 0
tx = None tx = None
while True: while True:
try: try:
#self.filter[0].handle(getter, block, None)
tx = block.tx(i) tx = block.tx(i)
self.progress_callback('processing {}'.format(repr(tx)), block.number, i) self.progress_callback('processing {}'.format(repr(tx)), block.number, i)
self.backend.set(block.number, i) self.backend.set(block.number, i)
for f in self.filter: self.filter.apply(conn, block, tx)
f.handle(getter, block, tx)
self.progress_callback('applied filter {} on {}'.format(f.name(), repr(tx)), block.number, i)
except IndexError as e: except IndexError as e:
self.backend.set(block.number + 1, 0) self.backend.set(block.number + 1, 0)
break break
i += 1 i += 1
def get(self, getter): def get(self, conn):
(block_number, tx_number) = self.backend.get() (block_number, tx_number) = self.backend.get()
block_hash = [] block_hash = []
uu = uuid.uuid4() o = block_by_number(block_number)
res = getter.get(block_number) r = conn.do(o)
logg.debug('get {}'.format(res)) b = Block(r)
logg.debug('get {}'.format(b))
return res return b

34
chainsyncer/filter.py Normal file
View File

@ -0,0 +1,34 @@
# standard imports
import logging
logg = logging.getLogger(__name__)
class SyncFilter:
def __init__(self, safe=True):
self.safe = safe
self.filters = []
def add(self, fltr):
if getattr(fltr, 'filter') == None:
raise ValueError('filter object must implement have method filter')
logg.debug('added filter {}'.format(str(fltr)))
self.filters.append(fltr)
def apply(self, conn, block, tx):
for f in self.filters:
logg.debug('applying filter {}'.format(str(f)))
f.filter(conn, block, tx)
class NoopFilter(SyncFilter):
def filter(self, conn, block, tx):
logg.debug('noop filter :received\n{} {}'.format(block, tx))
def __str__(self):
return 'noopfilter'

View File

@ -7,14 +7,19 @@ import argparse
import sys import sys
import re import re
# third-party imports # external imports
import confini import confini
from chainlib.eth.connection import HTTPConnection from chainlib.eth.connection import HTTPConnection
from chainlib.eth.block import block_latest
from chainlib.chain import ChainSpec
# local imports
from chainsyncer.driver import HeadSyncer from chainsyncer.driver import HeadSyncer
from chainsyncer.db import dsn_from_config from chainsyncer.db import dsn_from_config
from chainsyncer.db.models.base import SessionBase from chainsyncer.db.models.base import SessionBase
from chainsyncer.backend import SyncerBackend from chainsyncer.backend import SyncerBackend
from chainsyncer.error import LoopDone from chainsyncer.error import LoopDone
from chainsyncer.filter import NoopFilter
logging.basicConfig(level=logging.WARNING) logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger() logg = logging.getLogger()
@ -39,6 +44,7 @@ argparser.add_argument('-c', type=str, default=config_dir, help='config root to
argparser.add_argument('-i', '--chain-spec', type=str, dest='i', help='chain spec') argparser.add_argument('-i', '--chain-spec', type=str, dest='i', help='chain spec')
argparser.add_argument('--abi-dir', dest='abi_dir', type=str, help='Directory containing bytecode and abi') argparser.add_argument('--abi-dir', dest='abi_dir', type=str, help='Directory containing bytecode and abi')
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration') argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
argparser.add_argument('--offset', type=int, help='block number to start sync')
argparser.add_argument('-q', type=str, default='cic-eth', help='celery queue to submit transaction tasks to') argparser.add_argument('-q', type=str, default='cic-eth', help='celery queue to submit transaction tasks to')
argparser.add_argument('-v', help='be verbose', action='store_true') argparser.add_argument('-v', help='be verbose', action='store_true')
argparser.add_argument('-vv', help='be more verbose', action='store_true') argparser.add_argument('-vv', help='be more verbose', action='store_true')
@ -55,7 +61,7 @@ config = confini.Config(config_dir, args.env_prefix)
config.process() config.process()
# override args # override args
args_override = { args_override = {
'CHAIN_SPEC': getattr(args, 'i'), 'SYNCER_CHAIN_SPEC': getattr(args, 'i'),
'ETH_PROVIDER': getattr(args, 'p'), 'ETH_PROVIDER': getattr(args, 'p'),
} }
config.dict_override(args_override, 'cli flag') config.dict_override(args_override, 'cli flag')
@ -70,19 +76,29 @@ queue = args.q
dsn = dsn_from_config(config) dsn = dsn_from_config(config)
SessionBase.connect(dsn) SessionBase.connect(dsn)
c = HTTPConnection(config.get('ETH_PROVIDER')) conn = HTTPConnection(config.get('ETH_PROVIDER'))
chain = config.get('CHAIN_SPEC')
chain = ChainSpec.from_chain_str(config.get('SYNCER_CHAIN_SPEC'))
block_offset = args.offset
def main(): def main():
block_offset = c.block_number() global block_offset
if block_offset == None:
o = block_latest()
r = conn.do(o)
block_offset = r[1]
syncer_backend = SyncerBackend.live(chain, 0) syncer_backend = SyncerBackend.live(chain, 0)
syncer = HeadSyncer(syncer_backend) syncer = HeadSyncer(syncer_backend)
fltr = NoopFilter()
syncer.add_filter(fltr)
try: try:
logg.debug('block offset {} {}'.format(block_offset, c)) logg.debug('block offset {}'.format(block_offset))
syncer.loop(int(config.get('SYNCER_LOOP_INTERVAL')), c) syncer.loop(int(config.get('SYNCER_LOOP_INTERVAL')), conn)
except LoopDone as e: except LoopDone as e:
sys.stderr.write("sync '{}' done at block {}\n".format(args.mode, e)) sys.stderr.write("sync '{}' done at block {}\n".format(args.mode, e))

View File

@ -1,2 +1,3 @@
[syncer] [syncer]
loop_interval = 1 loop_interval = 1
chain_spec =

View File

@ -1,8 +1,6 @@
psycopg2==2.8.6 psycopg2==2.8.6
SQLAlchemy==1.3.20 SQLAlchemy==1.3.20
py-evm==0.3.0a20 confini~=0.3.6b2
eth-tester==0.5.0b3
confini==0.3.6b2
semver==2.13.0 semver==2.13.0
hexathon==0.0.1a2 hexathon~=0.0.1a3
chainlib~=0.0.1a7 chainlib~=0.0.1a13

View File

@ -1,6 +1,6 @@
[metadata] [metadata]
name = chainsyncer name = chainsyncer
version = 0.0.1a5 version = 0.0.1a6
description = Generic blockchain syncer driver description = Generic blockchain syncer driver
author = Louis Holbrook author = Louis Holbrook
author_email = dev@holbrook.no author_email = dev@holbrook.no

View File

@ -6,7 +6,21 @@ CREATE TABLE IF NOT EXISTS chain_sync (
tx_start int not null default 0, tx_start int not null default 0,
block_cursor int not null default 0, block_cursor int not null default 0,
tx_cursor int not null default 0, tx_cursor int not null default 0,
flags bytea not null,
num_flags int not null,
block_target int default null, block_target int default null,
date_created timestamp not null, date_created timestamp not null,
date_updated timestamp default null date_updated timestamp default null
); );
DROP TABLE chain_sync_filter;
CREATE TABLE IF NOT EXISTS chain_sync_filter (
id serial primary key not null,
chain_sync_id int not null,
flags bytea default null,
count int not null default 0,
digest char(64) not null default '0000000000000000000000000000000000000000000000000000000000000000',
CONSTRAINT fk_chain_sync
FOREIGN KEY(chain_sync_id)
REFERENCES chain_sync(id)
);

13
sql/sqlite/1.sql Normal file
View File

@ -0,0 +1,13 @@
CREATE TABLE IF NOT EXISTS chain_sync (
id serial primary key not null,
blockchain varchar not null,
block_start int not null default 0,
tx_start int not null default 0,
block_cursor int not null default 0,
tx_cursor int not null default 0,
flags bytea not null,
num_flags int not null,
block_target int default null,
date_created timestamp not null,
date_updated timestamp default null
);

10
sql/sqlite/2.sql Normal file
View File

@ -0,0 +1,10 @@
CREATE TABLE IF NOT EXISTS chain_sync_filter (
id serial primary key not null,
chain_sync_id int not null,
flags bytea default null,
count int not null default 0,
digest char(64) not null default '0000000000000000000000000000000000000000000000000000000000000000',
CONSTRAINT fk_chain_sync
FOREIGN KEY(chain_sync_id)
REFERENCES chain_sync(id)
);

46
tests/base.py Normal file
View File

@ -0,0 +1,46 @@
import unittest
import tempfile
import os
#import pysqlite
from chainsyncer.db import dsn_from_config
from chainsyncer.db.models.base import SessionBase
script_dir = os.path.realpath(os.path.dirname(__file__))
class TestBase(unittest.TestCase):
def setUp(self):
db_dir = tempfile.mkdtemp()
self.db_path = os.path.join(db_dir, 'test.sqlite')
config = {
'DATABASE_ENGINE': 'sqlite',
'DATABASE_DRIVER': 'pysqlite',
'DATABASE_NAME': self.db_path,
}
dsn = dsn_from_config(config)
SessionBase.poolable = False
SessionBase.transactional = False
SessionBase.procedural = False
SessionBase.connect(dsn, debug=True)
f = open(os.path.join(script_dir, '..', 'sql', 'sqlite', '1.sql'), 'r')
sql = f.read()
f.close()
conn = SessionBase.engine.connect()
conn.execute(sql)
f = open(os.path.join(script_dir, '..', 'sql', 'sqlite', '2.sql'), 'r')
sql = f.read()
f.close()
conn = SessionBase.engine.connect()
conn.execute(sql)
def tearDown(self):
SessionBase.disconnect()
os.unlink(self.db_path)

22
tests/test_basic.py Normal file
View File

@ -0,0 +1,22 @@
# standard imports
import unittest
# external imports
from chainlib.chain import ChainSpec
# local imports
from chainsyncer.backend import SyncerBackend
# testutil imports
from tests.base import TestBase
class TestBasic(TestBase):
def test_hello(self):
chain_spec = ChainSpec('evm', 'bloxberg', 8996, 'foo')
backend = SyncerBackend(chain_spec, 'foo')
if __name__ == '__main__':
unittest.main()