WIP rehabilitate tracker daemon

This commit is contained in:
nolash 2021-03-22 23:36:34 +01:00
parent 0d67f6efba
commit 1bc08295ef
Signed by untrusted user who does not match committer: lash
GPG Key ID: 21D2E7BB88C2A746
4 changed files with 60 additions and 96 deletions

View File

@ -8,13 +8,55 @@ from chainlib.status import Status as TxStatus
from chainlib.eth.address import to_checksum_address
from chainlib.eth.error import RequestMismatchException
from chainlib.eth.constant import ZERO_ADDRESS
from chainlib.eth.erc20 import ERC20
from hexathon import strip_0x
# local imports
from .base import SyncFilter
from cic_eth.eth.meta import ExtendedTx
logg = logging.getLogger().getLogger__name__)
logg = logging.getLogger().getChild(__name__)
def parse_transfer(tx):
r = ERC20.parse_transfer_request(tx.payload)
transfer_data = {}
transfer_data['to'] = r[0]
transfer_data['value'] = r[1]
transfer_data['from'] = tx['from']
transfer_data['token_address'] = tx['to']
return ('transfer', transfer_data)
def parse_transferfrom(tx):
r = ERC20.parse_transfer_request(tx.payload)
transfer_data = unpack_transferfrom(tx.payload)
transfer_data['from'] = r[0]
transfer_data['to'] = r[1]
transfer_data['value'] = r[2]
transfer_data['token_address'] = tx['to']
return ('transferfrom', transfer_data)
def parse_giftto(tx):
# TODO: broken
logg.error('broken')
return
transfer_data = unpack_gift(tx.payload)
transfer_data['from'] = tx.inputs[0]
transfer_data['value'] = 0
transfer_data['token_address'] = ZERO_ADDRESS
# TODO: would be better to query the gift amount from the block state
for l in tx.logs:
topics = l['topics']
logg.debug('topixx {}'.format(topics))
if strip_0x(topics[0]) == '45c201a59ac545000ead84f30b2db67da23353aa1d58ac522c48505412143ffa':
#transfer_data['value'] = web3.Web3.toInt(hexstr=strip_0x(l['data']))
transfer_data['value'] = int.from_bytes(bytes.fromhex(strip_0x(l_data)))
#token_address_bytes = topics[2][32-20:]
token_address = strip_0x(topics[2])[64-40:]
transfer_data['token_address'] = to_checksum_address(token_address)
return ('tokengift', transfer_data)
class CallbackFilter(SyncFilter):
@ -53,45 +95,6 @@ class CallbackFilter(SyncFilter):
return s
def parse_transfer(self, tx):
r = ERC20.parse_transfer_request(tx.payload)
transfer_data = {}
transfer_data['to'] = r[0]
transfer_data['value'] = r[1]
transfer_data['from'] = tx['from']
transfer_data['token_address'] = tx['to']
return ('transfer', transfer_data)
def parse_transferfrom(self, tx):
r = ERC20.parse_transfer_request(tx.payload)
transfer_data = unpack_transferfrom(tx.payload)
transfer_data['from'] = r[0]
transfer_data['to'] = r[1]
transfer_data['value'] = r[2]
transfer_data['token_address'] = tx['to']
return ('transferfrom', transfer_data)
def parse_giftto(self, tx):
# TODO: broken
transfer_data = unpack_gift(tx.payload)
transfer_data['from'] = tx.inputs[0]
transfer_data['value'] = 0
transfer_data['token_address'] = ZERO_ADDRESS
# TODO: would be better to query the gift amount from the block state
for l in tx.logs:
topics = l['topics']
logg.debug('topixx {}'.format(topics))
if strip_0x(topics[0]) == '45c201a59ac545000ead84f30b2db67da23353aa1d58ac522c48505412143ffa':
#transfer_data['value'] = web3.Web3.toInt(hexstr=strip_0x(l['data']))
transfer_data['value'] = int.from_bytes(bytes.fromhex(strip_0x(l_data)))
#token_address_bytes = topics[2][32-20:]
token_address = strip_0x(topics[2])[64-40:]
transfer_data['token_address'] = to_checksum_address(token_address)
return ('tokengift', transfer_data)
def parse_data(self, tx):
transfer_type = None
transfer_data = None
@ -103,7 +106,7 @@ class CallbackFilter(SyncFilter):
for parser in [
parse_transfer,
parse_transfeffrom,
parse_transferfrom,
parse_giftto,
]:
try:

View File

