2021-02-01 18:12:51 +01:00
# standard imports
import sys
import os
import logging
import argparse
import tempfile
import re
import urllib
import websocket
2021-07-08 17:28:04 +02:00
import stat
import importlib
2021-02-01 18:12:51 +01:00
2021-03-29 15:27:53 +02:00
# external imports
2021-02-01 18:12:51 +01:00
import celery
import confini
2021-04-24 19:53:45 +02:00
from chainlib . connection import (
RPCConnection ,
ConnType ,
)
from chainlib . eth . connection import (
EthUnixSignerConnection ,
EthHTTPSignerConnection ,
)
2021-08-17 08:46:51 +02:00
from chainlib . eth . address import to_checksum_address
2021-03-29 15:27:53 +02:00
from chainlib . chain import ChainSpec
2021-04-04 14:40:59 +02:00
from chainqueue . db . models . otx import Otx
2021-04-23 23:02:51 +02:00
from cic_eth_registry . error import UnknownContractError
2021-05-19 08:59:42 +02:00
from cic_eth_registry . erc20 import ERC20Token
2021-08-17 08:46:51 +02:00
from hexathon import add_0x
2021-04-21 19:34:13 +02:00
import liveness . linux
2021-02-01 18:12:51 +01:00
2021-04-23 23:02:51 +02:00
2021-02-01 18:12:51 +01:00
# local imports
2021-08-17 08:46:51 +02:00
import cic_eth . cli
2021-04-04 14:40:59 +02:00
from cic_eth . eth import (
erc20 ,
tx ,
account ,
nonce ,
gas ,
)
from cic_eth . admin import (
debug ,
ctrl ,
2021-05-15 06:36:54 +02:00
token ,
2021-04-04 14:40:59 +02:00
)
from cic_eth . queue import (
query ,
balance ,
state ,
tx ,
lock ,
time ,
)
from cic_eth . callbacks import (
Callback ,
http ,
2021-04-16 22:24:07 +02:00
noop ,
2021-04-04 14:40:59 +02:00
#tcp,
redis ,
)
2021-02-01 18:12:51 +01:00
from cic_eth . db . models . base import SessionBase
from cic_eth . db import dsn_from_config
2021-03-04 16:06:14 +01:00
from cic_eth . ext import tx
2021-03-30 18:33:09 +02:00
from cic_eth . registry import (
connect as connect_registry ,
connect_declarator ,
connect_token_registry ,
)
2021-12-22 19:24:05 +01:00
from cic_eth . task import (
BaseTask ,
CriticalWeb3Task ,
)
2021-02-01 18:12:51 +01:00
logging . basicConfig ( level = logging . WARNING )
logg = logging . getLogger ( )
2021-08-17 08:46:51 +02:00
arg_flags = cic_eth . cli . argflag_std_read
local_arg_flags = cic_eth . cli . argflag_local_task
argparser = cic_eth . cli . ArgumentParser ( arg_flags )
argparser . process_local_flags ( local_arg_flags )
2021-02-01 18:12:51 +01:00
argparser . add_argument ( ' --trace-queue-status ' , default = None , dest = ' trace_queue_status ' , action = ' store_true ' , help = ' set to perist all queue entry status changes to storage ' )
2021-07-08 17:28:04 +02:00
argparser . add_argument ( ' --aux-all ' , action = ' store_true ' , help = ' include tasks from all submodules from the aux module path ' )
2021-12-22 19:24:05 +01:00
argparser . add_argument ( ' --min-fee-price ' , dest = ' min_fee_price ' , type = int , help = ' set minimum fee price for transactions, in wei ' )
2021-07-08 17:28:04 +02:00
argparser . add_argument ( ' --aux ' , action = ' append ' , type = str , default = [ ] , help = ' add single submodule from the aux module path ' )
2021-02-01 18:12:51 +01:00
args = argparser . parse_args ( )
2021-08-17 08:46:51 +02:00
# process config
extra_args = {
' aux_all ' : None ,
' aux ' : None ,
' trace_queue_status ' : ' TASKS_TRACE_QUEUE_STATUS ' ,
2021-12-22 19:24:05 +01:00
' min_fee_price ' : ' ETH_MIN_FEE_PRICE ' ,
2021-02-01 18:12:51 +01:00
}
2021-08-17 08:46:51 +02:00
config = cic_eth . cli . Config . from_args ( args , arg_flags , local_arg_flags )
2021-02-01 18:12:51 +01:00
2021-08-17 08:46:51 +02:00
# connect to celery
celery_app = cic_eth . cli . CeleryApp . from_config ( config )
2021-04-21 19:34:13 +02:00
2021-08-17 08:46:51 +02:00
# set up rpc
rpc = cic_eth . cli . RPC . from_config ( config , use_signer = True )
conn = rpc . get_default ( )
2021-07-08 17:28:04 +02:00
2021-02-01 18:12:51 +01:00
# connect to database
dsn = dsn_from_config ( config )
2021-04-09 15:00:15 +02:00
SessionBase . connect ( dsn , pool_size = int ( config . get ( ' DATABASE_POOL_SIZE ' ) ) , debug = config . true ( ' DATABASE_DEBUG ' ) )
2021-02-01 18:12:51 +01:00
Otx . tracing = config . true ( ' TASKS_TRACE_QUEUE_STATUS ' )
2021-08-17 08:46:51 +02:00
# execute health checks
# TODO: health should be separate service with endpoint that can be queried
health_modules = config . get ( ' CIC_HEALTH_MODULES ' , [ ] )
if len ( health_modules ) != 0 :
health_modules = health_modules . split ( ' , ' )
logg . debug ( ' health mods {} ' . format ( health_modules ) )
2021-04-24 19:53:45 +02:00
liveness . linux . load ( health_modules , rundir = config . get ( ' CIC_RUN_DIR ' ) , config = config , unit = ' cic-eth-tasker ' )
2021-02-01 18:12:51 +01:00
2021-08-17 08:46:51 +02:00
# set up chain provisions
chain_spec = ChainSpec . from_chain_str ( config . get ( ' CHAIN_SPEC ' ) )
registry = None
2021-07-08 17:28:04 +02:00
try :
2021-08-17 08:46:51 +02:00
registry = connect_registry ( conn , chain_spec , config . get ( ' CIC_REGISTRY_ADDRESS ' ) )
2021-07-08 17:28:04 +02:00
except UnknownContractError as e :
logg . exception ( ' Registry contract connection failed for {} : {} ' . format ( config . get ( ' CIC_REGISTRY_ADDRESS ' ) , e ) )
sys . exit ( 1 )
logg . info ( ' connected contract registry {} ' . format ( config . get ( ' CIC_REGISTRY_ADDRESS ' ) ) )
trusted_addresses_src = config . get ( ' CIC_TRUST_ADDRESS ' )
if trusted_addresses_src == None :
logg . critical ( ' At least one trusted address must be declared in CIC_TRUST_ADDRESS ' )
sys . exit ( 1 )
trusted_addresses = trusted_addresses_src . split ( ' , ' )
2021-08-17 08:46:51 +02:00
for i , address in enumerate ( trusted_addresses ) :
if config . get ( ' _UNSAFE ' ) :
trusted_addresses [ i ] = to_checksum_address ( address )
2021-07-08 17:28:04 +02:00
logg . info ( ' using trusted address {} ' . format ( address ) )
2021-08-17 08:46:51 +02:00
connect_declarator ( conn , chain_spec , trusted_addresses )
connect_token_registry ( conn , chain_spec )
2021-07-08 17:28:04 +02:00
2021-08-17 08:46:51 +02:00
# detect auxiliary task modules (plugins)
2021-07-08 17:28:04 +02:00
# TODO: move to separate file
aux = [ ]
if args . aux_all :
if len ( args . aux ) > 0 :
logg . warning ( ' --aux-all is set so --aux will have no effect ' )
for p in sys . path :
logg . debug ( ' checking for aux modules in {} ' . format ( p ) )
aux_dir = os . path . join ( p , ' cic_eth_aux ' )
try :
d = os . listdir ( aux_dir )
except FileNotFoundError :
logg . debug ( ' no aux module found in {} ' . format ( aux_dir ) )
continue
for v in d :
if v [ : 1 ] == ' . ' :
logg . debug ( ' dotfile, skip {} ' . format ( v ) )
continue
aux_mod_path = os . path . join ( aux_dir , v )
st = os . stat ( aux_mod_path )
if not stat . S_ISDIR ( st . st_mode ) :
logg . debug ( ' not a dir, skip {} ' . format ( v ) )
continue
aux_mod_file = os . path . join ( aux_dir , v , ' __init__.py ' )
try :
st = os . stat ( aux_mod_file )
except FileNotFoundError :
logg . debug ( ' __init__.py not found, skip {} ' . format ( v ) )
continue
aux . append ( v )
logg . debug ( ' found module {} in {} ' . format ( v , aux_dir ) )
elif len ( args . aux ) > 0 :
for p in sys . path :
v_found = None
for v in args . aux :
aux_dir = os . path . join ( p , ' cic_eth_aux ' )
aux_mod_file = os . path . join ( aux_dir , v , ' __init__.py ' )
try :
st = os . stat ( aux_mod_file )
v_found = v
except FileNotFoundError :
logg . debug ( ' cannot find explicity requested aux module {} in path {} ' . format ( v , aux_dir ) )
continue
if v_found == None :
logg . critical ( ' excplicity requested aux module {} not found in any path ' . format ( v ) )
sys . exit ( 1 )
logg . info ( ' aux module {} found in path {} ' . format ( v , aux_dir ) )
aux . append ( v )
2021-10-20 17:02:36 +02:00
default_token_symbol = config . get ( ' CIC_DEFAULT_TOKEN_SYMBOL ' )
defaullt_token_address = None
if default_token_symbol :
default_token_address = registry . by_name ( default_token_symbol )
else :
default_token_address = registry . by_name ( ' DefaultToken ' )
c = ERC20Token ( chain_spec , conn , default_token_address )
default_token_symbol = c . symbol
logg . info ( ' found default token {} address {} ' . format ( default_token_symbol , default_token_address ) )
config . add ( default_token_symbol , ' CIC_DEFAULT_TOKEN_SYMBOL ' , exists_ok = True )
2021-07-08 17:28:04 +02:00
for v in aux :
mname = ' cic_eth_aux. ' + v
mod = importlib . import_module ( mname )
2021-08-17 08:46:51 +02:00
mod . aux_setup ( conn , config )
2021-07-08 17:28:04 +02:00
logg . info ( ' loaded aux module {} ' . format ( mname ) )
2021-02-01 18:12:51 +01:00
def main ( ) :
argv = [ ' worker ' ]
2021-08-17 08:46:51 +02:00
log_level = logg . getEffectiveLevel ( )
log_level_name = logging . getLevelName ( log_level )
argv . append ( ' --loglevel= ' + log_level_name )
2021-02-01 18:12:51 +01:00
argv . append ( ' -Q ' )
2021-08-17 08:46:51 +02:00
argv . append ( config . get ( ' CELERY_QUEUE ' ) )
2021-02-01 18:12:51 +01:00
argv . append ( ' -n ' )
2021-08-17 08:46:51 +02:00
argv . append ( config . get ( ' CELERY_QUEUE ' ) )
2021-03-30 18:33:09 +02:00
2021-12-22 19:24:05 +01:00
# TODO: More elegant way of setting queue-wide settings
2021-10-20 17:02:36 +02:00
BaseTask . default_token_symbol = default_token_symbol
BaseTask . default_token_address = default_token_address
2021-10-07 17:12:35 +02:00
default_token = ERC20Token ( chain_spec , conn , add_0x ( BaseTask . default_token_address ) )
2021-08-17 08:46:51 +02:00
default_token . load ( conn )
2021-05-19 08:59:42 +02:00
BaseTask . default_token_decimals = default_token . decimals
BaseTask . default_token_name = default_token . name
2021-10-14 15:24:51 +02:00
BaseTask . trusted_addresses = trusted_addresses
2021-12-22 19:24:05 +01:00
CriticalWeb3Task . safe_gas_refill_amount = int ( config . get ( ' ETH_GAS_HOLDER_MINIMUM_UNITS ' ) ) * int ( config . get ( ' ETH_GAS_HOLDER_REFILL_UNITS ' ) )
CriticalWeb3Task . safe_gas_threshold_amount = int ( config . get ( ' ETH_GAS_HOLDER_MINIMUM_UNITS ' ) ) * int ( config . get ( ' ETH_GAS_HOLDER_REFILL_THRESHOLD ' ) )
CriticalWeb3Task . safe_gas_gifter_balance = int ( config . get ( ' ETH_GAS_HOLDER_MINIMUM_UNITS ' ) ) * int ( config . get ( ' ETH_GAS_GIFTER_REFILL_BUFFER ' ) )
if config . get ( ' ETH_MIN_FEE_PRICE ' ) :
BaseTask . min_fee_price = int ( config . get ( ' ETH_MIN_FEE_PRICE ' ) )
CriticalWeb3Task . safe_gas_threshold_amount * = BaseTask . min_fee_price
CriticalWeb3Task . safe_gas_refill_amount * = BaseTask . min_fee_price
CriticalWeb3Task . safe_gas_gifter_balance * = BaseTask . min_fee_price
2021-05-19 08:59:42 +02:00
2021-04-24 19:53:45 +02:00
BaseTask . run_dir = config . get ( ' CIC_RUN_DIR ' )
2021-04-24 08:14:24 +02:00
logg . info ( ' default token set to {} {} ' . format ( BaseTask . default_token_symbol , BaseTask . default_token_address ) )
2021-04-21 19:34:13 +02:00
2021-04-24 19:53:45 +02:00
liveness . linux . set ( rundir = config . get ( ' CIC_RUN_DIR ' ) )
2021-08-17 08:46:51 +02:00
celery_app . worker_main ( argv )
2021-04-24 19:53:45 +02:00
liveness . linux . reset ( rundir = config . get ( ' CIC_RUN_DIR ' ) )
2021-02-01 18:12:51 +01:00
2021-04-04 14:40:59 +02:00
@celery.signals.eventlet_pool_postshutdown.connect
def shutdown ( sender = None , headers = None , body = None , * * kwargs ) :
2021-08-17 08:46:51 +02:00
logg . warning ( ' in shutdown event hook ' )
2021-04-04 14:40:59 +02:00
2021-02-01 18:12:51 +01:00
if __name__ == ' __main__ ' :
main ( )