diff --git a/apps/cic-cache/cic_cache/runnable/tracker.py b/apps/cic-cache/cic_cache/runnable/tracker.py index 3531e02f..f7a8fb30 100644 --- a/apps/cic-cache/cic_cache/runnable/tracker.py +++ b/apps/cic-cache/cic_cache/runnable/tracker.py @@ -9,23 +9,34 @@ import re # third-party imports import confini -from cic_registry import CICRegistry -from cic_registry.chain import ( - ChainRegistry, - ChainSpec, +from cic_eth_registry import CICRegistry +from chainlib.chain import ChainSpec +from chainlib.eth.tx import ( + transaction, + transaction_by_block, ) -#from cic_registry.bancor import BancorRegistryClient -from cic_registry.token import Token -from cic_registry.error import ( +from chainlib.eth.block import ( + transaction_count, + block_by_number, + block_by_hash, + ) +from chainlib.eth.contract import ( + code, + ) +from chainlib.connection import RPCConnection +#from cic_registry.token import Token +from cic_eth_registry.error import ( UnknownContractError, - UnknownDeclarationError, +# UnknownDeclarationError, ) -from cic_registry.declaration import to_token_declaration -from web3.exceptions import BlockNotFound, TransactionNotFound -from websockets.exceptions import ConnectionClosedError -from requests.exceptions import ConnectionError -import web3 -from web3 import HTTPProvider, WebsocketProvider +from hexathon import ( + strip_0x, + add_0x, + ) +#from cic_registry.declaration import to_token_declaration +#from web3.exceptions import BlockNotFound, TransactionNotFound +#from websockets.exceptions import ConnectionClosedError +#from requests.exceptions import ConnectionError # local imports from cic_cache import db @@ -33,11 +44,6 @@ from cic_cache.db.models.base import SessionBase logging.basicConfig(level=logging.WARNING) logg = logging.getLogger() -logging.getLogger('websockets.protocol').setLevel(logging.CRITICAL) -logging.getLogger('urllib3').setLevel(logging.CRITICAL) -logging.getLogger('web3.RequestManager').setLevel(logging.CRITICAL) -logging.getLogger('web3.providers.WebsocketProvider').setLevel(logging.CRITICAL) -logging.getLogger('web3.providers.HTTPProvider').setLevel(logging.CRITICAL) log_topics = { 'transfer': '0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef', @@ -48,11 +54,12 @@ log_topics = { config_dir = os.path.join('/usr/local/etc/cic-cache') argparser = argparse.ArgumentParser(description='daemon that monitors transactions in new blocks') +argparser.add_argument('-p', '--provider', dest='p', default='http://localhost:8545', type=str, help='Web3 provider url (http only)') +argparser.add_argument('-r', '--registry-address', dest='r', type=str, help='CIC registry address') argparser.add_argument('-c', type=str, default=config_dir, help='config root to use') argparser.add_argument('-i', '--chain-spec', type=str, dest='i', help='chain spec') argparser.add_argument('--trust-address', default=[], type=str, dest='trust_address', action='append', help='Set address as trust') argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration') -argparser.add_argument('--abi-dir', dest='abi_dir', type=str, help='Directory containing bytecode and abi') argparser.add_argument('-v', help='be verbose', action='store_true') argparser.add_argument('-vv', help='be more verbose', action='store_true') args = argparser.parse_args(sys.argv[1:]) @@ -60,7 +67,6 @@ args = argparser.parse_args(sys.argv[1:]) config_dir = os.path.join(args.c) os.makedirs(config_dir, 0o777, True) - if args.v == True: logging.getLogger().setLevel(logging.INFO) elif args.vv == True: @@ -69,7 +75,9 @@ elif args.vv == True: config = confini.Config(config_dir, args.env_prefix) config.process() args_override = { - 'ETH_ABI_DIR': getattr(args, 'abi_dir'), + 'ETH_PROVIDER': getattr(args, 'p'), + 'CIC_CHAIN_SPEC': getattr(args, 'i'), + 'CIC_REGISTRY_ADDRESS': getattr(args, 'r'), 'CIC_TRUST_ADDRESS': ",".join(getattr(args, 'trust_address', [])), } config.dict_override(args_override, 'cli flag') @@ -79,37 +87,34 @@ logg.debug('config loaded from {}:\n{}'.format(config_dir, config)) # connect to database dsn = db.dsn_from_config(config) -SessionBase.connect(dsn) +SessionBase.connect(dsn, debug=config.true('DATABASE_DEBUG')) + +chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC')) + +RPCConnection.register_location(config.get('ETH_PROVIDER'), chain_spec, 'default') + +registry_address = config.get('CIC_REGISTRY_ADDRESS') -re_websocket = re.compile('^wss?://') -re_http = re.compile('^https?://') -blockchain_provider = config.get('ETH_PROVIDER') -if re.match(re_websocket, blockchain_provider) != None: - blockchain_provider = WebsocketProvider(blockchain_provider) -elif re.match(re_http, blockchain_provider) != None: - blockchain_provider = HTTPProvider(blockchain_provider) -else: - raise ValueError('unknown provider url {}'.format(blockchain_provider)) - -def web3_constructor(): - w3 = web3.Web3(blockchain_provider) - return (blockchain_provider, w3) +def rpc_connect(): + rpc = RPCConnection.connect(chain_spec, 'default') + return rpc +rpc = rpc_connect() +CICRegistry.init(registry_address, chain_spec, rpc) class RunStateEnum(enum.IntEnum): INIT = 0 RUN = 1 TERMINATE = 9 - def rubberstamp(src): return True class Tracker: - def __init__(self, chain_spec, trusts=[]): + def __init__(self, chain_spec, rpc, trusts=[]): self.block_height = 0 self.tx_height = 0 self.state = RunStateEnum.INIT @@ -117,7 +122,9 @@ class Tracker: self.convert_enabled = False self.trusts = trusts self.chain_spec = chain_spec - self.declarator = CICRegistry.get_contract(chain_spec, 'AddressDeclarator', 'Declarator') + + registry = CICRegistry(chain_spec, rpc) + self.declarator_address = registry.by_name('AddressDeclarator') def __process_tx(self, w3, session, t, r, l, b): @@ -198,67 +205,79 @@ class Tracker: # session.flush() - def check_token(self, address): - t = None - try: - t = CICRegistry.get_address(CICRegistry.default_chain_spec, address) - return t - except UnknownContractError: - logg.debug('contract {} not in registry'.format(address)) + def check_token(self, registry, address): + return registry.by_address(self.chain_spec, address) + #except UnknownContractError: + # logg.debug('contract {} not in registry'.format(address)) # If nothing was returned, we look up the token in the declarator - for trust in self.trusts: - logg.debug('look up declaration for contract {} with trust {}'.format(address, trust)) - fn = self.declarator.function('declaration') - # TODO: cache trust in LRUcache - declaration_array = fn(trust, address).call() - try: - declaration = to_token_declaration(trust, address, declaration_array, [rubberstamp]) - logg.debug('found declaration for token {} from trust address {}'.format(address, trust)) - except UnknownDeclarationError: - continue - - try: - c = w3.eth.contract(abi=CICRegistry.abi('ERC20'), address=address) - t = CICRegistry.add_token(self.chain_spec, c) - break - except ValueError: - logg.error('declaration for {} validates as token, but location is not ERC20 compatible'.format(address)) - - return t +# for trust in self.trusts: +# logg.debug('look up declaration for contract {} with trust {}'.format(address, trust)) +# fn = self.declarator.function('declaration') +# # TODO: cache trust in LRUcache +# declaration_array = fn(trust, address).call() +# try: +# declaration = to_token_declaration(trust, address, declaration_array, [rubberstamp]) +# logg.debug('found declaration for token {} from trust address {}'.format(address, trust)) +# except UnknownDeclarationError: +# continue +# +# try: +# c = w3.eth.contract(abi=CICRegistry.abi('ERC20'), address=address) +# t = CICRegistry.add_token(self.chain_spec, c) +# break +# except ValueError: +# logg.error('declaration for {} validates as token, but location is not ERC20 compatible'.format(address)) +# +# return t # TODO use input data instead of logs - def process(self, w3, session, block): + def process(self, rpc, session, block): #self.refresh_registry(w3) - tx_count = w3.eth.getBlockTransactionCount(block.hash) - b = w3.eth.getBlock(block.hash) + o = transaction_count(block['hash']) + r = rpc.do(o) + tx_count = int(r, 16) + + o = block_by_hash(block['hash']) + block = rpc.do(o) + for i in range(self.tx_height, tx_count): - tx = w3.eth.getTransactionByBlock(block.hash, i) - if tx.to == None: - logg.debug('block {} tx {} is contract creation tx, skipping'.format(block.number, i)) - continue - if len(w3.eth.getCode(tx.to)) == 0: - logg.debug('block {} tx {} not a contract tx, skipping'.format(block.number, i)) + o = transaction_by_block(block['hash'], i) + tx = rpc.do(o) + + if tx['to'] == None: + logg.debug('block {} tx {} is contract creation tx, skipping'.format(block['number'], i)) continue - t = self.check_token(tx.to) - if t != None and isinstance(t, Token): - r = w3.eth.getTransactionReceipt(tx.hash) - for l in r.logs: - logg.debug('block {} tx {} {} token log {} {}'.format(block.number, i, tx.hash.hex(), l.logIndex, l.topics[0].hex())) - if l.topics[0].hex() == log_topics['transfer']: - self.__process_tx(w3, session, t, r, l, b) + o = code(tx['to']) + r = rpc.do(o) - # TODO: cache contracts in LRUcache - elif self.convert_enabled and tx.to == CICRegistry.get_contract(CICRegistry.default_chain_spec, 'Converter').address: - r = w3.eth.getTransactionReceipt(tx.hash) - for l in r.logs: - logg.info('block {} tx {} {} bancornetwork log {} {}'.format(block.number, i, tx.hash.hex(), l.logIndex, l.topics[0].hex())) - if l.topics[0].hex() == log_topics['convert']: - self.__process_convert(w3, session, t, r, l, b) + #if len(w3.eth.getCode(tx.to)) == 0: + if len(strip_0x(r)) == 0: + logg.debug('block {} tx {} not a contract tx, skipping'.format(block['number'], i)) + continue + + registry = CICRegistry(self.chain_spec, rpc) + t = self.check_token(registry, tx['to']) + logg.debug('token? {}'.format(t)) +# t = self.check_token(tx.to) +# if t != None and isinstance(t, Token): +# r = w3.eth.getTransactionReceipt(tx.hash) +# for l in r.logs: +# logg.debug('block {} tx {} {} token log {} {}'.format(block.number, i, tx.hash.hex(), l.logIndex, l.topics[0].hex())) +# if l.topics[0].hex() == log_topics['transfer']: +# self.__process_tx(w3, session, t, r, l, b) +# +# # TODO: cache contracts in LRUcache +# elif self.convert_enabled and tx.to == CICRegistry.get_contract(CICRegistry.default_chain_spec, 'Converter').address: +# r = w3.eth.getTransactionReceipt(tx.hash) +# for l in r.logs: +# logg.info('block {} tx {} {} bancornetwork log {} {}'.format(block.number, i, tx.hash.hex(), l.logIndex, l.topics[0].hex())) +# if l.topics[0].hex() == log_topics['convert']: +# self.__process_convert(w3, session, t, r, l, b) - session.execute("UPDATE tx_sync SET tx = '{}'".format(tx.hash.hex())) + session.execute("UPDATE tx_sync SET tx = '{}'".format(tx['hash'])) session.commit() self.tx_height += 1 @@ -271,67 +290,59 @@ class Tracker: logg.info('starting at block {} tx index {}'.format(self.block_height, self.tx_height)) self.state = RunStateEnum.RUN while self.state == RunStateEnum.RUN: - (provider, w3) = web3_constructor() session = SessionBase.create_session() try: - block = w3.eth.getBlock(self.block_height) - self.process(w3, session, block) + rpc = rpc_connect() + o = block_by_number(self.block_height) + block = rpc.do(o) + #block = w3.eth.getBlock(self.block_height) + self.process(rpc, session, block) self.block_height += 1 self.tx_height = 0 - except BlockNotFound as e: - logg.debug('no block {} yet, zZzZ...'.format(self.block_height)) - time.sleep(self.__get_next_retry()) - except ConnectionClosedError as e: - logg.info('connection gone, retrying') - time.sleep(self.__get_next_retry(True)) - except OSError as e: - logg.error('cannot connect {}'.format(e)) - time.sleep(self.__get_next_retry(True)) +# except BlockNotFound as e: +# logg.debug('no block {} yet, zZzZ...'.format(self.block_height)) +# time.sleep(self.__get_next_retry()) +# except ConnectionClosedError as e: +# logg.info('connection gone, retrying') +# time.sleep(self.__get_next_retry(True)) +# except OSError as e: +# logg.error('cannot connect {}'.format(e)) +# time.sleep(self.__get_next_retry(True)) except Exception as e: session.close() raise(e) session.close() - def load(self, w3): + def load(self, conn): session = SessionBase.create_session() r = session.execute('SELECT tx FROM tx_sync').first() if r != None: if r[0] == '0x{0:0{1}X}'.format(0, 64): logg.debug('last tx was zero-address, starting from scratch') return - t = w3.eth.getTransaction(r[0]) - - self.block_height = t.blockNumber - self.tx_height = t.transactionIndex+1 - c = w3.eth.getBlockTransactionCount(t.blockHash.hex()) - logg.debug('last tx processed {} index {} (max index {})'.format(t.blockNumber, t.transactionIndex, c-1)) + o = transaction(r[0]) + tx = conn.do(o) + + + self.block_height = int(tx['blockNumber'], 16) + self.tx_height = int(tx['transactionIndex'], 16) + 1 + o = transaction_count(tx['blockHash']) + r = conn.do(o) + c = int(r, 16) + logg.debug('last tx processed {} index {} (max index {})'.format(tx['blockNumber'], tx['transactionIndex'], c-1)) if c == self.tx_height: self.block_height += 1 self.tx_height = 0 session.close() -(provider, w3) = web3_constructor() trust = config.get('CIC_TRUST_ADDRESS', "").split(",") -chain_spec = args.i - -try: - w3.eth.chainId -except Exception as e: - logg.exception(e) - sys.stderr.write('cannot connect to evm node\n') - sys.exit(1) def main(): - chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC')) - CICRegistry.init(w3, config.get('CIC_REGISTRY_ADDRESS'), chain_spec) - CICRegistry.add_path(config.get('ETH_ABI_DIR')) - chain_registry = ChainRegistry(chain_spec) - CICRegistry.add_chain_registry(chain_registry) + t = Tracker(chain_spec, rpc, trust) - t = Tracker(chain_spec, trust) - t.load(w3) + t.load(rpc) t.loop() diff --git a/apps/cic-cache/requirements.txt b/apps/cic-cache/requirements.txt index 26b4548a..6ac59b76 100644 --- a/apps/cic-cache/requirements.txt +++ b/apps/cic-cache/requirements.txt @@ -1,8 +1,8 @@ alembic==1.4.2 -confini~=0.3.6b2 +confini~=0.3.6rc3 uwsgi==2.0.19.1 moolb~=0.1.0 -cic-registry~=0.5.3a4 +cic-eth-registry~=0.5.4a8 SQLAlchemy==1.3.20 semver==2.13.0 psycopg2==2.8.6