From 305a1f760b3159314a1fecf60ebece7f0fc02907 Mon Sep 17 00:00:00 2001 From: nolash Date: Mon, 22 Mar 2021 22:10:52 +0100 Subject: [PATCH] WIP Rehabilitate runnables --- apps/cic-eth/cic_eth/eth/erc20.py | 68 ------ apps/cic-eth/cic_eth/eth/meta.py | 71 +++++++ .../cic_eth/runnable/daemons/dispatcher.py | 34 +-- .../runnable/daemons/filters/callback.py | 98 +++++---- .../cic_eth/runnable/daemons/filters/gas.py | 5 +- .../runnable/daemons/filters/register.py | 10 +- .../cic_eth/runnable/daemons/filters/tx.py | 2 +- .../cic-eth/cic_eth/runnable/daemons/retry.py | 82 ++++--- .../cic_eth/runnable/daemons/tasker.py | 2 +- apps/cic-eth/cic_eth/sync/__init__.py | 1 - apps/cic-eth/cic_eth/sync/backend.py | 201 ------------------ apps/cic-eth/cic_eth/sync/base.py | 51 ----- apps/cic-eth/cic_eth/sync/error.py | 4 - apps/cic-eth/cic_eth/sync/head.py | 51 ----- apps/cic-eth/cic_eth/sync/history.py | 74 ------- apps/cic-eth/cic_eth/sync/mempool.py | 50 ----- apps/cic-eth/cic_eth/sync/mined.py | 109 ---------- apps/cic-eth/cic_eth/sync/retry.py | 75 ------- 18 files changed, 192 insertions(+), 796 deletions(-) create mode 100644 apps/cic-eth/cic_eth/eth/meta.py delete mode 100644 apps/cic-eth/cic_eth/sync/__init__.py delete mode 100644 apps/cic-eth/cic_eth/sync/backend.py delete mode 100644 apps/cic-eth/cic_eth/sync/base.py delete mode 100644 apps/cic-eth/cic_eth/sync/error.py delete mode 100644 apps/cic-eth/cic_eth/sync/head.py delete mode 100644 apps/cic-eth/cic_eth/sync/history.py delete mode 100644 apps/cic-eth/cic_eth/sync/mempool.py delete mode 100644 apps/cic-eth/cic_eth/sync/mined.py delete mode 100644 apps/cic-eth/cic_eth/sync/retry.py diff --git a/apps/cic-eth/cic_eth/eth/erc20.py b/apps/cic-eth/cic_eth/eth/erc20.py index 818e3bff..b0ff0268 100644 --- a/apps/cic-eth/cic_eth/eth/erc20.py +++ b/apps/cic-eth/cic_eth/eth/erc20.py @@ -7,7 +7,6 @@ import requests import web3 from chainlib.eth.constant import ZERO_ADDRESS from chainlib.chain import ChainSpec -from chainlib.status import Status as TxStatus from chainlib.connection import RPCConnection from chainlib.eth.erc20 import ERC20 from chainlib.eth.tx import ( @@ -298,70 +297,3 @@ def cache_approve_data( session.close() return (tx_hash_hex, cache_id) - -#class ExtendedTx: -# -# _default_decimals = 6 -# -# def __init__(self, tx_hash, chain_spec): -# self._chain_spec = chain_spec -# self.chain = str(chain_spec) -# self.hash = tx_hash -# self.sender = None -# self.sender_label = None -# self.recipient = None -# self.recipient_label = None -# self.source_token_value = 0 -# self.destination_token_value = 0 -# self.source_token = ZERO_ADDRESS -# self.destination_token = ZERO_ADDRESS -# self.source_token_symbol = '' -# self.destination_token_symbol = '' -# self.source_token_decimals = ExtendedTx._default_decimals -# self.destination_token_decimals = ExtendedTx._default_decimals -# self.status = TxStatus.PENDING.name -# self.status_code = TxStatus.PENDING.value -# -# -# def set_actors(self, sender, recipient, trusted_declarator_addresses=None): -# self.sender = sender -# self.recipient = recipient -# if trusted_declarator_addresses != None: -# self.sender_label = translate_address(sender, trusted_declarator_addresses, self.chain) -# self.recipient_label = translate_address(recipient, trusted_declarator_addresses, self.chain) -# -# -# def set_tokens(self, source, source_value, destination=None, destination_value=None): -# c = RpcClient(self._chain_spec) -# registry = safe_registry(c.w3) -# if destination == None: -# destination = source -# if destination_value == None: -# destination_value = source_value -# st = registry.get_address(self._chain_spec, source) -# dt = registry.get_address(self._chain_spec, destination) -# self.source_token = source -# self.source_token_symbol = st.symbol() -# self.source_token_decimals = st.decimals() -# self.source_token_value = source_value -# self.destination_token = destination -# self.destination_token_symbol = dt.symbol() -# self.destination_token_decimals = dt.decimals() -# self.destination_token_value = destination_value -# -# -# def set_status(self, n): -# if n: -# self.status = TxStatus.ERROR.name -# else: -# self.status = TxStatus.SUCCESS.name -# self.status_code = n -# -# -# def to_dict(self): -# o = {} -# for attr in dir(self): -# if attr[0] == '_' or attr in ['set_actors', 'set_tokens', 'set_status', 'to_dict']: -# continue -# o[attr] = getattr(self, attr) -# return o diff --git a/apps/cic-eth/cic_eth/eth/meta.py b/apps/cic-eth/cic_eth/eth/meta.py new file mode 100644 index 00000000..e6f446d3 --- /dev/null +++ b/apps/cic-eth/cic_eth/eth/meta.py @@ -0,0 +1,71 @@ +# extended imports +from chainlib.eth.constant import ZERO_ADDRESS +from chainlib.status import Status as TxStatus + + +class ExtendedTx: + + _default_decimals = 6 + + def __init__(self, rpc, tx_hash, chain_spec): + self.rpc = rpc + self.chain_spec = chain_spec + self.hash = tx_hash + self.sender = None + self.sender_label = None + self.recipient = None + self.recipient_label = None + self.source_token_value = 0 + self.destination_token_value = 0 + self.source_token = ZERO_ADDRESS + self.destination_token = ZERO_ADDRESS + self.source_token_symbol = '' + self.destination_token_symbol = '' + self.source_token_decimals = ExtendedTx._default_decimals + self.destination_token_decimals = ExtendedTx._default_decimals + self.status = TxStatus.PENDING.name + self.status_code = TxStatus.PENDING.value + + + def set_actors(self, sender, recipient, trusted_declarator_addresses=None): + self.sender = sender + self.recipient = recipient + if trusted_declarator_addresses != None: + self.sender_label = translate_address(sender, trusted_declarator_addresses, self.chain_spec) + self.recipient_label = translate_address(recipient, trusted_declarator_addresses, self.chain_spec) + + + def set_tokens(self, source, source_value, destination=None, destination_value=None): + if destination == None: + destination = source + if destination_value == None: + destination_value = source_value + st = ERC20Token(self.rpc, source) + dt = ERC20Token(self.rpc, destination) + self.source_token = source + self.source_token_symbol = st.symbol + self.source_token_name = st.name + self.source_token_decimals = st.decimals + self.source_token_value = source_value + self.destination_token = destination + self.destination_token_symbol = dt.symbol + self.destination_token_name = dt.name + self.destination_token_decimals = dt.decimals + self.destination_token_value = destination_value + + + def set_status(self, n): + if n: + self.status = TxStatus.ERROR.name + else: + self.status = TxStatus.SUCCESS.name + self.status_code = n + + + def to_dict(self): + o = {} + for attr in dir(self): + if attr[0] == '_' or attr in ['set_actors', 'set_tokens', 'set_status', 'to_dict']: + continue + o[attr] = getattr(self, attr) + return o diff --git a/apps/cic-eth/cic_eth/runnable/daemons/dispatcher.py b/apps/cic-eth/cic_eth/runnable/daemons/dispatcher.py index 7f5cf7c2..928ae2d4 100644 --- a/apps/cic-eth/cic_eth/runnable/daemons/dispatcher.py +++ b/apps/cic-eth/cic_eth/runnable/daemons/dispatcher.py @@ -11,11 +11,10 @@ import datetime # third-party imports import confini import celery -import web3 -from web3 import HTTPProvider, WebsocketProvider -from cic_registry import CICRegistry -from cic_registry.chain import ChainSpec +from cic_eth_registry import CICRegistry +from chainlib.chain import ChainSpec from chainlib.eth.tx import unpack +from chainsyncer.error import SyncDone from hexathon import strip_0x # local imports @@ -31,7 +30,6 @@ from cic_eth.queue.tx import ( set_dequeue, ) from cic_eth.admin.ctrl import lock_send -from cic_eth.sync.error import LoopDone from cic_eth.eth.tx import send as task_tx_send from cic_eth.error import ( PermanentTxError, @@ -51,6 +49,7 @@ logging.getLogger('web3.providers.HTTPProvider').setLevel(logging.CRITICAL) config_dir = os.path.join('/usr/local/etc/cic-eth') argparser = argparse.ArgumentParser(description='daemon that monitors transactions in new blocks') +argparser.add_argument('-p', '--provider', dest='p', type=str, help='rpc provider') argparser.add_argument('-c', type=str, default=config_dir, help='config root to use') argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration') argparser.add_argument('-q', type=str, default='cic-eth', help='celery queue to submit transaction tasks to') @@ -79,21 +78,9 @@ queue = args.q dsn = dsn_from_config(config) SessionBase.connect(dsn, debug=config.true('DATABASE_DEBUG')) +chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC')) -re_websocket = re.compile('^wss?://') -re_http = re.compile('^https?://') -blockchain_provider = config.get('ETH_PROVIDER') -if re.match(re_websocket, blockchain_provider) != None: - blockchain_provider = WebsocketProvider(blockchain_provider) -elif re.match(re_http, blockchain_provider) != None: - blockchain_provider = HTTPProvider(blockchain_provider) -else: - raise ValueError('unknown provider url {}'.format(blockchain_provider)) - -def web3_constructor(): - w3 = web3.Web3(blockchain_provider) - return (blockchain_provider, w3) -RpcClient.set_constructor(web3_constructor) +RPCConnection.registry_location(args.p, chain_spec, tag='default') run = True @@ -165,17 +152,10 @@ class DispatchSyncer: def main(): - - chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC')) - c = RpcClient(chain_spec) - - CICRegistry.init(c.w3, config.get('CIC_REGISTRY_ADDRESS'), chain_spec) - CICRegistry.add_path(config.get('ETH_ABI_DIR')) - syncer = DispatchSyncer(chain_spec) try: syncer.loop(c.w3, float(config.get('DISPATCHER_LOOP_INTERVAL'))) - except LoopDone as e: + except SyncDone as e: sys.stderr.write("dispatcher done at block {}\n".format(e)) sys.exit(0) diff --git a/apps/cic-eth/cic_eth/runnable/daemons/filters/callback.py b/apps/cic-eth/cic_eth/runnable/daemons/filters/callback.py index 7fce9d39..570f7ee9 100644 --- a/apps/cic-eth/cic_eth/runnable/daemons/filters/callback.py +++ b/apps/cic-eth/cic_eth/runnable/daemons/filters/callback.py @@ -2,29 +2,19 @@ import logging # third-party imports -import web3 import celery -from cic_registry.error import UnknownContractError +from cic_eth_registry.error import UnknownContractError from chainlib.status import Status as TxStatus -from chainlib.eth.address import to_checksum +from chainlib.eth.address import to_checksum_address +from chainlib.eth.error import RequestMismatchException from chainlib.eth.constant import ZERO_ADDRESS from hexathon import strip_0x # local imports from .base import SyncFilter -from cic_eth.eth.token import ( - unpack_transfer, - unpack_transferfrom, - ) -from cic_eth.eth.account import unpack_gift -from cic_eth.eth.token import ExtendedTx -from .base import SyncFilter +from cic_eth.eth.meta import ExtendedTx -logg = logging.getLogger(__name__) - -transfer_method_signature = 'a9059cbb' # keccak256(transfer(address,uint256)) -transferfrom_method_signature = '23b872dd' # keccak256(transferFrom(address,address,uint256)) -giveto_method_signature = '63e4bff4' # keccak256(giveTo(address)) +logg = logging.getLogger().getLogger__name__) class CallbackFilter(SyncFilter): @@ -63,38 +53,65 @@ class CallbackFilter(SyncFilter): return s + def parse_transfer(self, tx): + r = ERC20.parse_transfer_request(tx.payload) + transfer_data = {} + transfer_data['to'] = r[0] + transfer_data['value'] = r[1] + transfer_data['from'] = tx['from'] + transfer_data['token_address'] = tx['to'] + return ('transfer', transfer_data) + + + def parse_transferfrom(self, tx): + r = ERC20.parse_transfer_request(tx.payload) + transfer_data = unpack_transferfrom(tx.payload) + transfer_data['from'] = r[0] + transfer_data['to'] = r[1] + transfer_data['value'] = r[2] + transfer_data['token_address'] = tx['to'] + return ('transferfrom', transfer_data) + + + def parse_giftto(self, tx): + # TODO: broken + transfer_data = unpack_gift(tx.payload) + transfer_data['from'] = tx.inputs[0] + transfer_data['value'] = 0 + transfer_data['token_address'] = ZERO_ADDRESS + # TODO: would be better to query the gift amount from the block state + for l in tx.logs: + topics = l['topics'] + logg.debug('topixx {}'.format(topics)) + if strip_0x(topics[0]) == '45c201a59ac545000ead84f30b2db67da23353aa1d58ac522c48505412143ffa': + #transfer_data['value'] = web3.Web3.toInt(hexstr=strip_0x(l['data'])) + transfer_data['value'] = int.from_bytes(bytes.fromhex(strip_0x(l_data))) + #token_address_bytes = topics[2][32-20:] + token_address = strip_0x(topics[2])[64-40:] + transfer_data['token_address'] = to_checksum_address(token_address) + return ('tokengift', transfer_data) + + def parse_data(self, tx): transfer_type = None transfer_data = None + # TODO: what's with the mix of attributes and dict keys logg.debug('have payload {}'.format(tx.payload)) method_signature = tx.payload[:8] logg.debug('tx status {}'.format(tx.status)) - if method_signature == transfer_method_signature: - transfer_data = unpack_transfer(tx.payload) - transfer_data['from'] = tx['from'] - transfer_data['token_address'] = tx['to'] - elif method_signature == transferfrom_method_signature: - transfer_type = 'transferfrom' - transfer_data = unpack_transferfrom(tx.payload) - transfer_data['token_address'] = tx['to'] + for parser in [ + parse_transfer, + parse_transfeffrom, + parse_giftto, + ]: + try: + (transfer_type, transfer_data) = parser(tx) + break + except RequestMismatchException: + continue - # TODO: do not rely on logs here - elif method_signature == giveto_method_signature: - transfer_type = 'tokengift' - transfer_data = unpack_gift(tx.payload) - transfer_data['from'] = tx.inputs[0] - transfer_data['value'] = 0 - transfer_data['token_address'] = ZERO_ADDRESS - for l in tx.logs: - topics = l['topics'] - logg.debug('topixx {}'.format(topics)) - if strip_0x(topics[0]) == '45c201a59ac545000ead84f30b2db67da23353aa1d58ac522c48505412143ffa': - transfer_data['value'] = web3.Web3.toInt(hexstr=strip_0x(l['data'])) - #token_address_bytes = topics[2][32-20:] - token_address = strip_0x(topics[2])[64-40:] - transfer_data['token_address'] = to_checksum(token_address) logg.debug('resolved method {}'.format(transfer_type)) @@ -105,8 +122,6 @@ class CallbackFilter(SyncFilter): def filter(self, conn, block, tx, db_session=None): - chain_str = str(self.chain_spec) - transfer_data = None transfer_type = None try: @@ -122,11 +137,10 @@ class CallbackFilter(SyncFilter): logg.debug('checking callbacks filter input {}'.format(tx.payload[:8])) if transfer_data != None: - logg.debug('wtfoo {}'.format(transfer_data)) token_symbol = None result = None try: - tokentx = ExtendedTx(tx.hash, self.chain_spec) + tokentx = ExtendedTx(conn, tx.hash, self.chain_spec) tokentx.set_actors(transfer_data['from'], transfer_data['to'], self.trusted_addresses) tokentx.set_tokens(transfer_data['token_address'], transfer_data['value']) if transfer_data['status'] == 0: diff --git a/apps/cic-eth/cic_eth/runnable/daemons/filters/gas.py b/apps/cic-eth/cic_eth/runnable/daemons/filters/gas.py index 3fee3d9f..0d5970d1 100644 --- a/apps/cic-eth/cic_eth/runnable/daemons/filters/gas.py +++ b/apps/cic-eth/cic_eth/runnable/daemons/filters/gas.py @@ -2,7 +2,6 @@ import logging # external imports -from cic_registry.chain import ChainSpec from hexathon import add_0x # local imports @@ -14,7 +13,7 @@ from cic_eth.queue.tx import get_paused_txs from cic_eth.eth.task import create_check_gas_and_send_task from .base import SyncFilter -logg = logging.getLogger(__name__) +logg = logging.getLogger().getLogger(__name__) class GasFilter(SyncFilter): @@ -47,7 +46,7 @@ class GasFilter(SyncFilter): if len(txs) > 0: s = create_check_gas_and_send_task( list(txs.values()), - str(self.chain_spec), + self.chain_spec.asdict(), r[0], 0, tx_hashes_hex=list(txs.keys()), diff --git a/apps/cic-eth/cic_eth/runnable/daemons/filters/register.py b/apps/cic-eth/cic_eth/runnable/daemons/filters/register.py index 8d0e3124..40eef8ad 100644 --- a/apps/cic-eth/cic_eth/runnable/daemons/filters/register.py +++ b/apps/cic-eth/cic_eth/runnable/daemons/filters/register.py @@ -3,7 +3,7 @@ import logging # third-party imports import celery -from chainlib.eth.address import to_checksum +from chainlib.eth.address import to_checksum_address from hexathon import ( add_0x, strip_0x, @@ -12,9 +12,9 @@ from hexathon import ( # local imports from .base import SyncFilter -logg = logging.getLogger(__name__) +logg = logging.getLogger().getChild(__name__) -account_registry_add_log_hash = '0x5ed3bdd47b9af629827a8d129aa39c870b10c03f0153fe9ddb8e84b665061acd' # keccak256(AccountAdded(address,uint256)) +account_registry_add_log_hash = '0x5ed3bdd47b9af629827a8d129aa39c870b10c03f0153fe9ddb8e84b665061acd' class RegistrationFilter(SyncFilter): @@ -32,7 +32,7 @@ class RegistrationFilter(SyncFilter): # TODO: use abi conversion method instead address_hex = strip_0x(l['topics'][1])[64-40:] - address = to_checksum(add_0x(address_hex)) + address = to_checksum_address(add_0x(address_hex)) logg.info('request token gift to {}'.format(address)) s_nonce = celery.signature( 'cic_eth.eth.tx.reserve_nonce', @@ -44,7 +44,7 @@ class RegistrationFilter(SyncFilter): s_gift = celery.signature( 'cic_eth.eth.account.gift', [ - str(self.chain_spec), + self.chain_spec.asdict(), ], queue=self.queue, ) diff --git a/apps/cic-eth/cic_eth/runnable/daemons/filters/tx.py b/apps/cic-eth/cic_eth/runnable/daemons/filters/tx.py index c796d33c..52ec2532 100644 --- a/apps/cic-eth/cic_eth/runnable/daemons/filters/tx.py +++ b/apps/cic-eth/cic_eth/runnable/daemons/filters/tx.py @@ -13,7 +13,7 @@ from chainsyncer.db.models.base import SessionBase from chainlib.status import Status from .base import SyncFilter -logg = logging.getLogger(__name__) +logg = logging.getLogger().getChild(__name__) class TxFilter(SyncFilter): diff --git a/apps/cic-eth/cic_eth/runnable/daemons/retry.py b/apps/cic-eth/cic_eth/runnable/daemons/retry.py index 7abf07ff..d3774ca4 100644 --- a/apps/cic-eth/cic_eth/runnable/daemons/retry.py +++ b/apps/cic-eth/cic_eth/runnable/daemons/retry.py @@ -8,9 +8,8 @@ import datetime import web3 import confini import celery -from web3 import HTTPProvider, WebsocketProvider -from cic_registry import CICRegistry -from cic_registry.chain import ChainSpec +from cic_eth_registry import CICRegistry +from chainlib.chain import ChainSpec from cic_eth.db import dsn_from_config from cic_eth.db import SessionBase @@ -25,19 +24,14 @@ from cic_eth.eth.util import unpack_signed_raw_tx_hex logging.basicConfig(level=logging.WARNING) logg = logging.getLogger() -logging.getLogger('websockets.protocol').setLevel(logging.CRITICAL) -logging.getLogger('web3.RequestManager').setLevel(logging.CRITICAL) -logging.getLogger('web3.providers.WebsocketProvider').setLevel(logging.CRITICAL) -logging.getLogger('web3.providers.HTTPProvider').setLevel(logging.CRITICAL) - config_dir = os.path.join('/usr/local/etc/cic-eth') argparser = argparse.ArgumentParser(description='daemon that monitors transactions in new blocks') +argparser.add_argument('-p', '--provider', dest='p', type=str, help='rpc provider') argparser.add_argument('-c', type=str, default=config_dir, help='config root to use') argparser.add_argument('-i', '--chain-spec', dest='i', type=str, help='chain spec') argparser.add_argument('--retry-delay', dest='retry_delay', type=str, help='seconds to wait for retrying a transaction that is marked as sent') -argparser.add_argument('--abi-dir', dest='abi_dir', type=str, help='Directory containing bytecode and abi') argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration') argparser.add_argument('-q', type=str, default='cic-eth', help='celery queue to submit transaction tasks to') argparser.add_argument('-v', help='be verbose', action='store_true') @@ -71,31 +65,15 @@ queue = args.q chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC')) +RPCConnection.registry_location(args.p, chain_spec, tag='default') + dsn = dsn_from_config(config) SessionBase.connect(dsn) - -re_websocket = re.compile('^wss?://') -re_http = re.compile('^https?://') -blockchain_provider = config.get('ETH_PROVIDER') -if re.match(re_websocket, blockchain_provider) != None: - blockchain_provider = WebsocketProvider(blockchain_provider) -elif re.match(re_http, blockchain_provider) != None: - blockchain_provider = HTTPProvider(blockchain_provider) -else: - raise ValueError('unknown provider url {}'.format(blockchain_provider)) - -def web3_constructor(): - w3 = web3.Web3(blockchain_provider) - return (blockchain_provider, w3) -RpcClient.set_constructor(web3_constructor) - - straggler_delay = int(config.get('CIC_TX_RETRY_DELAY')) # TODO: we already have the signed raw tx in get, so its a waste of cycles to get_tx here -def sendfail_filter(w3, tx_hash, rcpt, chain_str): - chain_spec = ChainSpec.from_chain_str(chain_str) +def sendfail_filter(w3, tx_hash, rcpt, chain_spec): tx_dict = get_tx(tx_hash) tx = unpack_signed_raw_tx_hex(tx_dict['signed_tx'], chain_spec.chain_id()) logg.debug('submitting tx {} for retry'.format(tx_hash)) @@ -137,7 +115,7 @@ def sendfail_filter(w3, tx_hash, rcpt, chain_str): # TODO: can we merely use the dispatcher instead? -def dispatch(chain_str): +def dispatch(conn, chain_spec): txs = get_status_tx(StatusEnum.RETRY, before=datetime.datetime.utcnow()) if len(txs) == 0: logg.debug('no retry state txs found') @@ -199,11 +177,49 @@ def dispatch(chain_str): # s_send.apply_async() -def main(): +class RetrySyncer(Syncer): - c = RpcClient(chain_spec) - CICRegistry.init(c.w3, config.get('CIC_REGISTRY_ADDRESS'), chain_spec) - CICRegistry.add_path(config.get('ETH_ABI_DIR')) + def __init__(self, chain_spec, stalled_grace_seconds, failed_grace_seconds=None, final_func=None): + self.chain_spec = chain_spec + if failed_grace_seconds == None: + failed_grace_seconds = stalled_grace_seconds + self.stalled_grace_seconds = stalled_grace_seconds + self.failed_grace_seconds = failed_grace_seconds + self.final_func = final_func + + + def get(self): +# before = datetime.datetime.utcnow() - datetime.timedelta(seconds=self.failed_grace_seconds) +# failed_txs = get_status_tx( +# StatusEnum.SENDFAIL.value, +# before=before, +# ) + before = datetime.datetime.utcnow() - datetime.timedelta(seconds=self.stalled_grace_seconds) + stalled_txs = get_status_tx( + StatusBits.IN_NETWORK.value, + not_status=StatusBits.FINAL | StatusBits.MANUAL | StatusBits.OBSOLETE, + before=before, + ) + # return list(failed_txs.keys()) + list(stalled_txs.keys()) + return stalled_txs + + def process(self, conn, ref): + logg.debug('tx {}'.format(ref)) + for f in self.filter: + f(conn, ref, None, str(self.chain_spec)) + + + + def loop(self, interval): + while self.running and Syncer.running_global: + rpc = RPCConnection.connect(self.chain_spec, 'default') + for tx in self.get(): + self.process(rpc, tx) + if self.final_func != None: + self.final_func(rpc, self.chain_spec) + time.sleep(interval) + +def main(): syncer = RetrySyncer(chain_spec, straggler_delay, final_func=dispatch) syncer.filter.append(sendfail_filter) diff --git a/apps/cic-eth/cic_eth/runnable/daemons/tasker.py b/apps/cic-eth/cic_eth/runnable/daemons/tasker.py index fa13142d..7ead2f71 100644 --- a/apps/cic-eth/cic_eth/runnable/daemons/tasker.py +++ b/apps/cic-eth/cic_eth/runnable/daemons/tasker.py @@ -49,7 +49,7 @@ logg = logging.getLogger() config_dir = os.path.join('/usr/local/etc/cic-eth') argparser = argparse.ArgumentParser() -argparser.add_argument('-p', '--provider', dest='p', type=str, help='web3 provider') +argparser.add_argument('-p', '--provider', dest='p', type=str, help='rpc provider') argparser.add_argument('-c', type=str, default=config_dir, help='config file') argparser.add_argument('-q', type=str, default='cic-eth', help='queue name for worker tasks') argparser.add_argument('-r', type=str, help='CIC registry address') diff --git a/apps/cic-eth/cic_eth/sync/__init__.py b/apps/cic-eth/cic_eth/sync/__init__.py deleted file mode 100644 index 325f58dc..00000000 --- a/apps/cic-eth/cic_eth/sync/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from .base import Syncer diff --git a/apps/cic-eth/cic_eth/sync/backend.py b/apps/cic-eth/cic_eth/sync/backend.py deleted file mode 100644 index 56f08f30..00000000 --- a/apps/cic-eth/cic_eth/sync/backend.py +++ /dev/null @@ -1,201 +0,0 @@ -# standard imports -import logging - -# local imports -from cic_eth.db.models.sync import BlockchainSync -from cic_eth.db.models.base import SessionBase - -logg = logging.getLogger() - - -class SyncerBackend: - """Interface to block and transaction sync state. - - :param chain_spec: Chain spec for the chain that syncer is running for. - :type chain_spec: cic_registry.chain.ChainSpec - :param object_id: Unique id for the syncer session. - :type object_id: number - """ - def __init__(self, chain_spec, object_id): - self.db_session = None - self.db_object = None - self.chain_spec = chain_spec - self.object_id = object_id - self.connect() - self.disconnect() - - - def connect(self): - """Loads the state of the syncer session with the given id. - """ - if self.db_session == None: - self.db_session = SessionBase.create_session() - q = self.db_session.query(BlockchainSync) - q = q.filter(BlockchainSync.id==self.object_id) - self.db_object = q.first() - if self.db_object == None: - self.disconnect() - raise ValueError('sync entry with id {} not found'.format(self.object_id)) - return self.db_session - - - def disconnect(self): - """Commits state of sync to backend. - """ - if self.db_session != None: - self.db_session.add(self.db_object) - self.db_session.commit() - self.db_session.close() - self.db_session = None - - - def chain(self): - """Returns chain spec for syncer - - :returns: Chain spec - :rtype chain_spec: cic_registry.chain.ChainSpec - """ - return self.chain_spec - - - def get(self): - """Get the current state of the syncer cursor. - - :returns: Block and block transaction height, respectively - :rtype: tuple - """ - self.connect() - pair = self.db_object.cursor() - self.disconnect() - return pair - - - def set(self, block_height, tx_height): - """Update the state of the syncer cursor - :param block_height: Block height of cursor - :type block_height: number - :param tx_height: Block transaction height of cursor - :type tx_height: number - :returns: Block and block transaction height, respectively - :rtype: tuple - """ - self.connect() - pair = self.db_object.set(block_height, tx_height) - self.disconnect() - return pair - - - def start(self): - """Get the initial state of the syncer cursor. - - :returns: Initial block and block transaction height, respectively - :rtype: tuple - """ - self.connect() - pair = self.db_object.start() - self.disconnect() - return pair - - - def target(self): - """Get the target state (upper bound of sync) of the syncer cursor. - - :returns: Target block height - :rtype: number - """ - self.connect() - target = self.db_object.target() - self.disconnect() - return target - - - @staticmethod - def first(chain): - """Returns the model object of the most recent syncer in backend. - - :param chain: Chain spec of chain that syncer is running for. - :type chain: cic_registry.chain.ChainSpec - :returns: Last syncer object - :rtype: cic_eth.db.models.BlockchainSync - """ - return BlockchainSync.first(chain) - - - @staticmethod - def initial(chain, block_height): - """Creates a new syncer session and commit its initial state to backend. - - :param chain: Chain spec of chain that syncer is running for. - :type chain: cic_registry.chain.ChainSpec - :param block_height: Target block height - :type block_height: number - :returns: New syncer object - :rtype: cic_eth.db.models.BlockchainSync - """ - object_id = None - session = SessionBase.create_session() - o = BlockchainSync(chain, 0, 0, block_height) - session.add(o) - session.commit() - object_id = o.id - session.close() - - return SyncerBackend(chain, object_id) - - - @staticmethod - def resume(chain, block_height): - """Retrieves and returns all previously unfinished syncer sessions. - - - :param chain: Chain spec of chain that syncer is running for. - :type chain: cic_registry.chain.ChainSpec - :param block_height: Target block height - :type block_height: number - :returns: Syncer objects of unfinished syncs - :rtype: list of cic_eth.db.models.BlockchainSync - """ - syncers = [] - - session = SessionBase.create_session() - - object_id = None - - for object_id in BlockchainSync.get_unsynced(session=session): - logg.debug('block syncer resume added previously unsynced sync entry id {}'.format(object_id)) - syncers.append(SyncerBackend(chain, object_id)) - - (block_resume, tx_resume) = BlockchainSync.get_last_live_height(block_height, session=session) - if block_height != block_resume: - o = BlockchainSync(chain, block_resume, tx_resume, block_height) - session.add(o) - session.commit() - object_id = o.id - syncers.append(SyncerBackend(chain, object_id)) - logg.debug('block syncer resume added new sync entry from previous run id {}, start{}:{} target {}'.format(object_id, block_resume, tx_resume, block_height)) - - session.close() - - return syncers - - - @staticmethod - def live(chain, block_height): - """Creates a new open-ended syncer session starting at the given block height. - - :param chain: Chain spec of chain that syncer is running for. - :type chain: cic_registry.chain.ChainSpec - :param block_height: Target block height - :type block_height: number - :returns: "Live" syncer object - :rtype: cic_eth.db.models.BlockchainSync - """ - object_id = None - session = SessionBase.create_session() - o = BlockchainSync(chain, block_height, 0, None) - session.add(o) - session.commit() - object_id = o.id - session.close() - - return SyncerBackend(chain, object_id) diff --git a/apps/cic-eth/cic_eth/sync/base.py b/apps/cic-eth/cic_eth/sync/base.py deleted file mode 100644 index b678ccb7..00000000 --- a/apps/cic-eth/cic_eth/sync/base.py +++ /dev/null @@ -1,51 +0,0 @@ -# TODO: extend blocksync model -class Syncer: - """Base class and interface for implementing a block sync poller routine. - - :param bc_cache: Retrieves block cache cursors for chain head and latest processed block. - :type bc_cache: cic_eth.sync.SyncerBackend - """ - w3 = None - running_global = True - - def __init__(self, bc_cache): - self.cursor = None - self.bc_cache = bc_cache - self.filter = [] - self.running = True - - - def chain(self): - """Returns the string representation of the chain spec for the chain the syncer is running on. - - :returns: Chain spec string - :rtype: str - """ - return self.bc_cache.chain() - - - def get(self): - """Get latest unprocessed blocks. - - :returns: list of block hash strings - :rtype: list - """ - raise NotImplementedError() - - - def process(self, w3, ref): - """Process transactions in a single block. - - :param ref: Reference of object to process - :type ref: str, 0x-hex - """ - raise NotImplementedError() - - - def loop(self, interval): - """Entry point for syncer loop - - :param interval: Delay in seconds until next attempt if no new blocks are found. - :type interval: int - """ - raise NotImplementedError() diff --git a/apps/cic-eth/cic_eth/sync/error.py b/apps/cic-eth/cic_eth/sync/error.py deleted file mode 100644 index 1d2ff27b..00000000 --- a/apps/cic-eth/cic_eth/sync/error.py +++ /dev/null @@ -1,4 +0,0 @@ -class LoopDone(Exception): - """Exception raised when a syncing is complete. - """ - pass diff --git a/apps/cic-eth/cic_eth/sync/head.py b/apps/cic-eth/cic_eth/sync/head.py deleted file mode 100644 index f96a75b7..00000000 --- a/apps/cic-eth/cic_eth/sync/head.py +++ /dev/null @@ -1,51 +0,0 @@ -# standard imports -import logging - -# third-party imports -import web3 - -# local imports -from .mined import MinedSyncer -from .base import Syncer - -logg = logging.getLogger() - - -class HeadSyncer(MinedSyncer): - """Implements the get method in Syncer for retrieving every new mined block. - - :param bc_cache: Retrieves block cache cursors for chain head and latest processed block. - :type bc_cache: Object implementing methods from cic_eth.sync.SyncerBackend - """ - def __init__(self, bc_cache): - super(HeadSyncer, self).__init__(bc_cache) - # TODO: filter not returning all blocks, at least with ganache. kind of defeats the point, then - #self.w3_filter = rpc.w3.eth.filter({ - # 'fromBlock': block_offset, - # }) #'latest') - #self.bc_cache.set(block_offset, 0) - logg.debug('initialized head syncer with offset {}'.format(bc_cache.start())) - - """Implements Syncer.get - - :param w3: Web3 object - :type w3: web3.Web3 - :returns: Block hash of newly mined blocks. if any - :rtype: list of str, 0x-hex - """ - def get(self, w3): - # Of course, the filter doesn't return the same block dict format as getBlock() so we'll just waste some cycles getting the hashes instead. - #hashes = [] - #for block in self.w3_filter.get_new_entries(): - # hashes.append(block['blockHash']) - #logg.debug('blocks {}'.format(hashes)) - #return hashes - (block_number, tx_number) = self.bc_cache.get() - block_hash = [] - try: - block = w3.eth.getBlock(block_number) - block_hash.append(block.hash) - except web3.exceptions.BlockNotFound: - pass - - return block_hash diff --git a/apps/cic-eth/cic_eth/sync/history.py b/apps/cic-eth/cic_eth/sync/history.py deleted file mode 100644 index 6a500fd0..00000000 --- a/apps/cic-eth/cic_eth/sync/history.py +++ /dev/null @@ -1,74 +0,0 @@ -# standard imports -import logging - -# third-party imports -from web3.exceptions import BlockNotFound -from .error import LoopDone - -# local imports -from .mined import MinedSyncer -from .base import Syncer -from cic_eth.db.models.base import SessionBase - -logg = logging.getLogger() - - -class HistorySyncer(MinedSyncer): - """Implements the get method in Syncer for retrieving all blocks between last processed block before previous shutdown and block height at time of syncer start. - - :param bc_cache: Retrieves block cache cursors for chain head and latest processed block. - :type bc_cache: Object implementing methods from cic_eth.sync.SyncerBackend - :param mx: Maximum number of blocks to return in one call - :type mx: int - """ - def __init__(self, bc_cache, mx=500): - super(HistorySyncer, self).__init__(bc_cache) - self.max = mx - - self.target = bc_cache.target() - logg.info('History syncer target block number {}'.format(self.target)) - - session_offset = self.bc_cache.get() - - self.block_offset = session_offset[0] - self.tx_offset = session_offset[1] - logg.info('History syncer starting at {}:{}'.format(session_offset[0], session_offset[1])) - - self.filter = [] - - - """Implements Syncer.get - - BUG: Should also raise LoopDone when block array is empty after loop. - - :param w3: Web3 object - :type w3: web3.Web3 - :raises LoopDone: If a block is not found. - :return: Return a batch of blocks to process - :rtype: list of str, 0x-hex - """ - def get(self, w3): - sync_db = self.bc_cache - height = self.bc_cache.get() - logg.debug('height {}'.format(height)) - block_last = height[0] - tx_last = height[1] - if not self.running: - raise LoopDone((block_last, tx_last)) - b = [] - block_target = block_last + self.max - if block_target > self.target: - block_target = self.target - logg.debug('target {} last {} max {}'.format(block_target, block_last, self.max)) - for i in range(block_last, block_target): - if i == self.target: - logg.info('reached target {}, exiting'.format(i)) - self.running = False - break - bhash = w3.eth.getBlock(i).hash - b.append(bhash) - logg.debug('appending block {} {}'.format(i, bhash.hex())) - if block_last == block_target: - logg.info('aleady reached target {}, exiting'.format(self.target)) - self.running = False - return b diff --git a/apps/cic-eth/cic_eth/sync/mempool.py b/apps/cic-eth/cic_eth/sync/mempool.py deleted file mode 100644 index a3a62c6d..00000000 --- a/apps/cic-eth/cic_eth/sync/mempool.py +++ /dev/null @@ -1,50 +0,0 @@ -class MemPoolSyncer(Syncer): - - - def __init__(self, bc_cache): - raise NotImplementedError('incomplete, needs web3 tx to raw transaction conversion') - super(MemPoolSyncer, self).__init__(bc_cache) -# self.w3_filter = Syncer.w3.eth.filter('pending') -# for tx in tx_cache.txs: -# self.txs.append(tx) -# logg.debug('add tx {} to mempoolsyncer'.format(tx)) -# -# -# def get(self): -# return self.w3_filter.get_new_entries() -# -# -# def process(self, tx_hash): -# tx_hash_hex = tx_hash.hex() -# if tx_hash_hex in self.txs: -# logg.debug('syncer already watching {}, skipping'.format(tx_hash_hex)) -# tx = self.w3.eth.getTransaction(tx_hash_hex) -# serialized_tx = rlp.encode({ -# 'nonce': tx.nonce, -# 'from': getattr(tx, 'from'), -# }) -# logg.info('add {} to syncer: {}'.format(tx, serialized_tx)) -# otx = Otx( -# nonce=tx.nonce, -# address=getattr(tx, 'from'), -# tx_hash=tx_hash_hex, -# signed_tx=serialized_tx, -# ) -# Otx.session.add(otx) -# Otx.session.commit() -# -# -# def loop(self, interval): -# while Syncer.running: -# logg.debug('loop execute') -# txs = self.get() -# logg.debug('got txs {}'.format(txs)) -# for tx in txs: -# #block_number = self.process(block.hex()) -# self.process(tx) -# #if block_number > self.bc_cache.head(): -# # self.bc_cache.head(block_number) -# time.sleep(interval) -# logg.info("Syncer no longer set to run, gracefully exiting") - - diff --git a/apps/cic-eth/cic_eth/sync/mined.py b/apps/cic-eth/cic_eth/sync/mined.py deleted file mode 100644 index 362b223d..00000000 --- a/apps/cic-eth/cic_eth/sync/mined.py +++ /dev/null @@ -1,109 +0,0 @@ -# standard imports -import logging -import time - -# third-party imports -import celery - -# local impotes -from .base import Syncer -from cic_eth.queue.tx import set_final_status -from cic_eth.eth import RpcClient - -app = celery.current_app -logg = logging.getLogger() - - -class MinedSyncer(Syncer): - """Base implementation of block processor for mined blocks. - - Loops through all transactions, - - :param bc_cache: Retrieves block cache cursors for chain head and latest processed block. - :type bc_cache: Object implementing methods from cic_eth.sync.SyncerBackend - """ - - yield_delay = 0.005 - - def __init__(self, bc_cache): - super(MinedSyncer, self).__init__(bc_cache) - self.block_offset = 0 - self.tx_offset = 0 - - - def process(self, w3, ref): - """Processes transactions in a single block, advancing transaction (and block) cursor accordingly. - - :param w3: Web3 object - :type w3: web3.Web3 - :param ref: Block reference (hash) to process - :type ref: str, 0x-hex - :returns: Block number of next unprocessed block - :rtype: number - """ - b = w3.eth.getBlock(ref) - c = w3.eth.getBlockTransactionCount(ref) - s = 0 - if self.block_offset == b.number: - s = self.tx_offset - - logg.debug('processing {} (blocknumber {}, count {}, offset {})'.format(ref, b.number, c, s)) - - for i in range(s, c): - tx = w3.eth.getTransactionByBlock(ref, i) - tx_hash_hex = tx['hash'].hex() - rcpt = w3.eth.getTransactionReceipt(tx_hash_hex) - logg.debug('{}/{} processing tx {} from block {} {}'.format(i+1, c, tx_hash_hex, b.number, ref)) - ours = False - # TODO: ensure filter loop can complete on graceful shutdown - for f in self.filter: - #try: - session = self.bc_cache.connect() - task_uuid = f(w3, tx, rcpt, self.chain(), session) - #except Exception as e: - # logg.error('error in filter {} tx {}: {}'.format(f, tx_hash_hex, e)) - # continue - if task_uuid != None: - logg.debug('tx {} passed to celery task {}'.format(tx_hash_hex, task_uuid)) - s = celery.signature( - 'set_final_status', - [tx_hash_hex, rcpt['blockNumber'], not rcpt['status']], - ) - s.apply_async() - break - next_tx = i + 1 - if next_tx == c: - self.bc_cache.set(b.number+1, 0) - else: - self.bc_cache.set(b.number, next_tx) - if c == 0: - logg.info('synced block {} has no transactions'.format(b.number)) - #self.bc_cache.session(b.number+1, 0) - self.bc_cache.set(b.number+1, 0) - return b['number'] - - - - def loop(self, interval): - """Loop running until the "running" property of Syncer is set to False. - - Retrieves latest unprocessed blocks and processes them. - - :param interval: Delay in seconds until next attempt if no new blocks are found. - :type interval: int - """ - while self.running and Syncer.running_global: - self.bc_cache.connect() - c = RpcClient(self.chain()) - logg.debug('loop execute') - e = self.get(c.w3) - logg.debug('got blocks {}'.format(e)) - for block in e: - block_number = self.process(c.w3, block.hex()) - logg.debug('processed block {} {}'.format(block_number, block.hex())) - self.bc_cache.disconnect() - if len(e) > 0: - time.sleep(self.yield_delay) - else: - time.sleep(interval) - logg.info("Syncer no longer set to run, gracefully exiting") diff --git a/apps/cic-eth/cic_eth/sync/retry.py b/apps/cic-eth/cic_eth/sync/retry.py deleted file mode 100644 index 57bd7e99..00000000 --- a/apps/cic-eth/cic_eth/sync/retry.py +++ /dev/null @@ -1,75 +0,0 @@ -# standard imports -import logging -import datetime -import time - -# third-party imports -import celery - -# local imports -from .base import Syncer -from cic_eth.eth.rpc import RpcClient -from cic_eth.db.enum import ( - StatusEnum, - StatusBits, - ) -from cic_eth.queue.tx import get_status_tx - -logg = logging.getLogger() - -celery_app = celery.current_app - - -class noop_cache: - - def __init__(self, chain_spec): - self.chain_spec = chain_spec - - - def chain(self): - return self.chain_spec - - -class RetrySyncer(Syncer): - - def __init__(self, chain_spec, stalled_grace_seconds, failed_grace_seconds=None, final_func=None): - cache = noop_cache(chain_spec) - super(RetrySyncer, self).__init__(cache) - if failed_grace_seconds == None: - failed_grace_seconds = stalled_grace_seconds - self.stalled_grace_seconds = stalled_grace_seconds - self.failed_grace_seconds = failed_grace_seconds - self.final_func = final_func - - - def get(self, w3): -# before = datetime.datetime.utcnow() - datetime.timedelta(seconds=self.failed_grace_seconds) -# failed_txs = get_status_tx( -# StatusEnum.SENDFAIL.value, -# before=before, -# ) - before = datetime.datetime.utcnow() - datetime.timedelta(seconds=self.stalled_grace_seconds) - stalled_txs = get_status_tx( - StatusBits.IN_NETWORK.value, - not_status=StatusBits.FINAL | StatusBits.MANUAL | StatusBits.OBSOLETE, - before=before, - ) - # return list(failed_txs.keys()) + list(stalled_txs.keys()) - return stalled_txs - - - def process(self, w3, ref): - logg.debug('tx {}'.format(ref)) - for f in self.filter: - f(w3, ref, None, str(self.chain())) - - - def loop(self, interval): - chain_str = str(self.chain()) - while self.running and Syncer.running_global: - c = RpcClient(self.chain()) - for tx in self.get(c.w3): - self.process(c.w3, tx) - if self.final_func != None: - self.final_func(chain_str) - time.sleep(interval)