2021-02-01 18:12:51 +01:00
# standard imports
import os
import sys
import logging
import time
import argparse
import sys
import re
import datetime
# third-party imports
import confini
import celery
2021-03-29 15:27:53 +02:00
from cic_eth_registry import CICRegistry
from chainlib . chain import ChainSpec
2021-03-06 18:55:51 +01:00
from chainlib . eth . tx import unpack
2021-03-29 15:27:53 +02:00
from chainlib . connection import RPCConnection
2021-03-06 18:55:51 +01:00
from hexathon import strip_0x
2021-04-04 14:40:59 +02:00
from chainqueue . db . enum import (
StatusEnum ,
StatusBits ,
)
from chainqueue . error import NotLocalTxError
2021-06-03 19:22:47 +02:00
from chainqueue . sql . state import set_reserved
2021-02-01 18:12:51 +01:00
# local imports
import cic_eth
from cic_eth . db import SessionBase
from cic_eth . db . enum import LockEnum
from cic_eth . db import dsn_from_config
2021-04-04 14:40:59 +02:00
from cic_eth . queue . query import get_upcoming_tx
2021-02-01 18:12:51 +01:00
from cic_eth . admin . ctrl import lock_send
from cic_eth . eth . tx import send as task_tx_send
2021-03-01 21:15:17 +01:00
from cic_eth . error import (
PermanentTxError ,
TemporaryTxError ,
)
2021-02-01 18:12:51 +01:00
logging . basicConfig ( level = logging . WARNING )
logg = logging . getLogger ( )
config_dir = os . path . join ( ' /usr/local/etc/cic-eth ' )
argparser = argparse . ArgumentParser ( description = ' daemon that monitors transactions in new blocks ' )
2021-03-29 15:27:53 +02:00
argparser . add_argument ( ' -p ' , ' --provider ' , default = ' http://localhost:8545 ' , dest = ' p ' , type = str , help = ' rpc provider ' )
2021-02-01 18:12:51 +01:00
argparser . add_argument ( ' -c ' , type = str , default = config_dir , help = ' config root to use ' )
2021-03-29 15:27:53 +02:00
argparser . add_argument ( ' -i ' , ' --chain-spec ' , dest = ' i ' , type = str , help = ' chain spec ' )
2021-02-01 18:12:51 +01:00
argparser . add_argument ( ' --env-prefix ' , default = os . environ . get ( ' CONFINI_ENV_PREFIX ' ) , dest = ' env_prefix ' , type = str , help = ' environment prefix for variables to overwrite configuration ' )
argparser . add_argument ( ' -q ' , type = str , default = ' cic-eth ' , help = ' celery queue to submit transaction tasks to ' )
argparser . add_argument ( ' -v ' , help = ' be verbose ' , action = ' store_true ' )
argparser . add_argument ( ' -vv ' , help = ' be more verbose ' , action = ' store_true ' )
args = argparser . parse_args ( sys . argv [ 1 : ] )
if args . v == True :
logging . getLogger ( ) . setLevel ( logging . INFO )
elif args . vv == True :
logging . getLogger ( ) . setLevel ( logging . DEBUG )
config_dir = os . path . join ( args . c )
os . makedirs ( config_dir , 0o777 , True )
config = confini . Config ( config_dir , args . env_prefix )
config . process ( )
# override args
2021-03-29 15:27:53 +02:00
args_override = {
' CIC_CHAIN_SPEC ' : getattr ( args , ' i ' ) ,
' ETH_PROVIDER ' : getattr ( args , ' p ' ) ,
}
config . dict_override ( args_override , ' cli flag ' )
2021-02-01 18:12:51 +01:00
config . censor ( ' PASSWORD ' , ' DATABASE ' )
config . censor ( ' PASSWORD ' , ' SSL ' )
logg . debug ( ' config loaded from {} : \n {} ' . format ( config_dir , config ) )
app = celery . Celery ( backend = config . get ( ' CELERY_RESULT_URL ' ) , broker = config . get ( ' CELERY_BROKER_URL ' ) )
queue = args . q
dsn = dsn_from_config ( config )
2021-02-12 03:27:29 +01:00
SessionBase . connect ( dsn , debug = config . true ( ' DATABASE_DEBUG ' ) )
2021-02-01 18:12:51 +01:00
2021-03-29 15:27:53 +02:00
chain_spec = ChainSpec . from_chain_str ( config . get ( ' CIC_CHAIN_SPEC ' ) )
2021-02-01 18:12:51 +01:00
2021-03-29 15:27:53 +02:00
RPCConnection . register_location ( config . get ( ' ETH_PROVIDER ' ) , chain_spec , tag = ' default ' )
2021-02-01 18:12:51 +01:00
run = True
class DispatchSyncer :
2021-04-01 22:55:39 +02:00
yield_delay = 0.0005
2021-02-21 16:41:37 +01:00
2021-02-01 18:12:51 +01:00
def __init__ ( self , chain_spec ) :
self . chain_spec = chain_spec
2021-07-14 14:43:51 +02:00
self . session = None
2021-02-01 18:12:51 +01:00
def chain ( self ) :
return self . chain_spec
def process ( self , w3 , txs ) :
c = len ( txs . keys ( ) )
logg . debug ( ' processing {} txs {} ' . format ( c , list ( txs . keys ( ) ) ) )
chain_str = str ( self . chain_spec )
2021-07-14 14:43:51 +02:00
self . session = SessionBase . create_session ( )
2021-02-01 18:12:51 +01:00
for k in txs . keys ( ) :
tx_raw = txs [ k ]
2021-03-06 18:55:51 +01:00
tx_raw_bytes = bytes . fromhex ( strip_0x ( tx_raw ) )
2021-04-04 14:40:59 +02:00
tx = unpack ( tx_raw_bytes , self . chain_spec )
2021-03-01 21:15:17 +01:00
try :
2021-07-14 23:17:32 +02:00
set_reserved ( self . chain_spec , tx [ ' hash ' ] , session = self . session )
2021-07-14 14:43:51 +02:00
self . session . commit ( )
2021-03-01 21:15:17 +01:00
except NotLocalTxError as e :
logg . warning ( ' dispatcher was triggered with non-local tx {} ' . format ( tx [ ' hash ' ] ) )
2021-07-14 14:43:51 +02:00
self . session . rollback ( )
2021-03-06 18:55:51 +01:00
continue
2021-02-01 18:12:51 +01:00
s_check = celery . signature (
' cic_eth.admin.ctrl.check_lock ' ,
[
[ tx_raw ] ,
2021-03-29 15:27:53 +02:00
self . chain_spec . asdict ( ) ,
2021-02-01 18:12:51 +01:00
LockEnum . QUEUE ,
tx [ ' from ' ] ,
] ,
queue = queue ,
)
s_send = celery . signature (
' cic_eth.eth.tx.send ' ,
[
2021-03-29 15:27:53 +02:00
self . chain_spec . asdict ( ) ,
2021-02-01 18:12:51 +01:00
] ,
queue = queue ,
)
s_check . link ( s_send )
t = s_check . apply_async ( )
2021-03-01 21:15:17 +01:00
logg . info ( ' processed {} ' . format ( k ) )
2021-07-14 14:43:51 +02:00
self . session . close ( )
self . session = None
2021-02-01 18:12:51 +01:00
2021-07-11 15:10:57 +02:00
def loop ( self , interval ) :
2021-02-01 18:12:51 +01:00
while run :
txs = { }
2021-03-01 21:15:17 +01:00
typ = StatusBits . QUEUED
2021-04-04 14:40:59 +02:00
utxs = get_upcoming_tx ( self . chain_spec , typ )
2021-02-01 18:12:51 +01:00
for k in utxs . keys ( ) :
txs [ k ] = utxs [ k ]
2021-07-11 15:10:57 +02:00
try :
conn = RPCConnection . connect ( self . chain_spec , ' default ' )
self . process ( conn , txs )
except ConnectionError as e :
2021-07-14 14:43:51 +02:00
if self . session != None :
self . session . close ( )
self . session = None
2021-07-11 15:10:57 +02:00
logg . error ( ' connection to node failed: {} ' . format ( e ) )
2021-02-01 18:12:51 +01:00
2021-02-21 16:41:37 +01:00
if len ( utxs ) > 0 :
time . sleep ( self . yield_delay )
else :
time . sleep ( interval )
2021-02-01 18:12:51 +01:00
def main ( ) :
syncer = DispatchSyncer ( chain_spec )
2021-07-11 15:10:57 +02:00
syncer . loop ( float ( config . get ( ' DISPATCHER_LOOP_INTERVAL ' ) ) )
2021-02-01 18:12:51 +01:00
sys . exit ( 0 )
if __name__ == ' __main__ ' :
main ( )