2021-04-02 15:16:27 +02:00
# standard imports
2021-02-01 18:12:51 +01:00
import os
import sys
import logging
import argparse
import re
2021-04-02 15:16:27 +02:00
# external imports
2021-02-01 18:12:51 +01:00
import confini
import celery
2021-03-29 15:27:53 +02:00
from cic_eth_registry import CICRegistry
from chainlib . chain import ChainSpec
2021-04-02 15:16:27 +02:00
from chainlib . connection import RPCConnection
2021-04-05 17:07:09 +02:00
from chainsyncer . filter import SyncFilter
2021-02-01 18:12:51 +01:00
2021-04-02 15:16:27 +02:00
# local imports
2021-02-01 18:12:51 +01:00
from cic_eth . db import dsn_from_config
from cic_eth . db import SessionBase
from cic_eth . admin . ctrl import lock_send
2021-04-05 17:07:09 +02:00
from cic_eth . db . enum import LockEnum
from cic_eth . runnable . daemons . filters . straggler import StragglerFilter
from cic_eth . sync . retry import RetrySyncer
2021-04-06 21:01:50 +02:00
from cic_eth . stat import init_chain_stat
2021-02-01 18:12:51 +01:00
logging . basicConfig ( level = logging . WARNING )
logg = logging . getLogger ( )
config_dir = os . path . join ( ' /usr/local/etc/cic-eth ' )
argparser = argparse . ArgumentParser ( description = ' daemon that monitors transactions in new blocks ' )
2021-03-29 15:27:53 +02:00
argparser . add_argument ( ' -p ' , ' --provider ' , dest = ' p ' , type = str , help = ' rpc provider ' )
2021-02-01 18:12:51 +01:00
argparser . add_argument ( ' -c ' , type = str , default = config_dir , help = ' config root to use ' )
argparser . add_argument ( ' -i ' , ' --chain-spec ' , dest = ' i ' , type = str , help = ' chain spec ' )
2021-04-02 15:16:27 +02:00
argparser . add_argument ( ' --batch-size ' , dest = ' batch_size ' , type = int , default = 50 , help = ' max amount of txs to resend per iteration ' )
argparser . add_argument ( ' --retry-delay ' , dest = ' retry_delay ' , type = int , help = ' seconds to wait for retrying a transaction that is marked as sent ' )
2021-02-01 18:12:51 +01:00
argparser . add_argument ( ' --env-prefix ' , default = os . environ . get ( ' CONFINI_ENV_PREFIX ' ) , dest = ' env_prefix ' , type = str , help = ' environment prefix for variables to overwrite configuration ' )
argparser . add_argument ( ' -q ' , type = str , default = ' cic-eth ' , help = ' celery queue to submit transaction tasks to ' )
argparser . add_argument ( ' -v ' , help = ' be verbose ' , action = ' store_true ' )
argparser . add_argument ( ' -vv ' , help = ' be more verbose ' , action = ' store_true ' )
args = argparser . parse_args ( sys . argv [ 1 : ] )
if args . v == True :
logging . getLogger ( ) . setLevel ( logging . INFO )
elif args . vv == True :
logging . getLogger ( ) . setLevel ( logging . DEBUG )
config_dir = os . path . join ( args . c )
os . makedirs ( config_dir , 0o777 , True )
config = confini . Config ( config_dir , args . env_prefix )
config . process ( )
# override args
args_override = {
2021-03-29 15:27:53 +02:00
' ETH_PROVIDER ' : getattr ( args , ' p ' ) ,
2021-02-01 18:12:51 +01:00
' CIC_CHAIN_SPEC ' : getattr ( args , ' i ' ) ,
' CIC_TX_RETRY_DELAY ' : getattr ( args , ' retry_delay ' ) ,
}
config . dict_override ( args_override , ' cli flag ' )
config . censor ( ' PASSWORD ' , ' DATABASE ' )
config . censor ( ' PASSWORD ' , ' SSL ' )
logg . debug ( ' config loaded from {} : \n {} ' . format ( config_dir , config ) )
2021-04-02 15:16:27 +02:00
config . add ( args . batch_size , ' _BATCH_SIZE ' , True )
2021-02-01 18:12:51 +01:00
app = celery . Celery ( backend = config . get ( ' CELERY_RESULT_URL ' ) , broker = config . get ( ' CELERY_BROKER_URL ' ) )
queue = args . q
chain_spec = ChainSpec . from_chain_str ( config . get ( ' CIC_CHAIN_SPEC ' ) )
2021-04-02 15:16:27 +02:00
RPCConnection . register_location ( config . get ( ' ETH_PROVIDER ' ) , chain_spec , tag = ' default ' )
2021-03-29 15:27:53 +02:00
2021-02-01 18:12:51 +01:00
dsn = dsn_from_config ( config )
2021-04-02 15:16:27 +02:00
SessionBase . connect ( dsn , debug = config . true ( ' DATABASE_DEBUG ' ) )
2021-02-01 18:12:51 +01:00
2021-04-02 15:16:27 +02:00
def main ( ) :
conn = RPCConnection . connect ( chain_spec , ' default ' )
2021-04-06 21:01:50 +02:00
straggler_delay = int ( config . get ( ' CIC_TX_RETRY_DELAY ' ) )
loop_interval = config . get ( ' SYNCER_LOOP_INTERVAL ' )
if loop_interval == None :
stat = init_chain_stat ( conn )
loop_interval = stat . block_average ( )
2021-04-02 15:16:27 +02:00
syncer = RetrySyncer ( conn , chain_spec , straggler_delay , batch_size = config . get ( ' _BATCH_SIZE ' ) )
syncer . backend . set ( 0 , 0 )
2021-04-05 17:07:09 +02:00
fltr = StragglerFilter ( chain_spec , queue = queue )
syncer . add_filter ( fltr )
2021-04-06 21:01:50 +02:00
syncer . loop ( int ( loop_interval ) , conn )
2021-02-01 18:12:51 +01:00
if __name__ == ' __main__ ' :
main ( )