Syncer refactor

This commit is contained in:
Louis Holbrook
2021-03-01 20:15:17 +00:00
parent 451079d004
commit 5001113267
56 changed files with 833 additions and 938 deletions

View File

@@ -21,14 +21,21 @@ import cic_eth
from cic_eth.eth import RpcClient
from cic_eth.db import SessionBase
from cic_eth.db.enum import StatusEnum
from cic_eth.db.enum import StatusBits
from cic_eth.db.enum import LockEnum
from cic_eth.db import dsn_from_config
from cic_eth.queue.tx import get_upcoming_tx
from cic_eth.queue.tx import (
get_upcoming_tx,
set_dequeue,
)
from cic_eth.admin.ctrl import lock_send
from cic_eth.sync.error import LoopDone
from cic_eth.eth.tx import send as task_tx_send
from cic_eth.error import PermanentTxError
from cic_eth.error import TemporaryTxError
from cic_eth.error import (
PermanentTxError,
TemporaryTxError,
NotLocalTxError,
)
from cic_eth.eth.util import unpack_signed_raw_tx_hex
logging.basicConfig(level=logging.WARNING)
@@ -109,6 +116,11 @@ class DispatchSyncer:
for k in txs.keys():
tx_raw = txs[k]
tx = unpack_signed_raw_tx_hex(tx_raw, self.chain_spec.chain_id())
try:
set_dequeue(tx['hash'])
except NotLocalTxError as e:
logg.warning('dispatcher was triggered with non-local tx {}'.format(tx['hash']))
s_check = celery.signature(
'cic_eth.admin.ctrl.check_lock',
@@ -129,12 +141,13 @@ class DispatchSyncer:
)
s_check.link(s_send)
t = s_check.apply_async()
logg.info('processed {}'.format(k))
def loop(self, w3, interval):
while run:
txs = {}
typ = StatusEnum.READYSEND
typ = StatusBits.QUEUED
utxs = get_upcoming_tx(typ, chain_id=self.chain_id)
for k in utxs.keys():
txs[k] = utxs[k]

View File

