From 81dd5753e84398fb1680ea1d95fdfd92a1cd420d Mon Sep 17 00:00:00 2001 From: nolash Date: Mon, 22 Feb 2021 10:57:07 +0100 Subject: [PATCH 01/19] Add filter to live instantiator, add filter start state --- chainsyncer/backend.py | 51 ++++++++++++++++++--- chainsyncer/db/models/base.py | 69 +++++++++++++++++++++++----- chainsyncer/db/models/filter.py | 79 ++++++++++++++++++++++++--------- chainsyncer/db/models/sync.py | 24 +++++----- chainsyncer/filter.py | 5 +++ sql/sqlite/1.sql | 14 +++--- sql/sqlite/2.sql | 7 +-- tests/base.py | 12 ++++- tests/test_database.py | 55 +++++++++++++++++++++++ 9 files changed, 257 insertions(+), 59 deletions(-) create mode 100644 tests/test_database.py diff --git a/chainsyncer/backend.py b/chainsyncer/backend.py index 3380bfc..5953ff2 100644 --- a/chainsyncer/backend.py +++ b/chainsyncer/backend.py @@ -7,6 +7,7 @@ from chainlib.chain import ChainSpec # local imports from chainsyncer.db.models.sync import BlockchainSync +from chainsyncer.db.models.filter import BlockchainSyncFilter from chainsyncer.db.models.base import SessionBase logg = logging.getLogger() @@ -23,6 +24,7 @@ class SyncerBackend: def __init__(self, chain_spec, object_id): self.db_session = None self.db_object = None + self.db_object_filter = None self.chain_spec = chain_spec self.object_id = object_id self.connect() @@ -34,9 +36,17 @@ class SyncerBackend: """ if self.db_session == None: self.db_session = SessionBase.create_session() + q = self.db_session.query(BlockchainSync) q = q.filter(BlockchainSync.id==self.object_id) self.db_object = q.first() + + if self.db_object != None: + qtwo = self.db_session.query(BlockchainSyncFilter) + qtwo = qtwo.join(BlockchainSync) + qtwo = qtwo.filter(BlockchainSync.id==self.db_object.id) + self.db_object_filter = qtwo.first() + if self.db_object == None: raise ValueError('sync entry with id {} not found'.format(self.object_id)) @@ -44,6 +54,8 @@ class SyncerBackend: def disconnect(self): """Commits state of sync to backend. """ + if self.db_object_filter != None: + self.db_session.add(self.db_object_filter) self.db_session.add(self.db_object) self.db_session.commit() self.db_session.close() @@ -67,8 +79,9 @@ class SyncerBackend: """ self.connect() pair = self.db_object.cursor() + filter_state = self.db_object_filter.filter() self.disconnect() - return pair + return (pair, filter_state,) def set(self, block_height, tx_height): @@ -82,8 +95,9 @@ class SyncerBackend: """ self.connect() pair = self.db_object.set(block_height, tx_height) + filter_state = self.db_object_filter.filter() self.disconnect() - return pair + return (pair, filter_state,) def start(self): @@ -94,8 +108,9 @@ class SyncerBackend: """ self.connect() pair = self.db_object.start() + filter_state = self.db_object_filter.start() self.disconnect() - return pair + return (pair, filter_state,) def target(self): @@ -106,12 +121,13 @@ class SyncerBackend: """ self.connect() target = self.db_object.target() + filter_state = self.db_object_filter.target() self.disconnect() - return target + return (target, filter_target,) @staticmethod - def first(chain): + def first(chain_spec): """Returns the model object of the most recent syncer in backend. :param chain: Chain spec of chain that syncer is running for. @@ -119,7 +135,12 @@ class SyncerBackend: :returns: Last syncer object :rtype: cic_eth.db.models.BlockchainSync """ - return BlockchainSync.first(chain) + #return BlockchainSync.first(str(chain_spec)) + object_id = BlockchainSync.first(str(chain_spec)) + if object_id == None: + return None + return SyncerBackend(chain_spec, object_id) + @staticmethod @@ -193,15 +214,30 @@ class SyncerBackend: """ object_id = None session = SessionBase.create_session() + o = BlockchainSync(str(chain_spec), block_height, 0, None) session.add(o) - session.commit() + session.flush() object_id = o.id + + of = BlockchainSyncFilter(o) + session.add(of) + session.commit() + session.close() return SyncerBackend(chain_spec, object_id) + def register_filter(self, name): + self.connect() + if self.db_object_filter == None: + self.db_object_filter = BlockchainSyncFilter(self.db_object) + self.db_object_filter.add(name) + self.db_session.add(self.db_object_filter) + self.disconnect() + + class MemBackend: def __init__(self, chain_spec, object_id): @@ -209,6 +245,7 @@ class MemBackend: self.chain_spec = chain_spec self.block_height = 0 self.tx_height = 0 + self.flags = 0 self.db_session = None diff --git a/chainsyncer/db/models/base.py b/chainsyncer/db/models/base.py index 153906a..db570df 100644 --- a/chainsyncer/db/models/base.py +++ b/chainsyncer/db/models/base.py @@ -1,8 +1,18 @@ +# stanard imports +import logging + # third-party imports from sqlalchemy import Column, Integer from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker +from sqlalchemy.pool import ( + StaticPool, + QueuePool, + AssertionPool, + ) + +logg = logging.getLogger() Model = declarative_base(name='Model') @@ -21,7 +31,11 @@ class SessionBase(Model): transactional = True """Whether the database backend supports query transactions. Should be explicitly set by initialization code""" poolable = True - """Whether the database backend supports query transactions. Should be explicitly set by initialization code""" + """Whether the database backend supports connection pools. Should be explicitly set by initialization code""" + procedural = True + """Whether the database backend supports stored procedures""" + localsessions = {} + """Contains dictionary of sessions initiated by db model components""" @staticmethod @@ -40,7 +54,7 @@ class SessionBase(Model): @staticmethod - def connect(dsn, debug=False): + def connect(dsn, pool_size=8, debug=False): """Create new database connection engine and connect to database backend. :param dsn: DSN string defining connection. @@ -48,14 +62,28 @@ class SessionBase(Model): """ e = None if SessionBase.poolable: - e = create_engine( - dsn, - max_overflow=50, - pool_pre_ping=True, - pool_size=20, - pool_recycle=10, - echo=debug, - ) + poolclass = QueuePool + if pool_size > 1: + e = create_engine( + dsn, + max_overflow=pool_size*3, + pool_pre_ping=True, + pool_size=pool_size, + pool_recycle=60, + poolclass=poolclass, + echo=debug, + ) + else: + if debug: + poolclass = AssertionPool + else: + poolclass = StaticPool + + e = create_engine( + dsn, + poolclass=poolclass, + echo=debug, + ) else: e = create_engine( dsn, @@ -71,3 +99,24 @@ class SessionBase(Model): """ SessionBase.engine.dispose() SessionBase.engine = None + + + @staticmethod + def bind_session(session=None): + localsession = session + if localsession == None: + localsession = SessionBase.create_session() + localsession_key = str(id(localsession)) + logg.debug('creating new session {}'.format(localsession_key)) + SessionBase.localsessions[localsession_key] = localsession + return localsession + + + @staticmethod + def release_session(session=None): + session.flush() + session_key = str(id(session)) + if SessionBase.localsessions.get(session_key) != None: + logg.debug('destroying session {}'.format(session_key)) + session.commit() + session.close() diff --git a/chainsyncer/db/models/filter.py b/chainsyncer/db/models/filter.py index 89bd564..656ed62 100644 --- a/chainsyncer/db/models/filter.py +++ b/chainsyncer/db/models/filter.py @@ -1,40 +1,79 @@ # standard imports +import logging import hashlib -# third-party imports -from sqlalchemy import Column, String, Integer, BLOB +# external imports +from sqlalchemy import Column, String, Integer, BLOB, ForeignKey from sqlalchemy.ext.hybrid import hybrid_property, hybrid_method # local imports from .base import SessionBase +from .sync import BlockchainSync - -zero_digest = '{:<064s'.format('0') +zero_digest = bytearray(32) +logg = logging.getLogger(__name__) class BlockchainSyncFilter(SessionBase): __tablename__ = 'chain_sync_filter' - chain_sync_id = Column(Integer, ForeignKey='chain_sync.id') + chain_sync_id = Column(Integer, ForeignKey('chain_sync.id')) + flags_start = Column(BLOB) flags = Column(BLOB) - digest = Column(String) + digest = Column(BLOB) count = Column(Integer) - @staticmethod - def set(self, names): - - def __init__(self, names, chain_sync, digest=None): - if len(names) == 0: - digest = zero_digest - elif digest == None: - h = hashlib.new('sha256') - for n in names: - h.update(n.encode('utf-8') + b'\x00') - z = h.digest() - digest = z.hex() + def __init__(self, chain_sync, count=0, flags=None, digest=zero_digest): self.digest = digest - self.count = len(names) - self.flags = bytearray((len(names) -1 ) / 8 + 1) + self.count = count + + if flags == None: + flags = bytearray(0) + self.flags_start = flags + self.flags = flags + self.chain_sync_id = chain_sync.id + + + def add(self, name): + h = hashlib.new('sha256') + h.update(self.digest) + h.update(name.encode('utf-8')) + z = h.digest() + + old_byte_count = int((self.count - 1) / 8 + 1) + new_byte_count = int((self.count) / 8 + 1) + + logg.debug('old new {} {}'.format(old_byte_count, new_byte_count)) + if old_byte_count != new_byte_count: + self.flags = bytearray(1) + self.flags + self.count += 1 + self.digest = z + + + def start(self): + return self.flags_start + + + def cursor(self): + return self.flags_current + + + def clear(self): + self.flags = 0 + + + def target(self): + n = 0 + for i in range(self.count): + n |= 2 << i + return n + + + def set(self, n): + if self.flags & n > 0: + SessionBase.release_session(session) + raise AttributeError('Filter bit already set') + r.flags |= n diff --git a/chainsyncer/db/models/sync.py b/chainsyncer/db/models/sync.py index 6dab033..4f2f156 100644 --- a/chainsyncer/db/models/sync.py +++ b/chainsyncer/db/models/sync.py @@ -41,19 +41,23 @@ class BlockchainSync(SessionBase): :type chain: str :param session: Session to use. If not specified, a separate session will be created for this method only. :type session: SqlAlchemy Session - :returns: True if sync record found - :rtype: bool + :returns: Database primary key id of sync record + :rtype: number|None """ - local_session = False - if session == None: - session = SessionBase.create_session() - local_session = True + session = SessionBase.bind_session(session) + q = session.query(BlockchainSync.id) q = q.filter(BlockchainSync.blockchain==chain) o = q.first() - if local_session: - session.close() - return o == None + + if o == None: + return None + + sync_id = o.id + + SessionBase.release_session(session) + + return sync_id @staticmethod @@ -165,4 +169,4 @@ class BlockchainSync(SessionBase): self.tx_cursor = tx_start self.block_target = block_target self.date_created = datetime.datetime.utcnow() - self.date_modified = datetime.datetime.utcnow() + self.date_updated = datetime.datetime.utcnow() diff --git a/chainsyncer/filter.py b/chainsyncer/filter.py index e488b8c..263bbe5 100644 --- a/chainsyncer/filter.py +++ b/chainsyncer/filter.py @@ -9,6 +9,7 @@ from .error import BackendError logg = logging.getLogger(__name__) + class SyncFilter: def __init__(self, backend, safe=True): @@ -32,11 +33,15 @@ class SyncFilter: except sqlalchemy.exc.TimeoutError as e: self.backend.disconnect() raise BackendError('database connection fail: {}'.format(e)) + i = 0 for f in self.filters: + i += 1 logg.debug('applying filter {}'.format(str(f))) f.filter(conn, block, tx, self.backend.db_session) + self.backend.set_filter() self.backend.disconnect() + class NoopFilter: def filter(self, conn, block, tx, db_session=None): diff --git a/sql/sqlite/1.sql b/sql/sqlite/1.sql index fa9f6fa..0f115d4 100644 --- a/sql/sqlite/1.sql +++ b/sql/sqlite/1.sql @@ -1,13 +1,11 @@ CREATE TABLE IF NOT EXISTS chain_sync ( - id serial primary key not null, + id integer primary key autoincrement, blockchain varchar not null, - block_start int not null default 0, - tx_start int not null default 0, - block_cursor int not null default 0, - tx_cursor int not null default 0, - flags bytea not null, - num_flags int not null, - block_target int default null, + block_start integer not null default 0, + tx_start integer not null default 0, + block_cursor integer not null default 0, + tx_cursor integer not null default 0, + block_target integer default null, date_created timestamp not null, date_updated timestamp default null ); diff --git a/sql/sqlite/2.sql b/sql/sqlite/2.sql index c43624e..9e42802 100644 --- a/sql/sqlite/2.sql +++ b/sql/sqlite/2.sql @@ -1,8 +1,9 @@ CREATE TABLE IF NOT EXISTS chain_sync_filter ( - id serial primary key not null, - chain_sync_id int not null, + id integer primary key autoincrement not null, + chain_sync_id integer not null, flags bytea default null, - count int not null default 0, + flags_start bytea default null, + count integer not null default 0, digest char(64) not null default '0000000000000000000000000000000000000000000000000000000000000000', CONSTRAINT fk_chain_sync FOREIGN KEY(chain_sync_id) diff --git a/tests/base.py b/tests/base.py index 6729eb9..2d0a99c 100644 --- a/tests/base.py +++ b/tests/base.py @@ -1,13 +1,21 @@ +# standard imports +import logging import unittest import tempfile import os #import pysqlite +# external imports +from chainlib.chain import ChainSpec + +# local imports from chainsyncer.db import dsn_from_config from chainsyncer.db.models.base import SessionBase script_dir = os.path.realpath(os.path.dirname(__file__)) +logging.basicConfig(level=logging.DEBUG) + class TestBase(unittest.TestCase): @@ -23,7 +31,7 @@ class TestBase(unittest.TestCase): SessionBase.poolable = False SessionBase.transactional = False SessionBase.procedural = False - SessionBase.connect(dsn, debug=True) + SessionBase.connect(dsn, debug=False) f = open(os.path.join(script_dir, '..', 'sql', 'sqlite', '1.sql'), 'r') sql = f.read() @@ -39,6 +47,8 @@ class TestBase(unittest.TestCase): conn = SessionBase.engine.connect() conn.execute(sql) + self.chain_spec = ChainSpec('evm', 'foo', 42, 'bar') + def tearDown(self): SessionBase.disconnect() os.unlink(self.db_path) diff --git a/tests/test_database.py b/tests/test_database.py new file mode 100644 index 0000000..22f919b --- /dev/null +++ b/tests/test_database.py @@ -0,0 +1,55 @@ +# standard imports +import unittest + +# external imports +from chainlib.chain import ChainSpec + +# local imports +from chainsyncer.db.models.base import SessionBase +from chainsyncer.db.models.filter import BlockchainSyncFilter +from chainsyncer.backend import SyncerBackend + +# testutil imports +from tests.base import TestBase + +class TestDatabase(TestBase): + + + def test_backend_live(self): + s = SyncerBackend.live(self.chain_spec, 42) + self.assertEqual(s.object_id, 1) + backend = SyncerBackend.first(self.chain_spec) + #SyncerBackend(self.chain_spec, sync_id) + self.assertEqual(backend.object_id, 1) + + bogus_chain_spec = ChainSpec('bogus', 'foo', 13, 'baz') + sync_id = SyncerBackend.first(bogus_chain_spec) + self.assertIsNone(sync_id) + + + def test_backend_filter(self): + s = SyncerBackend.live(self.chain_spec, 42) + + s.connect() + filter_id = s.db_object_filter.id + s.disconnect() + + session = SessionBase.create_session() + o = session.query(BlockchainSyncFilter).get(filter_id) + self.assertEqual(len(o.flags), 0) + session.close() + + for i in range(9): + s.register_filter(str(i)) + + s.connect() + filter_id = s.db_object_filter.id + s.disconnect() + + session = SessionBase.create_session() + o = session.query(BlockchainSyncFilter).get(filter_id) + self.assertEqual(len(o.flags), 2) + session.close() + +if __name__ == '__main__': + unittest.main() From 5cf8b4f29672576d77544f95575dd75583aad068 Mon Sep 17 00:00:00 2001 From: nolash Date: Mon, 22 Feb 2021 13:10:38 +0100 Subject: [PATCH 02/19] Rehabilitate start, resume --- chainsyncer/backend.py | 4 ++-- chainsyncer/db/models/filter.py | 17 ++++++++++++----- chainsyncer/db/models/sync.py | 1 + tests/test_database.py | 28 ++++++++++++++++++++++++++++ 4 files changed, 43 insertions(+), 7 deletions(-) diff --git a/chainsyncer/backend.py b/chainsyncer/backend.py index 5953ff2..a5da1f4 100644 --- a/chainsyncer/backend.py +++ b/chainsyncer/backend.py @@ -79,7 +79,7 @@ class SyncerBackend: """ self.connect() pair = self.db_object.cursor() - filter_state = self.db_object_filter.filter() + filter_state = self.db_object_filter.cursor() self.disconnect() return (pair, filter_state,) @@ -95,7 +95,7 @@ class SyncerBackend: """ self.connect() pair = self.db_object.set(block_height, tx_height) - filter_state = self.db_object_filter.filter() + filter_state = self.db_object_filter.cursor() self.disconnect() return (pair, filter_state,) diff --git a/chainsyncer/db/models/filter.py b/chainsyncer/db/models/filter.py index 656ed62..33964b7 100644 --- a/chainsyncer/db/models/filter.py +++ b/chainsyncer/db/models/filter.py @@ -54,11 +54,11 @@ class BlockchainSyncFilter(SessionBase): def start(self): - return self.flags_start + return int.from_bytes(self.flags_start, 'big') def cursor(self): - return self.flags_current + return int.from_bytes(self.flags, 'big') def clear(self): @@ -68,12 +68,19 @@ class BlockchainSyncFilter(SessionBase): def target(self): n = 0 for i in range(self.count): - n |= 2 << i + n |= (1 << self.count) - 1 return n def set(self, n): - if self.flags & n > 0: + if n > self.count: + raise IndexError('bit flag out of range') + + b = 1 << (n % 8) + i = int((n - 1) / 8 + 1) + if self.flags[i] & b > 0: SessionBase.release_session(session) raise AttributeError('Filter bit already set') - r.flags |= n + flags = bytearray(self.flags) + flags[i] |= b + self.flags = flags diff --git a/chainsyncer/db/models/sync.py b/chainsyncer/db/models/sync.py index 4f2f156..aafdd74 100644 --- a/chainsyncer/db/models/sync.py +++ b/chainsyncer/db/models/sync.py @@ -126,6 +126,7 @@ class BlockchainSync(SessionBase): """ self.block_cursor = block_height self.tx_cursor = tx_height + return (self.block_cursor, self.tx_cursor,) def cursor(self): diff --git a/tests/test_database.py b/tests/test_database.py index 22f919b..ac3baf2 100644 --- a/tests/test_database.py +++ b/tests/test_database.py @@ -1,5 +1,6 @@ # standard imports import unittest +import logging # external imports from chainlib.chain import ChainSpec @@ -12,6 +13,9 @@ from chainsyncer.backend import SyncerBackend # testutil imports from tests.base import TestBase +logg = logging.getLogger() + + class TestDatabase(TestBase): @@ -49,7 +53,31 @@ class TestDatabase(TestBase): session = SessionBase.create_session() o = session.query(BlockchainSyncFilter).get(filter_id) self.assertEqual(len(o.flags), 2) + + t = o.target() + self.assertEqual(t, (1 << 9) - 1) + + for i in range(9): + o.set(i) + + c = o.cursor() + self.assertEqual(c, t) + session.close() + + def test_backend_resume(self): + s = SyncerBackend.live(self.chain_spec, 42) + s.register_filter('foo') + s.register_filter('bar') + s.register_filter('baz') + + s.set(42, 13) + + s = SyncerBackend.first(self.chain_spec) + logg.debug('start {}'.format(s)) + self.assertEqual(s.get(), ((42,13), 0)) + + if __name__ == '__main__': unittest.main() From 37d0a363034f35be020f6495cd6d8e34e1b00b7d Mon Sep 17 00:00:00 2001 From: nolash Date: Mon, 22 Feb 2021 18:40:41 +0100 Subject: [PATCH 03/19] Add test for resume tx, add flag state to resume --- chainsyncer/backend.py | 68 ++++++++++++++++++++++++--------- chainsyncer/db/models/filter.py | 17 +++++---- chainsyncer/db/models/sync.py | 22 +++++------ tests/test_database.py | 46 ++++++++++++++++++++-- 4 files changed, 111 insertions(+), 42 deletions(-) diff --git a/chainsyncer/backend.py b/chainsyncer/backend.py index a5da1f4..fdafda9 100644 --- a/chainsyncer/backend.py +++ b/chainsyncer/backend.py @@ -79,7 +79,7 @@ class SyncerBackend: """ self.connect() pair = self.db_object.cursor() - filter_state = self.db_object_filter.cursor() + (filter_state, count, digest) = self.db_object_filter.cursor() self.disconnect() return (pair, filter_state,) @@ -95,7 +95,7 @@ class SyncerBackend: """ self.connect() pair = self.db_object.set(block_height, tx_height) - filter_state = self.db_object_filter.cursor() + (filter_state, count, digest)= self.db_object_filter.cursor() self.disconnect() return (pair, filter_state,) @@ -108,7 +108,7 @@ class SyncerBackend: """ self.connect() pair = self.db_object.start() - filter_state = self.db_object_filter.start() + (filter_state, count, digest) = self.db_object_filter.start() self.disconnect() return (pair, filter_state,) @@ -121,7 +121,7 @@ class SyncerBackend: """ self.connect() target = self.db_object.target() - filter_state = self.db_object_filter.target() + (filter_state, count, digest) = self.db_object_filter.target() self.disconnect() return (target, filter_target,) @@ -144,7 +144,7 @@ class SyncerBackend: @staticmethod - def initial(chain, block_height): + def initial(chain_spec, target_block_height, start_block_height=0): """Creates a new syncer session and commit its initial state to backend. :param chain: Chain spec of chain that syncer is running for. @@ -154,24 +154,31 @@ class SyncerBackend: :returns: New syncer object :rtype: cic_eth.db.models.BlockchainSync """ + if start_block_height >= target_block_height: + raise ValueError('start block height must be lower than target block height') object_id = None session = SessionBase.create_session() - o = BlockchainSync(chain, 0, 0, block_height) + o = BlockchainSync(str(chain_spec), start_block_height, 0, target_block_height) session.add(o) session.commit() object_id = o.id + + of = BlockchainSyncFilter(o) + session.add(of) + session.commit() + session.close() - return SyncerBackend(chain, object_id) + return SyncerBackend(chain_spec, object_id) @staticmethod - def resume(chain, block_height): + def resume(chain_spec, block_height): """Retrieves and returns all previously unfinished syncer sessions. - :param chain: Chain spec of chain that syncer is running for. - :type chain: cic_registry.chain.ChainSpec + :param chain_spec: Chain spec of chain that syncer is running for. + :type chain_spec: cic_registry.chain.ChainSpec :param block_height: Target block height :type block_height: number :returns: Syncer objects of unfinished syncs @@ -185,16 +192,39 @@ class SyncerBackend: for object_id in BlockchainSync.get_unsynced(session=session): logg.debug('block syncer resume added previously unsynced sync entry id {}'.format(object_id)) - syncers.append(SyncerBackend(chain, object_id)) + syncers.append(SyncerBackend(chain_spec, object_id)) - (block_resume, tx_resume) = BlockchainSync.get_last_live_height(block_height, session=session) - if block_height != block_resume: - o = BlockchainSync(chain, block_resume, tx_resume, block_height) - session.add(o) - session.commit() - object_id = o.id - syncers.append(SyncerBackend(chain, object_id)) - logg.debug('block syncer resume added new sync entry from previous run id {}, start{}:{} target {}'.format(object_id, block_resume, tx_resume, block_height)) + last_live_id = BlockchainSync.get_last_live(block_height, session=session) + logg.debug('last_live_id {}'.format(last_live_id)) + if last_live_id != None: + + q = session.query(BlockchainSync) + o = q.get(last_live_id) + + (block_resume, tx_resume) = o.cursor() + session.flush() + + if block_height != block_resume: + + q = session.query(BlockchainSyncFilter) + q = q.filter(BlockchainSyncFilter.chain_sync_id==last_live_id) + of = q.first() + (flags, count, digest) = of.cursor() + + session.flush() + + o = BlockchainSync(str(chain_spec), block_resume, tx_resume, block_height) + session.add(o) + session.flush() + object_id = o.id + + of = BlockchainSyncFilter(o, count, flags, digest) + session.add(of) + session.commit() + + syncers.append(SyncerBackend(chain_spec, object_id)) + + logg.debug('block syncer resume added new sync entry from previous run id {}, start{}:{} target {}'.format(object_id, block_resume, tx_resume, block_height)) session.close() diff --git a/chainsyncer/db/models/filter.py b/chainsyncer/db/models/filter.py index 33964b7..93cf4c2 100644 --- a/chainsyncer/db/models/filter.py +++ b/chainsyncer/db/models/filter.py @@ -31,6 +31,9 @@ class BlockchainSyncFilter(SessionBase): if flags == None: flags = bytearray(0) + else: # TODO: handle bytes too + bytecount = int((count - 1) / 8 + 1) + 1 + flags = flags.to_bytes(bytecount, 'big') self.flags_start = flags self.flags = flags @@ -54,22 +57,22 @@ class BlockchainSyncFilter(SessionBase): def start(self): - return int.from_bytes(self.flags_start, 'big') + return (int.from_bytes(self.flags_start, 'big'), self.count, self.digest) def cursor(self): - return int.from_bytes(self.flags, 'big') - - - def clear(self): - self.flags = 0 + return (int.from_bytes(self.flags, 'big'), self.count, self.digest) def target(self): n = 0 for i in range(self.count): n |= (1 << self.count) - 1 - return n + return (n, self.count, self.digest) + + + def clear(self): + self.flags = 0 def set(self, n): diff --git a/chainsyncer/db/models/sync.py b/chainsyncer/db/models/sync.py index aafdd74..01c28c1 100644 --- a/chainsyncer/db/models/sync.py +++ b/chainsyncer/db/models/sync.py @@ -61,7 +61,7 @@ class BlockchainSync(SessionBase): @staticmethod - def get_last_live_height(current, session=None): + def get_last_live(current, session=None): """Get the most recent open-ended ("live") syncer record. :param current: Current block number @@ -71,21 +71,19 @@ class BlockchainSync(SessionBase): :returns: Block and transaction number, respectively :rtype: tuple """ - local_session = False - if session == None: - session = SessionBase.create_session() - local_session = True - q = session.query(BlockchainSync) + session = SessionBase.bind_session(session) + + q = session.query(BlockchainSync.id) q = q.filter(BlockchainSync.block_target==None) q = q.order_by(BlockchainSync.date_created.desc()) - o = q.first() - if local_session: - session.close() + object_id = q.first() - if o == None: - return (0, 0) + SessionBase.release_session(session) - return (o.block_cursor, o.tx_cursor) + if object_id == None: + return None + + return object_id[0] @staticmethod diff --git a/tests/test_database.py b/tests/test_database.py index ac3baf2..5066f69 100644 --- a/tests/test_database.py +++ b/tests/test_database.py @@ -54,19 +54,21 @@ class TestDatabase(TestBase): o = session.query(BlockchainSyncFilter).get(filter_id) self.assertEqual(len(o.flags), 2) - t = o.target() + (t, c, d) = o.target() self.assertEqual(t, (1 << 9) - 1) for i in range(9): o.set(i) - c = o.cursor() - self.assertEqual(c, t) + (f, c, d) = o.cursor() + self.assertEqual(f, t) + self.assertEqual(c, 9) + self.assertEqual(d, o.digest) session.close() - def test_backend_resume(self): + def test_backend_retrieve(self): s = SyncerBackend.live(self.chain_spec, 42) s.register_filter('foo') s.register_filter('bar') @@ -77,7 +79,43 @@ class TestDatabase(TestBase): s = SyncerBackend.first(self.chain_spec) logg.debug('start {}'.format(s)) self.assertEqual(s.get(), ((42,13), 0)) + + + def test_backend_initial(self): + with self.assertRaises(ValueError): + s = SyncerBackend.initial(self.chain_spec, 42, 42) + with self.assertRaises(ValueError): + s = SyncerBackend.initial(self.chain_spec, 42, 43) + + s = SyncerBackend.initial(self.chain_spec, 42, 13) + + s.set(43, 13) + + s = SyncerBackend.first(self.chain_spec) + self.assertEqual(s.get(), ((43,13), 0)) + self.assertEqual(s.start(), ((13,0), 0)) + + + def test_backend_resume(self): + s = SyncerBackend.resume(self.chain_spec, 666) + self.assertEqual(len(s), 0) + + s = SyncerBackend.live(self.chain_spec, 42) + original_id = s.object_id + s = SyncerBackend.resume(self.chain_spec, 666) + self.assertEqual(len(s), 1) + resumed_id = s[0].object_id + self.assertEqual(resumed_id, original_id + 1) + + + def test_backend_resume_several(self): + s = SyncerBackend.live(self.chain_spec, 42) + s.set(43, 13) + s = SyncerBackend.resume(self.chain_spec, 666) + s[0].set(123, 2) + s = SyncerBackend.resume(self.chain_spec, 1024) + self.assertEqual(len(s), 2) if __name__ == '__main__': unittest.main() From be405b8376a248051c4e11050912470dcc35b5d1 Mon Sep 17 00:00:00 2001 From: nolash Date: Mon, 22 Feb 2021 20:06:34 +0100 Subject: [PATCH 04/19] Correct multiple resume --- chainsyncer/backend.py | 19 +++++++++++++++---- chainsyncer/db/models/sync.py | 26 ++++++++++++++++++++++---- tests/test_database.py | 32 +++++++++++++++++++++++++++++++- 3 files changed, 68 insertions(+), 9 deletions(-) diff --git a/chainsyncer/backend.py b/chainsyncer/backend.py index fdafda9..4cfde9d 100644 --- a/chainsyncer/backend.py +++ b/chainsyncer/backend.py @@ -121,7 +121,7 @@ class SyncerBackend: """ self.connect() target = self.db_object.target() - (filter_state, count, digest) = self.db_object_filter.target() + (filter_target, count, digest) = self.db_object_filter.target() self.disconnect() return (target, filter_target,) @@ -190,12 +190,22 @@ class SyncerBackend: object_id = None + highest_unsynced_block = 0 + highest_unsynced_tx = 0 + object_id = BlockchainSync.get_last(session=session, live=False) + if object_id != None: + q = session.query(BlockchainSync) + o = q.get(object_id) + (highest_unsynced_block, highest_unsynced_index) = o.cursor() + for object_id in BlockchainSync.get_unsynced(session=session): logg.debug('block syncer resume added previously unsynced sync entry id {}'.format(object_id)) - syncers.append(SyncerBackend(chain_spec, object_id)) + s = SyncerBackend(chain_spec, object_id) + syncers.append(s) - last_live_id = BlockchainSync.get_last_live(block_height, session=session) + last_live_id = BlockchainSync.get_last(session=session) logg.debug('last_live_id {}'.format(last_live_id)) + logg.debug('higesst {} {}'.format(highest_unsynced_block, highest_unsynced_tx)) if last_live_id != None: q = session.query(BlockchainSync) @@ -204,7 +214,8 @@ class SyncerBackend: (block_resume, tx_resume) = o.cursor() session.flush() - if block_height != block_resume: + #if block_height != block_resume: + if highest_unsynced_block < block_resume: q = session.query(BlockchainSyncFilter) q = q.filter(BlockchainSyncFilter.chain_sync_id==last_live_id) diff --git a/chainsyncer/db/models/sync.py b/chainsyncer/db/models/sync.py index 01c28c1..ab6ab88 100644 --- a/chainsyncer/db/models/sync.py +++ b/chainsyncer/db/models/sync.py @@ -61,11 +61,9 @@ class BlockchainSync(SessionBase): @staticmethod - def get_last_live(current, session=None): + def get_last(session=None, live=True): """Get the most recent open-ended ("live") syncer record. - :param current: Current block number - :type current: number :param session: Session to use. If not specified, a separate session will be created for this method only. :type session: SqlAlchemy Session :returns: Block and transaction number, respectively @@ -74,7 +72,10 @@ class BlockchainSync(SessionBase): session = SessionBase.bind_session(session) q = session.query(BlockchainSync.id) - q = q.filter(BlockchainSync.block_target==None) + if live: + q = q.filter(BlockchainSync.block_target==None) + else: + q = q.filter(BlockchainSync.block_target!=None) q = q.order_by(BlockchainSync.date_created.desc()) object_id = q.first() @@ -169,3 +170,20 @@ class BlockchainSync(SessionBase): self.block_target = block_target self.date_created = datetime.datetime.utcnow() self.date_updated = datetime.datetime.utcnow() + + + def __str__(self): + return """object_id: {} +start: {}:{} +cursor: {}:{} +target: {} +""".format( + self.id, + self.block_start, + self.tx_start, + self.block_cursor, + self.tx_cursor, + self.block_target, + ) + + diff --git a/tests/test_database.py b/tests/test_database.py index 5066f69..9797b18 100644 --- a/tests/test_database.py +++ b/tests/test_database.py @@ -67,7 +67,6 @@ class TestDatabase(TestBase): session.close() - def test_backend_retrieve(self): s = SyncerBackend.live(self.chain_spec, 42) s.register_filter('foo') @@ -107,15 +106,46 @@ class TestDatabase(TestBase): self.assertEqual(len(s), 1) resumed_id = s[0].object_id self.assertEqual(resumed_id, original_id + 1) + self.assertEqual(s[0].get(), ((42, 0), 0)) + def test_backend_resume_when_completed(self): + s = SyncerBackend.live(self.chain_spec, 42) + + s = SyncerBackend.resume(self.chain_spec, 666) + s[0].set(666, 0) + + s = SyncerBackend.resume(self.chain_spec, 666) + self.assertEqual(len(s), 0) + + def test_backend_resume_several(self): s = SyncerBackend.live(self.chain_spec, 42) s.set(43, 13) + s = SyncerBackend.resume(self.chain_spec, 666) + SyncerBackend.live(self.chain_spec, 666) s[0].set(123, 2) + + logg.debug('>>>>>') s = SyncerBackend.resume(self.chain_spec, 1024) + SyncerBackend.live(self.chain_spec, 1024) + s[0].connect() + logg.debug('syncer 1 {}'.format(s[0].db_object)) + s[0].disconnect() + s[1].connect() + logg.debug('syncer 2 {}'.format(s[1].db_object)) + s[1].disconnect() + + + self.assertEqual(len(s), 2) + self.assertEqual(s[0].target(), (666, 0)) + self.assertEqual(s[0].get(), ((123, 2), 0)) + self.assertEqual(s[1].target(), (1024, 0)) + self.assertEqual(s[1].get(), ((666, 0), 0)) + + if __name__ == '__main__': unittest.main() From b69005682c3a48f9ddf0f0c10e965ee65bdbf273 Mon Sep 17 00:00:00 2001 From: nolash Date: Mon, 22 Feb 2021 20:54:27 +0100 Subject: [PATCH 05/19] Correct flags on resume --- chainsyncer/backend.py | 10 ++++++++-- chainsyncer/db/models/filter.py | 13 ++++++------- tests/test_database.py | 25 +++++++++++++++---------- 3 files changed, 29 insertions(+), 19 deletions(-) diff --git a/chainsyncer/backend.py b/chainsyncer/backend.py index 4cfde9d..60d4597 100644 --- a/chainsyncer/backend.py +++ b/chainsyncer/backend.py @@ -95,6 +95,7 @@ class SyncerBackend: """ self.connect() pair = self.db_object.set(block_height, tx_height) + self.db_object_filter.clear() (filter_state, count, digest)= self.db_object_filter.cursor() self.disconnect() return (pair, filter_state,) @@ -204,8 +205,6 @@ class SyncerBackend: syncers.append(s) last_live_id = BlockchainSync.get_last(session=session) - logg.debug('last_live_id {}'.format(last_live_id)) - logg.debug('higesst {} {}'.format(highest_unsynced_block, highest_unsynced_tx)) if last_live_id != None: q = session.query(BlockchainSync) @@ -279,6 +278,13 @@ class SyncerBackend: self.disconnect() + def complete_filter(self, n): + self.connect() + self.db_object_filter.set(n) + self.disconnect() + + + class MemBackend: def __init__(self, chain_spec, object_id): diff --git a/chainsyncer/db/models/filter.py b/chainsyncer/db/models/filter.py index 93cf4c2..44b6d5f 100644 --- a/chainsyncer/db/models/filter.py +++ b/chainsyncer/db/models/filter.py @@ -32,7 +32,7 @@ class BlockchainSyncFilter(SessionBase): if flags == None: flags = bytearray(0) else: # TODO: handle bytes too - bytecount = int((count - 1) / 8 + 1) + 1 + bytecount = int((count - 1) / 8 + 1) flags = flags.to_bytes(bytecount, 'big') self.flags_start = flags self.flags = flags @@ -49,7 +49,6 @@ class BlockchainSyncFilter(SessionBase): old_byte_count = int((self.count - 1) / 8 + 1) new_byte_count = int((self.count) / 8 + 1) - logg.debug('old new {} {}'.format(old_byte_count, new_byte_count)) if old_byte_count != new_byte_count: self.flags = bytearray(1) + self.flags self.count += 1 @@ -72,7 +71,7 @@ class BlockchainSyncFilter(SessionBase): def clear(self): - self.flags = 0 + self.flags = bytearray(len(self.flags)) def set(self, n): @@ -80,10 +79,10 @@ class BlockchainSyncFilter(SessionBase): raise IndexError('bit flag out of range') b = 1 << (n % 8) - i = int((n - 1) / 8 + 1) - if self.flags[i] & b > 0: - SessionBase.release_session(session) + i = int(n / 8) + byte_idx = len(self.flags)-1-i + if (self.flags[byte_idx] & b) > 0: raise AttributeError('Filter bit already set') flags = bytearray(self.flags) - flags[i] |= b + flags[byte_idx] |= b self.flags = flags diff --git a/tests/test_database.py b/tests/test_database.py index 9797b18..f568407 100644 --- a/tests/test_database.py +++ b/tests/test_database.py @@ -76,7 +76,6 @@ class TestDatabase(TestBase): s.set(42, 13) s = SyncerBackend.first(self.chain_spec) - logg.debug('start {}'.format(s)) self.assertEqual(s.get(), ((42,13), 0)) @@ -127,17 +126,8 @@ class TestDatabase(TestBase): SyncerBackend.live(self.chain_spec, 666) s[0].set(123, 2) - logg.debug('>>>>>') s = SyncerBackend.resume(self.chain_spec, 1024) SyncerBackend.live(self.chain_spec, 1024) - s[0].connect() - logg.debug('syncer 1 {}'.format(s[0].db_object)) - s[0].disconnect() - s[1].connect() - logg.debug('syncer 2 {}'.format(s[1].db_object)) - s[1].disconnect() - - self.assertEqual(len(s), 2) self.assertEqual(s[0].target(), (666, 0)) @@ -146,6 +136,21 @@ class TestDatabase(TestBase): self.assertEqual(s[1].get(), ((666, 0), 0)) + def test_backend_resume_filter(self): + s = SyncerBackend.live(self.chain_spec, 42) + s.register_filter('foo') + s.register_filter('bar') + s.register_filter('baz') + + s.set(43, 13) + s.complete_filter(0) + s.complete_filter(2) + + s = SyncerBackend.resume(self.chain_spec, 666) + (pair, flags) = s[0].get() + + self.assertEqual(flags, 5) + if __name__ == '__main__': unittest.main() From d837602394e0c9f4a5451119faa93788e94022ab Mon Sep 17 00:00:00 2001 From: nolash Date: Mon, 22 Feb 2021 22:23:30 +0100 Subject: [PATCH 06/19] Handle new return format from backend getters --- chainsyncer/db/models/sync.py | 1 + chainsyncer/driver.py | 10 +++++++--- chainsyncer/filter.py | 4 ++-- setup.cfg | 2 +- sql/postgresql/1.sql | 16 +--------------- sql/postgresql/2.sql | 12 ++++++++++++ sql/sqlite/2.sql | 2 +- 7 files changed, 25 insertions(+), 22 deletions(-) create mode 100644 sql/postgresql/2.sql diff --git a/chainsyncer/db/models/sync.py b/chainsyncer/db/models/sync.py index ab6ab88..d1d8ff5 100644 --- a/chainsyncer/db/models/sync.py +++ b/chainsyncer/db/models/sync.py @@ -51,6 +51,7 @@ class BlockchainSync(SessionBase): o = q.first() if o == None: + SessionBase.release_session(session) return None sync_id = o.id diff --git a/chainsyncer/driver.py b/chainsyncer/driver.py index 2309eea..41c1024 100644 --- a/chainsyncer/driver.py +++ b/chainsyncer/driver.py @@ -44,6 +44,7 @@ class Syncer: def add_filter(self, f): self.filter.add(f) + self.backend.register_filter(str(f)) class BlockPollSyncer(Syncer): @@ -53,7 +54,7 @@ class BlockPollSyncer(Syncer): def loop(self, interval, conn): - g = self.backend.get() + (g, flags) = self.backend.get() last_tx = g[1] last_block = g[0] self.progress_callback(last_block, last_tx, 'loop started') @@ -63,7 +64,8 @@ class BlockPollSyncer(Syncer): while True: try: block = self.get(conn) - except Exception: + except Exception as e: + logg.debug('erro {}'.format(e)) break last_block = block.number self.process(conn, block) @@ -97,7 +99,9 @@ class HeadSyncer(BlockPollSyncer): def get(self, conn): - (block_number, tx_number) = self.backend.get() + #(block_number, tx_number) = self.backend.get() + (height, flags) = self.backend.get() + block_number = height[0] block_hash = [] o = block_by_number(block_number) r = conn.do(o) diff --git a/chainsyncer/filter.py b/chainsyncer/filter.py index 263bbe5..b6f24c1 100644 --- a/chainsyncer/filter.py +++ b/chainsyncer/filter.py @@ -21,7 +21,7 @@ class SyncFilter: def add(self, fltr): if getattr(fltr, 'filter') == None: raise ValueError('filter object must implement have method filter') - logg.debug('added filter {}'.format(str(fltr))) + logg.debug('added filter "{}"'.format(str(fltr))) self.filters.append(fltr) @@ -38,7 +38,7 @@ class SyncFilter: i += 1 logg.debug('applying filter {}'.format(str(f))) f.filter(conn, block, tx, self.backend.db_session) - self.backend.set_filter() + self.backend.complete_filter(i) self.backend.disconnect() diff --git a/setup.cfg b/setup.cfg index 04005e0..94fda9f 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = chainsyncer -version = 0.0.1a10 +version = 0.0.1a11 description = Generic blockchain syncer driver author = Louis Holbrook author_email = dev@holbrook.no diff --git a/sql/postgresql/1.sql b/sql/postgresql/1.sql index 17a9794..7487761 100644 --- a/sql/postgresql/1.sql +++ b/sql/postgresql/1.sql @@ -1,4 +1,4 @@ -DROP TABLE chain_sync; +DROP TABLE IF EXISTS chain_sync CASCADE; CREATE TABLE IF NOT EXISTS chain_sync ( id serial primary key not null, blockchain varchar not null, @@ -6,21 +6,7 @@ CREATE TABLE IF NOT EXISTS chain_sync ( tx_start int not null default 0, block_cursor int not null default 0, tx_cursor int not null default 0, - flags bytea not null, - num_flags int not null, block_target int default null, date_created timestamp not null, date_updated timestamp default null ); - -DROP TABLE chain_sync_filter; -CREATE TABLE IF NOT EXISTS chain_sync_filter ( - id serial primary key not null, - chain_sync_id int not null, - flags bytea default null, - count int not null default 0, - digest char(64) not null default '0000000000000000000000000000000000000000000000000000000000000000', - CONSTRAINT fk_chain_sync - FOREIGN KEY(chain_sync_id) - REFERENCES chain_sync(id) -); diff --git a/sql/postgresql/2.sql b/sql/postgresql/2.sql new file mode 100644 index 0000000..b26d95d --- /dev/null +++ b/sql/postgresql/2.sql @@ -0,0 +1,12 @@ +DROP TABLE IF EXISTS chain_sync_filter; +CREATE TABLE IF NOT EXISTS chain_sync_filter ( + id serial primary key not null, + chain_sync_id integer not null, + flags bytea default null, + flags_start bytea default null, + count integer not null default 0, + digest bytea not null, + CONSTRAINT fk_chain_sync + FOREIGN KEY(chain_sync_id) + REFERENCES chain_sync(id) +); diff --git a/sql/sqlite/2.sql b/sql/sqlite/2.sql index 9e42802..e366748 100644 --- a/sql/sqlite/2.sql +++ b/sql/sqlite/2.sql @@ -4,7 +4,7 @@ CREATE TABLE IF NOT EXISTS chain_sync_filter ( flags bytea default null, flags_start bytea default null, count integer not null default 0, - digest char(64) not null default '0000000000000000000000000000000000000000000000000000000000000000', + digest bytea not null, CONSTRAINT fk_chain_sync FOREIGN KEY(chain_sync_id) REFERENCES chain_sync(id) From a4ad3feb2f17005c5758a30cc80a21b6f7f23ffa Mon Sep 17 00:00:00 2001 From: nolash Date: Mon, 22 Feb 2021 23:38:42 +0100 Subject: [PATCH 07/19] Add graceful shutdown --- chainsyncer/driver.py | 23 ++++++++++++++++++++++- chainsyncer/filter.py | 3 ++- 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/chainsyncer/driver.py b/chainsyncer/driver.py index 41c1024..6f8d9b5 100644 --- a/chainsyncer/driver.py +++ b/chainsyncer/driver.py @@ -2,12 +2,14 @@ import uuid import logging import time +import signal # external imports from chainlib.eth.block import ( block_by_number, Block, ) +from chainlib.eth.tx import receipt # local imports from chainsyncer.filter import SyncFilter @@ -23,6 +25,7 @@ class Syncer: running_global = True yield_delay=0.005 + signal_set = False def __init__(self, backend, loop_callback=noop_callback, progress_callback=noop_callback): self.cursor = None @@ -31,6 +34,22 @@ class Syncer: self.filter = SyncFilter(backend) self.progress_callback = progress_callback self.loop_callback = loop_callback + if not Syncer.signal_set: + signal.signal(signal.SIGINT, Syncer.__sig_terminate) + signal.signal(signal.SIGTERM, Syncer.__sig_terminate) + Syncer.signal_set = True + + + @staticmethod + def __sig_terminate(sig, frame): + logg.warning('got signal {}'.format(sig)) + Syncer.terminate() + + + @staticmethod + def terminate(): + logg.info('termination requested!') + Syncer.running_global = False def chain(self): @@ -61,7 +80,7 @@ class BlockPollSyncer(Syncer): while self.running and Syncer.running_global: if self.loop_callback != None: self.loop_callback(last_block, last_tx) - while True: + while True and Syncer.running_global: try: block = self.get(conn) except Exception as e: @@ -89,6 +108,8 @@ class HeadSyncer(BlockPollSyncer): while True: try: tx = block.tx(i) + rcpt = conn.do(receipt(tx.hash)) + tx.apply_receipt(rcpt) self.progress_callback(block.number, i, 'processing {}'.format(repr(tx))) self.backend.set(block.number, i) self.filter.apply(conn, block, tx) diff --git a/chainsyncer/filter.py b/chainsyncer/filter.py index b6f24c1..81159b4 100644 --- a/chainsyncer/filter.py +++ b/chainsyncer/filter.py @@ -39,7 +39,8 @@ class SyncFilter: logg.debug('applying filter {}'.format(str(f))) f.filter(conn, block, tx, self.backend.db_session) self.backend.complete_filter(i) - self.backend.disconnect() + if session != None: + self.backend.disconnect() class NoopFilter: From 0e65dc1ef421f0192119ccfeb647efd30966c4d4 Mon Sep 17 00:00:00 2001 From: nolash Date: Mon, 22 Feb 2021 23:40:23 +0100 Subject: [PATCH 08/19] Bump version --- setup.cfg | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.cfg b/setup.cfg index 94fda9f..334fae5 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = chainsyncer -version = 0.0.1a11 +version = 0.0.1a12 description = Generic blockchain syncer driver author = Louis Holbrook author_email = dev@holbrook.no From c79224cf7b70f6c0c586045ea58784a0b4507d72 Mon Sep 17 00:00:00 2001 From: nolash Date: Tue, 23 Feb 2021 06:26:15 +0100 Subject: [PATCH 09/19] Add sqlalchemy migrations --- MANIFEST.in | 2 +- chainsyncer/db/migrations/sqlalchemy.py | 36 +++++++++++++++++++++++++ setup.cfg | 6 +++++ 3 files changed, 43 insertions(+), 1 deletion(-) create mode 100644 chainsyncer/db/migrations/sqlalchemy.py diff --git a/MANIFEST.in b/MANIFEST.in index 854c928..ffe1ce6 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1 +1 @@ -include requirements.txt LICENSE.txt +include requirements.txt LICENSE.txt sql/**/* diff --git a/chainsyncer/db/migrations/sqlalchemy.py b/chainsyncer/db/migrations/sqlalchemy.py new file mode 100644 index 0000000..d9cbeac --- /dev/null +++ b/chainsyncer/db/migrations/sqlalchemy.py @@ -0,0 +1,36 @@ +from alembic import op +import sqlalchemy as sa + +def chainsyncer_upgrade(major, minor, patch): + r0_0_1_u() + +def chainsyncer_downgrade(major, minor, patch): + r0_0_1_d() + +def r0_0_1_u(): + op.create_table( + 'chain_sync', + sa.Column('id', sa.Integer, primary_key=True), + sa.Column('blockchain', sa.String, nullable=False, unique=True), + sa.Column('block_start', sa.Integer, nullable=False, default=0), + sa.Column('tx_start', sa.Integer, nullable=False, default=0), + sa.Column('block_cursor', sa.Integer, nullable=False, default=0), + sa.Column('tx_cursor', sa.Integer, nullable=False, default=0), + sa.Column('block_target', sa.Integer, nullable=True), + sa.Column('date_created', sa.DateTime, nullable=False), + sa.Column('date_updated', sa.DateTime), + ) + + op.create_table( + 'chain_sync_filter', + sa.Column('id', sa.Integer, primary_key=True), + sa.Column('chain_sync_id', sa.Integer, sa.ForeignKey('chain_sync.id'), nullable=True), + sa.Column('flags', sa.BLOB, nullable=True), + sa.Column('flags_start', sa.BLOB, nullable=True), + sa.Column('count', sa.Integer, nullable=False, default=0), + sa.Column('digest', sa.BLOB, nullable=False), + ) + +def r0_0_1_d(): + op.drop_table('chain_sync_filter') + op.drop_table('chain_sync') diff --git a/setup.cfg b/setup.cfg index 334fae5..258ade6 100644 --- a/setup.cfg +++ b/setup.cfg @@ -21,13 +21,19 @@ licence_files = LICENSE.txt [options] +include_package_data = True python_requires = >= 3.6 packages = chainsyncer chainsyncer.db + chainsyncer.db.migrations chainsyncer.db.models chainsyncer.runnable +[options.package_data] +* = + sql/* + #[options.entry_points] #console_scripts = # blocksync-celery = chainsyncer.runnable.tracker:main From 4b8d031b6c824566bc2d0e4ca962729665d426b5 Mon Sep 17 00:00:00 2001 From: nolash Date: Tue, 23 Feb 2021 06:39:52 +0100 Subject: [PATCH 10/19] Change to largebinary type in migrations --- chainsyncer/db/migrations/sqlalchemy.py | 6 +++--- requirements.txt | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/chainsyncer/db/migrations/sqlalchemy.py b/chainsyncer/db/migrations/sqlalchemy.py index d9cbeac..2d50f99 100644 --- a/chainsyncer/db/migrations/sqlalchemy.py +++ b/chainsyncer/db/migrations/sqlalchemy.py @@ -25,10 +25,10 @@ def r0_0_1_u(): 'chain_sync_filter', sa.Column('id', sa.Integer, primary_key=True), sa.Column('chain_sync_id', sa.Integer, sa.ForeignKey('chain_sync.id'), nullable=True), - sa.Column('flags', sa.BLOB, nullable=True), - sa.Column('flags_start', sa.BLOB, nullable=True), + sa.Column('flags', sa.LargeBinary, nullable=True), + sa.Column('flags_start', sa.LargeBinary, nullable=True), sa.Column('count', sa.Integer, nullable=False, default=0), - sa.Column('digest', sa.BLOB, nullable=False), + sa.Column('digest', sa.String(64), nullable=False), ) def r0_0_1_d(): diff --git a/requirements.txt b/requirements.txt index d8ee20b..27f43a9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,4 +3,4 @@ SQLAlchemy==1.3.20 confini~=0.3.6b2 semver==2.13.0 hexathon~=0.0.1a3 -chainlib~=0.0.1a15 +chainlib~=0.0.1a17 From 9bd61f71bbd150271cde48bb2cb1f84eb9eb5b97 Mon Sep 17 00:00:00 2001 From: nolash Date: Tue, 23 Feb 2021 08:24:57 +0100 Subject: [PATCH 11/19] Use hex for digest --- chainsyncer/db/models/filter.py | 14 +++++++------- setup.cfg | 2 +- sql/postgresql/2.sql | 2 +- sql/sqlite/2.sql | 2 +- 4 files changed, 10 insertions(+), 10 deletions(-) diff --git a/chainsyncer/db/models/filter.py b/chainsyncer/db/models/filter.py index 44b6d5f..b9e7c57 100644 --- a/chainsyncer/db/models/filter.py +++ b/chainsyncer/db/models/filter.py @@ -3,14 +3,14 @@ import logging import hashlib # external imports -from sqlalchemy import Column, String, Integer, BLOB, ForeignKey +from sqlalchemy import Column, String, Integer, LargeBinary, ForeignKey from sqlalchemy.ext.hybrid import hybrid_property, hybrid_method # local imports from .base import SessionBase from .sync import BlockchainSync -zero_digest = bytearray(32) +zero_digest = bytes(32).hex() logg = logging.getLogger(__name__) @@ -19,9 +19,9 @@ class BlockchainSyncFilter(SessionBase): __tablename__ = 'chain_sync_filter' chain_sync_id = Column(Integer, ForeignKey('chain_sync.id')) - flags_start = Column(BLOB) - flags = Column(BLOB) - digest = Column(BLOB) + flags_start = Column(LargeBinary) + flags = Column(LargeBinary) + digest = Column(String(64)) count = Column(Integer) @@ -42,7 +42,7 @@ class BlockchainSyncFilter(SessionBase): def add(self, name): h = hashlib.new('sha256') - h.update(self.digest) + h.update(bytes.fromhex(self.digest)) h.update(name.encode('utf-8')) z = h.digest() @@ -52,7 +52,7 @@ class BlockchainSyncFilter(SessionBase): if old_byte_count != new_byte_count: self.flags = bytearray(1) + self.flags self.count += 1 - self.digest = z + self.digest = z.hex() def start(self): diff --git a/setup.cfg b/setup.cfg index 258ade6..f726757 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = chainsyncer -version = 0.0.1a12 +version = 0.0.1a14 description = Generic blockchain syncer driver author = Louis Holbrook author_email = dev@holbrook.no diff --git a/sql/postgresql/2.sql b/sql/postgresql/2.sql index b26d95d..ff940d3 100644 --- a/sql/postgresql/2.sql +++ b/sql/postgresql/2.sql @@ -5,7 +5,7 @@ CREATE TABLE IF NOT EXISTS chain_sync_filter ( flags bytea default null, flags_start bytea default null, count integer not null default 0, - digest bytea not null, + digest char(64) not null, CONSTRAINT fk_chain_sync FOREIGN KEY(chain_sync_id) REFERENCES chain_sync(id) diff --git a/sql/sqlite/2.sql b/sql/sqlite/2.sql index e366748..90be6b0 100644 --- a/sql/sqlite/2.sql +++ b/sql/sqlite/2.sql @@ -4,7 +4,7 @@ CREATE TABLE IF NOT EXISTS chain_sync_filter ( flags bytea default null, flags_start bytea default null, count integer not null default 0, - digest bytea not null, + digest char(64) not null, CONSTRAINT fk_chain_sync FOREIGN KEY(chain_sync_id) REFERENCES chain_sync(id) From ea73d0082ec37d3278c1bfd7135f83cc96b5b9e6 Mon Sep 17 00:00:00 2001 From: nolash Date: Tue, 23 Feb 2021 08:46:14 +0100 Subject: [PATCH 12/19] Remove redundant postgres connections in backend resume method --- chainsyncer/backend.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/chainsyncer/backend.py b/chainsyncer/backend.py index 60d4597..0dddce1 100644 --- a/chainsyncer/backend.py +++ b/chainsyncer/backend.py @@ -199,11 +199,16 @@ class SyncerBackend: o = q.get(object_id) (highest_unsynced_block, highest_unsynced_index) = o.cursor() - for object_id in BlockchainSync.get_unsynced(session=session): + object_ids = BlockchainSync.get_unsynced(session=session) + session.close() + + for object_id in object_ids: logg.debug('block syncer resume added previously unsynced sync entry id {}'.format(object_id)) s = SyncerBackend(chain_spec, object_id) syncers.append(s) + session = SessionBase.create_session() + last_live_id = BlockchainSync.get_last(session=session) if last_live_id != None: From caabe288a71b90099eddf3a1c5b47ed849c25532 Mon Sep 17 00:00:00 2001 From: nolash Date: Tue, 23 Feb 2021 08:46:51 +0100 Subject: [PATCH 13/19] Bump version --- setup.cfg | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.cfg b/setup.cfg index f726757..1fb77ad 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = chainsyncer -version = 0.0.1a14 +version = 0.0.1a15 description = Generic blockchain syncer driver author = Louis Holbrook author_email = dev@holbrook.no From c599a80a6c9dabf8a8d8c6dbbbd1f03776dff8c5 Mon Sep 17 00:00:00 2001 From: nolash Date: Tue, 23 Feb 2021 22:56:12 +0100 Subject: [PATCH 14/19] Remove unique chain row constraint --- chainsyncer/backend.py | 14 ++++-- chainsyncer/db/migrations/sqlalchemy.py | 2 +- chainsyncer/db/models/sync.py | 1 + chainsyncer/driver.py | 58 +++++++++++++++++++++---- chainsyncer/error.py | 5 ++- setup.cfg | 2 +- 6 files changed, 66 insertions(+), 16 deletions(-) diff --git a/chainsyncer/backend.py b/chainsyncer/backend.py index 0dddce1..7f4ddc1 100644 --- a/chainsyncer/backend.py +++ b/chainsyncer/backend.py @@ -10,7 +10,7 @@ from chainsyncer.db.models.sync import BlockchainSync from chainsyncer.db.models.filter import BlockchainSyncFilter from chainsyncer.db.models.base import SessionBase -logg = logging.getLogger() +logg = logging.getLogger(__name__) class SyncerBackend: @@ -203,8 +203,8 @@ class SyncerBackend: session.close() for object_id in object_ids: - logg.debug('block syncer resume added previously unsynced sync entry id {}'.format(object_id)) s = SyncerBackend(chain_spec, object_id) + logg.debug('resume unfinished {}'.format(s)) syncers.append(s) session = SessionBase.create_session() @@ -237,9 +237,10 @@ class SyncerBackend: session.add(of) session.commit() - syncers.append(SyncerBackend(chain_spec, object_id)) + backend = SyncerBackend(chain_spec, object_id) + syncers.append(backend) - logg.debug('block syncer resume added new sync entry from previous run id {}, start{}:{} target {}'.format(object_id, block_resume, tx_resume, block_height)) + logg.debug('last live session resume {}'.format(backend)) session.close() @@ -287,6 +288,11 @@ class SyncerBackend: self.connect() self.db_object_filter.set(n) self.disconnect() + + + + def __str__(self): + return "syncerbackend chain {} start {} target {}".format(self.chain(), self.start(), self.target()) diff --git a/chainsyncer/db/migrations/sqlalchemy.py b/chainsyncer/db/migrations/sqlalchemy.py index 2d50f99..0b78c1e 100644 --- a/chainsyncer/db/migrations/sqlalchemy.py +++ b/chainsyncer/db/migrations/sqlalchemy.py @@ -11,7 +11,7 @@ def r0_0_1_u(): op.create_table( 'chain_sync', sa.Column('id', sa.Integer, primary_key=True), - sa.Column('blockchain', sa.String, nullable=False, unique=True), + sa.Column('blockchain', sa.String, nullable=False), sa.Column('block_start', sa.Integer, nullable=False, default=0), sa.Column('tx_start', sa.Integer, nullable=False, default=0), sa.Column('block_cursor', sa.Integer, nullable=False, default=0), diff --git a/chainsyncer/db/models/sync.py b/chainsyncer/db/models/sync.py index d1d8ff5..036c821 100644 --- a/chainsyncer/db/models/sync.py +++ b/chainsyncer/db/models/sync.py @@ -126,6 +126,7 @@ class BlockchainSync(SessionBase): """ self.block_cursor = block_height self.tx_cursor = tx_height + self.date_updated = datetime.datetime.utcnow() return (self.block_cursor, self.tx_cursor,) diff --git a/chainsyncer/driver.py b/chainsyncer/driver.py index 6f8d9b5..0a6ca3c 100644 --- a/chainsyncer/driver.py +++ b/chainsyncer/driver.py @@ -13,12 +13,16 @@ from chainlib.eth.tx import receipt # local imports from chainsyncer.filter import SyncFilter +from chainsyncer.error import ( + SyncDone, + NoBlockForYou, + ) -logg = logging.getLogger() +logg = logging.getLogger(__name__) def noop_callback(block_number, tx_index, s=None): - logg.debug('({},{}) {}'.format(block_number, tx_index, s)) + logg.debug('noop callback ({},{}) {}'.format(block_number, tx_index, s)) class Syncer: @@ -83,8 +87,10 @@ class BlockPollSyncer(Syncer): while True and Syncer.running_global: try: block = self.get(conn) - except Exception as e: - logg.debug('erro {}'.format(e)) + except SyncDone as e: + logg.info('sync done: {}'.format(e)) + return self.backend.get() + except NoBlockForYou as e: break last_block = block.number self.process(conn, block) @@ -97,10 +103,6 @@ class BlockPollSyncer(Syncer): class HeadSyncer(BlockPollSyncer): - def __init__(self, backend, loop_callback=noop_callback, progress_callback=noop_callback): - super(HeadSyncer, self).__init__(backend, loop_callback, progress_callback) - - def process(self, conn, block): logg.debug('process block {}'.format(block)) i = 0 @@ -120,13 +122,51 @@ class HeadSyncer(BlockPollSyncer): def get(self, conn): - #(block_number, tx_number) = self.backend.get() (height, flags) = self.backend.get() block_number = height[0] block_hash = [] o = block_by_number(block_number) r = conn.do(o) + if r == None: + raise NoBlockForYou() b = Block(r) logg.debug('get {}'.format(b)) return b + + + def __str__(self): + return '[headsyncer] {}'.format(str(self.backend)) + + +class HistorySyncer(HeadSyncer): + + def __init__(self, backend, loop_callback=noop_callback, progress_callback=noop_callback): + super(HeadSyncer, self).__init__(backend, loop_callback, progress_callback) + self.block_target = None + (block_number, flags) = self.backend.target() + if block_number == None: + raise AttributeError('backend has no future target. Use HeadSyner instead') + self.block_target = block_number + + + def get(self, conn): + (height, flags) = self.backend.get() + if self.block_target < height[0]: + raise SyncDone(self.block_target) + block_number = height[0] + block_hash = [] + o = block_by_number(block_number) + r = conn.do(o) + if r == None: + raise NoBlockForYou() + b = Block(r) + logg.debug('get {}'.format(b)) + + return b + + + def __str__(self): + return '[historysyncer] {}'.format(str(self.backend)) + + diff --git a/chainsyncer/error.py b/chainsyncer/error.py index 9d14d45..33b865a 100644 --- a/chainsyncer/error.py +++ b/chainsyncer/error.py @@ -1,8 +1,11 @@ -class LoopDone(Exception): +class SyncDone(Exception): """Exception raised when a syncing is complete. """ pass +class NoBlockForYou(Exception): + pass + class RequestError(Exception): pass diff --git a/setup.cfg b/setup.cfg index 1fb77ad..e6414b9 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = chainsyncer -version = 0.0.1a15 +version = 0.0.1a16 description = Generic blockchain syncer driver author = Louis Holbrook author_email = dev@holbrook.no From 70873e4aecae393be416510b91585d8c2548b0db Mon Sep 17 00:00:00 2001 From: nolash Date: Wed, 24 Feb 2021 09:32:56 +0100 Subject: [PATCH 15/19] Temporarily remove redundant connection in complete_filter, bump version --- chainsyncer/backend.py | 4 ++-- setup.cfg | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/chainsyncer/backend.py b/chainsyncer/backend.py index 7f4ddc1..193b49c 100644 --- a/chainsyncer/backend.py +++ b/chainsyncer/backend.py @@ -50,6 +50,8 @@ class SyncerBackend: if self.db_object == None: raise ValueError('sync entry with id {} not found'.format(self.object_id)) + return self.db_session + def disconnect(self): """Commits state of sync to backend. @@ -285,9 +287,7 @@ class SyncerBackend: def complete_filter(self, n): - self.connect() self.db_object_filter.set(n) - self.disconnect() diff --git a/setup.cfg b/setup.cfg index e6414b9..f22165a 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = chainsyncer -version = 0.0.1a16 +version = 0.0.1a17 description = Generic blockchain syncer driver author = Louis Holbrook author_email = dev@holbrook.no From d9196f5ff5427f7f666927f4b2b693c757991f72 Mon Sep 17 00:00:00 2001 From: nolash Date: Wed, 24 Feb 2021 12:47:18 +0100 Subject: [PATCH 16/19] Bump version --- chainsyncer/driver.py | 8 ++++++-- requirements.txt | 2 +- setup.cfg | 2 +- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/chainsyncer/driver.py b/chainsyncer/driver.py index 0a6ca3c..b072b98 100644 --- a/chainsyncer/driver.py +++ b/chainsyncer/driver.py @@ -5,6 +5,7 @@ import time import signal # external imports +import sqlalchemy from chainlib.eth.block import ( block_by_number, Block, @@ -92,6 +93,10 @@ class BlockPollSyncer(Syncer): return self.backend.get() except NoBlockForYou as e: break +# TODO: To properly handle this, ensure that previous request is rolled back +# except sqlalchemy.exc.OperationalError as e: +# logg.error('database error: {}'.format(e)) +# break last_block = block.number self.process(conn, block) start_tx = 0 @@ -116,6 +121,7 @@ class HeadSyncer(BlockPollSyncer): self.backend.set(block.number, i) self.filter.apply(conn, block, tx) except IndexError as e: + logg.debug('index error syncer rcpt get {}'.format(e)) self.backend.set(block.number + 1, 0) break i += 1 @@ -130,7 +136,6 @@ class HeadSyncer(BlockPollSyncer): if r == None: raise NoBlockForYou() b = Block(r) - logg.debug('get {}'.format(b)) return b @@ -161,7 +166,6 @@ class HistorySyncer(HeadSyncer): if r == None: raise NoBlockForYou() b = Block(r) - logg.debug('get {}'.format(b)) return b diff --git a/requirements.txt b/requirements.txt index 27f43a9..c7cba4f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,4 +3,4 @@ SQLAlchemy==1.3.20 confini~=0.3.6b2 semver==2.13.0 hexathon~=0.0.1a3 -chainlib~=0.0.1a17 +chainlib~=0.0.1a18 diff --git a/setup.cfg b/setup.cfg index f22165a..d50f13b 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = chainsyncer -version = 0.0.1a17 +version = 0.0.1a18 description = Generic blockchain syncer driver author = Louis Holbrook author_email = dev@holbrook.no From d2fe51dd43810728874172eba51c86067b9976ad Mon Sep 17 00:00:00 2001 From: nolash Date: Mon, 1 Mar 2021 20:26:30 +0100 Subject: [PATCH 17/19] Add filter flag methods on membackend --- chainsyncer/backend.py | 15 ++++++++++++++- setup.cfg | 2 +- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/chainsyncer/backend.py b/chainsyncer/backend.py index 193b49c..79dc69e 100644 --- a/chainsyncer/backend.py +++ b/chainsyncer/backend.py @@ -322,4 +322,17 @@ class MemBackend: def get(self): - return (self.block_height, self.tx_height) + return ((self.block_height, self.tx_height), self.flags) + + + def register_filter(self, name): + pass + + + def complete_filter(self, n): + pass + + + def __str__(self): + return "syncer membackend chain {} cursor".format(self.get()) + diff --git a/setup.cfg b/setup.cfg index d50f13b..8ad482f 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = chainsyncer -version = 0.0.1a18 +version = 0.0.1a19 description = Generic blockchain syncer driver author = Louis Holbrook author_email = dev@holbrook.no From 997097366410e321431bcd3befe6c13d024b580f Mon Sep 17 00:00:00 2001 From: nolash Date: Tue, 23 Mar 2021 16:55:52 +0100 Subject: [PATCH 18/19] Backend session detail --- chainsyncer/filter.py | 2 +- setup.cfg | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/chainsyncer/filter.py b/chainsyncer/filter.py index 81159b4..9ea460c 100644 --- a/chainsyncer/filter.py +++ b/chainsyncer/filter.py @@ -37,7 +37,7 @@ class SyncFilter: for f in self.filters: i += 1 logg.debug('applying filter {}'.format(str(f))) - f.filter(conn, block, tx, self.backend.db_session) + f.filter(conn, block, tx, session) self.backend.complete_filter(i) if session != None: self.backend.disconnect() diff --git a/setup.cfg b/setup.cfg index 8ad482f..53a8de0 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = chainsyncer -version = 0.0.1a19 +version = 0.0.1a20 description = Generic blockchain syncer driver author = Louis Holbrook author_email = dev@holbrook.no From ad7c780dbc44bfc3b9e6274f7cf5208777833754 Mon Sep 17 00:00:00 2001 From: nolash Date: Sun, 4 Apr 2021 14:41:09 +0200 Subject: [PATCH 19/19] Upgrade deps --- requirements.txt | 6 +++--- setup.cfg | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/requirements.txt b/requirements.txt index c7cba4f..4d34603 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,6 @@ psycopg2==2.8.6 SQLAlchemy==1.3.20 -confini~=0.3.6b2 +confini~=0.3.6rc3 semver==2.13.0 -hexathon~=0.0.1a3 -chainlib~=0.0.1a18 +hexathon~=0.0.1a7 +chainlib~=0.0.2a1 diff --git a/setup.cfg b/setup.cfg index 53a8de0..5f1a9f6 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = chainsyncer -version = 0.0.1a20 +version = 0.0.1a21 description = Generic blockchain syncer driver author = Louis Holbrook author_email = dev@holbrook.no