Use latest block only for syncer getter in retrier

This commit is contained in:
nolash 2021-03-30 13:01:46 +02:00
parent 2e5ceac4d0
commit 6f6882e770
Signed by untrusted user who does not match committer: lash
GPG Key ID: 21D2E7BB88C2A746

View File

@ -13,9 +13,14 @@ from cic_eth_registry import CICRegistry
from chainlib.chain import ChainSpec from chainlib.chain import ChainSpec
from chainlib.eth.tx import unpack from chainlib.eth.tx import unpack
from chainlib.connection import RPCConnection from chainlib.connection import RPCConnection
from chainlib.eth.block import block_latest from chainlib.eth.block import (
block_latest,
block_by_number,
Block,
)
from chainsyncer.driver import HeadSyncer from chainsyncer.driver import HeadSyncer
from chainsyncer.backend import MemBackend from chainsyncer.backend import MemBackend
from chainsyncer.error import NoBlockForYou
# local imports # local imports
from cic_eth.db import dsn_from_config from cic_eth.db import dsn_from_config
@ -38,7 +43,7 @@ argparser = argparse.ArgumentParser(description='daemon that monitors transactio
argparser.add_argument('-p', '--provider', dest='p', type=str, help='rpc provider') argparser.add_argument('-p', '--provider', dest='p', type=str, help='rpc provider')
argparser.add_argument('-c', type=str, default=config_dir, help='config root to use') argparser.add_argument('-c', type=str, default=config_dir, help='config root to use')
argparser.add_argument('-i', '--chain-spec', dest='i', type=str, help='chain spec') argparser.add_argument('-i', '--chain-spec', dest='i', type=str, help='chain spec')
argparser.add_argument('--retry-delay', dest='retry_delay', type=str, help='seconds to wait for retrying a transaction that is marked as sent') argparser.add_argument('--retry-delay', dest='retry_delay', type=int, help='seconds to wait for retrying a transaction that is marked as sent')
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration') argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
argparser.add_argument('-q', type=str, default='cic-eth', help='celery queue to submit transaction tasks to') argparser.add_argument('-q', type=str, default='cic-eth', help='celery queue to submit transaction tasks to')
argparser.add_argument('-v', help='be verbose', action='store_true') argparser.add_argument('-v', help='be verbose', action='store_true')
@ -217,6 +222,19 @@ class RetrySyncer(HeadSyncer):
self.conn = conn self.conn = conn
def get(self, conn):
o = block_latest()
r = conn.do(o)
(pair, flags) = self.backend.get()
n = int(r, 16)
if n == pair[0]:
raise NoBlockForYou('block {} already checked'.format(n))
o = block_by_number(n)
r = conn.do(o)
b = Block(r)
return b
def process(self, conn, block): def process(self, conn, block):
before = datetime.datetime.utcnow() - datetime.timedelta(seconds=self.stalled_grace_seconds) before = datetime.datetime.utcnow() - datetime.timedelta(seconds=self.stalled_grace_seconds)
stalled_txs = get_status_tx( stalled_txs = get_status_tx(
@ -227,15 +245,15 @@ class RetrySyncer(HeadSyncer):
) )
for tx in stalled_txs: for tx in stalled_txs:
self.filter.apply(self.conn, block, tx) self.filter.apply(self.conn, block, tx)
self.backend.set(block.number + 1, 0) self.backend.set(block.number, 0)
def main(): def main():
o = block_latest() #o = block_latest()
conn = RPCConnection.connect(chain_spec, 'default') conn = RPCConnection.connect(chain_spec, 'default')
block = conn.do(o) #block = conn.do(o)
syncer = RetrySyncer(conn, chain_spec, straggler_delay) syncer = RetrySyncer(conn, chain_spec, straggler_delay)
syncer.backend.set(int(block, 16), 0) syncer.backend.set(0, 0)
syncer.add_filter(FooFilter(chain_spec, queue=queue)) syncer.add_filter(FooFilter(chain_spec, queue=queue))
syncer.loop(float(straggler_delay), conn) syncer.loop(float(straggler_delay), conn)