From f154136dd3a430c2d1211d53b31bf026c321e97b Mon Sep 17 00:00:00 2001 From: Louis Holbrook Date: Wed, 17 Feb 2021 08:19:42 +0000 Subject: [PATCH] cic-eth: Address translator task --- apps/cic-cache | 2 +- apps/cic-eth/cic_eth/eth/token.py | 58 +++ apps/cic-eth/cic_eth/eth/util.py | 2 + apps/cic-eth/cic_eth/ext/__init__.py | 0 apps/cic-eth/cic_eth/ext/address.py | 43 ++ .../runnable/{ => daemons}/dispatcher.py | 0 .../runnable/daemons/filters/__init__.py | 4 + .../cic_eth/runnable/daemons/filters/base.py | 2 + .../runnable/daemons/filters/callback.py | 107 +++++ .../runnable/daemons/filters/convert.py | 61 +++ .../cic_eth/runnable/daemons/filters/gas.py | 54 +++ .../runnable/daemons/filters/register.py | 35 ++ .../cic_eth/runnable/daemons/filters/tx.py | 38 ++ .../cic_eth/runnable/daemons/manager.py | 207 +++++++++ .../cic_eth/runnable/{ => daemons}/retry.py | 0 .../cic_eth/runnable/{ => daemons}/tasker.py | 19 +- apps/cic-eth/cic_eth/runnable/manager.py | 410 ------------------ apps/cic-eth/config/cic.ini | 5 +- apps/cic-eth/config/test/cic.ini | 6 +- apps/cic-eth/docker/start_dispatcher.sh | 2 +- apps/cic-eth/docker/start_manager.sh | 2 +- apps/cic-eth/docker/start_retry.sh | 2 +- apps/cic-eth/docker/start_tasker.sh | 2 +- apps/cic-eth/requirements.txt | 4 +- apps/cic-eth/setup.cfg | 11 +- apps/cic-eth/tests/fixtures_registry.py | 44 +- .../tests/testdata/abi/Declarator.json | 1 + .../tests/unit/eth/test_extended_tx.py | 58 +++ apps/cic-eth/tests/unit/ext/test_address.py | 33 ++ apps/contract-migration/docker/Dockerfile | 2 +- apps/contract-migration/reset.sh | 2 + docker-compose.yml | 3 - 32 files changed, 775 insertions(+), 444 deletions(-) create mode 100644 apps/cic-eth/cic_eth/ext/__init__.py create mode 100644 apps/cic-eth/cic_eth/ext/address.py rename apps/cic-eth/cic_eth/runnable/{ => daemons}/dispatcher.py (100%) create mode 100644 apps/cic-eth/cic_eth/runnable/daemons/filters/__init__.py create mode 100644 apps/cic-eth/cic_eth/runnable/daemons/filters/base.py create mode 100644 apps/cic-eth/cic_eth/runnable/daemons/filters/callback.py create mode 100644 apps/cic-eth/cic_eth/runnable/daemons/filters/convert.py create mode 100644 apps/cic-eth/cic_eth/runnable/daemons/filters/gas.py create mode 100644 apps/cic-eth/cic_eth/runnable/daemons/filters/register.py create mode 100644 apps/cic-eth/cic_eth/runnable/daemons/filters/tx.py create mode 100644 apps/cic-eth/cic_eth/runnable/daemons/manager.py rename apps/cic-eth/cic_eth/runnable/{ => daemons}/retry.py (100%) rename apps/cic-eth/cic_eth/runnable/{ => daemons}/tasker.py (93%) delete mode 100644 apps/cic-eth/cic_eth/runnable/manager.py create mode 100644 apps/cic-eth/tests/testdata/abi/Declarator.json create mode 100644 apps/cic-eth/tests/unit/eth/test_extended_tx.py create mode 100644 apps/cic-eth/tests/unit/ext/test_address.py diff --git a/apps/cic-cache b/apps/cic-cache index 06c5f0fb..d2cb3a45 160000 --- a/apps/cic-cache +++ b/apps/cic-cache @@ -1 +1 @@ -Subproject commit 06c5f0fb0dca5992a7ffb12874b5cb0c9fdcd706 +Subproject commit d2cb3a45558d7ca3a412c97c6aea794d9ac6c6f5 diff --git a/apps/cic-eth/cic_eth/eth/token.py b/apps/cic-eth/cic_eth/eth/token.py index fac767e9..5fd77c12 100644 --- a/apps/cic-eth/cic_eth/eth/token.py +++ b/apps/cic-eth/cic_eth/eth/token.py @@ -6,6 +6,7 @@ import celery import requests import web3 from cic_registry import CICRegistry +from cic_registry import zero_address from cic_registry.chain import ChainSpec # platform imports @@ -17,6 +18,7 @@ from cic_eth.eth.task import sign_and_register_tx from cic_eth.eth.task import create_check_gas_and_send_task from cic_eth.eth.factory import TxFactory from cic_eth.eth.util import unpack_signed_raw_tx +from cic_eth.ext.address import translate_address celery_app = celery.current_app logg = logging.getLogger() @@ -445,3 +447,59 @@ def cache_approve_data( cache_id = tx_cache.id session.close() return (tx_hash_hex, cache_id) + + +class ExtendedTx: + + _default_decimals = 6 + + def __init__(self, tx_hash, chain_spec): + self._chain_spec = chain_spec + self.chain = str(chain_spec) + self.hash = tx_hash + self.sender = None + self.sender_label = None + self.recipient = None + self.recipient_label = None + self.source_token_value = 0 + self.destination_token_value = 0 + self.source_token = zero_address + self.destination_token = zero_address + self.source_token_symbol = '' + self.destination_token_symbol = '' + self.source_token_decimals = ExtendedTx._default_decimals + self.destination_token_decimals = ExtendedTx._default_decimals + + + def set_actors(self, sender, recipient, trusted_declarator_addresses=None): + self.sender = sender + self.recipient = recipient + if trusted_declarator_addresses != None: + self.sender_label = translate_address(sender, trusted_declarator_addresses, self.chain) + self.recipient_label = translate_address(recipient, trusted_declarator_addresses, self.chain) + + + def set_tokens(self, source, source_value, destination=None, destination_value=None): + if destination == None: + destination = source + if destination_value == None: + destination_value = source_value + st = CICRegistry.get_address(self._chain_spec, source) + dt = CICRegistry.get_address(self._chain_spec, destination) + self.source_token = source + self.source_token_symbol = st.symbol() + self.source_token_decimals = st.decimals() + self.source_token_value = source_value + self.destination_token = destination + self.destination_token_symbol = dt.symbol() + self.destination_token_decimals = dt.decimals() + self.destination_token_value = destination_value + + + def to_dict(self): + o = {} + for attr in dir(self): + if attr[0] == '_' or attr in ['set_actors', 'set_tokens', 'to_dict']: + continue + o[attr] = getattr(self, attr) + return o diff --git a/apps/cic-eth/cic_eth/eth/util.py b/apps/cic-eth/cic_eth/eth/util.py index d90c3221..71683d82 100644 --- a/apps/cic-eth/cic_eth/eth/util.py +++ b/apps/cic-eth/cic_eth/eth/util.py @@ -104,3 +104,5 @@ def tx_hex_string(tx_hex, chain_id): tx_raw_bytes = bytes.fromhex(tx_hex) return tx_string(tx_raw_bytes, chain_id) + + diff --git a/apps/cic-eth/cic_eth/ext/__init__.py b/apps/cic-eth/cic_eth/ext/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/apps/cic-eth/cic_eth/ext/address.py b/apps/cic-eth/cic_eth/ext/address.py new file mode 100644 index 00000000..e7f8dac0 --- /dev/null +++ b/apps/cic-eth/cic_eth/ext/address.py @@ -0,0 +1,43 @@ +# standard imports +import logging + +# third-party imports +import celery +from cic_registry.chain import ChainSpec +from cic_registry import CICRegistry + +celery_app = celery.current_app + +logg = logging.getLogger() + + +def translate_address(address, trusted_addresses, chain_spec): + for trusted_address in trusted_addresses: + o = CICRegistry.get_contract(chain_spec, 'AddressDeclarator', 'Declarator') + fn = o.function('declaration') + declaration_hex = fn(trusted_address, address).call() + declaration_bytes = declaration_hex[0].rstrip(b'\x00') + declaration = None + try: + declaration = declaration_bytes.decode('utf-8', errors='strict') + except UnicodeDecodeError: + continue + return declaration + + +@celery_app.task() +def translate_tx_addresses(tx, trusted_addresses, chain_str): + + chain_spec = ChainSpec.from_chain_str(chain_str) + + declaration = None + if tx['sender_label'] == None: + declaration = translate_address(tx['sender'], trusted_addresses, chain_spec) + tx['sender_label'] = declaration + + declaration = None + if tx['recipient_label'] == None: + declaration = translate_address(tx['recipient'], trusted_addresses, chain_spec) + tx['recipient_label'] = declaration + + return tx diff --git a/apps/cic-eth/cic_eth/runnable/dispatcher.py b/apps/cic-eth/cic_eth/runnable/daemons/dispatcher.py similarity index 100% rename from apps/cic-eth/cic_eth/runnable/dispatcher.py rename to apps/cic-eth/cic_eth/runnable/daemons/dispatcher.py diff --git a/apps/cic-eth/cic_eth/runnable/daemons/filters/__init__.py b/apps/cic-eth/cic_eth/runnable/daemons/filters/__init__.py new file mode 100644 index 00000000..feed42c9 --- /dev/null +++ b/apps/cic-eth/cic_eth/runnable/daemons/filters/__init__.py @@ -0,0 +1,4 @@ +from .callback import CallbackFilter +from .tx import TxFilter +from .gas import GasFilter +from .register import RegistrationFilter diff --git a/apps/cic-eth/cic_eth/runnable/daemons/filters/base.py b/apps/cic-eth/cic_eth/runnable/daemons/filters/base.py new file mode 100644 index 00000000..5a57a482 --- /dev/null +++ b/apps/cic-eth/cic_eth/runnable/daemons/filters/base.py @@ -0,0 +1,2 @@ +class SyncFilter: + pass diff --git a/apps/cic-eth/cic_eth/runnable/daemons/filters/callback.py b/apps/cic-eth/cic_eth/runnable/daemons/filters/callback.py new file mode 100644 index 00000000..a43ce319 --- /dev/null +++ b/apps/cic-eth/cic_eth/runnable/daemons/filters/callback.py @@ -0,0 +1,107 @@ +# standard imports +import logging + +# third-party imports +import web3 +import celery +from cic_registry.error import UnknownContractError + +# local imports +from .base import SyncFilter +from cic_eth.eth.token import unpack_transfer +from cic_eth.eth.token import unpack_transferfrom +from cic_eth.eth.token import ExtendedTx +from .base import SyncFilter + +logg = logging.getLogger() + +transfer_method_signature = '0xa9059cbb' # keccak256(transfer(address,uint256)) +transferfrom_method_signature = '0x23b872dd' # keccak256(transferFrom(address,address,uint256)) +giveto_method_signature = '0x63e4bff4' # keccak256(giveTo(address)) + + +class CallbackFilter(SyncFilter): + + trusted_addresses = [] + + def __init__(self, method, queue): + self.queue = queue + self.method = method + + + def call_back(self, transfer_type, result): + s = celery.signature( + self.method, + [ + result, + transfer_type, + int(rcpt.status == 0), + ], + queue=tc.queue, + ) +# s_translate = celery.signature( +# 'cic_eth.ext.address.translate', +# [ +# result, +# self.trusted_addresses, +# chain_str, +# ], +# queue=self.queue, +# ) +# s_translate.link(s) +# s_translate.apply_async() + s.apply_async() + + + def parse_data(self, tx, rcpt): + transfer_type = 'transfer' + transfer_data = None + method_signature = tx.input[:10] + + if method_signature == transfer_method_signature: + transfer_data = unpack_transfer(tx.input) + transfer_data['from'] = tx['from'] + transfer_data['token_address'] = tx['to'] + + elif method_signature == transferfrom_method_signature: + transfer_type = 'transferfrom' + transfer_data = unpack_transferfrom(tx.input) + transfer_data['token_address'] = tx['to'] + + # TODO: do not rely on logs here + elif method_signature == giveto_method_signature: + transfer_type = 'tokengift' + transfer_data = unpack_gift(tx.input) + for l in rcpt.logs: + if l.topics[0].hex() == '0x45c201a59ac545000ead84f30b2db67da23353aa1d58ac522c48505412143ffa': + transfer_data['value'] = web3.Web3.toInt(hexstr=l.data) + token_address_bytes = l.topics[2][32-20:] + transfer_data['token_address'] = web3.Web3.toChecksumAddress(token_address_bytes.hex()) + transfer_data['from'] = rcpt.to + + return (transfer_type, transfer_data) + + + def filter(self, w3, tx, rcpt, chain_spec): + logg.debug('applying callback filter "{}:{}"'.format(self.queue, self.method)) + chain_str = str(chain_spec) + + transfer_data = self.parse_data(tx, rcpt) + + transfer_data = None + if len(tx.input) < 10: + logg.debug('callbacks filter data length not sufficient for method signature in tx {}, skipping'.format(tx['hash'])) + return + + logg.debug('checking callbacks filter input {}'.format(tx.input[:10])) + + if transfer_data != None: + token_symbol = None + result = None + try: + tokentx = ExtendedTx(self.chain_spec) + tokentx.set_actors(transfer_data['from'], transfer_data['to'], self.trusted_addresses) + tokentx.set_tokens(transfer_data['token_address'], transfer_data['value']) + self.call_back(tokentx.to_dict()) + except UnknownContractError: + logg.debug('callback filter {}:{} skipping "transfer" method on unknown contract {} tx {}'.format(tc.queue, tc.method, transfer_data['to'], tx.hash.hex())) diff --git a/apps/cic-eth/cic_eth/runnable/daemons/filters/convert.py b/apps/cic-eth/cic_eth/runnable/daemons/filters/convert.py new file mode 100644 index 00000000..4d3bec41 --- /dev/null +++ b/apps/cic-eth/cic_eth/runnable/daemons/filters/convert.py @@ -0,0 +1,61 @@ + +#__convert_log_hash = '0x7154b38b5dd31bb3122436a96d4e09aba5b323ae1fd580025fab55074334c095' # keccak256(Conversion(address,address,address,uint256,uint256,address) + +#def parse_convert_log(w3, entry): +# data = entry.data[2:] +# from_amount = int(data[:64], 16) +# to_amount = int(data[64:128], 16) +# holder_address_hex_raw = '0x' + data[-40:] +# holder_address_hex = w3.toChecksumAddress(holder_address_hex_raw) +# o = { +# 'from_amount': from_amount, +# 'to_amount': to_amount, +# 'holder_address': holder_address_hex +# } +# logg.debug('parsed convert log {}'.format(o)) +# return o + + +#def convert_filter(w3, tx, rcpt, chain_spec): +# destination_token_address = None +# recipient_address = None +# amount = 0 +# for l in rcpt['logs']: +# event_topic_hex = l['topics'][0].hex() +# if event_topic_hex == __convert_log_hash: +# tx_hash_hex = tx['hash'].hex() +# try: +# convert_transfer = TxConvertTransfer.get(tx_hash_hex) +# except UnknownConvertError: +# logg.warning('skipping unknown convert tx {}'.format(tx_hash_hex)) +# continue +# if convert_transfer.transfer_tx_hash != None: +# logg.warning('convert tx {} cache record already has transfer hash {}, skipping'.format(tx_hash_hex, convert_transfer.transfer_hash)) +# continue +# recipient_address = convert_transfer.recipient_address +# logg.debug('found convert event {} recipient'.format(tx_hash_hex, recipient_address)) +# r = parse_convert_log(l) +# destination_token_address = l['topics'][3][-20:] +# +# if destination_token_address == zero_address or destination_token_address == None: +# return None +# +# destination_token_address_hex = destination_token_address.hex() +# s = celery.signature( +# 'cic_eth.eth.bancor.transfer_converted', +# [ +# [{ +# 'address': w3.toChecksumAddress(destination_token_address_hex), +# }], +# r['holder_address'], +# recipient_address, +# r['to_amount'], +# tx_hash_hex, +# str(chain_spec), +# ], +# queue=queue, +# ) +# logg.info('sending tx signature {}'.format(s)) +# t = s.apply_async() +# logg.debug('submitted transfer after convert task uuid {} {}'.format(t, t.successful())) +# return t diff --git a/apps/cic-eth/cic_eth/runnable/daemons/filters/gas.py b/apps/cic-eth/cic_eth/runnable/daemons/filters/gas.py new file mode 100644 index 00000000..a3cf1d32 --- /dev/null +++ b/apps/cic-eth/cic_eth/runnable/daemons/filters/gas.py @@ -0,0 +1,54 @@ +# standard imports +import logging + +# third-party imports +from cic_registry.chain import ChainSpec + +# local imports +from cic_eth.db.models.base import SessionBase +from cic_eth.db.models.tx import TxCache +from cic_eth.db import Otx +from cic_eth.queue.tx import get_paused_txs +from cic_eth.eth.task import create_check_gas_and_send_task +from .base import SyncFilter + +logg = logging.getLogger() + + +class GasFilter(SyncFilter): + + def __init__(self, gas_provider): + self.gas_provider = gas_provider + + + def filter(self, w3, tx, rcpt, chain_str): + logg.debug('applying gas filter') + tx_hash_hex = tx.hash.hex() + if tx['value'] > 0: + logg.debug('gas refill tx {}'.format(tx_hash_hex)) + session = SessionBase.create_session() + q = session.query(TxCache.recipient) + q = q.join(Otx) + q = q.filter(Otx.tx_hash==tx_hash_hex) + r = q.first() + + session.close() + + if r == None: + logg.warning('unsolicited gas refill tx {}'.format(tx_hash_hex)) + return + + chain_spec = ChainSpec.from_chain_str(chain_str) + txs = get_paused_txs(StatusEnum.WAITFORGAS, r[0], chain_spec.chain_id()) + + if len(txs) > 0: + logg.info('resuming gas-in-waiting txs for {}: {}'.format(r[0], txs.keys())) + s = create_check_gas_and_send_task( + list(txs.values()), + str(chain_str), + r[0], + 0, + tx_hashes_hex=list(txs.keys()), + queue=queue, + ) + s.apply_async() diff --git a/apps/cic-eth/cic_eth/runnable/daemons/filters/register.py b/apps/cic-eth/cic_eth/runnable/daemons/filters/register.py new file mode 100644 index 00000000..180cdca6 --- /dev/null +++ b/apps/cic-eth/cic_eth/runnable/daemons/filters/register.py @@ -0,0 +1,35 @@ +# standard imports +import logging + +# third-party imports +import celery +from chainlib.eth.address import to_checksum + +# local imports +from .base import SyncFilter + +logg = logging.getLogger() + +account_registry_add_log_hash = '0x5ed3bdd47b9af629827a8d129aa39c870b10c03f0153fe9ddb8e84b665061acd' # keccak256(AccountAdded(address,uint256)) + + +class RegistrationFilter(SyncFilter): + + def filter(self, w3, tx, rcpt, chain_spec): + logg.debug('applying registration filter') + registered_address = None + for l in rcpt['logs']: + event_topic_hex = l['topics'][0].hex() + if event_topic_hex == account_registry_add_log_hash: + address_bytes = l.topics[1][32-20:] + address = to_checksum(address_bytes.hex()) + logg.debug('request token gift to {}'.format(address)) + s = celery.signature( + 'cic_eth.eth.account.gift', + [ + address, + str(chain_spec), + ], + queue=queue, + ) + s.apply_async() diff --git a/apps/cic-eth/cic_eth/runnable/daemons/filters/tx.py b/apps/cic-eth/cic_eth/runnable/daemons/filters/tx.py new file mode 100644 index 00000000..6a645ff3 --- /dev/null +++ b/apps/cic-eth/cic_eth/runnable/daemons/filters/tx.py @@ -0,0 +1,38 @@ +# standard imports +import logging + +# third-party imports +import celery + +# local imports +from cic_eth.db.models.otx import Otx +from .base import SyncFilter + +logg = logging.getLogger() + + +class TxFilter(SyncFilter): + + def __init__(self, queue): + self.queue = queue + + + def filter(self, w3, tx, rcpt, chain_spec): + logg.debug('applying tx filter') + tx_hash_hex = tx.hash.hex() + otx = Otx.load(tx_hash_hex) + if otx == None: + logg.debug('tx {} not found locally, skipping'.format(tx_hash_hex)) + return None + logg.info('otx found {}'.format(otx.tx_hash)) + s = celery.siignature( + 'cic_eth.queue.tx.set_final_status', + [ + tx_hash_hex, + rcpt.blockNumber, + rcpt.status == 0, + ], + queue=self.queue, + ) + t = s.apply_async() + return t diff --git a/apps/cic-eth/cic_eth/runnable/daemons/manager.py b/apps/cic-eth/cic_eth/runnable/daemons/manager.py new file mode 100644 index 00000000..bb85c466 --- /dev/null +++ b/apps/cic-eth/cic_eth/runnable/daemons/manager.py @@ -0,0 +1,207 @@ +# standard imports +import os +import sys +import logging +import time +import argparse +import sys +import re + +# third-party imports +import confini +import celery +import rlp +import web3 +from web3 import HTTPProvider, WebsocketProvider +from cic_registry import CICRegistry +from cic_registry.chain import ChainSpec +from cic_registry import zero_address +from cic_registry.chain import ChainRegistry +from cic_registry.error import UnknownContractError +from cic_bancor.bancor import BancorRegistryClient + +# local imports +import cic_eth +from cic_eth.eth import RpcClient +from cic_eth.db import SessionBase +from cic_eth.db import Otx +from cic_eth.db import TxConvertTransfer +from cic_eth.db.models.tx import TxCache +from cic_eth.db.enum import StatusEnum +from cic_eth.db import dsn_from_config +from cic_eth.queue.tx import get_paused_txs +from cic_eth.sync import Syncer +from cic_eth.sync.error import LoopDone +from cic_eth.db.error import UnknownConvertError +from cic_eth.eth.util import unpack_signed_raw_tx +from cic_eth.eth.task import create_check_gas_and_send_task +from cic_eth.sync.backend import SyncerBackend +from cic_eth.eth.token import unpack_transfer +from cic_eth.eth.token import unpack_transferfrom +from cic_eth.eth.account import unpack_gift +from cic_eth.runnable.daemons.filters import ( + CallbackFilter, + GasFilter, + TxFilter, + RegistrationFilter, + ) + +logging.basicConfig(level=logging.WARNING) +logg = logging.getLogger() +logging.getLogger('websockets.protocol').setLevel(logging.CRITICAL) +logging.getLogger('web3.RequestManager').setLevel(logging.CRITICAL) +logging.getLogger('web3.providers.WebsocketProvider').setLevel(logging.CRITICAL) +logging.getLogger('web3.providers.HTTPProvider').setLevel(logging.CRITICAL) + + +config_dir = os.path.join('/usr/local/etc/cic-eth') + +argparser = argparse.ArgumentParser(description='daemon that monitors transactions in new blocks') +argparser.add_argument('-c', type=str, default=config_dir, help='config root to use') +argparser.add_argument('-i', '--chain-spec', type=str, dest='i', help='chain spec') +argparser.add_argument('--abi-dir', dest='abi_dir', type=str, help='Directory containing bytecode and abi') +argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration') +argparser.add_argument('-q', type=str, default='cic-eth', help='celery queue to submit transaction tasks to') +argparser.add_argument('-v', help='be verbose', action='store_true') +argparser.add_argument('-vv', help='be more verbose', action='store_true') +argparser.add_argument('mode', type=str, help='sync mode: (head|history)', default='head') +args = argparser.parse_args(sys.argv[1:]) + +if args.v == True: + logging.getLogger().setLevel(logging.INFO) +elif args.vv == True: + logging.getLogger().setLevel(logging.DEBUG) + +config_dir = os.path.join(args.c) +os.makedirs(config_dir, 0o777, True) +config = confini.Config(config_dir, args.env_prefix) +config.process() +# override args +args_override = { + 'ETH_ABI_DIR': getattr(args, 'abi_dir'), + 'CIC_CHAIN_SPEC': getattr(args, 'i'), + } +config.dict_override(args_override, 'cli flag') +config.censor('PASSWORD', 'DATABASE') +config.censor('PASSWORD', 'SSL') +logg.debug('config loaded from {}:\n{}'.format(config_dir, config)) + +app = celery.Celery(backend=config.get('CELERY_RESULT_URL'), broker=config.get('CELERY_BROKER_URL')) + +queue = args.q + +chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC')) + + +re_websocket = re.compile('^wss?://') +re_http = re.compile('^https?://') +blockchain_provider = config.get('ETH_PROVIDER') +if re.match(re_websocket, blockchain_provider) != None: + blockchain_provider = WebsocketProvider(blockchain_provider) +elif re.match(re_http, blockchain_provider) != None: + blockchain_provider = HTTPProvider(blockchain_provider) +else: + raise ValueError('unknown provider url {}'.format(blockchain_provider)) + +def web3_constructor(): + w3 = web3.Web3(blockchain_provider) + return (blockchain_provider, w3) +RpcClient.set_constructor(web3_constructor) + +c = RpcClient(chain_spec) +CICRegistry.init(c.w3, config.get('CIC_REGISTRY_ADDRESS'), chain_spec) +CICRegistry.add_path(config.get('ETH_ABI_DIR')) +chain_registry = ChainRegistry(chain_spec) +CICRegistry.add_chain_registry(chain_registry, True) + +declarator = CICRegistry.get_contract(chain_spec, 'AddressDeclarator', interface='Declarator') + + +dsn = dsn_from_config(config) +SessionBase.connect(dsn) + + +def main(): + global chain_spec, c, queue + + if config.get('ETH_ACCOUNT_ACCOUNTS_INDEX_WRITER') != None: + CICRegistry.add_role(chain_spec, config.get('ETH_ACCOUNT_ACCOUNTS_INDEX_WRITER'), 'AccountRegistry', True) + + syncers = [] + block_offset = c.w3.eth.blockNumber + chain = str(chain_spec) + + if SyncerBackend.first(chain): + from cic_eth.sync.history import HistorySyncer + backend = SyncerBackend.initial(chain, block_offset) + syncer = HistorySyncer(backend) + syncers.append(syncer) + + if args.mode == 'head': + from cic_eth.sync.head import HeadSyncer + block_sync = SyncerBackend.live(chain, block_offset+1) + syncers.append(HeadSyncer(block_sync)) + elif args.mode == 'history': + from cic_eth.sync.history import HistorySyncer + backends = SyncerBackend.resume(chain, block_offset+1) + for backend in backends: + syncers.append(HistorySyncer(backend)) + if len(syncers) == 0: + logg.info('found no unsynced history. terminating') + sys.exit(0) + else: + sys.stderr.write("unknown mode '{}'\n".format(args.mode)) + sys.exit(1) + +# bancor_registry_contract = CICRegistry.get_contract(chain_spec, 'BancorRegistry', interface='Registry') +# bancor_chain_registry = CICRegistry.get_chain_registry(chain_spec) +# bancor_registry = BancorRegistryClient(c.w3, bancor_chain_registry, config.get('ETH_ABI_DIR')) +# bancor_registry.load() + + trusted_addresses_src = config.get('CIC_TRUST_ADDRESS') + if trusted_addresses_src == None: + logg.critical('At least one trusted address must be declared in CIC_TRUST_ADDRESS') + sys.exit(1) + trusted_addresses = trusted_addresses_src.split(',') + for address in trusted_addresses: + logg.info('using trusted address {}'.format(address)) + CallbackFilter.trusted_addresses = trusted_addresses + + callback_filters = [] + for cb in config.get('TASKS_TRANSFER_CALLBACKS', '').split(','): + task_split = cb.split(':') + task_queue = queue + if len(task_split) > 1: + task_queue = task_split[0] + callback_filter = CallbackFilter(task_split[1], task_queue) + callback_filters.append(callback_filter) + + tx_filter = TxFilter(queue) + + registration_filter = RegistrationFilter() + + gas_filter = GasFilter(c.gas_provider()) + + i = 0 + for syncer in syncers: + logg.debug('running syncer index {}'.format(i)) + syncer.filter.append(gas_filter.filter) + syncer.filter.append(registration_filter.filter) + # TODO: the two following filter functions break the filter loop if return uuid. Pro: less code executed. Con: Possibly unintuitive flow break + syncer.filter.append(tx_filter.filter) + #syncer.filter.append(convert_filter) + for cf in callback_filters: + syncer.filter.append(cf.filter) + + try: + syncer.loop(int(config.get('SYNCER_LOOP_INTERVAL'))) + except LoopDone as e: + sys.stderr.write("sync '{}' done at block {}\n".format(args.mode, e)) + + i += 1 + + sys.exit(0) + + +if __name__ == '__main__': + main() diff --git a/apps/cic-eth/cic_eth/runnable/retry.py b/apps/cic-eth/cic_eth/runnable/daemons/retry.py similarity index 100% rename from apps/cic-eth/cic_eth/runnable/retry.py rename to apps/cic-eth/cic_eth/runnable/daemons/retry.py diff --git a/apps/cic-eth/cic_eth/runnable/tasker.py b/apps/cic-eth/cic_eth/runnable/daemons/tasker.py similarity index 93% rename from apps/cic-eth/cic_eth/runnable/tasker.py rename to apps/cic-eth/cic_eth/runnable/daemons/tasker.py index 9e75166f..6eb343ce 100644 --- a/apps/cic-eth/cic_eth/runnable/tasker.py +++ b/apps/cic-eth/cic_eth/runnable/daemons/tasker.py @@ -220,15 +220,16 @@ def main(): if config.get('ETH_ACCOUNT_ACCOUNTS_INDEX_WRITER') != None: CICRegistry.add_role(chain_spec, config.get('ETH_ACCOUNT_ACCOUNTS_INDEX_WRITER'), 'AccountRegistry', True) - if config.get('CIC_DECLARATOR_ADDRESS') != None: - abi_path = os.path.join(config.get('ETH_ABI_DIR'), '{}.json'.format(interface)) - f = open(abi_path) - abi = json.load(abi_path) - f.close() - c = w3.eth.contract(abi=abi, address=address) - trusted_addresses = config.get('CIC_TRUSTED_ADDRESSES', []).split(',') - oracle = DeclaratorOracleAdapter(contract, trusted_addresses) - chain_registry.add_oracle(oracle) + declarator = CICRegistry.get_contract(chain_spec, 'AddressDeclarator', interface='Declarator') + trusted_addresses_src = config.get('CIC_TRUST_ADDRESS') + if trusted_addresses_src == None: + logg.critical('At least one trusted address must be declared in CIC_TRUST_ADDRESS') + sys.exit(1) + trusted_addresses = trusted_addresses_src.split(',') + for address in trusted_addresses: + logg.info('using trusted address {}'.format(address)) + oracle = DeclaratorOracleAdapter(declarator.contract, trusted_addresses) + chain_registry.add_oracle('naive_erc20_oracle', oracle) #chain_spec = CICRegistry.default_chain_spec diff --git a/apps/cic-eth/cic_eth/runnable/manager.py b/apps/cic-eth/cic_eth/runnable/manager.py deleted file mode 100644 index d3aac98f..00000000 --- a/apps/cic-eth/cic_eth/runnable/manager.py +++ /dev/null @@ -1,410 +0,0 @@ -# standard imports -import os -import sys -import logging -import time -import argparse -import sys -import re - -# third-party imports -import confini -import celery -import rlp -import web3 -from web3 import HTTPProvider, WebsocketProvider -from cic_registry import CICRegistry -from cic_registry.chain import ChainSpec -from cic_registry import zero_address -from cic_registry.chain import ChainRegistry -from cic_registry.error import UnknownContractError -from cic_bancor.bancor import BancorRegistryClient - -# local imports -import cic_eth -from cic_eth.eth import RpcClient -from cic_eth.db import SessionBase -from cic_eth.db import Otx -from cic_eth.db import TxConvertTransfer -from cic_eth.db.models.tx import TxCache -from cic_eth.db.enum import StatusEnum -from cic_eth.db import dsn_from_config -from cic_eth.queue.tx import get_paused_txs -from cic_eth.sync import Syncer -from cic_eth.sync.error import LoopDone -from cic_eth.db.error import UnknownConvertError -from cic_eth.eth.util import unpack_signed_raw_tx -from cic_eth.eth.task import create_check_gas_and_send_task -from cic_eth.sync.backend import SyncerBackend -from cic_eth.eth.token import unpack_transfer -from cic_eth.eth.token import unpack_transferfrom -from cic_eth.eth.account import unpack_gift - -logging.basicConfig(level=logging.WARNING) -logg = logging.getLogger() -logging.getLogger('websockets.protocol').setLevel(logging.CRITICAL) -logging.getLogger('web3.RequestManager').setLevel(logging.CRITICAL) -logging.getLogger('web3.providers.WebsocketProvider').setLevel(logging.CRITICAL) -logging.getLogger('web3.providers.HTTPProvider').setLevel(logging.CRITICAL) - - -config_dir = os.path.join('/usr/local/etc/cic-eth') - -argparser = argparse.ArgumentParser(description='daemon that monitors transactions in new blocks') -argparser.add_argument('-c', type=str, default=config_dir, help='config root to use') -argparser.add_argument('-i', '--chain-spec', type=str, dest='i', help='chain spec') -argparser.add_argument('--abi-dir', dest='abi_dir', type=str, help='Directory containing bytecode and abi') -argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration') -argparser.add_argument('-q', type=str, default='cic-eth', help='celery queue to submit transaction tasks to') -argparser.add_argument('-v', help='be verbose', action='store_true') -argparser.add_argument('-vv', help='be more verbose', action='store_true') -argparser.add_argument('mode', type=str, help='sync mode: (head|history)', default='head') -args = argparser.parse_args(sys.argv[1:]) - -if args.v == True: - logging.getLogger().setLevel(logging.INFO) -elif args.vv == True: - logging.getLogger().setLevel(logging.DEBUG) - -config_dir = os.path.join(args.c) -os.makedirs(config_dir, 0o777, True) -config = confini.Config(config_dir, args.env_prefix) -config.process() -# override args -args_override = { - 'ETH_ABI_DIR': getattr(args, 'abi_dir'), - 'CIC_CHAIN_SPEC': getattr(args, 'i'), - } -config.dict_override(args_override, 'cli flag') -config.censor('PASSWORD', 'DATABASE') -config.censor('PASSWORD', 'SSL') -logg.debug('config loaded from {}:\n{}'.format(config_dir, config)) - -app = celery.Celery(backend=config.get('CELERY_RESULT_URL'), broker=config.get('CELERY_BROKER_URL')) - -queue = args.q - -dsn = dsn_from_config(config) -SessionBase.connect(dsn) - -# TODO: There is too much code in this file, split it up - -transfer_callbacks = [] -for cb in config.get('TASKS_TRANSFER_CALLBACKS', '').split(','): - task_split = cb.split(':') - task_queue = queue - if len(task_split) > 1: - task_queue = task_split[0] - task_pair = (task_split[1], task_queue) - transfer_callbacks.append(task_pair) - - -# TODO: move to contract registry -__convert_log_hash = '0x7154b38b5dd31bb3122436a96d4e09aba5b323ae1fd580025fab55074334c095' # keccak256(Conversion(address,address,address,uint256,uint256,address) -__account_registry_add_log_hash = '0x5ed3bdd47b9af629827a8d129aa39c870b10c03f0153fe9ddb8e84b665061acd' # keccak256(AccountAdded(address,uint256)) - -__transfer_method_signature = '0xa9059cbb' # keccak256(transfer(address,uint256)) -__transferfrom_method_signature = '0x23b872dd' # keccak256(transferFrom(address,address,uint256)) -__giveto_method_signature = '0x63e4bff4' # keccak256(giveTo(address)) - -# TODO: move to bancor package -def parse_convert_log(w3, entry): - data = entry.data[2:] - from_amount = int(data[:64], 16) - to_amount = int(data[64:128], 16) - holder_address_hex_raw = '0x' + data[-40:] - holder_address_hex = w3.toChecksumAddress(holder_address_hex_raw) - o = { - 'from_amount': from_amount, - 'to_amount': to_amount, - 'holder_address': holder_address_hex - } - logg.debug('parsed convert log {}'.format(o)) - return o - - -def registration_filter(w3, tx, rcpt, chain_spec): - registered_address = None - for l in rcpt['logs']: - event_topic_hex = l['topics'][0].hex() - if event_topic_hex == __account_registry_add_log_hash: - address_bytes = l.topics[1][32-20:] - address = web3.Web3.toChecksumAddress(address_bytes.hex()) - logg.debug('request token gift to {}'.format(address)) - s = celery.signature( - 'cic_eth.eth.account.gift', - [ - address, - str(chain_spec), - ], - queue=queue, - ) - s.apply_async() - - -def convert_filter(w3, tx, rcpt, chain_spec): - destination_token_address = None - recipient_address = None - amount = 0 - for l in rcpt['logs']: - event_topic_hex = l['topics'][0].hex() - if event_topic_hex == __convert_log_hash: - tx_hash_hex = tx['hash'].hex() - try: - convert_transfer = TxConvertTransfer.get(tx_hash_hex) - except UnknownConvertError: - logg.warning('skipping unknown convert tx {}'.format(tx_hash_hex)) - continue - if convert_transfer.transfer_tx_hash != None: - logg.warning('convert tx {} cache record already has transfer hash {}, skipping'.format(tx_hash_hex, convert_transfer.transfer_hash)) - continue - recipient_address = convert_transfer.recipient_address - logg.debug('found convert event {} recipient'.format(tx_hash_hex, recipient_address)) - r = parse_convert_log(l) - destination_token_address = l['topics'][3][-20:] - - if destination_token_address == zero_address or destination_token_address == None: - return None - - destination_token_address_hex = destination_token_address.hex() - s = celery.signature( - 'cic_eth.eth.bancor.transfer_converted', - [ - [{ - 'address': w3.toChecksumAddress(destination_token_address_hex), - }], - r['holder_address'], - recipient_address, - r['to_amount'], - tx_hash_hex, - str(chain_spec), - ], - queue=queue, - ) - logg.info('sending tx signature {}'.format(s)) - t = s.apply_async() - logg.debug('submitted transfer after convert task uuid {} {}'.format(t, t.successful())) - return t - - -def tx_filter(w3, tx, rcpt, chain_spec): - tx_hash_hex = tx.hash.hex() - otx = Otx.load(tx_hash_hex) - if otx == None: - logg.debug('tx {} not found locally, skipping'.format(tx_hash_hex)) - return None - logg.info('otx found {}'.format(otx.tx_hash)) - s = celery.signature( - 'cic_eth.queue.tx.set_final_status', - [ - tx_hash_hex, - rcpt.blockNumber, - rcpt.status == 0, - ], - queue=queue, - ) - t = s.apply_async() - return t - - -# TODO: replace with registry call instead -def get_token_symbol(w3, address): - #token = CICRegistry.get_address(CICRegistry.chain_spec, tx['to']) - logg.warning('token verification missing') - c = w3.eth.contract(abi=CICRegistry.abi('ERC20'), address=address) - return c.functions.symbol().call() - - -# TODO: replace with registry call instead -def get_token_decimals(w3, address): - #token = CICRegistry.get_address(CICRegistry.chain_spec, tx['to']) - logg.warning('token verification missing') - c = w3.eth.contract(abi=CICRegistry.abi('ERC20'), address=address) - return c.functions.decimals().call() - - -def callbacks_filter(w3, tx, rcpt, chain_spec): - transfer_data = None - if len(tx.input) < 10: - logg.debug('callbacks filter data length not sufficient for method signature in tx {}, skipping'.format(tx['hash'])) - return - - logg.debug('checking callbacks filter input {}'.format(tx.input[:10])) - - transfer_type = 'transfer' - method_signature = tx.input[:10] - if method_signature == __transfer_method_signature: - transfer_data = unpack_transfer(tx.input) - transfer_data['from'] = tx['from'] - transfer_data['token_address'] = tx['to'] - elif method_signature == __transferfrom_method_signature: - transfer_type = 'transferfrom' - transfer_data = unpack_transferfrom(tx.input) - transfer_data['token_address'] = tx['to'] - elif method_signature == __giveto_method_signature: - transfer_type = 'tokengift' - transfer_data = unpack_gift(tx.input) - for l in rcpt.logs: - if l.topics[0].hex() == '0x45c201a59ac545000ead84f30b2db67da23353aa1d58ac522c48505412143ffa': - transfer_data['amount'] = web3.Web3.toInt(hexstr=l.data) - token_address_bytes = l.topics[2][32-20:] - transfer_data['token_address'] = web3.Web3.toChecksumAddress(token_address_bytes.hex()) - transfer_data['from'] = rcpt.to - - if transfer_data != None: - for tc in transfer_callbacks: - token_symbol = None - try: - logg.debug('checking token {}'.format(transfer_data['token_address'])) - token_symbol = get_token_symbol(w3, transfer_data['token_address']) - token_decimals = get_token_decimals(w3, transfer_data['token_address']) - logg.debug('calling transfer callback {}:{} for tx {}'.format(tc[1], tc[0], tx['hash'])) - except UnknownContractError: - logg.debug('callback filter {}:{} skipping "transfer" method on unknown contract {} tx {}'.format(tc[1], tc[0], transfer_data['to'], tx.hash.hex())) - continue - result = { - 'hash': tx.hash.hex(), - 'sender': transfer_data['from'], - 'recipient': transfer_data['to'], - 'source_value': transfer_data['amount'], - 'destination_value': transfer_data['amount'], - 'source_token': transfer_data['token_address'], - 'destination_token': transfer_data['token_address'], - 'source_token_symbol': token_symbol, - 'destination_token_symbol': token_symbol, - 'source_token_decimals': token_decimals, - 'destination_token_decimals': token_decimals, - 'chain': str(chain_spec), - } - s = celery.signature( - tc[0], - [ - result, - transfer_type, - int(rcpt.status == 0), - ], - queue=tc[1], - ) - s.apply_async() - - -class GasFilter: - - def __init__(self, gas_provider): - self.gas_provider = gas_provider - - def filter(self, w3, tx, rcpt, chain_str): - tx_hash_hex = tx.hash.hex() - if tx['value'] > 0: - logg.debug('gas refill tx {}'.format(tx_hash_hex)) - session = SessionBase.create_session() - q = session.query(TxCache.recipient) - q = q.join(Otx) - q = q.filter(Otx.tx_hash==tx_hash_hex) - r = q.first() - - session.close() - - if r == None: - logg.warning('unsolicited gas refill tx {}'.format(tx_hash_hex)) - return - - chain_spec = ChainSpec.from_chain_str(chain_str) - txs = get_paused_txs(StatusEnum.WAITFORGAS, r[0], chain_spec.chain_id()) - - if len(txs) > 0: - logg.info('resuming gas-in-waiting txs for {}: {}'.format(r[0], txs.keys())) - s = create_check_gas_and_send_task( - list(txs.values()), - str(chain_str), - r[0], - 0, - tx_hashes_hex=list(txs.keys()), - queue=queue, - ) - s.apply_async() - - -re_websocket = re.compile('^wss?://') -re_http = re.compile('^https?://') -blockchain_provider = config.get('ETH_PROVIDER') -if re.match(re_websocket, blockchain_provider) != None: - blockchain_provider = WebsocketProvider(blockchain_provider) -elif re.match(re_http, blockchain_provider) != None: - blockchain_provider = HTTPProvider(blockchain_provider) -else: - raise ValueError('unknown provider url {}'.format(blockchain_provider)) - -def web3_constructor(): - w3 = web3.Web3(blockchain_provider) - return (blockchain_provider, w3) -RpcClient.set_constructor(web3_constructor) - - -def main(): - - chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC')) - c = RpcClient(chain_spec) - - CICRegistry.init(c.w3, config.get('CIC_REGISTRY_ADDRESS'), chain_spec) - CICRegistry.add_path(config.get('ETH_ABI_DIR')) - chain_registry = ChainRegistry(chain_spec) - CICRegistry.add_chain_registry(chain_registry) - - if config.get('ETH_ACCOUNT_ACCOUNTS_INDEX_WRITER') != None: - CICRegistry.add_role(chain_spec, config.get('ETH_ACCOUNT_ACCOUNTS_INDEX_WRITER'), 'AccountRegistry', True) - - syncers = [] - block_offset = c.w3.eth.blockNumber - chain = str(chain_spec) - - if SyncerBackend.first(chain): - from cic_eth.sync.history import HistorySyncer - backend = SyncerBackend.initial(chain, block_offset) - syncer = HistorySyncer(backend) - syncers.append(syncer) - - if args.mode == 'head': - from cic_eth.sync.head import HeadSyncer - block_sync = SyncerBackend.live(chain, block_offset+1) - syncers.append(HeadSyncer(block_sync)) - elif args.mode == 'history': - from cic_eth.sync.history import HistorySyncer - backends = SyncerBackend.resume(chain, block_offset+1) - for backend in backends: - syncers.append(HistorySyncer(backend)) - if len(syncers) == 0: - logg.info('found no unsynced history. terminating') - sys.exit(0) - else: - sys.stderr.write("unknown mode '{}'\n".format(args.mode)) - sys.exit(1) - -# bancor_registry_contract = CICRegistry.get_contract(chain_spec, 'BancorRegistry', interface='Registry') -# bancor_chain_registry = CICRegistry.get_chain_registry(chain_spec) -# bancor_registry = BancorRegistryClient(c.w3, bancor_chain_registry, config.get('ETH_ABI_DIR')) -# bancor_registry.load() - - i = 0 - for syncer in syncers: - logg.debug('running syncer index {}'.format(i)) - gas_filter = GasFilter(c.gas_provider()).filter - syncer.filter.append(gas_filter) - syncer.filter.append(registration_filter) - syncer.filter.append(callbacks_filter) - # TODO: the two following filter functions break the filter loop if return uuid. Pro: less code executed. Con: Possibly unintuitive flow break - syncer.filter.append(tx_filter) - syncer.filter.append(convert_filter) - - try: - syncer.loop(int(config.get('SYNCER_LOOP_INTERVAL'))) - except LoopDone as e: - sys.stderr.write("sync '{}' done at block {}\n".format(args.mode, e)) - - i += 1 - - sys.exit(0) - - -if __name__ == '__main__': - main() diff --git a/apps/cic-eth/config/cic.ini b/apps/cic-eth/config/cic.ini index 2ad870e2..bb167d68 100644 --- a/apps/cic-eth/config/cic.ini +++ b/apps/cic-eth/config/cic.ini @@ -1,8 +1,5 @@ [cic] registry_address = -token_index_address = -accounts_index_address = -declarator_address = -approval_escrow_address = chain_spec = tx_retry_delay = +trust_address = diff --git a/apps/cic-eth/config/test/cic.ini b/apps/cic-eth/config/test/cic.ini index efd8fc4b..d985ae3e 100644 --- a/apps/cic-eth/config/test/cic.ini +++ b/apps/cic-eth/config/test/cic.ini @@ -1,8 +1,4 @@ [cic] registry_address = -token_index_address = -accounts_index_address = -declarator_address = -approval_escrow_address = chain_spec = -trusted_addresses = +trust_address = diff --git a/apps/cic-eth/docker/start_dispatcher.sh b/apps/cic-eth/docker/start_dispatcher.sh index 86ac06f7..150d757b 100644 --- a/apps/cic-eth/docker/start_dispatcher.sh +++ b/apps/cic-eth/docker/start_dispatcher.sh @@ -2,4 +2,4 @@ . ./db.sh -/usr/local/bin/cic-eth-dispatcher $@ +/usr/local/bin/cic-eth-dispatcherd $@ diff --git a/apps/cic-eth/docker/start_manager.sh b/apps/cic-eth/docker/start_manager.sh index 383e888d..42da71fa 100644 --- a/apps/cic-eth/docker/start_manager.sh +++ b/apps/cic-eth/docker/start_manager.sh @@ -2,4 +2,4 @@ . ./db.sh -/usr/local/bin/cic-eth-manager $@ +/usr/local/bin/cic-eth-managerd $@ diff --git a/apps/cic-eth/docker/start_retry.sh b/apps/cic-eth/docker/start_retry.sh index 721d5bcf..981bb8c9 100644 --- a/apps/cic-eth/docker/start_retry.sh +++ b/apps/cic-eth/docker/start_retry.sh @@ -2,4 +2,4 @@ . ./db.sh -/usr/local/bin/cic-eth-retrier $@ +/usr/local/bin/cic-eth-retrierd $@ diff --git a/apps/cic-eth/docker/start_tasker.sh b/apps/cic-eth/docker/start_tasker.sh index 61ea1e06..f33098d3 100644 --- a/apps/cic-eth/docker/start_tasker.sh +++ b/apps/cic-eth/docker/start_tasker.sh @@ -9,7 +9,7 @@ echo "!!! starting signer" python /usr/local/bin/crypto-dev-daemon -vv -c /usr/local/etc/crypto-dev-signer & echo "!!! starting tracker" -/usr/local/bin/cic-eth-tasker $@ +/usr/local/bin/cic-eth-taskerd $@ # thanks! https://docs.docker.com/config/containers/multi-service_container/ sleep 1; diff --git a/apps/cic-eth/requirements.txt b/apps/cic-eth/requirements.txt index cbcbd766..9c5938ea 100644 --- a/apps/cic-eth/requirements.txt +++ b/apps/cic-eth/requirements.txt @@ -2,7 +2,7 @@ web3==5.12.2 celery==4.4.7 crypto-dev-signer~=0.4.13rc2 confini~=0.3.6b1 -cic-registry~=0.5.3a12 +cic-registry~=0.5.3a18 cic-bancor~=0.0.6 redis==3.5.3 alembic==1.4.2 @@ -16,3 +16,5 @@ uWSGI==2.0.19.1 semver==2.13.0 eth-gas-proxy==0.0.1a4 websocket-client==0.57.0 +eth-address-index~=0.1.0a8 +chainlib~=0.0.1a11 diff --git a/apps/cic-eth/setup.cfg b/apps/cic-eth/setup.cfg index 770fb684..9f2b3656 100644 --- a/apps/cic-eth/setup.cfg +++ b/apps/cic-eth/setup.cfg @@ -33,7 +33,10 @@ packages = cic_eth.db.models cic_eth.queue cic_eth.sync + cic_eth.ext cic_eth.runnable + cic_eth.runnable.daemons + cic_eth.runnable.daemons.filters cic_eth.callbacks scripts = ./scripts/migrate.py @@ -41,10 +44,10 @@ scripts = [options.entry_points] console_scripts = # daemons - cic-eth-tasker = cic_eth.runnable.tasker:main - cic-eth-manager = cic_eth.runnable.manager:main - cic-eth-dispatcher = cic_eth.runnable.dispatcher:main - cic-eth-retrier = cic_eth.runnable.retry:main + cic-eth-taskerd = cic_eth.runnable.daemons.tasker:main + cic-eth-managerd = cic_eth.runnable.daemons.manager:main + cic-eth-dispatcherd = cic_eth.runnable.daemons.dispatcher:main + cic-eth-retrierd = cic_eth.runnable.daemons.retry:main # tools cic-eth-create = cic_eth.runnable.create:main cic-eth-inspect = cic_eth.runnable.view:main diff --git a/apps/cic-eth/tests/fixtures_registry.py b/apps/cic-eth/tests/fixtures_registry.py index 8f3067e4..e139e9fe 100644 --- a/apps/cic-eth/tests/fixtures_registry.py +++ b/apps/cic-eth/tests/fixtures_registry.py @@ -1,10 +1,50 @@ -# third-party imports -import pytest +# standard imports import os import json +import logging + +# third-party imports +import pytest +from eth_address_declarator import AddressDeclarator # local imports from cic_registry import CICRegistry +from cic_registry import to_identifier from cic_registry.contract import Contract from cic_registry.error import ChainExistsError +logg = logging.getLogger() + +script_dir = os.path.dirname(__file__) + + +@pytest.fixture(scope='session') +def local_cic_registry( + cic_registry, + ): + path = os.path.realpath(os.path.join(script_dir, 'testdata', 'abi')) + CICRegistry.add_path(path) + return cic_registry + + +@pytest.fixture(scope='function') +def address_declarator( + bloxberg_config, + default_chain_spec, + default_chain_registry, + local_cic_registry, + init_rpc, + init_w3, + ): + + c = init_rpc.w3.eth.contract(abi=AddressDeclarator.abi(), bytecode=AddressDeclarator.bytecode()) + default_description = '0x{:<064s}'.format(b'test'.hex()) + logg.debug('default_ {}'.format(default_description)) + tx_hash = c.constructor(default_description).transact() + rcpt = init_rpc.w3.eth.getTransactionReceipt(tx_hash) + + registry = init_rpc.w3.eth.contract(abi=CICRegistry.abi(), address=local_cic_registry) + chain_identifier = to_identifier(default_chain_registry.chain()) + registry.functions.set(to_identifier('AddressDeclarator'), rcpt.contractAddress, chain_identifier, bloxberg_config['digest']).transact() + + return rcpt.contractAddress diff --git a/apps/cic-eth/tests/testdata/abi/Declarator.json b/apps/cic-eth/tests/testdata/abi/Declarator.json new file mode 100644 index 00000000..06e46668 --- /dev/null +++ b/apps/cic-eth/tests/testdata/abi/Declarator.json @@ -0,0 +1 @@ +[{"inputs":[{"internalType":"bytes32","name":"_initialDescription","type":"bytes32"}],"stateMutability":"nonpayable","type":"constructor"},{"inputs":[{"internalType":"address","name":"_subject","type":"address"},{"internalType":"bytes32","name":"_proof","type":"bytes32"}],"name":"addDeclaration","outputs":[{"internalType":"bool","name":"","type":"bool"}],"stateMutability":"nonpayable","type":"function"},{"inputs":[{"internalType":"uint256","name":"","type":"uint256"},{"internalType":"uint256","name":"","type":"uint256"}],"name":"contents","outputs":[{"internalType":"bytes32","name":"","type":"bytes32"}],"stateMutability":"view","type":"function"},{"inputs":[{"internalType":"address","name":"_declarator","type":"address"},{"internalType":"address","name":"_subject","type":"address"}],"name":"declaration","outputs":[{"internalType":"bytes32[]","name":"","type":"bytes32[]"}],"stateMutability":"view","type":"function"},{"inputs":[{"internalType":"address","name":"_declarator","type":"address"},{"internalType":"uint256","name":"_idx","type":"uint256"}],"name":"declarationAddressAt","outputs":[{"internalType":"address","name":"","type":"address"}],"stateMutability":"view","type":"function"},{"inputs":[{"internalType":"address","name":"_declarator","type":"address"}],"name":"declarationCount","outputs":[{"internalType":"uint256","name":"","type":"uint256"}],"stateMutability":"view","type":"function"},{"inputs":[{"internalType":"address","name":"_subject","type":"address"},{"internalType":"uint256","name":"_idx","type":"uint256"}],"name":"declaratorAddressAt","outputs":[{"internalType":"address","name":"","type":"address"}],"stateMutability":"view","type":"function"},{"inputs":[{"internalType":"address","name":"_subject","type":"address"}],"name":"declaratorCount","outputs":[{"internalType":"uint256","name":"","type":"uint256"}],"stateMutability":"view","type":"function"},{"inputs":[],"name":"owner","outputs":[{"internalType":"address","name":"","type":"address"}],"stateMutability":"view","type":"function"},{"inputs":[{"internalType":"bytes4","name":"interfaceID","type":"bytes4"}],"name":"supportsInterface","outputs":[{"internalType":"bool","name":"","type":"bool"}],"stateMutability":"view","type":"function"},{"inputs":[],"name":"transferOwnership","outputs":[],"stateMutability":"nonpayable","type":"function"}] diff --git a/apps/cic-eth/tests/unit/eth/test_extended_tx.py b/apps/cic-eth/tests/unit/eth/test_extended_tx.py new file mode 100644 index 00000000..f74a0394 --- /dev/null +++ b/apps/cic-eth/tests/unit/eth/test_extended_tx.py @@ -0,0 +1,58 @@ +# standard imports +import os +import logging + +# third-party imports +import web3 +from cic_registry import CICRegistry + +# local imports +from cic_eth.eth.token import ExtendedTx + +logg = logging.getLogger() + + +def test_extended_token( + default_chain_spec, + dummy_token, + local_cic_registry, + address_declarator, + init_w3, + ): + + address_foo = web3.Web3.toChecksumAddress('0x' + os.urandom(20).hex()) + label_foo = '0x{:<064s}'.format(b'foo'.hex()) + address_bar = web3.Web3.toChecksumAddress('0x' + os.urandom(20).hex()) + label_bar = '0x{:<064s}'.format(b'bar'.hex()) + label_token = '0x{:<064s}'.format(b'toktoktok'.hex()) + + # TODO: still need to test results with two different tokens + token_contract = init_w3.eth.contract(abi=CICRegistry.abi('ERC20'), address=dummy_token) + token = CICRegistry.add_token(default_chain_spec, token_contract) + + declarator = CICRegistry.get_contract(default_chain_spec, 'AddressDeclarator', 'Declarator') + fn = declarator.function('addDeclaration') + fn(address_foo, label_foo).transact({'from': init_w3.eth.accounts[1]}) + fn(address_bar, label_bar).transact({'from': init_w3.eth.accounts[1]}) + fn(dummy_token, label_token).transact({'from': init_w3.eth.accounts[1]}) + tx_hash = '0x' + os.urandom(32).hex() + xtx = ExtendedTx(tx_hash, default_chain_spec) + xtx.set_actors(address_foo, address_bar, [init_w3.eth.accounts[1]]) + xtx.set_tokens(dummy_token, 1024) + tx = xtx.to_dict() + + logg.debug('tx {}'.format(tx)) + assert tx['hash'] == tx_hash + assert tx['source_token'] == dummy_token + assert tx['destination_token'] == dummy_token + assert tx['source_token_symbol'] == token.symbol() + assert tx['destination_token_symbol'] == token.symbol() + assert tx['source_token_value'] == 1024 + assert tx['destination_token_value'] == 1024 + assert tx['source_token_decimals'] == token.decimals() + assert tx['destination_token_decimals'] == token.decimals() + assert tx['sender'] == address_foo + assert tx['sender_label'] == 'foo' + assert tx['recipient'] == address_bar + assert tx['recipient_label'] == 'bar' + assert tx['chain'] == str(default_chain_spec) diff --git a/apps/cic-eth/tests/unit/ext/test_address.py b/apps/cic-eth/tests/unit/ext/test_address.py new file mode 100644 index 00000000..61562270 --- /dev/null +++ b/apps/cic-eth/tests/unit/ext/test_address.py @@ -0,0 +1,33 @@ +# third-party imports +from eth_address_declarator import AddressDeclarator +from cic_registry import CICRegistry + +# local imports +from cic_eth.ext.address import translate_tx_addresses + +def test_translate( + default_chain_spec, + address_declarator, + init_rpc, + init_w3, + ): + + chain_str = str(default_chain_spec) + + c = init_rpc.w3.eth.contract(abi=AddressDeclarator.abi(), address=address_declarator) + + description = '0x{:<064s}'.format(b'foo'.hex()) + c.functions.addDeclaration(init_w3.eth.accounts[2], description).transact({'from': init_w3.eth.accounts[1]}) + description = '0x{:<064s}'.format(b'bar'.hex()) + c.functions.addDeclaration(init_w3.eth.accounts[3], description).transact({'from': init_w3.eth.accounts[1]}) + + tx = { + 'sender': init_w3.eth.accounts[2], + 'sender_label': None, + 'recipient': init_w3.eth.accounts[3], + 'recipient_label': None, + + } + tx = translate_tx_addresses(tx, [init_w3.eth.accounts[1]], chain_str) + assert tx['sender_label'] == 'foo' + assert tx['recipient_label'] == 'bar' diff --git a/apps/contract-migration/docker/Dockerfile b/apps/contract-migration/docker/Dockerfile index 8ecb10cf..784aa5d8 100644 --- a/apps/contract-migration/docker/Dockerfile +++ b/apps/contract-migration/docker/Dockerfile @@ -136,7 +136,7 @@ ARG eth_address_index_version==0.1.0a8 RUN pip install --extra-index-url $pip_extra_index_url eth-address-index==$eth_address_index_version RUN echo Install cic specific python packages -ARG cic_registry_version=0.5.3a11 +ARG cic_registry_version=0.5.3a18 RUN pip install --extra-index-url $pip_extra_index_url cic-registry==$cic_registry_version RUN echo Install misc helpers diff --git a/apps/contract-migration/reset.sh b/apps/contract-migration/reset.sh index 53ac4f79..0c4d3512 100644 --- a/apps/contract-migration/reset.sh +++ b/apps/contract-migration/reset.sh @@ -44,6 +44,7 @@ if [[ -n "${ETH_PROVIDER}" ]]; then >&2 echo "deploy address declarator contract" declarator_description=0x546869732069732074686520434943206e6574776f726b000000000000000000 CIC_DECLARATOR_ADDRESS=`eth-address-declarator-deploy -y $keystore_file -i $CIC_CHAIN_SPEC -p $ETH_PROVIDER -w -v $declarator_description` + cic-registry-set -y $keystore_file -r $CIC_REGISTRY_ADDRESS -i $CIC_CHAIN_SPEC -k AddressDeclarator -p $ETH_PROVIDER $CIC_DECLARATOR_ADDRESS -vv else echo "\$ETH_PROVIDER not set!" @@ -59,6 +60,7 @@ export DEV_ETH_RESERVE_AMOUNT=$DEV_ETH_RESERVE_AMOUNT export DEV_ETH_ACCOUNTS_INDEX_ADDRESS=$CIC_ACCOUNTS_INDEX_ADDRESS export BANCOR_REGISTRY_ADDRESS=$BANCOR_REGISTRY_ADDRESS export CIC_REGISTRY_ADDRESS=$CIC_REGISTRY_ADDRESS +export CIC_TRUST_ADDRESS=$DEV_ETH_ACCOUNT_CONTRACT_DEPLOYER EOF diff --git a/docker-compose.yml b/docker-compose.yml index a51435ea..1aa307cf 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -117,9 +117,6 @@ services: CELERY_RESULT_URL: ${CELERY_RESULT_URL:-redis://redis:6379} DEV_PIP_EXTRA_INDEX_URL: ${DEV_PIP_EXTRA_INDEX_URL:-https://pip.grassrootseconomics.net:8433} command: ["./seed_cic_eth.sh"] - deploy: - restart_policy: - condition: on-failure depends_on: - eth - postgres