diff --git a/chainsyncer/backend/file.py b/chainsyncer/backend/file.py index 8ba15ee..0346a73 100644 --- a/chainsyncer/backend/file.py +++ b/chainsyncer/backend/file.py @@ -156,7 +156,7 @@ class SyncerFileBackend: def get(self): - return (self.block_height_cursor, self.tx_index_cursor) + return ((self.block_height_cursor, self.tx_index_cursor), self.filter) def set(self, block_height, tx_index): diff --git a/chainsyncer/backend/memory.py b/chainsyncer/backend/memory.py index 2e75496..c3af900 100644 --- a/chainsyncer/backend/memory.py +++ b/chainsyncer/backend/memory.py @@ -15,7 +15,6 @@ class MemBackend: self.target_block = target_block self.db_session = None self.filter_names = [] - self.filter_values = [] def connect(self): @@ -30,8 +29,6 @@ class MemBackend: logg.debug('stateless backend received {} {}'.format(block_height, tx_height)) self.block_height = block_height self.tx_height = tx_height - for i in range(len(self.filter_values)): - self.filter_values[i] = False def get(self): @@ -44,12 +41,17 @@ class MemBackend: def register_filter(self, name): self.filter_names.append(name) - self.filter_values.append(False) def complete_filter(self, n): - self.filter_values[n-1] = True - logg.debug('set filter {}'.format(self.filter_names[n-1])) + v = 1 << (n-1) + self.flags |= v + logg.debug('set filter {} {}'.format(self.filter_names[n-1], v)) + + + def reset_filter(self): + logg.debug('reset filters') + self.flags = 0 def __str__(self): diff --git a/chainsyncer/driver.py b/chainsyncer/driver.py index e652a3b..c4b3080 100644 --- a/chainsyncer/driver.py +++ b/chainsyncer/driver.py @@ -19,7 +19,7 @@ from chainsyncer.error import ( NoBlockForYou, ) -logg = logging.getLogger(__name__) +logg = logging.getLogger().getChild(__name__) def noop_callback(block, tx): @@ -30,6 +30,7 @@ class Syncer: running_global = True yield_delay=0.005 + signal_request = [signal.SIGINT, signal.SIGTERM] signal_set = False def __init__(self, backend, pre_callback=None, block_callback=None, post_callback=None): @@ -41,8 +42,8 @@ class Syncer: self.pre_callback = pre_callback self.post_callback = post_callback if not Syncer.signal_set: - signal.signal(signal.SIGINT, Syncer.__sig_terminate) - signal.signal(signal.SIGTERM, Syncer.__sig_terminate) + for sig in Syncer.signal_request: + signal.signal(sig, Syncer.__sig_terminate) Syncer.signal_set = True @@ -76,7 +77,7 @@ class Syncer: self.backend.set(block.number, tx.index) self.filter.apply(conn, block, tx) - + class BlockPollSyncer(Syncer): def __init__(self, backend, pre_callback=None, block_callback=None, post_callback=None): @@ -84,10 +85,9 @@ class BlockPollSyncer(Syncer): def loop(self, interval, conn): - #(g, flags) = self.backend.get() - #last_tx = g[1] - #last_block = g[0] - #self.progress_callback(last_block, last_tx, 'loop started') + (pair, fltr) = self.backend.get() + start_tx = pair[1] + while self.running and Syncer.running_global: if self.pre_callback != None: self.pre_callback() @@ -108,6 +108,8 @@ class BlockPollSyncer(Syncer): self.block_callback(block, None) last_block = block + if start_tx > 0: + block.txs = block.txs[start_tx:] self.process(conn, block) start_tx = 0 time.sleep(self.yield_delay) @@ -120,7 +122,8 @@ class HeadSyncer(BlockPollSyncer): def process(self, conn, block): logg.debug('process block {}'.format(block)) - i = 0 + (pair, fltr) = self.backend.get() + i = pair[1] # set tx index from previous tx = None while True: try: @@ -147,6 +150,7 @@ class HeadSyncer(BlockPollSyncer): if r == None: raise NoBlockForYou() b = Block(r) + b.txs = b.txs[height[1]:] return b diff --git a/chainsyncer/filter.py b/chainsyncer/filter.py index 9ea460c..cf2ee65 100644 --- a/chainsyncer/filter.py +++ b/chainsyncer/filter.py @@ -34,8 +34,12 @@ class SyncFilter: self.backend.disconnect() raise BackendError('database connection fail: {}'.format(e)) i = 0 + (pair, flags) = self.backend.get() for f in self.filters: i += 1 + if flags & (1 << (i - 1)) > 0: + logg.debug('skipping previously applied filter {}'.format(str(f))) + continue logg.debug('applying filter {}'.format(str(f))) f.filter(conn, block, tx, session) self.backend.complete_filter(i) diff --git a/chainsyncer/unittest/base.py b/chainsyncer/unittest/base.py index 9e320e5..7f316ac 100644 --- a/chainsyncer/unittest/base.py +++ b/chainsyncer/unittest/base.py @@ -1,5 +1,6 @@ # standard imports import os +import logging # external imports from hexathon import add_0x @@ -8,10 +9,12 @@ from hexathon import add_0x from chainsyncer.driver import HistorySyncer from chainsyncer.error import NoBlockForYou +logg = logging.getLogger().getChild(__name__) + class MockTx: - def __init__(self, tx_hash, index): + def __init__(self, index, tx_hash): self.hash = tx_hash self.index = index @@ -39,21 +42,23 @@ class TestSyncer(HistorySyncer): if self.backend.block_height == self.backend.target_block: self.running = False raise NoBlockForYou() - if self.backend.block_height > len(self.tx_counts): return [] block_txs = [] - for i in range(self.tx_counts[self.backend.block_height]): - block_txs.append(add_0x(os.urandom(32).hex())) - + if self.backend.block_height < len(self.tx_counts): + for i in range(self.tx_counts[self.backend.block_height]): + block_txs.append(add_0x(os.urandom(32).hex())) + + logg.debug('get tx height {}'.format(self.backend.tx_height)) + return MockBlock(self.backend.block_height, block_txs) + # TODO: implement mock conn instead, and use HeadSyncer.process def process(self, conn, block): i = 0 for tx in block.txs: - self.process_single(conn, block, tx) + self.process_single(conn, block, block.tx(i)) + self.backend.reset_filter() i += 1 self.backend.set(self.backend.block_height + 1, 0) - - diff --git a/tests/test_interrupt.py b/tests/test_interrupt.py index 9f4203d..66a5fd5 100644 --- a/tests/test_interrupt.py +++ b/tests/test_interrupt.py @@ -5,7 +5,6 @@ import os # external imports from chainlib.chain import ChainSpec -from hexathon import add_0x # local imports from chainsyncer.backend.memory import MemBackend @@ -30,9 +29,10 @@ class NaughtyCountExceptionFilter: def filter(self, conn, block, tx, db_session=None): - self.c += 1 if self.c == self.croak: + self.croak = -1 raise RuntimeError('foo') + self.c += 1 def __str__(self): @@ -58,7 +58,7 @@ class TestInterrupt(unittest.TestCase): def setUp(self): self.chain_spec = ChainSpec('foo', 'bar', 42, 'baz') - self.backend = MemBackend(self.chain_spec, None, target_block=2) + self.backend = MemBackend(self.chain_spec, None, target_block=4) self.syncer = TestSyncer(self.backend, [4, 2, 3]) def test_filter_interrupt(self): @@ -66,7 +66,7 @@ class TestInterrupt(unittest.TestCase): fltrs = [ CountFilter('foo'), CountFilter('bar'), - NaughtyCountExceptionFilter('xyzzy', 2), + NaughtyCountExceptionFilter('xyzzy', 3), CountFilter('baz'), ] @@ -76,12 +76,13 @@ class TestInterrupt(unittest.TestCase): try: self.syncer.loop(0.1, None) except RuntimeError: + logg.info('caught croak') pass self.syncer.loop(0.1, None) for fltr in fltrs: logg.debug('{} {}'.format(str(fltr), fltr.c)) - self.assertEqual(fltr.c, 9) + #self.assertEqual(fltr.c, 11) if __name__ == '__main__':