2021-02-01 18:12:51 +01:00
# standard imports
import sys
import os
import logging
import argparse
import tempfile
import re
import urllib
import websocket
2021-03-29 15:27:53 +02:00
# external imports
2021-02-01 18:12:51 +01:00
import celery
import confini
2021-03-29 15:27:53 +02:00
from chainlib . connection import RPCConnection
from chainlib . eth . connection import EthUnixSignerConnection
from chainlib . chain import ChainSpec
2021-04-04 14:40:59 +02:00
from chainqueue . db . models . otx import Otx
2021-04-19 12:46:23 +02:00
import liveness . linux
2021-02-01 18:12:51 +01:00
# local imports
2021-04-04 14:40:59 +02:00
from cic_eth . eth import (
erc20 ,
tx ,
account ,
nonce ,
gas ,
)
from cic_eth . admin import (
debug ,
ctrl ,
)
from cic_eth . queue import (
query ,
balance ,
state ,
tx ,
lock ,
time ,
)
from cic_eth . callbacks import (
Callback ,
http ,
2021-04-16 22:24:07 +02:00
noop ,
2021-04-04 14:40:59 +02:00
#tcp,
redis ,
)
2021-02-01 18:12:51 +01:00
from cic_eth . db . models . base import SessionBase
from cic_eth . db import dsn_from_config
2021-03-04 16:06:14 +01:00
from cic_eth . ext import tx
2021-03-30 18:33:09 +02:00
from cic_eth . registry import (
connect as connect_registry ,
connect_declarator ,
connect_token_registry ,
)
2021-02-01 18:12:51 +01:00
2021-04-19 12:46:23 +02:00
2021-02-01 18:12:51 +01:00
logging . basicConfig ( level = logging . WARNING )
logg = logging . getLogger ( )
config_dir = os . path . join ( ' /usr/local/etc/cic-eth ' )
argparser = argparse . ArgumentParser ( )
2021-03-29 15:27:53 +02:00
argparser . add_argument ( ' -p ' , ' --provider ' , dest = ' p ' , type = str , help = ' rpc provider ' )
2021-02-01 18:12:51 +01:00
argparser . add_argument ( ' -c ' , type = str , default = config_dir , help = ' config file ' )
argparser . add_argument ( ' -q ' , type = str , default = ' cic-eth ' , help = ' queue name for worker tasks ' )
2021-02-21 16:41:37 +01:00
argparser . add_argument ( ' -r ' , type = str , help = ' CIC registry address ' )
2021-02-01 18:12:51 +01:00
argparser . add_argument ( ' --abi-dir ' , dest = ' abi_dir ' , type = str , help = ' Directory containing bytecode and abi ' )
argparser . add_argument ( ' --trace-queue-status ' , default = None , dest = ' trace_queue_status ' , action = ' store_true ' , help = ' set to perist all queue entry status changes to storage ' )
argparser . add_argument ( ' -i ' , ' --chain-spec ' , dest = ' i ' , type = str , help = ' chain spec ' )
argparser . add_argument ( ' --env-prefix ' , default = os . environ . get ( ' CONFINI_ENV_PREFIX ' ) , dest = ' env_prefix ' , type = str , help = ' environment prefix for variables to overwrite configuration ' )
argparser . add_argument ( ' -v ' , action = ' store_true ' , help = ' be verbose ' )
argparser . add_argument ( ' -vv ' , action = ' store_true ' , help = ' be more verbose ' )
args = argparser . parse_args ( )
if args . vv :
logging . getLogger ( ) . setLevel ( logging . DEBUG )
elif args . v :
logging . getLogger ( ) . setLevel ( logging . INFO )
config = confini . Config ( args . c , args . env_prefix )
config . process ( )
# override args
args_override = {
' CIC_CHAIN_SPEC ' : getattr ( args , ' i ' ) ,
2021-02-21 16:41:37 +01:00
' CIC_REGISTRY_ADDRESS ' : getattr ( args , ' r ' ) ,
2021-02-01 18:12:51 +01:00
' ETH_PROVIDER ' : getattr ( args , ' p ' ) ,
' TASKS_TRACE_QUEUE_STATUS ' : getattr ( args , ' trace_queue_status ' ) ,
}
2021-03-29 15:27:53 +02:00
config . add ( args . q , ' _CELERY_QUEUE ' , True )
2021-02-01 18:12:51 +01:00
config . dict_override ( args_override , ' cli flag ' )
config . censor ( ' PASSWORD ' , ' DATABASE ' )
config . censor ( ' PASSWORD ' , ' SSL ' )
logg . debug ( ' config loaded from {} : \n {} ' . format ( args . c , config ) )
2021-04-19 12:46:23 +02:00
health_modules = config . get ( ' CIC_HEALTH_MODULES ' , [ ] )
if len ( health_modules ) != 0 :
health_modules = health_modules . split ( ' , ' )
logg . debug ( ' health mods {} ' . format ( health_modules ) )
2021-02-01 18:12:51 +01:00
# connect to database
dsn = dsn_from_config ( config )
2021-04-09 15:00:15 +02:00
SessionBase . connect ( dsn , pool_size = int ( config . get ( ' DATABASE_POOL_SIZE ' ) ) , debug = config . true ( ' DATABASE_DEBUG ' ) )
2021-02-01 18:12:51 +01:00
# set up celery
current_app = celery . Celery ( __name__ )
broker = config . get ( ' CELERY_BROKER_URL ' )
if broker [ : 4 ] == ' file ' :
bq = tempfile . mkdtemp ( )
bp = tempfile . mkdtemp ( )
current_app . conf . update ( {
' broker_url ' : broker ,
' broker_transport_options ' : {
' data_folder_in ' : bq ,
' data_folder_out ' : bq ,
' data_folder_processed ' : bp ,
} ,
} ,
)
logg . warning ( ' celery broker dirs queue i/o {} processed {} , will NOT be deleted on shutdown ' . format ( bq , bp ) )
else :
current_app . conf . update ( {
' broker_url ' : broker ,
} )
result = config . get ( ' CELERY_RESULT_URL ' )
if result [ : 4 ] == ' file ' :
rq = tempfile . mkdtemp ( )
current_app . conf . update ( {
' result_backend ' : ' file:// {} ' . format ( rq ) ,
} )
logg . warning ( ' celery backend store dir {} created, will NOT be deleted on shutdown ' . format ( rq ) )
else :
current_app . conf . update ( {
' result_backend ' : result ,
} )
2021-03-29 15:27:53 +02:00
chain_spec = ChainSpec . from_chain_str ( config . get ( ' CIC_CHAIN_SPEC ' ) )
RPCConnection . register_location ( config . get ( ' ETH_PROVIDER ' ) , chain_spec , ' default ' )
RPCConnection . register_location ( config . get ( ' SIGNER_SOCKET_PATH ' ) , chain_spec , ' signer ' , constructor = EthUnixSignerConnection )
2021-02-01 18:12:51 +01:00
Otx . tracing = config . true ( ' TASKS_TRACE_QUEUE_STATUS ' )
2021-04-19 15:59:13 +02:00
liveness . linux . load ( health_modules )
2021-02-01 18:12:51 +01:00
def main ( ) :
argv = [ ' worker ' ]
if args . vv :
argv . append ( ' --loglevel=DEBUG ' )
elif args . v :
argv . append ( ' --loglevel=INFO ' )
argv . append ( ' -Q ' )
argv . append ( args . q )
argv . append ( ' -n ' )
argv . append ( args . q )
2021-03-29 15:27:53 +02:00
# if config.true('SSL_ENABLE_CLIENT'):
# Callback.ssl = True
# Callback.ssl_cert_file = config.get('SSL_CERT_FILE')
# Callback.ssl_key_file = config.get('SSL_KEY_FILE')
# Callback.ssl_password = config.get('SSL_PASSWORD')
#
# if config.get('SSL_CA_FILE') != '':
# Callback.ssl_ca_file = config.get('SSL_CA_FILE')
2021-02-01 18:12:51 +01:00
2021-03-29 15:27:53 +02:00
rpc = RPCConnection . connect ( chain_spec , ' default ' )
2021-03-30 18:33:09 +02:00
connect_registry ( rpc , chain_spec , config . get ( ' CIC_REGISTRY_ADDRESS ' ) )
2021-02-01 18:12:51 +01:00
2021-02-17 09:19:42 +01:00
trusted_addresses_src = config . get ( ' CIC_TRUST_ADDRESS ' )
if trusted_addresses_src == None :
logg . critical ( ' At least one trusted address must be declared in CIC_TRUST_ADDRESS ' )
sys . exit ( 1 )
trusted_addresses = trusted_addresses_src . split ( ' , ' )
for address in trusted_addresses :
logg . info ( ' using trusted address {} ' . format ( address ) )
2021-03-30 18:33:09 +02:00
connect_declarator ( rpc , chain_spec , trusted_addresses )
connect_token_registry ( rpc , chain_spec )
2021-04-19 12:46:23 +02:00
2021-04-19 15:59:13 +02:00
liveness . linux . set ( )
2021-02-01 18:12:51 +01:00
current_app . worker_main ( argv )
2021-04-19 15:59:13 +02:00
liveness . linux . reset ( )
2021-02-01 18:12:51 +01:00
2021-04-04 14:40:59 +02:00
@celery.signals.eventlet_pool_postshutdown.connect
def shutdown ( sender = None , headers = None , body = None , * * kwargs ) :
logg . warning ( ' in shudown event hook ' )
2021-02-01 18:12:51 +01:00
if __name__ == ' __main__ ' :
main ( )