chaind-eth/chaind/eth/runnable/tasker.py
2022-04-10 17:13:49 +00:00

138 lines
4.3 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# standard imports
import os
import logging
import signal
# external imports
import chainlib.eth.cli
from chaind.session import SessionController
from chaind.setup import Environment
from chaind.error import (
NothingToDoError,
ClientGoneError,
ClientBlockError,
ClientInputError,
)
from chainqueue import (
Store,
Status,
)
from chainqueue.error import DuplicateTxError
from chainqueue.store.fs import (
IndexStore,
CounterStore,
)
from chainqueue.cache import CacheTokenTx
from chainlib.encode import TxHexNormalizer
from chainlib.chain import ChainSpec
from chaind.adapters.fs import ChaindFsAdapter
# local imports
from chaind.eth.dispatch import EthDispatcher
from chaind.eth.cache import EthCacheTx
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
script_dir = os.path.dirname(os.path.realpath(__file__))
config_dir = os.path.join(script_dir, '..', 'data', 'config')
env = Environment(domain='eth', env=os.environ)
arg_flags = chainlib.eth.cli.argflag_std_read
argparser = chainlib.eth.cli.ArgumentParser(arg_flags)
argparser.add_argument('--data-dir', type=str, help='data directory')
argparser.add_argument('--runtime-dir', type=str, help='runtime directory')
argparser.add_argument('--session-id', dest='session_id', type=str, help='session identifier')
argparser.add_argument('--dispatch-delay', dest='dispatch_delay', type=float, help='socket timeout before processing queue')
args = argparser.parse_args()
extra_args = {
'runtime_dir': 'SESSION_RUNTIME_DIR',
'data_dir': 'SESSION_DATA_DIR',
'session_id': 'SESSION_ID',
'dispatch_delay': 'SESSION_DISPATCH_DELAY',
}
config = chainlib.eth.cli.Config.from_args(args, arg_flags, extra_args=extra_args, base_config_dir=config_dir)
logg.debug('session id {} {}'.format(type(config.get('SESSION_ID')), config.get('SESSION_ID')))
if config.get('SESSION_ID') == None:
config.add(env.session, 'SESSION_ID', exists_ok=True)
if config.get('SESSION_RUNTIME_DIR') == None:
config.add(env.runtime_dir, 'SESSION_RUNTIME_DIR', exists_ok=True)
if config.get('SESSION_DATA_DIR') == None:
config.add(env.data_dir, 'SESSION_DATA_DIR', exists_ok=True)
if not config.get('SESSION_SOCKET_PATH'):
socket_path = os.path.join(config.get('SESSION_RUNTIME_DIR'), config.get('SESSION_ID'), 'chaind.sock')
config.add(socket_path, 'SESSION_SOCKET_PATH', True)
logg.debug('config loaded:\n{}'.format(config))
def process_outgoing(chain_spec, adapter, rpc, limit=100):
upcoming = adapter.upcoming()
logg.info('process {} {} {}'.format(chain_spec, adapter, rpc))
logg.info('upcoming {}'.format(upcoming))
i = 0
for tx_hash in upcoming:
if adapter.dispatch(tx_hash):
i += 1
return i
chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC'))
rpc = chainlib.eth.cli.Rpc()
conn = rpc.connect_by_config(config)
tx_normalizer = TxHexNormalizer().tx_hash
token_cache_store = CacheTokenTx(chain_spec, normalizer=tx_normalizer)
dispatcher = EthDispatcher(conn)
queue_adapter = ChaindFsAdapter(
chain_spec,
config.get('SESSION_DATA_DIR'),
EthCacheTx,
dispatcher,
)
ctrl = SessionController(config, queue_adapter, process_outgoing)
signal.signal(signal.SIGINT, ctrl.shutdown)
signal.signal(signal.SIGTERM, ctrl.shutdown)
def main():
while True:
v = None
client_socket = None
try:
(client_socket, v) = ctrl.get()
except ClientGoneError:
break
except ClientBlockError:
continue
except ClientInputError:
continue
except NothingToDoError:
pass
if v == None:
ctrl.process(conn)
continue
result_data = None
r = 0 # no error
try:
result_data = queue_adapter.put(v.hex())
except DuplicateTxError as e:
logg.error('tx already exists: {}'.format(e))
r = 1
except ValueError as e:
logg.error('adapter rejected input {}: "{}"'.format(v.hex(), e))
continue
if r == 0:
queue_adapter.enqueue(result_data)
ctrl.respond_put(client_socket, r, extra_data=result_data)
if __name__ == '__main__':
main()