2021-02-01 18:12:51 +01:00
|
|
|
# standard imports
|
|
|
|
import os
|
|
|
|
import sys
|
|
|
|
import logging
|
|
|
|
import time
|
|
|
|
import argparse
|
|
|
|
import sys
|
|
|
|
import re
|
|
|
|
import datetime
|
|
|
|
|
2021-08-17 08:46:51 +02:00
|
|
|
# external imports
|
2021-02-01 18:12:51 +01:00
|
|
|
import celery
|
2021-03-29 15:27:53 +02:00
|
|
|
from cic_eth_registry import CICRegistry
|
|
|
|
from chainlib.chain import ChainSpec
|
2021-03-06 18:55:51 +01:00
|
|
|
from chainlib.eth.tx import unpack
|
2021-03-29 15:27:53 +02:00
|
|
|
from chainlib.connection import RPCConnection
|
2021-03-06 18:55:51 +01:00
|
|
|
from hexathon import strip_0x
|
2021-04-04 14:40:59 +02:00
|
|
|
from chainqueue.db.enum import (
|
|
|
|
StatusEnum,
|
|
|
|
StatusBits,
|
|
|
|
)
|
|
|
|
from chainqueue.error import NotLocalTxError
|
2021-06-03 19:22:47 +02:00
|
|
|
from chainqueue.sql.state import set_reserved
|
2021-02-01 18:12:51 +01:00
|
|
|
|
|
|
|
# local imports
|
2021-08-17 08:46:51 +02:00
|
|
|
import cic_eth.cli
|
2021-02-01 18:12:51 +01:00
|
|
|
from cic_eth.db import SessionBase
|
|
|
|
from cic_eth.db.enum import LockEnum
|
|
|
|
from cic_eth.db import dsn_from_config
|
2021-04-04 14:40:59 +02:00
|
|
|
from cic_eth.queue.query import get_upcoming_tx
|
2021-02-01 18:12:51 +01:00
|
|
|
from cic_eth.admin.ctrl import lock_send
|
|
|
|
from cic_eth.eth.tx import send as task_tx_send
|
2021-03-01 21:15:17 +01:00
|
|
|
from cic_eth.error import (
|
|
|
|
PermanentTxError,
|
|
|
|
TemporaryTxError,
|
|
|
|
)
|
2021-02-01 18:12:51 +01:00
|
|
|
|
|
|
|
logging.basicConfig(level=logging.WARNING)
|
|
|
|
logg = logging.getLogger()
|
|
|
|
|
2021-08-17 08:46:51 +02:00
|
|
|
arg_flags = cic_eth.cli.argflag_std_read
|
|
|
|
local_arg_flags = cic_eth.cli.argflag_local_sync | cic_eth.cli.argflag_local_task
|
|
|
|
argparser = cic_eth.cli.ArgumentParser(arg_flags)
|
|
|
|
argparser.process_local_flags(local_arg_flags)
|
|
|
|
args = argparser.parse_args()
|
2021-02-01 18:12:51 +01:00
|
|
|
|
2021-08-17 08:46:51 +02:00
|
|
|
config = cic_eth.cli.Config.from_args(args, arg_flags, local_arg_flags)
|
2021-02-01 18:12:51 +01:00
|
|
|
|
2021-08-17 08:46:51 +02:00
|
|
|
# connect to celery
|
|
|
|
celery_app = cic_eth.cli.CeleryApp.from_config(config)
|
|
|
|
|
|
|
|
# connect to database
|
2021-02-01 18:12:51 +01:00
|
|
|
dsn = dsn_from_config(config)
|
2021-02-12 03:27:29 +01:00
|
|
|
SessionBase.connect(dsn, debug=config.true('DATABASE_DEBUG'))
|
2021-02-01 18:12:51 +01:00
|
|
|
|
2021-08-17 08:46:51 +02:00
|
|
|
chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC'))
|
2021-02-01 18:12:51 +01:00
|
|
|
|
2021-08-17 08:46:51 +02:00
|
|
|
# set up rpc
|
|
|
|
rpc = cic_eth.cli.RPC.from_config(config)
|
|
|
|
conn = rpc.get_default()
|
2021-02-01 18:12:51 +01:00
|
|
|
|
|
|
|
run = True
|
|
|
|
|
2021-08-17 08:46:51 +02:00
|
|
|
|
2021-02-01 18:12:51 +01:00
|
|
|
class DispatchSyncer:
|
|
|
|
|
2021-04-01 22:55:39 +02:00
|
|
|
yield_delay = 0.0005
|
2021-02-21 16:41:37 +01:00
|
|
|
|
2021-02-01 18:12:51 +01:00
|
|
|
def __init__(self, chain_spec):
|
|
|
|
self.chain_spec = chain_spec
|
2021-07-15 00:02:59 +02:00
|
|
|
self.session = None
|
2021-02-01 18:12:51 +01:00
|
|
|
|
|
|
|
|
|
|
|
def chain(self):
|
|
|
|
return self.chain_spec
|
|
|
|
|
|
|
|
|
|
|
|
def process(self, w3, txs):
|
|
|
|
c = len(txs.keys())
|
|
|
|
logg.debug('processing {} txs {}'.format(c, list(txs.keys())))
|
|
|
|
chain_str = str(self.chain_spec)
|
2021-07-15 00:02:59 +02:00
|
|
|
self.session = SessionBase.create_session()
|
2021-02-01 18:12:51 +01:00
|
|
|
for k in txs.keys():
|
|
|
|
tx_raw = txs[k]
|
2021-03-06 18:55:51 +01:00
|
|
|
tx_raw_bytes = bytes.fromhex(strip_0x(tx_raw))
|
2021-04-04 14:40:59 +02:00
|
|
|
tx = unpack(tx_raw_bytes, self.chain_spec)
|
2021-03-01 21:15:17 +01:00
|
|
|
|
|
|
|
try:
|
2021-07-15 00:02:59 +02:00
|
|
|
set_reserved(self.chain_spec, tx['hash'], session=self.session)
|
|
|
|
self.session.commit()
|
2021-03-01 21:15:17 +01:00
|
|
|
except NotLocalTxError as e:
|
|
|
|
logg.warning('dispatcher was triggered with non-local tx {}'.format(tx['hash']))
|
2021-07-15 00:02:59 +02:00
|
|
|
self.session.rollback()
|
2021-03-06 18:55:51 +01:00
|
|
|
continue
|
2021-02-01 18:12:51 +01:00
|
|
|
|
|
|
|
s_check = celery.signature(
|
|
|
|
'cic_eth.admin.ctrl.check_lock',
|
|
|
|
[
|
|
|
|
[tx_raw],
|
2021-03-29 15:27:53 +02:00
|
|
|
self.chain_spec.asdict(),
|
2021-02-01 18:12:51 +01:00
|
|
|
LockEnum.QUEUE,
|
|
|
|
tx['from'],
|
|
|
|
],
|
2021-08-25 11:33:23 +02:00
|
|
|
queue=config.get('CELERY_QUEUE'),
|
2021-02-01 18:12:51 +01:00
|
|
|
)
|
|
|
|
s_send = celery.signature(
|
|
|
|
'cic_eth.eth.tx.send',
|
|
|
|
[
|
2021-03-29 15:27:53 +02:00
|
|
|
self.chain_spec.asdict(),
|
2021-02-01 18:12:51 +01:00
|
|
|
],
|
2021-08-25 11:33:23 +02:00
|
|
|
queue=config.get('CELERY_QUEUE'),
|
2021-02-01 18:12:51 +01:00
|
|
|
)
|
|
|
|
s_check.link(s_send)
|
|
|
|
t = s_check.apply_async()
|
2021-03-01 21:15:17 +01:00
|
|
|
logg.info('processed {}'.format(k))
|
2021-07-15 00:02:59 +02:00
|
|
|
self.session.close()
|
|
|
|
self.session = None
|
2021-02-01 18:12:51 +01:00
|
|
|
|
|
|
|
|
2021-07-15 00:02:59 +02:00
|
|
|
def loop(self, interval):
|
2021-02-01 18:12:51 +01:00
|
|
|
while run:
|
|
|
|
txs = {}
|
2021-03-01 21:15:17 +01:00
|
|
|
typ = StatusBits.QUEUED
|
2021-04-04 14:40:59 +02:00
|
|
|
utxs = get_upcoming_tx(self.chain_spec, typ)
|
2021-02-01 18:12:51 +01:00
|
|
|
for k in utxs.keys():
|
|
|
|
txs[k] = utxs[k]
|
2021-07-15 00:02:59 +02:00
|
|
|
try:
|
|
|
|
conn = RPCConnection.connect(self.chain_spec, 'default')
|
|
|
|
self.process(conn, txs)
|
|
|
|
except ConnectionError as e:
|
|
|
|
if self.session != None:
|
|
|
|
self.session.close()
|
|
|
|
self.session = None
|
|
|
|
logg.error('connection to node failed: {}'.format(e))
|
2021-02-01 18:12:51 +01:00
|
|
|
|
2021-02-21 16:41:37 +01:00
|
|
|
if len(utxs) > 0:
|
|
|
|
time.sleep(self.yield_delay)
|
|
|
|
else:
|
|
|
|
time.sleep(interval)
|
2021-02-01 18:12:51 +01:00
|
|
|
|
|
|
|
|
|
|
|
def main():
|
|
|
|
syncer = DispatchSyncer(chain_spec)
|
2021-07-15 00:02:59 +02:00
|
|
|
syncer.loop(float(config.get('DISPATCHER_LOOP_INTERVAL')))
|
2021-02-01 18:12:51 +01:00
|
|
|
|
|
|
|
sys.exit(0)
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
main()
|