From 6f6882e7704f25fd630e4c1214418e0b43c80d0e Mon Sep 17 00:00:00 2001 From: nolash Date: Tue, 30 Mar 2021 13:01:46 +0200 Subject: [PATCH] Use latest block only for syncer getter in retrier --- .../cic-eth/cic_eth/runnable/daemons/retry.py | 30 +++++++++++++++---- 1 file changed, 24 insertions(+), 6 deletions(-) diff --git a/apps/cic-eth/cic_eth/runnable/daemons/retry.py b/apps/cic-eth/cic_eth/runnable/daemons/retry.py index 834e1afe..8c9deec4 100644 --- a/apps/cic-eth/cic_eth/runnable/daemons/retry.py +++ b/apps/cic-eth/cic_eth/runnable/daemons/retry.py @@ -13,9 +13,14 @@ from cic_eth_registry import CICRegistry from chainlib.chain import ChainSpec from chainlib.eth.tx import unpack from chainlib.connection import RPCConnection -from chainlib.eth.block import block_latest +from chainlib.eth.block import ( + block_latest, + block_by_number, + Block, + ) from chainsyncer.driver import HeadSyncer from chainsyncer.backend import MemBackend +from chainsyncer.error import NoBlockForYou # local imports from cic_eth.db import dsn_from_config @@ -38,7 +43,7 @@ argparser = argparse.ArgumentParser(description='daemon that monitors transactio argparser.add_argument('-p', '--provider', dest='p', type=str, help='rpc provider') argparser.add_argument('-c', type=str, default=config_dir, help='config root to use') argparser.add_argument('-i', '--chain-spec', dest='i', type=str, help='chain spec') -argparser.add_argument('--retry-delay', dest='retry_delay', type=str, help='seconds to wait for retrying a transaction that is marked as sent') +argparser.add_argument('--retry-delay', dest='retry_delay', type=int, help='seconds to wait for retrying a transaction that is marked as sent') argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration') argparser.add_argument('-q', type=str, default='cic-eth', help='celery queue to submit transaction tasks to') argparser.add_argument('-v', help='be verbose', action='store_true') @@ -217,6 +222,19 @@ class RetrySyncer(HeadSyncer): self.conn = conn + def get(self, conn): + o = block_latest() + r = conn.do(o) + (pair, flags) = self.backend.get() + n = int(r, 16) + if n == pair[0]: + raise NoBlockForYou('block {} already checked'.format(n)) + o = block_by_number(n) + r = conn.do(o) + b = Block(r) + return b + + def process(self, conn, block): before = datetime.datetime.utcnow() - datetime.timedelta(seconds=self.stalled_grace_seconds) stalled_txs = get_status_tx( @@ -227,15 +245,15 @@ class RetrySyncer(HeadSyncer): ) for tx in stalled_txs: self.filter.apply(self.conn, block, tx) - self.backend.set(block.number + 1, 0) + self.backend.set(block.number, 0) def main(): - o = block_latest() + #o = block_latest() conn = RPCConnection.connect(chain_spec, 'default') - block = conn.do(o) + #block = conn.do(o) syncer = RetrySyncer(conn, chain_spec, straggler_delay) - syncer.backend.set(int(block, 16), 0) + syncer.backend.set(0, 0) syncer.add_filter(FooFilter(chain_spec, queue=queue)) syncer.loop(float(straggler_delay), conn)