@@ -5,37 +5,46 @@ import logging
import web3
import celery
from cic_registry.error import UnknownContractError
from chainlib.status import Status as TxStatus
from chainlib.eth.address import to_checksum
from chainlib.eth.constant import ZERO_ADDRESS
from hexathon import strip_0x
# local imports
from .base import SyncFilter
from cic_eth.eth.token import unpack_transfer
from cic_eth.eth.token import unpack_transferfrom
from cic_eth.eth.token import (
unpack_transfer,
unpack_transferfrom,
)
from cic_eth.eth.account import unpack_gift
from cic_eth.eth.token import ExtendedTx
from .base import SyncFilter
logg = logging.getLogger()
logg = logging.getLogger(__name__)
transfer_method_signature = '0xa9059cbb' # keccak256(transfer(address,uint256))
transferfrom_method_signature = '0x23b872dd' # keccak256(transferFrom(address,address,uint256))
giveto_method_signature = '0x63e4bff4' # keccak256(giveTo(address))
transfer_method_signature = 'a9059cbb' # keccak256(transfer(address,uint256))
transferfrom_method_signature = '23b872dd' # keccak256(transferFrom(address,address,uint256))
giveto_method_signature = '63e4bff4' # keccak256(giveTo(address))
class CallbackFilter(SyncFilter):
trusted_addresses = []
def __init__(self, method, queue):
def __init__(self, chain_spec, method, queue):
self.queue = queue
self.method = method
self.chain_spec = chain_spec
def call_back(self, transfer_type, result):
logg.debug('result {}'.format(result))
s = celery.signature(
self.method,
[
result,
transfer_type,
int(rcpt.status == 0),
int(result['status_code'] == 0),
],
queue=self.queue,
)
@@ -50,58 +59,85 @@ class CallbackFilter(SyncFilter):
# )
# s_translate.link(s)
# s_translate.apply_async()
s.apply_async()
t = s.apply_async()
return s
def parse_data(self, tx, rcpt):
transfer_type = 'transfer'
def parse_data(self, tx):
transfer_type = None
transfer_data = None
method_signature = tx.input[:10]
logg.debug('have payload {}'.format(tx.payload))
method_signature = tx.payload[:8]
logg.debug('tx status {}'.format(tx.status))
if method_signature == transfer_method_signature:
transfer_data = unpack_transfer(tx.input)
transfer_data = unpack_transfer(tx.payload)
transfer_data['from'] = tx['from']
transfer_data['token_address'] = tx['to']
elif method_signature == transferfrom_method_signature:
transfer_type = 'transferfrom'
transfer_data = unpack_transferfrom(tx.input)
transfer_data = unpack_transferfrom(tx.payload)
transfer_data['token_address'] = tx['to']
# TODO: do not rely on logs here
elif method_signature == giveto_method_signature:
transfer_type = 'tokengift'
transfer_data = unpack_gift(tx.input)
for l in rcpt.logs:
if l.topics[0].hex() == '0x45c201a59ac545000ead84f30b2db67da23353aa1d58ac522c48505412143ffa':
transfer_data['value'] = web3.Web3.toInt(hexstr=l.data)
token_address_bytes = l.topics[2][32-20:]
transfer_data['token_address'] = web3.Web3.toChecksumAddress(token_address_bytes.hex())
transfer_data['from'] = rcpt.to
transfer_data = unpack_gift(tx.payload)
transfer_data['from'] = tx.inputs[0]
transfer_data['value'] = 0
transfer_data['token_address'] = ZERO_ADDRESS
for l in tx.logs:
topics = l['topics']
logg.debug('topixx {}'.format(topics))
if strip_0x(topics[0]) == '45c201a59ac545000ead84f30b2db67da23353aa1d58ac522c48505412143ffa':
transfer_data['value'] = web3.Web3.toInt(hexstr=strip_0x(l['data']))
#token_address_bytes = topics[2][32-20:]
token_address = strip_0x(topics[2])[64-40:]
transfer_data['token_address'] = to_checksum(token_address)
logg.debug('resolved method {}'.format(transfer_type))
if transfer_data != None:
transfer_data['status'] = tx.status
return (transfer_type, transfer_data)
def filter(self, w3, tx, rcpt, chain_spec, session=None):
logg.debug('applying callback filter "{}:{}"'.format(self.queue, self.method))
chain_str = str(chain_spec)
transfer_data = self.parse_data(tx, rcpt)
def filter(self, conn, block, tx, db_session=None):
chain_str = str(self.chain_spec)
transfer_data = None
if len(tx.input) < 10:
logg.debug('callbacks filter data length not sufficient for method signature in tx {}, skipping'.format(tx['hash']))
transfer_type = None
try:
(transfer_type, transfer_data) = self.parse_data(tx)
except TypeError:
logg.debug('invalid method data length for tx {}'.format(tx.hash))
return
logg.debug('checking callbacks filter input {}'.format(tx.input[:10]))
if len(tx.payload) < 8:
logg.debug('callbacks filter data length not sufficient for method signature in tx {}, skipping'.format(tx.hash))
return
logg.debug('checking callbacks filter input {}'.format(tx.payload[:8]))
if transfer_data != None:
logg.debug('wtfoo {}'.format(transfer_data))
token_symbol = None
result = None
try:
tokentx = ExtendedTx(self.chain_spec)
tokentx = ExtendedTx(tx.hash, self.chain_spec)
tokentx.set_actors(transfer_data['from'], transfer_data['to'], self.trusted_addresses)
tokentx.set_tokens(transfer_data['token_address'], transfer_data['value'])
self.call_back(tokentx.to_dict())
if transfer_data['status'] == 0:
tokentx.set_status(1)
else:
tokentx.set_status(0)
t = self.call_back(transfer_type, tokentx.to_dict())
logg.info('callback success task id {} tx {}'.format(t, tx.hash))
except UnknownContractError:
logg.debug('callback filter {}:{} skipping "transfer" method on unknown contract {} tx {}'.format(tc.queue, tc.method, transfer_data['to'], tx.hash.hex()))
logg.debug('callback filter {}:{} skipping "transfer" method on unknown contract {} tx {}'.format(tc.queue, tc.method, transfer_data['to'], tx.hash))
def __str__(self):
return 'cic-eth callbacks'

View File

