2021-02-01 18:12:51 +01:00
# standard imports
import sys
import os
import logging
import argparse
import tempfile
import re
import urllib
import websocket
2021-07-02 21:25:47 +02:00
import stat
import importlib
2021-02-01 18:12:51 +01:00
2021-03-29 15:27:53 +02:00
# external imports
2021-02-01 18:12:51 +01:00
import celery
import confini
2021-04-24 19:53:45 +02:00
from chainlib . connection import (
RPCConnection ,
ConnType ,
)
from chainlib . eth . connection import (
EthUnixSignerConnection ,
EthHTTPSignerConnection ,
)
2021-03-29 15:27:53 +02:00
from chainlib . chain import ChainSpec
2021-04-04 14:40:59 +02:00
from chainqueue . db . models . otx import Otx
2021-04-23 23:02:51 +02:00
from cic_eth_registry . error import UnknownContractError
2021-05-19 08:59:42 +02:00
from cic_eth_registry . erc20 import ERC20Token
2021-04-21 19:34:13 +02:00
import liveness . linux
2021-02-01 18:12:51 +01:00
2021-04-23 23:02:51 +02:00
2021-02-01 18:12:51 +01:00
# local imports
2021-04-04 14:40:59 +02:00
from cic_eth . eth import (
erc20 ,
tx ,
account ,
nonce ,
gas ,
)
from cic_eth . admin import (
debug ,
ctrl ,
2021-05-15 06:36:54 +02:00
token ,
2021-04-04 14:40:59 +02:00
)
from cic_eth . queue import (
query ,
balance ,
state ,
tx ,
lock ,
time ,
)
from cic_eth . callbacks import (
Callback ,
http ,
2021-04-16 22:24:07 +02:00
noop ,
2021-04-04 14:40:59 +02:00
#tcp,
redis ,
)
2021-02-01 18:12:51 +01:00
from cic_eth . db . models . base import SessionBase
from cic_eth . db import dsn_from_config
2021-03-04 16:06:14 +01:00
from cic_eth . ext import tx
2021-03-30 18:33:09 +02:00
from cic_eth . registry import (
connect as connect_registry ,
connect_declarator ,
connect_token_registry ,
)
2021-04-24 08:14:24 +02:00
from cic_eth . task import BaseTask
2021-02-01 18:12:51 +01:00
2021-04-21 19:34:13 +02:00
2021-02-01 18:12:51 +01:00
logging . basicConfig ( level = logging . WARNING )
logg = logging . getLogger ( )
2021-07-02 21:25:47 +02:00
script_dir = os . path . dirname ( os . path . realpath ( __file__ ) )
2021-02-01 18:12:51 +01:00
config_dir = os . path . join ( ' /usr/local/etc/cic-eth ' )
argparser = argparse . ArgumentParser ( )
2021-03-29 15:27:53 +02:00
argparser . add_argument ( ' -p ' , ' --provider ' , dest = ' p ' , type = str , help = ' rpc provider ' )
2021-02-01 18:12:51 +01:00
argparser . add_argument ( ' -c ' , type = str , default = config_dir , help = ' config file ' )
argparser . add_argument ( ' -q ' , type = str , default = ' cic-eth ' , help = ' queue name for worker tasks ' )
2021-02-21 16:41:37 +01:00
argparser . add_argument ( ' -r ' , type = str , help = ' CIC registry address ' )
2021-04-24 08:14:24 +02:00
argparser . add_argument ( ' --default-token-symbol ' , dest = ' default_token_symbol ' , type = str , help = ' Symbol of default token to use ' )
2021-02-01 18:12:51 +01:00
argparser . add_argument ( ' --trace-queue-status ' , default = None , dest = ' trace_queue_status ' , action = ' store_true ' , help = ' set to perist all queue entry status changes to storage ' )
argparser . add_argument ( ' -i ' , ' --chain-spec ' , dest = ' i ' , type = str , help = ' chain spec ' )
argparser . add_argument ( ' --env-prefix ' , default = os . environ . get ( ' CONFINI_ENV_PREFIX ' ) , dest = ' env_prefix ' , type = str , help = ' environment prefix for variables to overwrite configuration ' )
2021-07-02 21:25:47 +02:00
argparser . add_argument ( ' --aux-all ' , action = ' store_true ' , help = ' include tasks from all submodules from the aux module path ' )
argparser . add_argument ( ' --aux ' , action = ' append ' , type = str , default = [ ] , help = ' add single submodule from the aux module path ' )
2021-02-01 18:12:51 +01:00
argparser . add_argument ( ' -v ' , action = ' store_true ' , help = ' be verbose ' )
argparser . add_argument ( ' -vv ' , action = ' store_true ' , help = ' be more verbose ' )
args = argparser . parse_args ( )
if args . vv :
logging . getLogger ( ) . setLevel ( logging . DEBUG )
elif args . v :
logging . getLogger ( ) . setLevel ( logging . INFO )
config = confini . Config ( args . c , args . env_prefix )
config . process ( )
# override args
args_override = {
' CIC_CHAIN_SPEC ' : getattr ( args , ' i ' ) ,
2021-02-21 16:41:37 +01:00
' CIC_REGISTRY_ADDRESS ' : getattr ( args , ' r ' ) ,
2021-04-24 08:14:24 +02:00
' CIC_DEFAULT_TOKEN_SYMBOL ' : getattr ( args , ' default_token_symbol ' ) ,
2021-02-01 18:12:51 +01:00
' ETH_PROVIDER ' : getattr ( args , ' p ' ) ,
' TASKS_TRACE_QUEUE_STATUS ' : getattr ( args , ' trace_queue_status ' ) ,
}
2021-03-29 15:27:53 +02:00
config . add ( args . q , ' _CELERY_QUEUE ' , True )
2021-02-01 18:12:51 +01:00
config . dict_override ( args_override , ' cli flag ' )
config . censor ( ' PASSWORD ' , ' DATABASE ' )
config . censor ( ' PASSWORD ' , ' SSL ' )
logg . debug ( ' config loaded from {} : \n {} ' . format ( args . c , config ) )
2021-04-21 19:34:13 +02:00
health_modules = config . get ( ' CIC_HEALTH_MODULES ' , [ ] )
if len ( health_modules ) != 0 :
health_modules = health_modules . split ( ' , ' )
logg . debug ( ' health mods {} ' . format ( health_modules ) )
2021-07-02 21:25:47 +02:00
2021-02-01 18:12:51 +01:00
# connect to database
dsn = dsn_from_config ( config )
2021-04-09 15:00:15 +02:00
SessionBase . connect ( dsn , pool_size = int ( config . get ( ' DATABASE_POOL_SIZE ' ) ) , debug = config . true ( ' DATABASE_DEBUG ' ) )
2021-02-01 18:12:51 +01:00
# set up celery
current_app = celery . Celery ( __name__ )
broker = config . get ( ' CELERY_BROKER_URL ' )
if broker [ : 4 ] == ' file ' :
bq = tempfile . mkdtemp ( )
bp = tempfile . mkdtemp ( )
2021-05-15 06:36:54 +02:00
conf_update = {
2021-02-01 18:12:51 +01:00
' broker_url ' : broker ,
' broker_transport_options ' : {
' data_folder_in ' : bq ,
' data_folder_out ' : bq ,
' data_folder_processed ' : bp ,
} ,
2021-05-15 06:36:54 +02:00
}
if config . true ( ' CELERY_DEBUG ' ) :
conf_update [ ' result_extended ' ] = True
current_app . conf . update ( conf_update )
2021-02-01 18:12:51 +01:00
logg . warning ( ' celery broker dirs queue i/o {} processed {} , will NOT be deleted on shutdown ' . format ( bq , bp ) )
else :
2021-05-15 06:36:54 +02:00
conf_update = {
' broker_url ' : broker ,
}
if config . true ( ' CELERY_DEBUG ' ) :
conf_update [ ' result_extended ' ] = True
current_app . conf . update ( conf_update )
2021-02-01 18:12:51 +01:00
result = config . get ( ' CELERY_RESULT_URL ' )
if result [ : 4 ] == ' file ' :
rq = tempfile . mkdtemp ( )
current_app . conf . update ( {
' result_backend ' : ' file:// {} ' . format ( rq ) ,
} )
logg . warning ( ' celery backend store dir {} created, will NOT be deleted on shutdown ' . format ( rq ) )
else :
current_app . conf . update ( {
' result_backend ' : result ,
} )
2021-03-29 15:27:53 +02:00
chain_spec = ChainSpec . from_chain_str ( config . get ( ' CIC_CHAIN_SPEC ' ) )
2021-04-24 19:53:45 +02:00
RPCConnection . register_constructor ( ConnType . UNIX , EthUnixSignerConnection , ' signer ' )
RPCConnection . register_constructor ( ConnType . HTTP , EthHTTPSignerConnection , ' signer ' )
RPCConnection . register_constructor ( ConnType . HTTP_SSL , EthHTTPSignerConnection , ' signer ' )
2021-03-29 15:27:53 +02:00
RPCConnection . register_location ( config . get ( ' ETH_PROVIDER ' ) , chain_spec , ' default ' )
2021-04-23 23:02:51 +02:00
RPCConnection . register_location ( config . get ( ' SIGNER_SOCKET_PATH ' ) , chain_spec , ' signer ' )
2021-02-01 18:12:51 +01:00
Otx . tracing = config . true ( ' TASKS_TRACE_QUEUE_STATUS ' )
2021-04-23 23:02:51 +02:00
#import cic_eth.checks.gas
#if not cic_eth.checks.gas.health(config=config):
# raise RuntimeError()
2021-04-24 19:53:45 +02:00
liveness . linux . load ( health_modules , rundir = config . get ( ' CIC_RUN_DIR ' ) , config = config , unit = ' cic-eth-tasker ' )
2021-02-01 18:12:51 +01:00
2021-07-04 13:21:07 +02:00
rpc = RPCConnection . connect ( chain_spec , ' default ' )
try :
registry = connect_registry ( rpc , chain_spec , config . get ( ' CIC_REGISTRY_ADDRESS ' ) )
except UnknownContractError as e :
logg . exception ( ' Registry contract connection failed for {} : {} ' . format ( config . get ( ' CIC_REGISTRY_ADDRESS ' ) , e ) )
sys . exit ( 1 )
logg . info ( ' connected contract registry {} ' . format ( config . get ( ' CIC_REGISTRY_ADDRESS ' ) ) )
trusted_addresses_src = config . get ( ' CIC_TRUST_ADDRESS ' )
if trusted_addresses_src == None :
logg . critical ( ' At least one trusted address must be declared in CIC_TRUST_ADDRESS ' )
sys . exit ( 1 )
trusted_addresses = trusted_addresses_src . split ( ' , ' )
for address in trusted_addresses :
logg . info ( ' using trusted address {} ' . format ( address ) )
connect_declarator ( rpc , chain_spec , trusted_addresses )
connect_token_registry ( rpc , chain_spec )
# detect aux
# TODO: move to separate file
aux_dir = os . path . join ( script_dir , ' .. ' , ' .. ' , ' aux ' )
aux = [ ]
if args . aux_all :
if len ( args . aux ) > 0 :
logg . warning ( ' --aux-all is set so --aux will have no effect ' )
for v in os . listdir ( aux_dir ) :
if v [ : 1 ] == ' . ' :
logg . debug ( ' dotfile, skip {} ' . format ( v ) )
continue
aux_mod_path = os . path . join ( aux_dir , v )
st = os . stat ( aux_mod_path )
if not stat . S_ISDIR ( st . st_mode ) :
logg . debug ( ' not a dir, skip {} ' . format ( v ) )
continue
aux_mod_file = os . path . join ( aux_dir , v , ' __init__.py ' )
try :
st = os . stat ( aux_mod_file )
except FileNotFoundError :
logg . debug ( ' __init__.py not found, skip {} ' . format ( v ) )
continue
aux . append ( v )
elif len ( args . aux ) > 0 :
for v in args . aux :
aux_mod_file = os . path . join ( aux_dir , v , ' __init__.py ' )
try :
st = os . stat ( aux_mod_file )
except FileNotFoundError :
logg . critical ( ' cannot find explicity requested aux module {} ' . format ( v ) )
sys . exit ( 1 )
logg . info ( ' aux module {} found in path ' . format ( v ) )
aux . append ( v )
for v in aux :
mod = importlib . import_module ( ' cic_eth.aux. ' + v )
mod . setup ( rpc , config )
2021-02-01 18:12:51 +01:00
def main ( ) :
argv = [ ' worker ' ]
if args . vv :
argv . append ( ' --loglevel=DEBUG ' )
elif args . v :
argv . append ( ' --loglevel=INFO ' )
argv . append ( ' -Q ' )
argv . append ( args . q )
argv . append ( ' -n ' )
argv . append ( args . q )
2021-03-29 15:27:53 +02:00
# if config.true('SSL_ENABLE_CLIENT'):
# Callback.ssl = True
# Callback.ssl_cert_file = config.get('SSL_CERT_FILE')
# Callback.ssl_key_file = config.get('SSL_KEY_FILE')
# Callback.ssl_password = config.get('SSL_PASSWORD')
#
# if config.get('SSL_CA_FILE') != '':
# Callback.ssl_ca_file = config.get('SSL_CA_FILE')
2021-02-01 18:12:51 +01:00
2021-03-29 15:27:53 +02:00
rpc = RPCConnection . connect ( chain_spec , ' default ' )
2021-03-30 18:33:09 +02:00
2021-04-24 08:14:24 +02:00
BaseTask . default_token_symbol = config . get ( ' CIC_DEFAULT_TOKEN_SYMBOL ' )
BaseTask . default_token_address = registry . by_name ( BaseTask . default_token_symbol )
2021-05-19 08:59:42 +02:00
default_token = ERC20Token ( chain_spec , rpc , BaseTask . default_token_address )
default_token . load ( rpc )
BaseTask . default_token_decimals = default_token . decimals
BaseTask . default_token_name = default_token . name
2021-04-24 19:53:45 +02:00
BaseTask . run_dir = config . get ( ' CIC_RUN_DIR ' )
2021-04-24 08:14:24 +02:00
logg . info ( ' default token set to {} {} ' . format ( BaseTask . default_token_symbol , BaseTask . default_token_address ) )
2021-04-21 19:34:13 +02:00
2021-04-24 19:53:45 +02:00
liveness . linux . set ( rundir = config . get ( ' CIC_RUN_DIR ' ) )
2021-02-01 18:12:51 +01:00
current_app . worker_main ( argv )
2021-04-24 19:53:45 +02:00
liveness . linux . reset ( rundir = config . get ( ' CIC_RUN_DIR ' ) )
2021-02-01 18:12:51 +01:00
2021-04-04 14:40:59 +02:00
@celery.signals.eventlet_pool_postshutdown.connect
def shutdown ( sender = None , headers = None , body = None , * * kwargs ) :
logg . warning ( ' in shudown event hook ' )
2021-02-01 18:12:51 +01:00
if __name__ == ' __main__ ' :
main ( )