diff --git a/.gitignore b/.gitignore index 558f498..7ebd175 100644 --- a/.gitignore +++ b/.gitignore @@ -6,3 +6,4 @@ gmon.out build/ dist/ *.sqlite +old/ diff --git a/chainsyncer/driver/__init__.py b/chainsyncer/driver/__init__.py new file mode 100644 index 0000000..4e3fffc --- /dev/null +++ b/chainsyncer/driver/__init__.py @@ -0,0 +1 @@ +from .base import SyncDriver diff --git a/chainsyncer/driver.py b/chainsyncer/driver/base.py similarity index 91% rename from chainsyncer/driver.py rename to chainsyncer/driver/base.py index f266edb..3bb23c1 100644 --- a/chainsyncer/driver.py +++ b/chainsyncer/driver/base.py @@ -94,11 +94,12 @@ class SyncDriver: def loop(self, conn, item, interval=1): logg.debug('started loop') - tx_start = item.tx_cursor while self.running and SyncDriver.running_global: self.last_start = time.clock_gettime_ns(self.clock_id) + if self.pre_callback != None: self.pre_callback() + while True and self.running: try: block = self.get(conn, item) @@ -111,27 +112,26 @@ class SyncDriver: self.block_callback(block, None) try: - self.process(conn, item, block, tx_start) + self.process(conn, item, block) except IndexError: item.next(advance_block=True) - tx_start = 0 time.sleep(self.yield_delay) + + if self.store.target > -1 and block.number >= self.store.target: + self.running = False + if self.post_callback != None: self.post_callback() - logg.debug('fooo') - if self.store.target > -1 and block.number >= self.store.target: - self.running = False self.idle(interval) def process_single(self, conn, block, tx): - logg.debug('single') self.session.filter(conn, block, tx) - def process(self, conn, block, tx_start): + def process(self, conn, item, block): raise NotImplementedError() diff --git a/chainsyncer/driver/chain_interface.py b/chainsyncer/driver/chain_interface.py new file mode 100644 index 0000000..1b53ebd --- /dev/null +++ b/chainsyncer/driver/chain_interface.py @@ -0,0 +1,57 @@ +# external imports +from chainlib.error import RPCException + +# local imports +from chainsyncer.error import NoBlockForYou +from chainsyncer.driver import SyncDriver + + +class ChainInterfaceDriver(SyncDriver): + + def __init__(self, store, chain_interface, offset=0, target=-1, pre_callback=None, post_callback=None, block_callback=None, idle_callback=None): + super(ChainInterfaceDriver, self).__init__(store, offset=offset, target=target, pre_callback=pre_callback, post_callback=post_callback, block_callback=block_callback, idle_callback=idle_callback) + self.chain_interface = chain_interface + + + def get(self, conn, item): + """Retrieve the block currently defined by the syncer cursor from the RPC provider. + + :param conn: RPC connection + :type conn: chainlib.connectin.RPCConnection + :raises NoBlockForYou: Block at the given height does not exist + :rtype: chainlib.block.Block + :returns: Block object + """ + o = self.chain_interface.block_by_number(item.cursor) + try: + r = conn.do(o) + except RPCException: + r = None + if r == None: + raise NoBlockForYou() + b = self.chain_interface.block_from_src(r) + b.txs = b.txs[item.tx_cursor:] + + return b + + + def process(self, conn, item, block): + tx_src = None + i = item.tx_cursor + while True: + # handle block objects regardless of whether the tx data is embedded or not + try: + tx = block.tx(i) + except AttributeError: + tx_hash = block.txs[i] + o = self.chain_interface.tx_by_hash(tx_hash, block=block) + r = conn.do(o) + #tx = self.chain_interface.tx_from_src(tx_src, block=block) + + rcpt = conn.do(self.chain_interface.tx_receipt(tx.hash)) + if rcpt != None: + tx.apply_receipt(self.chain_interface.src_normalize(rcpt)) + + self.process_single(conn, block, tx) + + i += 1 diff --git a/chainsyncer/unittest/base.py b/chainsyncer/unittest/base.py index 33c9a5c..ea21ad7 100644 --- a/chainsyncer/unittest/base.py +++ b/chainsyncer/unittest/base.py @@ -121,7 +121,7 @@ class MockBlock: :param i: Transaction index :type i: int """ - return MockTx(i, self.txs[i]) + return MockTx(i, self.txs[i].hash) class MockStore(State): @@ -170,7 +170,7 @@ class MockFilter: r = True self.brk -= 1 self.contents.append((block.number, tx.index, tx.hash,)) - logg.debug('filter {} result {} block {}'.format(self.common_name(), r, block.number)) + logg.debug('filter {} result {} block {} tx {} {}'.format(self.common_name(), r, block.number, tx.index, tx.hash)) return r @@ -202,8 +202,8 @@ class MockDriver(SyncDriver): raise NoBlockForYou() - def process(self, conn, item, block, tx_start): - i = tx_start + def process(self, conn, item, block): + i = item.tx_cursor while self.running: if self.interrupt != None: if self.interrupt[0] == block.number and self.interrupt[1] == i: @@ -216,3 +216,54 @@ class MockDriver(SyncDriver): self.process_single(conn, block, tx) item.next() i += 1 + + +class MockChainInterface: + + def block_by_number(self, number): + return ('block_by_number', number,) + + + def tx_by_hash(self, hsh): + return ('tx_by_hash', hsh,) + + + def block_from_src(self, src): + return src + + + def src_normalize(self, src): + return src + + + def tx_receipt(self, hsh): + return ('receipt', hsh,) + + +class MockChainInterfaceConn(MockConn): + + def __init__(self, interface): + self.ifc = interface + self.blocks = {} + self.txs = {} + + + def add_block(self, block): + logg.debug('add block {} {} with {} txs'.format(block.number, block.hash, len(block.txs))) + self.blocks[block.number] = block + for tx in block.txs: + self.txs[tx.hash] = tx + + + def do(self, o): + m = getattr(self, 'handle_' + o[0]) + return m(o[1]) + + + def handle_block_by_number(self, number): + return self.blocks[number] + + + + def handle_receipt(self, hsh): + return {} diff --git a/setup.cfg b/setup.cfg index 4cd862d..c5cd2ec 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = chainsyncer -version = 0.2.0 +version = 0.3.0 description = Generic blockchain syncer driver author = Louis Holbrook author_email = dev@holbrook.no diff --git a/tests/test_driver.py b/tests/test_driver.py new file mode 100644 index 0000000..80d61a7 --- /dev/null +++ b/tests/test_driver.py @@ -0,0 +1,61 @@ +# standard imports +import unittest +import tempfile +import shutil +import logging +import stat +import os + +# local imports +from chainsyncer.store.fs import SyncFsStore +from chainsyncer.session import SyncSession +from chainsyncer.error import ( + LockError, + FilterDone, + IncompleteFilterError, + SyncDone, + ) +from chainsyncer.unittest import ( + MockBlockGenerator, + MockFilter, + MockChainInterfaceConn, + MockTx, + MockBlock, + MockChainInterface, + MockFilterError, + ) +from chainsyncer.driver.chain_interface import ChainInterfaceDriver + +logging.basicConfig(level=logging.DEBUG) +logg = logging.getLogger() + + +class TestFilter(unittest.TestCase): + + def setUp(self): + self.path = tempfile.mkdtemp() + self.store = SyncFsStore(self.path) + self.ifc = MockChainInterface() + self.conn = MockChainInterfaceConn(self.ifc) + + + def tearDown(self): + shutil.rmtree(self.path) + + + def test_driver(self): + generator = MockBlockGenerator() + generator.generate([1, 2], driver=self.conn) + + drv = ChainInterfaceDriver(self.store, self.ifc, target=1) + + fltr_one = MockFilter('foo') + self.store.register(fltr_one) + with self.assertRaises(SyncDone): + drv.run(self.conn) + + self.assertEqual(len(fltr_one.contents), 3) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_session.py b/tests/test_session.py index 3ceeed9..b02e956 100644 --- a/tests/test_session.py +++ b/tests/test_session.py @@ -117,7 +117,6 @@ class TestFilter(unittest.TestCase): fltr_three = MockFilter('baz') self.store.register(fltr_three) - store = SyncFsStore(self.path, state_event_callback=state_event_handler, filter_state_event_callback=filter_state_event_handler) with self.assertRaises(SyncDone):