@@ -3,6 +3,7 @@ import logging
# third-party imports
from cic_registry.chain import ChainSpec
from hexathon import add_0x
# local imports
from cic_eth.db.enum import StatusBits
@@ -13,20 +14,19 @@ from cic_eth.queue.tx import get_paused_txs
from cic_eth.eth.task import create_check_gas_and_send_task
from .base import SyncFilter
logg = logging.getLogger()
logg = logging.getLogger(__name__)
class GasFilter(SyncFilter):
def __init__(self, gas_provider, queue=None):
def __init__(self, chain_spec, queue=None):
self.queue = queue
self.gas_provider = gas_provider
self.chain_spec = chain_spec
def filter(self, w3, tx, rcpt, chain_str, session=None):
logg.debug('applying gas filter')
tx_hash_hex = tx.hash.hex()
if tx['value'] > 0:
def filter(self, conn, block, tx, session): #rcpt, chain_str, session=None):
tx_hash_hex = add_0x(tx.hash)
if tx.value > 0:
logg.debug('gas refill tx {}'.format(tx_hash_hex))
session = SessionBase.bind_session(session)
q = session.query(TxCache.recipient)
@@ -35,23 +35,26 @@ class GasFilter(SyncFilter):
r = q.first()
if r == None:
logg.warning('unsolicited gas refill tx {}'.format(tx_hash_hex))
logg.debug('unsolicited gas refill tx {}'.format(tx_hash_hex))
SessionBase.release_session(session)
return
chain_spec = ChainSpec.from_chain_str(chain_str)
txs = get_paused_txs(StatusBits.GAS_ISSUES, r[0], chain_spec.chain_id(), session=session)
txs = get_paused_txs(StatusBits.GAS_ISSUES, r[0], self.chain_spec.chain_id(), session=session)
SessionBase.release_session(session)
logg.info('resuming gas-in-waiting txs for {}'.format(r[0]))
if len(txs) > 0:
logg.info('resuming gas-in-waiting txs for {}: {}'.format(r[0], txs.keys()))
s = create_check_gas_and_send_task(
list(txs.values()),
str(chain_str),
str(self.chain_spec),
r[0],
0,
tx_hashes_hex=list(txs.keys()),
queue=self.queue,
)
s.apply_async()
def __str__(self):
return 'eic-eth gasfilter'

View File

@@ -4,36 +4,48 @@ import logging
# third-party imports
import celery
from chainlib.eth.address import to_checksum
from hexathon import (
add_0x,
strip_0x,
)
# local imports
from .base import SyncFilter
logg = logging.getLogger()
logg = logging.getLogger(__name__)
account_registry_add_log_hash = '0x5ed3bdd47b9af629827a8d129aa39c870b10c03f0153fe9ddb8e84b665061acd' # keccak256(AccountAdded(address,uint256))
class RegistrationFilter(SyncFilter):
def __init__(self, queue):
def __init__(self, chain_spec, queue):
self.chain_spec = chain_spec
self.queue = queue
def filter(self, w3, tx, rcpt, chain_spec, session=None):
logg.debug('applying registration filter')
def filter(self, conn, block, tx, db_session=None):
registered_address = None
for l in rcpt['logs']:
event_topic_hex = l['topics'][0].hex()
logg.debug('register filter checking log {}'.format(tx.logs))
for l in tx.logs:
event_topic_hex = l['topics'][0]
if event_topic_hex == account_registry_add_log_hash:
address_bytes = l.topics[1][32-20:]
address = to_checksum(address_bytes.hex())
# TODO: use abi conversion method instead
address_hex = strip_0x(l['topics'][1])[64-40:]
address = to_checksum(add_0x(address_hex))
logg.debug('request token gift to {}'.format(address))
s = celery.signature(
'cic_eth.eth.account.gift',
[
address,
str(chain_spec),
str(self.chain_spec),
],
queue=self.queue,
)
s.apply_async()
def __str__(self):
return 'cic-eth account registration'

View File

@@ -3,39 +3,47 @@ import logging
# third-party imports
import celery
from hexathon import (
add_0x,
)
# local imports
from cic_eth.db.models.otx import Otx
from cic_eth.db.models.base import SessionBase
from chainsyncer.db.models.base import SessionBase
from chainlib.status import Status
from .base import SyncFilter
logg = logging.getLogger()
logg = logging.getLogger(__name__)
class TxFilter(SyncFilter):
def __init__(self, queue):
def __init__(self, chain_spec, queue):
self.queue = queue
self.chain_spec = chain_spec
def filter(self, w3, tx, rcpt, chain_spec, session=None):
session = SessionBase.bind_session(session)
logg.debug('applying tx filter')
tx_hash_hex = tx.hash.hex()
otx = Otx.load(tx_hash_hex, session=session)
SessionBase.release_session(session)
def filter(self, conn, block, tx, db_session=None):
db_session = SessionBase.bind_session(db_session)
tx_hash_hex = tx.hash
otx = Otx.load(add_0x(tx_hash_hex), session=db_session)
if otx == None:
logg.debug('tx {} not found locally, skipping'.format(tx_hash_hex))
return None
logg.info('otx found {}'.format(otx.tx_hash))
logg.info('local tx match {}'.format(otx.tx_hash))
SessionBase.release_session(db_session)
s = celery.signature(
'cic_eth.queue.tx.set_final_status',
[
tx_hash_hex,
rcpt.blockNumber,
rcpt.status == 0,
add_0x(tx_hash_hex),
tx.block.number,
tx.status == Status.ERROR,
],
queue=self.queue,
)
t = s.apply_async()
return t
def __str__(self):
return 'cic-eth erc20 transfer filter'

