diff --git a/apps/cic-eth/cic_eth/runnable/daemons/filters/callback.py b/apps/cic-eth/cic_eth/runnable/daemons/filters/callback.py index 570f7ee9..4155a007 100644 --- a/apps/cic-eth/cic_eth/runnable/daemons/filters/callback.py +++ b/apps/cic-eth/cic_eth/runnable/daemons/filters/callback.py @@ -8,13 +8,55 @@ from chainlib.status import Status as TxStatus from chainlib.eth.address import to_checksum_address from chainlib.eth.error import RequestMismatchException from chainlib.eth.constant import ZERO_ADDRESS +from chainlib.eth.erc20 import ERC20 from hexathon import strip_0x # local imports from .base import SyncFilter from cic_eth.eth.meta import ExtendedTx -logg = logging.getLogger().getLogger__name__) +logg = logging.getLogger().getChild(__name__) + + +def parse_transfer(tx): + r = ERC20.parse_transfer_request(tx.payload) + transfer_data = {} + transfer_data['to'] = r[0] + transfer_data['value'] = r[1] + transfer_data['from'] = tx['from'] + transfer_data['token_address'] = tx['to'] + return ('transfer', transfer_data) + + +def parse_transferfrom(tx): + r = ERC20.parse_transfer_request(tx.payload) + transfer_data = unpack_transferfrom(tx.payload) + transfer_data['from'] = r[0] + transfer_data['to'] = r[1] + transfer_data['value'] = r[2] + transfer_data['token_address'] = tx['to'] + return ('transferfrom', transfer_data) + + +def parse_giftto(tx): + # TODO: broken + logg.error('broken') + return + transfer_data = unpack_gift(tx.payload) + transfer_data['from'] = tx.inputs[0] + transfer_data['value'] = 0 + transfer_data['token_address'] = ZERO_ADDRESS + # TODO: would be better to query the gift amount from the block state + for l in tx.logs: + topics = l['topics'] + logg.debug('topixx {}'.format(topics)) + if strip_0x(topics[0]) == '45c201a59ac545000ead84f30b2db67da23353aa1d58ac522c48505412143ffa': + #transfer_data['value'] = web3.Web3.toInt(hexstr=strip_0x(l['data'])) + transfer_data['value'] = int.from_bytes(bytes.fromhex(strip_0x(l_data))) + #token_address_bytes = topics[2][32-20:] + token_address = strip_0x(topics[2])[64-40:] + transfer_data['token_address'] = to_checksum_address(token_address) + return ('tokengift', transfer_data) class CallbackFilter(SyncFilter): @@ -53,45 +95,6 @@ class CallbackFilter(SyncFilter): return s - def parse_transfer(self, tx): - r = ERC20.parse_transfer_request(tx.payload) - transfer_data = {} - transfer_data['to'] = r[0] - transfer_data['value'] = r[1] - transfer_data['from'] = tx['from'] - transfer_data['token_address'] = tx['to'] - return ('transfer', transfer_data) - - - def parse_transferfrom(self, tx): - r = ERC20.parse_transfer_request(tx.payload) - transfer_data = unpack_transferfrom(tx.payload) - transfer_data['from'] = r[0] - transfer_data['to'] = r[1] - transfer_data['value'] = r[2] - transfer_data['token_address'] = tx['to'] - return ('transferfrom', transfer_data) - - - def parse_giftto(self, tx): - # TODO: broken - transfer_data = unpack_gift(tx.payload) - transfer_data['from'] = tx.inputs[0] - transfer_data['value'] = 0 - transfer_data['token_address'] = ZERO_ADDRESS - # TODO: would be better to query the gift amount from the block state - for l in tx.logs: - topics = l['topics'] - logg.debug('topixx {}'.format(topics)) - if strip_0x(topics[0]) == '45c201a59ac545000ead84f30b2db67da23353aa1d58ac522c48505412143ffa': - #transfer_data['value'] = web3.Web3.toInt(hexstr=strip_0x(l['data'])) - transfer_data['value'] = int.from_bytes(bytes.fromhex(strip_0x(l_data))) - #token_address_bytes = topics[2][32-20:] - token_address = strip_0x(topics[2])[64-40:] - transfer_data['token_address'] = to_checksum_address(token_address) - return ('tokengift', transfer_data) - - def parse_data(self, tx): transfer_type = None transfer_data = None @@ -103,7 +106,7 @@ class CallbackFilter(SyncFilter): for parser in [ parse_transfer, - parse_transfeffrom, + parse_transferfrom, parse_giftto, ]: try: diff --git a/apps/cic-eth/cic_eth/runnable/daemons/filters/gas.py b/apps/cic-eth/cic_eth/runnable/daemons/filters/gas.py index 0d5970d1..1e30b16e 100644 --- a/apps/cic-eth/cic_eth/runnable/daemons/filters/gas.py +++ b/apps/cic-eth/cic_eth/runnable/daemons/filters/gas.py @@ -10,10 +10,10 @@ from cic_eth.db.models.base import SessionBase from cic_eth.db.models.tx import TxCache from cic_eth.db.models.otx import Otx from cic_eth.queue.tx import get_paused_txs -from cic_eth.eth.task import create_check_gas_and_send_task +from cic_eth.eth.gas import create_check_gas_task from .base import SyncFilter -logg = logging.getLogger().getLogger(__name__) +logg = logging.getLogger().getChild(__name__) class GasFilter(SyncFilter): @@ -44,7 +44,7 @@ class GasFilter(SyncFilter): logg.info('resuming gas-in-waiting txs for {}'.format(r[0])) if len(txs) > 0: - s = create_check_gas_and_send_task( + s = create_check_gas_task( list(txs.values()), self.chain_spec.asdict(), r[0], diff --git a/apps/cic-eth/cic_eth/runnable/daemons/tasker.py b/apps/cic-eth/cic_eth/runnable/daemons/tasker.py index 7ae3c386..655b4c40 100644 --- a/apps/cic-eth/cic_eth/runnable/daemons/tasker.py +++ b/apps/cic-eth/cic_eth/runnable/daemons/tasker.py @@ -143,10 +143,6 @@ def main(): # if config.get('SSL_CA_FILE') != '': # Callback.ssl_ca_file = config.get('SSL_CA_FILE') - - #if config.get('ETH_ACCOUNT_ACCOUNTS_INDEX_WRITER') != None: - # CICRegistry.add_role(chain_spec, config.get('ETH_ACCOUNT_ACCOUNTS_INDEX_WRITER'), 'AccountRegistry', True) - rpc = RPCConnection.connect(chain_spec, 'default') registry = CICRegistry(chain_spec, rpc) registry_address = registry.by_name('CICRegistry') @@ -158,15 +154,7 @@ def main(): trusted_addresses = trusted_addresses_src.split(',') for address in trusted_addresses: logg.info('using trusted address {}'.format(address)) - #oracle = DeclaratorOracleAdapter(declarator.contract, trusted_addresses) - #chain_registry.add_oracle(oracle, 'naive_erc20_oracle') - - - #chain_spec = CICRegistry.default_chain_spec - #bancor_registry_contract = CICRegistry.get_contract(chain_spec, 'BancorRegistry', interface='Registry') - #bancor_chain_registry = CICRegistry.get_chain_registry(chain_spec) - #bancor_registry = BancorRegistryClient(c.w3, bancor_chain_registry, config.get('ETH_ABI_DIR')) - #bancor_registry.load(True) + current_app.worker_main(argv) diff --git a/apps/cic-eth/cic_eth/runnable/daemons/tracker.py b/apps/cic-eth/cic_eth/runnable/daemons/tracker.py index ee049bc1..4fe598f1 100644 --- a/apps/cic-eth/cic_eth/runnable/daemons/tracker.py +++ b/apps/cic-eth/cic_eth/runnable/daemons/tracker.py @@ -11,18 +11,15 @@ import re import confini import celery import rlp -import web3 -from web3 import HTTPProvider, WebsocketProvider import cic_base.config import cic_base.log import cic_base.argparse import cic_base.rpc -from cic_registry import CICRegistry +from cic_eth_registry import CICRegistry +from cic_eth_registry.error import UnknownContractError from chainlib.chain import ChainSpec -from cic_registry import zero_address -from cic_registry.chain import ChainRegistry -from cic_registry.error import UnknownContractError -from chainlib.eth.connection import HTTPConnection +from chainlib.eth.constant import ZERO_ADDRESS +from chainlib.connection import RPCConnection from chainlib.eth.block import ( block_latest, ) @@ -37,11 +34,7 @@ from chainsyncer.driver import ( from chainsyncer.db.models.base import SessionBase # local imports -from cic_eth.registry import init_registry -from cic_eth.eth import RpcClient from cic_eth.db import dsn_from_config -#from cic_eth.sync import Syncer -#from cic_eth.sync.error import LoopDone from cic_eth.runnable.daemons.filters import ( CallbackFilter, GasFilter, @@ -64,42 +57,24 @@ config.add(args.q, '_CELERY_QUEUE', True) cic_base.config.log(config) - dsn = dsn_from_config(config) + SessionBase.connect(dsn, pool_size=16, debug=config.true('DATABASE_DEBUG')) -re_websocket = re.compile('^wss?://') -re_http = re.compile('^https?://') -blockchain_provider = config.get('ETH_PROVIDER') -if re.match(re_websocket, blockchain_provider) != None: - blockchain_provider = web3.Web3.WebsocketProvider(blockchain_provider) -elif re.match(re_http, blockchain_provider) != None: - blockchain_provider = web3.Web3.HTTPProvider(blockchain_provider) -else: - raise ValueError('unknown provider url {}'.format(blockchain_provider)) +chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC')) -def web3_constructor(): - w3 = web3.Web3(blockchain_provider) - return (blockchain_provider, w3) -RpcClient.set_constructor(web3_constructor) +RPCConnection.register_location(config.get('ETH_PROVIDER'), chain_spec, 'default') def main(): - # parse chain spec object - chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC')) - # connect to celery celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL')) - # set up registry - w3 = cic_base.rpc.create(config.get('ETH_PROVIDER')) # replace with HTTPConnection when registry has been so refactored - registry = init_registry(config, w3) - # Connect to blockchain with chainlib - conn = HTTPConnection(config.get('ETH_PROVIDER')) + rpc = RPCConnection.connect(chain_spec, 'default') o = block_latest() - r = conn.do(o) + r = rpc.do(o) block_offset = int(strip_0x(r), 16) + 1 logg.debug('starting at block {}'.format(block_offset)) @@ -151,7 +126,7 @@ def main(): gas_filter = GasFilter(chain_spec, config.get('_CELERY_QUEUE')) - transfer_auth_filter = TransferAuthFilter(registry, chain_spec, config.get('_CELERY_QUEUE')) + #transfer_auth_filter = TransferAuthFilter(registry, chain_spec, config.get('_CELERY_QUEUE')) i = 0 for syncer in syncers: @@ -160,17 +135,15 @@ def main(): syncer.add_filter(registration_filter) # TODO: the two following filter functions break the filter loop if return uuid. Pro: less code executed. Con: Possibly unintuitive flow break syncer.add_filter(tx_filter) - syncer.add_filter(transfer_auth_filter) + #syncer.add_filter(transfer_auth_filter) for cf in callback_filters: syncer.add_filter(cf) - r = syncer.loop(int(config.get('SYNCER_LOOP_INTERVAL')), conn) + r = syncer.loop(int(config.get('SYNCER_LOOP_INTERVAL')), rpc) sys.stderr.write("sync {} done at block {}\n".format(syncer, r)) i += 1 - sys.exit(0) - if __name__ == '__main__': main()