@ -10,10 +10,10 @@ from cic_eth.db.models.base import SessionBase
from cic_eth.db.models.tx import TxCache
from cic_eth.db.models.otx import Otx
from cic_eth.queue.tx import get_paused_txs
from cic_eth.eth.task import create_check_gas_and_send_task
from cic_eth.eth.gas import create_check_gas_task
from .base import SyncFilter
logg = logging.getLogger().getLogger(__name__)
logg = logging.getLogger().getChild(__name__)
class GasFilter(SyncFilter):
@ -44,7 +44,7 @@ class GasFilter(SyncFilter):
logg.info('resuming gas-in-waiting txs for {}'.format(r[0]))
if len(txs) > 0:
s = create_check_gas_and_send_task(
s = create_check_gas_task(
list(txs.values()),
self.chain_spec.asdict(),
r[0],

View File

@ -143,10 +143,6 @@ def main():
# if config.get('SSL_CA_FILE') != '':
# Callback.ssl_ca_file = config.get('SSL_CA_FILE')
#if config.get('ETH_ACCOUNT_ACCOUNTS_INDEX_WRITER') != None:
# CICRegistry.add_role(chain_spec, config.get('ETH_ACCOUNT_ACCOUNTS_INDEX_WRITER'), 'AccountRegistry', True)
rpc = RPCConnection.connect(chain_spec, 'default')
registry = CICRegistry(chain_spec, rpc)
registry_address = registry.by_name('CICRegistry')
@ -158,15 +154,7 @@ def main():
trusted_addresses = trusted_addresses_src.split(',')
for address in trusted_addresses:
logg.info('using trusted address {}'.format(address))
#oracle = DeclaratorOracleAdapter(declarator.contract, trusted_addresses)
#chain_registry.add_oracle(oracle, 'naive_erc20_oracle')
#chain_spec = CICRegistry.default_chain_spec
#bancor_registry_contract = CICRegistry.get_contract(chain_spec, 'BancorRegistry', interface='Registry')
#bancor_chain_registry = CICRegistry.get_chain_registry(chain_spec)
#bancor_registry = BancorRegistryClient(c.w3, bancor_chain_registry, config.get('ETH_ABI_DIR'))
#bancor_registry.load(True)
current_app.worker_main(argv)

View File

@ -11,18 +11,15 @@ import re
import confini
import celery
import rlp
import web3
from web3 import HTTPProvider, WebsocketProvider
import cic_base.config
import cic_base.log
import cic_base.argparse
import cic_base.rpc
from cic_registry import CICRegistry
from cic_eth_registry import CICRegistry
from cic_eth_registry.error import UnknownContractError
from chainlib.chain import ChainSpec
from cic_registry import zero_address
from cic_registry.chain import ChainRegistry
from cic_registry.error import UnknownContractError
from chainlib.eth.connection import HTTPConnection
from chainlib.eth.constant import ZERO_ADDRESS
from chainlib.connection import RPCConnection
from chainlib.eth.block import (
block_latest,
)
@ -37,11 +34,7 @@ from chainsyncer.driver import (
from chainsyncer.db.models.base import SessionBase
# local imports
from cic_eth.registry import init_registry
from cic_eth.eth import RpcClient
from cic_eth.db import dsn_from_config
#from cic_eth.sync import Syncer
#from cic_eth.sync.error import LoopDone
from cic_eth.runnable.daemons.filters import (
CallbackFilter,
GasFilter,
@ -64,42 +57,24 @@ config.add(args.q, '_CELERY_QUEUE', True)
cic_base.config.log(config)
dsn = dsn_from_config(config)
SessionBase.connect(dsn, pool_size=16, debug=config.true('DATABASE_DEBUG'))
re_websocket = re.compile('^wss?://')
re_http = re.compile('^https?://')
blockchain_provider = config.get('ETH_PROVIDER')
if re.match(re_websocket, blockchain_provider) != None:
blockchain_provider = web3.Web3.WebsocketProvider(blockchain_provider)
elif re.match(re_http, blockchain_provider) != None:
blockchain_provider = web3.Web3.HTTPProvider(blockchain_provider)
else:
raise ValueError('unknown provider url {}'.format(blockchain_provider))
chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
def web3_constructor():
w3 = web3.Web3(blockchain_provider)
return (blockchain_provider, w3)
RpcClient.set_constructor(web3_constructor)
RPCConnection.register_location(config.get('ETH_PROVIDER'), chain_spec, 'default')
def main():
# parse chain spec object
chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
# connect to celery
celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL'))
# set up registry
w3 = cic_base.rpc.create(config.get('ETH_PROVIDER')) # replace with HTTPConnection when registry has been so refactored
registry = init_registry(config, w3)
# Connect to blockchain with chainlib
conn = HTTPConnection(config.get('ETH_PROVIDER'))
rpc = RPCConnection.connect(chain_spec, 'default')
o = block_latest()
r = conn.do(o)
r = rpc.do(o)
block_offset = int(strip_0x(r), 16) + 1
logg.debug('starting at block {}'.format(block_offset))
@ -151,7 +126,7 @@ def main():
gas_filter = GasFilter(chain_spec, config.get('_CELERY_QUEUE'))
transfer_auth_filter = TransferAuthFilter(registry, chain_spec, config.get('_CELERY_QUEUE'))
#transfer_auth_filter = TransferAuthFilter(registry, chain_spec, config.get('_CELERY_QUEUE'))
i = 0
for syncer in syncers:
@ -160,17 +135,15 @@ def main():
syncer.add_filter(registration_filter)
# TODO: the two following filter functions break the filter loop if return uuid. Pro: less code executed. Con: Possibly unintuitive flow break
syncer.add_filter(tx_filter)
syncer.add_filter(transfer_auth_filter)
#syncer.add_filter(transfer_auth_filter)
for cf in callback_filters:
syncer.add_filter(cf)
r = syncer.loop(int(config.get('SYNCER_LOOP_INTERVAL')), conn)
r = syncer.loop(int(config.get('SYNCER_LOOP_INTERVAL')), rpc)
sys.stderr.write("sync {} done at block {}\n".format(syncer, r))
i += 1
sys.exit(0)
if __name__ == '__main__':
main()