View File

@@ -1,207 +0,0 @@
# standard imports
import os
import sys
import logging
import time
import argparse
import sys
import re
# third-party imports
import confini
import celery
import rlp
import web3
from web3 import HTTPProvider, WebsocketProvider
from cic_registry import CICRegistry
from cic_registry.chain import ChainSpec
from cic_registry import zero_address
from cic_registry.chain import ChainRegistry
from cic_registry.error import UnknownContractError
from cic_bancor.bancor import BancorRegistryClient
# local imports
import cic_eth
from cic_eth.eth import RpcClient
from cic_eth.db import SessionBase
from cic_eth.db import Otx
from cic_eth.db import TxConvertTransfer
from cic_eth.db.models.tx import TxCache
from cic_eth.db.enum import StatusEnum
from cic_eth.db import dsn_from_config
from cic_eth.queue.tx import get_paused_txs
from cic_eth.sync import Syncer
from cic_eth.sync.error import LoopDone
from cic_eth.db.error import UnknownConvertError
from cic_eth.eth.util import unpack_signed_raw_tx
from cic_eth.eth.task import create_check_gas_and_send_task
from cic_eth.sync.backend import SyncerBackend
from cic_eth.eth.token import unpack_transfer
from cic_eth.eth.token import unpack_transferfrom
from cic_eth.eth.account import unpack_gift
from cic_eth.runnable.daemons.filters import (
CallbackFilter,
GasFilter,
TxFilter,
RegistrationFilter,
)
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
logging.getLogger('websockets.protocol').setLevel(logging.CRITICAL)
logging.getLogger('web3.RequestManager').setLevel(logging.CRITICAL)
logging.getLogger('web3.providers.WebsocketProvider').setLevel(logging.CRITICAL)
logging.getLogger('web3.providers.HTTPProvider').setLevel(logging.CRITICAL)
config_dir = os.path.join('/usr/local/etc/cic-eth')
argparser = argparse.ArgumentParser(description='daemon that monitors transactions in new blocks')
argparser.add_argument('-c', type=str, default=config_dir, help='config root to use')
argparser.add_argument('-i', '--chain-spec', type=str, dest='i', help='chain spec')
argparser.add_argument('--abi-dir', dest='abi_dir', type=str, help='Directory containing bytecode and abi')
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
argparser.add_argument('-q', type=str, default='cic-eth', help='celery queue to submit transaction tasks to')
argparser.add_argument('-v', help='be verbose', action='store_true')
argparser.add_argument('-vv', help='be more verbose', action='store_true')
argparser.add_argument('mode', type=str, help='sync mode: (head|history)', default='head')
args = argparser.parse_args(sys.argv[1:])
if args.v == True:
logging.getLogger().setLevel(logging.INFO)
elif args.vv == True:
logging.getLogger().setLevel(logging.DEBUG)
config_dir = os.path.join(args.c)
os.makedirs(config_dir, 0o777, True)
config = confini.Config(config_dir, args.env_prefix)
config.process()
# override args
args_override = {
'ETH_ABI_DIR': getattr(args, 'abi_dir'),
'CIC_CHAIN_SPEC': getattr(args, 'i'),
}
config.dict_override(args_override, 'cli flag')
config.censor('PASSWORD', 'DATABASE')
config.censor('PASSWORD', 'SSL')
logg.debug('config loaded from {}:\n{}'.format(config_dir, config))
app = celery.Celery(backend=config.get('CELERY_RESULT_URL'), broker=config.get('CELERY_BROKER_URL'))
queue = args.q
chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
re_websocket = re.compile('^wss?://')
re_http = re.compile('^https?://')
blockchain_provider = config.get('ETH_PROVIDER')
if re.match(re_websocket, blockchain_provider) != None:
blockchain_provider = WebsocketProvider(blockchain_provider)
elif re.match(re_http, blockchain_provider) != None:
blockchain_provider = HTTPProvider(blockchain_provider)
else:
raise ValueError('unknown provider url {}'.format(blockchain_provider))
def web3_constructor():
w3 = web3.Web3(blockchain_provider)
return (blockchain_provider, w3)
RpcClient.set_constructor(web3_constructor)
c = RpcClient(chain_spec)
CICRegistry.init(c.w3, config.get('CIC_REGISTRY_ADDRESS'), chain_spec)
CICRegistry.add_path(config.get('ETH_ABI_DIR'))
chain_registry = ChainRegistry(chain_spec)
CICRegistry.add_chain_registry(chain_registry, True)
declarator = CICRegistry.get_contract(chain_spec, 'AddressDeclarator', interface='Declarator')
dsn = dsn_from_config(config)
SessionBase.connect(dsn, pool_size=1, debug=config.true('DATABASE_DEBUG'))
def main():
global chain_spec, c, queue
if config.get('ETH_ACCOUNT_ACCOUNTS_INDEX_WRITER') != None:
CICRegistry.add_role(chain_spec, config.get('ETH_ACCOUNT_ACCOUNTS_INDEX_WRITER'), 'AccountRegistry', True)
syncers = []
block_offset = c.w3.eth.blockNumber
chain = str(chain_spec)
if SyncerBackend.first(chain):
from cic_eth.sync.history import HistorySyncer
backend = SyncerBackend.initial(chain, block_offset)
syncer = HistorySyncer(backend)
syncers.append(syncer)
if args.mode == 'head':
from cic_eth.sync.head import HeadSyncer
block_sync = SyncerBackend.live(chain, block_offset+1)
syncers.append(HeadSyncer(block_sync))
elif args.mode == 'history':
from cic_eth.sync.history import HistorySyncer
backends = SyncerBackend.resume(chain, block_offset+1)
for backend in backends:
syncers.append(HistorySyncer(backend))
if len(syncers) == 0:
logg.info('found no unsynced history. terminating')
sys.exit(0)
else:
sys.stderr.write("unknown mode '{}'\n".format(args.mode))
sys.exit(1)
# bancor_registry_contract = CICRegistry.get_contract(chain_spec, 'BancorRegistry', interface='Registry')
# bancor_chain_registry = CICRegistry.get_chain_registry(chain_spec)
# bancor_registry = BancorRegistryClient(c.w3, bancor_chain_registry, config.get('ETH_ABI_DIR'))
# bancor_registry.load()
trusted_addresses_src = config.get('CIC_TRUST_ADDRESS')
if trusted_addresses_src == None:
logg.critical('At least one trusted address must be declared in CIC_TRUST_ADDRESS')
sys.exit(1)
trusted_addresses = trusted_addresses_src.split(',')
for address in trusted_addresses:
logg.info('using trusted address {}'.format(address))
CallbackFilter.trusted_addresses = trusted_addresses
callback_filters = []
for cb in config.get('TASKS_TRANSFER_CALLBACKS', '').split(','):
task_split = cb.split(':')
task_queue = queue
if len(task_split) > 1:
task_queue = task_split[0]
callback_filter = CallbackFilter(task_split[1], task_queue)
callback_filters.append(callback_filter)
tx_filter = TxFilter(queue)
registration_filter = RegistrationFilter(queue)
gas_filter = GasFilter(c.gas_provider(), queue)
i = 0
for syncer in syncers:
logg.debug('running syncer index {}'.format(i))
syncer.filter.append(gas_filter.filter)
syncer.filter.append(registration_filter.filter)
# TODO: the two following filter functions break the filter loop if return uuid. Pro: less code executed. Con: Possibly unintuitive flow break
syncer.filter.append(tx_filter.filter)
#syncer.filter.append(convert_filter)
for cf in callback_filters:
syncer.filter.append(cf.filter)
try:
syncer.loop(int(config.get('SYNCER_LOOP_INTERVAL')))
except LoopDone as e:
sys.stderr.write("sync '{}' done at block {}\n".format(args.mode, e))
i += 1
sys.exit(0)
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,168 @@
# standard imports
import os
import sys
import logging
import time
import argparse
import sys
import re
# third-party imports
import confini
import celery
import rlp
import web3
from web3 import HTTPProvider, WebsocketProvider
import cic_base.config
import cic_base.log
import cic_base.argparse
import cic_base.rpc
from cic_registry import CICRegistry
from chainlib.chain import ChainSpec
from cic_registry import zero_address
from cic_registry.chain import ChainRegistry
from cic_registry.error import UnknownContractError
from chainlib.eth.connection import HTTPConnection
from chainlib.eth.block import (
block_latest,
)
from hexathon import (
strip_0x,
)
from chainsyncer.backend import SyncerBackend
from chainsyncer.driver import (
HeadSyncer,
HistorySyncer,
)
from chainsyncer.db.models.base import SessionBase
# local imports
from cic_eth.registry import init_registry
from cic_eth.eth import RpcClient
from cic_eth.db import Otx
from cic_eth.db import TxConvertTransfer
from cic_eth.db.models.tx import TxCache
from cic_eth.db.enum import StatusEnum
from cic_eth.db import dsn_from_config
from cic_eth.queue.tx import get_paused_txs
#from cic_eth.sync import Syncer
#from cic_eth.sync.error import LoopDone
from cic_eth.db.error import UnknownConvertError
from cic_eth.eth.util import unpack_signed_raw_tx
from cic_eth.eth.task import create_check_gas_and_send_task
from cic_eth.eth.token import unpack_transfer
from cic_eth.eth.token import unpack_transferfrom
from cic_eth.eth.account import unpack_gift
from cic_eth.runnable.daemons.filters import (
CallbackFilter,
GasFilter,
TxFilter,
RegistrationFilter,
)
script_dir = os.path.realpath(os.path.dirname(__file__))
logg = cic_base.log.create()
argparser = cic_base.argparse.create(script_dir, cic_base.argparse.full_template)
#argparser = cic_base.argparse.add(argparser, add_traffic_args, 'traffic')
args = cic_base.argparse.parse(argparser, logg)
config = cic_base.config.create(args.c, args, args.env_prefix)
config.add(args.y, '_KEYSTORE_FILE', True)
config.add(args.q, '_CELERY_QUEUE', True)
cic_base.config.log(config)
dsn = dsn_from_config(config)
SessionBase.connect(dsn, pool_size=1, debug=config.true('DATABASE_DEBUG'))
def main():
# parse chain spec object
chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
# connect to celery
celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL'))
# set up registry
w3 = cic_base.rpc.create(config.get('ETH_PROVIDER')) # replace with HTTPConnection when registry has been so refactored
registry = init_registry(config, w3)
# Connect to blockchain with chainlib
conn = HTTPConnection(config.get('ETH_PROVIDER'))
o = block_latest()
r = conn.do(o)
block_offset = int(strip_0x(r), 16) + 1
logg.debug('starting at block {}'.format(block_offset))
syncers = []
#if SyncerBackend.first(chain_spec):
# backend = SyncerBackend.initial(chain_spec, block_offset)
syncer_backends = SyncerBackend.resume(chain_spec, block_offset)
if len(syncer_backends) == 0:
logg.info('found no backends to resume')
syncer_backends.append(SyncerBackend.initial(chain_spec, block_offset))
else:
for syncer_backend in syncer_backends:
logg.info('resuming sync session {}'.format(syncer_backend))
syncer_backends.append(SyncerBackend.live(chain_spec, block_offset+1))
for syncer_backend in syncer_backends:
try:
syncers.append(HistorySyncer(syncer_backend))
logg.info('Initializing HISTORY syncer on backend {}'.format(syncer_backend))
except AttributeError:
logg.info('Initializing HEAD syncer on backend {}'.format(syncer_backend))
syncers.append(HeadSyncer(syncer_backend))
trusted_addresses_src = config.get('CIC_TRUST_ADDRESS')
if trusted_addresses_src == None:
logg.critical('At least one trusted address must be declared in CIC_TRUST_ADDRESS')
sys.exit(1)
trusted_addresses = trusted_addresses_src.split(',')
for address in trusted_addresses:
logg.info('using trusted address {}'.format(address))
CallbackFilter.trusted_addresses = trusted_addresses
callback_filters = []
for cb in config.get('TASKS_TRANSFER_CALLBACKS', '').split(','):
task_split = cb.split(':')
task_queue = config.get('_CELERY_QUEUE')
if len(task_split) > 1:
task_queue = task_split[0]
callback_filter = CallbackFilter(chain_spec, task_split[1], task_queue)
callback_filters.append(callback_filter)
tx_filter = TxFilter(chain_spec, config.get('_CELERY_QUEUE'))
registration_filter = RegistrationFilter(chain_spec, config.get('_CELERY_QUEUE'))
gas_filter = GasFilter(chain_spec, config.get('_CELERY_QUEUE'))
i = 0
for syncer in syncers:
logg.debug('running syncer index {}'.format(i))
syncer.add_filter(gas_filter)
syncer.add_filter(registration_filter)
# TODO: the two following filter functions break the filter loop if return uuid. Pro: less code executed. Con: Possibly unintuitive flow break
syncer.add_filter(tx_filter)
for cf in callback_filters:
syncer.add_filter(cf)
r = syncer.loop(int(config.get('SYNCER_LOOP_INTERVAL')), conn)
sys.stderr.write("sync {} done at block {}\n".format(syncer, r))
i += 1
sys.exit(0)
if __name__ == '__main__':
main()

View File

@@ -18,6 +18,7 @@ import web3
from cic_registry import CICRegistry
from cic_registry.chain import ChainSpec
from cic_registry.chain import ChainRegistry
from hexathon import add_0x
# local imports
from cic_eth.api import AdminApi
@@ -36,8 +37,8 @@ default_abi_dir = '/usr/share/local/cic/solidity/abi'
default_config_dir = os.path.join('/usr/local/etc/cic-eth')
argparser = argparse.ArgumentParser()
argparser.add_argument('-p', '--provider', dest='p', default='http://localhost:8545', type=str, help='Web3 provider url (http only)')
argparser.add_argument('-r', '--registry-address', type=str, help='CIC registry address')
argparser.add_argument('-p', '--provider', dest='p', type=str, help='Web3 provider url (http only)')
argparser.add_argument('-r', '--registry-address', dest='r', type=str, help='CIC registry address')
argparser.add_argument('-f', '--format', dest='f', default='terminal', type=str, help='Output format')
argparser.add_argument('-c', type=str, default=default_config_dir, help='config root to use')
argparser.add_argument('-i', '--chain-spec', dest='i', type=str, help='chain spec')
@@ -61,12 +62,16 @@ config.process()
args_override = {
'ETH_PROVIDER': getattr(args, 'p'),
'CIC_CHAIN_SPEC': getattr(args, 'i'),
'CIC_REGISTRY_ADDRESS': getattr(args, 'r'),
}
# override args
config.dict_override(args_override, 'cli args')
config.censor('PASSWORD', 'DATABASE')
config.censor('PASSWORD', 'SSL')
logg.debug('config loaded from {}:\n{}'.format(config_dir, config))
config.add(add_0x(args.query), '_QUERY', True)
re_websocket = re.compile('^wss?://')
re_http = re.compile('^https?://')
blockchain_provider = config.get('ETH_PROVIDER')
@@ -148,21 +153,20 @@ def render_lock(o, **kwargs):
# TODO: move each command to submodule
def main():
logg.debug('len {}'.format(len(args.query)))
txs = []
renderer = render_tx
if len(args.query) > 66:
txs = [admin_api.tx(chain_spec, tx_raw=args.query)]
elif len(args.query) > 42:
txs = [admin_api.tx(chain_spec, tx_hash=args.query)]
elif len(args.query) == 42:
txs = admin_api.account(chain_spec, args.query, include_recipient=False)
if len(config.get('_QUERY')) > 66:
txs = [admin_api.tx(chain_spec, tx_raw=config.get('_QUERY'))]
elif len(config.get('_QUERY')) > 42:
txs = [admin_api.tx(chain_spec, tx_hash=config.get('_QUERY'))]
elif len(config.get('_QUERY')) == 42:
txs = admin_api.account(chain_spec, config.get('_QUERY'), include_recipient=False)
renderer = render_account
elif len(args.query) >= 4 and args.query[:4] == 'lock':
elif len(config.get('_QUERY')) >= 4 and config.get('_QUERY')[:4] == 'lock':
txs = admin_api.get_lock()
renderer = render_lock
else:
raise ValueError('cannot parse argument {}'.format(args.query))
raise ValueError('cannot parse argument {}'.format(config.get('_QUERY')))
if len(txs) == 0:
logg.info('no matches found')