WIP Rehabilitate runnables

This commit is contained in:
nolash 2021-03-22 22:10:52 +01:00
parent 05f7b3c9c3
commit 305a1f760b
Signed by untrusted user who does not match committer: lash
GPG Key ID: 21D2E7BB88C2A746
18 changed files with 192 additions and 796 deletions

View File

@ -7,7 +7,6 @@ import requests
import web3 import web3
from chainlib.eth.constant import ZERO_ADDRESS from chainlib.eth.constant import ZERO_ADDRESS
from chainlib.chain import ChainSpec from chainlib.chain import ChainSpec
from chainlib.status import Status as TxStatus
from chainlib.connection import RPCConnection from chainlib.connection import RPCConnection
from chainlib.eth.erc20 import ERC20 from chainlib.eth.erc20 import ERC20
from chainlib.eth.tx import ( from chainlib.eth.tx import (
@ -298,70 +297,3 @@ def cache_approve_data(
session.close() session.close()
return (tx_hash_hex, cache_id) return (tx_hash_hex, cache_id)
#class ExtendedTx:
#
# _default_decimals = 6
#
# def __init__(self, tx_hash, chain_spec):
# self._chain_spec = chain_spec
# self.chain = str(chain_spec)
# self.hash = tx_hash
# self.sender = None
# self.sender_label = None
# self.recipient = None
# self.recipient_label = None
# self.source_token_value = 0
# self.destination_token_value = 0
# self.source_token = ZERO_ADDRESS
# self.destination_token = ZERO_ADDRESS
# self.source_token_symbol = ''
# self.destination_token_symbol = ''
# self.source_token_decimals = ExtendedTx._default_decimals
# self.destination_token_decimals = ExtendedTx._default_decimals
# self.status = TxStatus.PENDING.name
# self.status_code = TxStatus.PENDING.value
#
#
# def set_actors(self, sender, recipient, trusted_declarator_addresses=None):
# self.sender = sender
# self.recipient = recipient
# if trusted_declarator_addresses != None:
# self.sender_label = translate_address(sender, trusted_declarator_addresses, self.chain)
# self.recipient_label = translate_address(recipient, trusted_declarator_addresses, self.chain)
#
#
# def set_tokens(self, source, source_value, destination=None, destination_value=None):
# c = RpcClient(self._chain_spec)
# registry = safe_registry(c.w3)
# if destination == None:
# destination = source
# if destination_value == None:
# destination_value = source_value
# st = registry.get_address(self._chain_spec, source)
# dt = registry.get_address(self._chain_spec, destination)
# self.source_token = source
# self.source_token_symbol = st.symbol()
# self.source_token_decimals = st.decimals()
# self.source_token_value = source_value
# self.destination_token = destination
# self.destination_token_symbol = dt.symbol()
# self.destination_token_decimals = dt.decimals()
# self.destination_token_value = destination_value
#
#
# def set_status(self, n):
# if n:
# self.status = TxStatus.ERROR.name
# else:
# self.status = TxStatus.SUCCESS.name
# self.status_code = n
#
#
# def to_dict(self):
# o = {}
# for attr in dir(self):
# if attr[0] == '_' or attr in ['set_actors', 'set_tokens', 'set_status', 'to_dict']:
# continue
# o[attr] = getattr(self, attr)
# return o

View File

@ -0,0 +1,71 @@
# extended imports
from chainlib.eth.constant import ZERO_ADDRESS
from chainlib.status import Status as TxStatus
class ExtendedTx:
_default_decimals = 6
def __init__(self, rpc, tx_hash, chain_spec):
self.rpc = rpc
self.chain_spec = chain_spec
self.hash = tx_hash
self.sender = None
self.sender_label = None
self.recipient = None
self.recipient_label = None
self.source_token_value = 0
self.destination_token_value = 0
self.source_token = ZERO_ADDRESS
self.destination_token = ZERO_ADDRESS
self.source_token_symbol = ''
self.destination_token_symbol = ''
self.source_token_decimals = ExtendedTx._default_decimals
self.destination_token_decimals = ExtendedTx._default_decimals
self.status = TxStatus.PENDING.name
self.status_code = TxStatus.PENDING.value
def set_actors(self, sender, recipient, trusted_declarator_addresses=None):
self.sender = sender
self.recipient = recipient
if trusted_declarator_addresses != None:
self.sender_label = translate_address(sender, trusted_declarator_addresses, self.chain_spec)
self.recipient_label = translate_address(recipient, trusted_declarator_addresses, self.chain_spec)
def set_tokens(self, source, source_value, destination=None, destination_value=None):
if destination == None:
destination = source
if destination_value == None:
destination_value = source_value
st = ERC20Token(self.rpc, source)
dt = ERC20Token(self.rpc, destination)
self.source_token = source
self.source_token_symbol = st.symbol
self.source_token_name = st.name
self.source_token_decimals = st.decimals
self.source_token_value = source_value
self.destination_token = destination
self.destination_token_symbol = dt.symbol
self.destination_token_name = dt.name
self.destination_token_decimals = dt.decimals
self.destination_token_value = destination_value
def set_status(self, n):
if n:
self.status = TxStatus.ERROR.name
else:
self.status = TxStatus.SUCCESS.name
self.status_code = n
def to_dict(self):
o = {}
for attr in dir(self):
if attr[0] == '_' or attr in ['set_actors', 'set_tokens', 'set_status', 'to_dict']:
continue
o[attr] = getattr(self, attr)
return o

View File

@ -11,11 +11,10 @@ import datetime
# third-party imports # third-party imports
import confini import confini
import celery import celery
import web3 from cic_eth_registry import CICRegistry
from web3 import HTTPProvider, WebsocketProvider from chainlib.chain import ChainSpec
from cic_registry import CICRegistry
from cic_registry.chain import ChainSpec
from chainlib.eth.tx import unpack from chainlib.eth.tx import unpack
from chainsyncer.error import SyncDone
from hexathon import strip_0x from hexathon import strip_0x
# local imports # local imports
@ -31,7 +30,6 @@ from cic_eth.queue.tx import (
set_dequeue, set_dequeue,
) )
from cic_eth.admin.ctrl import lock_send from cic_eth.admin.ctrl import lock_send
from cic_eth.sync.error import LoopDone
from cic_eth.eth.tx import send as task_tx_send from cic_eth.eth.tx import send as task_tx_send
from cic_eth.error import ( from cic_eth.error import (
PermanentTxError, PermanentTxError,
@ -51,6 +49,7 @@ logging.getLogger('web3.providers.HTTPProvider').setLevel(logging.CRITICAL)
config_dir = os.path.join('/usr/local/etc/cic-eth') config_dir = os.path.join('/usr/local/etc/cic-eth')
argparser = argparse.ArgumentParser(description='daemon that monitors transactions in new blocks') argparser = argparse.ArgumentParser(description='daemon that monitors transactions in new blocks')
argparser.add_argument('-p', '--provider', dest='p', type=str, help='rpc provider')
argparser.add_argument('-c', type=str, default=config_dir, help='config root to use') argparser.add_argument('-c', type=str, default=config_dir, help='config root to use')
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration') argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
argparser.add_argument('-q', type=str, default='cic-eth', help='celery queue to submit transaction tasks to') argparser.add_argument('-q', type=str, default='cic-eth', help='celery queue to submit transaction tasks to')
@ -79,21 +78,9 @@ queue = args.q
dsn = dsn_from_config(config) dsn = dsn_from_config(config)
SessionBase.connect(dsn, debug=config.true('DATABASE_DEBUG')) SessionBase.connect(dsn, debug=config.true('DATABASE_DEBUG'))
chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
re_websocket = re.compile('^wss?://') RPCConnection.registry_location(args.p, chain_spec, tag='default')
re_http = re.compile('^https?://')
blockchain_provider = config.get('ETH_PROVIDER')
if re.match(re_websocket, blockchain_provider) != None:
blockchain_provider = WebsocketProvider(blockchain_provider)
elif re.match(re_http, blockchain_provider) != None:
blockchain_provider = HTTPProvider(blockchain_provider)
else:
raise ValueError('unknown provider url {}'.format(blockchain_provider))
def web3_constructor():
w3 = web3.Web3(blockchain_provider)
return (blockchain_provider, w3)
RpcClient.set_constructor(web3_constructor)
run = True run = True
@ -165,17 +152,10 @@ class DispatchSyncer:
def main(): def main():
chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
c = RpcClient(chain_spec)
CICRegistry.init(c.w3, config.get('CIC_REGISTRY_ADDRESS'), chain_spec)
CICRegistry.add_path(config.get('ETH_ABI_DIR'))
syncer = DispatchSyncer(chain_spec) syncer = DispatchSyncer(chain_spec)
try: try:
syncer.loop(c.w3, float(config.get('DISPATCHER_LOOP_INTERVAL'))) syncer.loop(c.w3, float(config.get('DISPATCHER_LOOP_INTERVAL')))
except LoopDone as e: except SyncDone as e:
sys.stderr.write("dispatcher done at block {}\n".format(e)) sys.stderr.write("dispatcher done at block {}\n".format(e))
sys.exit(0) sys.exit(0)

View File

@ -2,29 +2,19 @@
import logging import logging
# third-party imports # third-party imports
import web3
import celery import celery
from cic_registry.error import UnknownContractError from cic_eth_registry.error import UnknownContractError
from chainlib.status import Status as TxStatus from chainlib.status import Status as TxStatus
from chainlib.eth.address import to_checksum from chainlib.eth.address import to_checksum_address
from chainlib.eth.error import RequestMismatchException
from chainlib.eth.constant import ZERO_ADDRESS from chainlib.eth.constant import ZERO_ADDRESS
from hexathon import strip_0x from hexathon import strip_0x
# local imports # local imports
from .base import SyncFilter from .base import SyncFilter
from cic_eth.eth.token import ( from cic_eth.eth.meta import ExtendedTx
unpack_transfer,
unpack_transferfrom,
)
from cic_eth.eth.account import unpack_gift
from cic_eth.eth.token import ExtendedTx
from .base import SyncFilter
logg = logging.getLogger(__name__) logg = logging.getLogger().getLogger__name__)
transfer_method_signature = 'a9059cbb' # keccak256(transfer(address,uint256))
transferfrom_method_signature = '23b872dd' # keccak256(transferFrom(address,address,uint256))
giveto_method_signature = '63e4bff4' # keccak256(giveTo(address))
class CallbackFilter(SyncFilter): class CallbackFilter(SyncFilter):
@ -63,38 +53,65 @@ class CallbackFilter(SyncFilter):
return s return s
def parse_transfer(self, tx):
r = ERC20.parse_transfer_request(tx.payload)
transfer_data = {}
transfer_data['to'] = r[0]
transfer_data['value'] = r[1]
transfer_data['from'] = tx['from']
transfer_data['token_address'] = tx['to']
return ('transfer', transfer_data)
def parse_transferfrom(self, tx):
r = ERC20.parse_transfer_request(tx.payload)
transfer_data = unpack_transferfrom(tx.payload)
transfer_data['from'] = r[0]
transfer_data['to'] = r[1]
transfer_data['value'] = r[2]
transfer_data['token_address'] = tx['to']
return ('transferfrom', transfer_data)
def parse_giftto(self, tx):
# TODO: broken
transfer_data = unpack_gift(tx.payload)
transfer_data['from'] = tx.inputs[0]
transfer_data['value'] = 0
transfer_data['token_address'] = ZERO_ADDRESS
# TODO: would be better to query the gift amount from the block state
for l in tx.logs:
topics = l['topics']
logg.debug('topixx {}'.format(topics))
if strip_0x(topics[0]) == '45c201a59ac545000ead84f30b2db67da23353aa1d58ac522c48505412143ffa':
#transfer_data['value'] = web3.Web3.toInt(hexstr=strip_0x(l['data']))
transfer_data['value'] = int.from_bytes(bytes.fromhex(strip_0x(l_data)))
#token_address_bytes = topics[2][32-20:]
token_address = strip_0x(topics[2])[64-40:]
transfer_data['token_address'] = to_checksum_address(token_address)
return ('tokengift', transfer_data)
def parse_data(self, tx): def parse_data(self, tx):
transfer_type = None transfer_type = None
transfer_data = None transfer_data = None
# TODO: what's with the mix of attributes and dict keys
logg.debug('have payload {}'.format(tx.payload)) logg.debug('have payload {}'.format(tx.payload))
method_signature = tx.payload[:8] method_signature = tx.payload[:8]
logg.debug('tx status {}'.format(tx.status)) logg.debug('tx status {}'.format(tx.status))
if method_signature == transfer_method_signature:
transfer_data = unpack_transfer(tx.payload)
transfer_data['from'] = tx['from']
transfer_data['token_address'] = tx['to']
elif method_signature == transferfrom_method_signature: for parser in [
transfer_type = 'transferfrom' parse_transfer,
transfer_data = unpack_transferfrom(tx.payload) parse_transfeffrom,
transfer_data['token_address'] = tx['to'] parse_giftto,
]:
try:
(transfer_type, transfer_data) = parser(tx)
break
except RequestMismatchException:
continue
# TODO: do not rely on logs here
elif method_signature == giveto_method_signature:
transfer_type = 'tokengift'
transfer_data = unpack_gift(tx.payload)
transfer_data['from'] = tx.inputs[0]
transfer_data['value'] = 0
transfer_data['token_address'] = ZERO_ADDRESS
for l in tx.logs:
topics = l['topics']
logg.debug('topixx {}'.format(topics))
if strip_0x(topics[0]) == '45c201a59ac545000ead84f30b2db67da23353aa1d58ac522c48505412143ffa':
transfer_data['value'] = web3.Web3.toInt(hexstr=strip_0x(l['data']))
#token_address_bytes = topics[2][32-20:]
token_address = strip_0x(topics[2])[64-40:]
transfer_data['token_address'] = to_checksum(token_address)
logg.debug('resolved method {}'.format(transfer_type)) logg.debug('resolved method {}'.format(transfer_type))
@ -105,8 +122,6 @@ class CallbackFilter(SyncFilter):
def filter(self, conn, block, tx, db_session=None): def filter(self, conn, block, tx, db_session=None):
chain_str = str(self.chain_spec)
transfer_data = None transfer_data = None
transfer_type = None transfer_type = None
try: try:
@ -122,11 +137,10 @@ class CallbackFilter(SyncFilter):
logg.debug('checking callbacks filter input {}'.format(tx.payload[:8])) logg.debug('checking callbacks filter input {}'.format(tx.payload[:8]))
if transfer_data != None: if transfer_data != None:
logg.debug('wtfoo {}'.format(transfer_data))
token_symbol = None token_symbol = None
result = None result = None
try: try:
tokentx = ExtendedTx(tx.hash, self.chain_spec) tokentx = ExtendedTx(conn, tx.hash, self.chain_spec)
tokentx.set_actors(transfer_data['from'], transfer_data['to'], self.trusted_addresses) tokentx.set_actors(transfer_data['from'], transfer_data['to'], self.trusted_addresses)
tokentx.set_tokens(transfer_data['token_address'], transfer_data['value']) tokentx.set_tokens(transfer_data['token_address'], transfer_data['value'])
if transfer_data['status'] == 0: if transfer_data['status'] == 0:

View File

@ -2,7 +2,6 @@
import logging import logging
# external imports # external imports
from cic_registry.chain import ChainSpec
from hexathon import add_0x from hexathon import add_0x
# local imports # local imports
@ -14,7 +13,7 @@ from cic_eth.queue.tx import get_paused_txs
from cic_eth.eth.task import create_check_gas_and_send_task from cic_eth.eth.task import create_check_gas_and_send_task
from .base import SyncFilter from .base import SyncFilter
logg = logging.getLogger(__name__) logg = logging.getLogger().getLogger(__name__)
class GasFilter(SyncFilter): class GasFilter(SyncFilter):
@ -47,7 +46,7 @@ class GasFilter(SyncFilter):
if len(txs) > 0: if len(txs) > 0:
s = create_check_gas_and_send_task( s = create_check_gas_and_send_task(
list(txs.values()), list(txs.values()),
str(self.chain_spec), self.chain_spec.asdict(),
r[0], r[0],
0, 0,
tx_hashes_hex=list(txs.keys()), tx_hashes_hex=list(txs.keys()),

View File

@ -3,7 +3,7 @@ import logging
# third-party imports # third-party imports
import celery import celery
from chainlib.eth.address import to_checksum from chainlib.eth.address import to_checksum_address
from hexathon import ( from hexathon import (
add_0x, add_0x,
strip_0x, strip_0x,
@ -12,9 +12,9 @@ from hexathon import (
# local imports # local imports
from .base import SyncFilter from .base import SyncFilter
logg = logging.getLogger(__name__) logg = logging.getLogger().getChild(__name__)
account_registry_add_log_hash = '0x5ed3bdd47b9af629827a8d129aa39c870b10c03f0153fe9ddb8e84b665061acd' # keccak256(AccountAdded(address,uint256)) account_registry_add_log_hash = '0x5ed3bdd47b9af629827a8d129aa39c870b10c03f0153fe9ddb8e84b665061acd'
class RegistrationFilter(SyncFilter): class RegistrationFilter(SyncFilter):
@ -32,7 +32,7 @@ class RegistrationFilter(SyncFilter):
# TODO: use abi conversion method instead # TODO: use abi conversion method instead
address_hex = strip_0x(l['topics'][1])[64-40:] address_hex = strip_0x(l['topics'][1])[64-40:]
address = to_checksum(add_0x(address_hex)) address = to_checksum_address(add_0x(address_hex))
logg.info('request token gift to {}'.format(address)) logg.info('request token gift to {}'.format(address))
s_nonce = celery.signature( s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce', 'cic_eth.eth.tx.reserve_nonce',
@ -44,7 +44,7 @@ class RegistrationFilter(SyncFilter):
s_gift = celery.signature( s_gift = celery.signature(
'cic_eth.eth.account.gift', 'cic_eth.eth.account.gift',
[ [
str(self.chain_spec), self.chain_spec.asdict(),
], ],
queue=self.queue, queue=self.queue,
) )

View File

@ -13,7 +13,7 @@ from chainsyncer.db.models.base import SessionBase
from chainlib.status import Status from chainlib.status import Status
from .base import SyncFilter from .base import SyncFilter
logg = logging.getLogger(__name__) logg = logging.getLogger().getChild(__name__)
class TxFilter(SyncFilter): class TxFilter(SyncFilter):

View File

@ -8,9 +8,8 @@ import datetime
import web3 import web3
import confini import confini
import celery import celery
from web3 import HTTPProvider, WebsocketProvider from cic_eth_registry import CICRegistry
from cic_registry import CICRegistry from chainlib.chain import ChainSpec
from cic_registry.chain import ChainSpec
from cic_eth.db import dsn_from_config from cic_eth.db import dsn_from_config
from cic_eth.db import SessionBase from cic_eth.db import SessionBase
@ -25,19 +24,14 @@ from cic_eth.eth.util import unpack_signed_raw_tx_hex
logging.basicConfig(level=logging.WARNING) logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger() logg = logging.getLogger()
logging.getLogger('websockets.protocol').setLevel(logging.CRITICAL)
logging.getLogger('web3.RequestManager').setLevel(logging.CRITICAL)
logging.getLogger('web3.providers.WebsocketProvider').setLevel(logging.CRITICAL)
logging.getLogger('web3.providers.HTTPProvider').setLevel(logging.CRITICAL)
config_dir = os.path.join('/usr/local/etc/cic-eth') config_dir = os.path.join('/usr/local/etc/cic-eth')
argparser = argparse.ArgumentParser(description='daemon that monitors transactions in new blocks') argparser = argparse.ArgumentParser(description='daemon that monitors transactions in new blocks')
argparser.add_argument('-p', '--provider', dest='p', type=str, help='rpc provider')
argparser.add_argument('-c', type=str, default=config_dir, help='config root to use') argparser.add_argument('-c', type=str, default=config_dir, help='config root to use')
argparser.add_argument('-i', '--chain-spec', dest='i', type=str, help='chain spec') argparser.add_argument('-i', '--chain-spec', dest='i', type=str, help='chain spec')
argparser.add_argument('--retry-delay', dest='retry_delay', type=str, help='seconds to wait for retrying a transaction that is marked as sent') argparser.add_argument('--retry-delay', dest='retry_delay', type=str, help='seconds to wait for retrying a transaction that is marked as sent')
argparser.add_argument('--abi-dir', dest='abi_dir', type=str, help='Directory containing bytecode and abi')
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration') argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
argparser.add_argument('-q', type=str, default='cic-eth', help='celery queue to submit transaction tasks to') argparser.add_argument('-q', type=str, default='cic-eth', help='celery queue to submit transaction tasks to')
argparser.add_argument('-v', help='be verbose', action='store_true') argparser.add_argument('-v', help='be verbose', action='store_true')
@ -71,31 +65,15 @@ queue = args.q
chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC')) chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
RPCConnection.registry_location(args.p, chain_spec, tag='default')
dsn = dsn_from_config(config) dsn = dsn_from_config(config)
SessionBase.connect(dsn) SessionBase.connect(dsn)
re_websocket = re.compile('^wss?://')
re_http = re.compile('^https?://')
blockchain_provider = config.get('ETH_PROVIDER')
if re.match(re_websocket, blockchain_provider) != None:
blockchain_provider = WebsocketProvider(blockchain_provider)
elif re.match(re_http, blockchain_provider) != None:
blockchain_provider = HTTPProvider(blockchain_provider)
else:
raise ValueError('unknown provider url {}'.format(blockchain_provider))
def web3_constructor():
w3 = web3.Web3(blockchain_provider)
return (blockchain_provider, w3)
RpcClient.set_constructor(web3_constructor)
straggler_delay = int(config.get('CIC_TX_RETRY_DELAY')) straggler_delay = int(config.get('CIC_TX_RETRY_DELAY'))
# TODO: we already have the signed raw tx in get, so its a waste of cycles to get_tx here # TODO: we already have the signed raw tx in get, so its a waste of cycles to get_tx here
def sendfail_filter(w3, tx_hash, rcpt, chain_str): def sendfail_filter(w3, tx_hash, rcpt, chain_spec):
chain_spec = ChainSpec.from_chain_str(chain_str)
tx_dict = get_tx(tx_hash) tx_dict = get_tx(tx_hash)
tx = unpack_signed_raw_tx_hex(tx_dict['signed_tx'], chain_spec.chain_id()) tx = unpack_signed_raw_tx_hex(tx_dict['signed_tx'], chain_spec.chain_id())
logg.debug('submitting tx {} for retry'.format(tx_hash)) logg.debug('submitting tx {} for retry'.format(tx_hash))
@ -137,7 +115,7 @@ def sendfail_filter(w3, tx_hash, rcpt, chain_str):
# TODO: can we merely use the dispatcher instead? # TODO: can we merely use the dispatcher instead?
def dispatch(chain_str): def dispatch(conn, chain_spec):
txs = get_status_tx(StatusEnum.RETRY, before=datetime.datetime.utcnow()) txs = get_status_tx(StatusEnum.RETRY, before=datetime.datetime.utcnow())
if len(txs) == 0: if len(txs) == 0:
logg.debug('no retry state txs found') logg.debug('no retry state txs found')
@ -199,11 +177,49 @@ def dispatch(chain_str):
# s_send.apply_async() # s_send.apply_async()
def main(): class RetrySyncer(Syncer):
c = RpcClient(chain_spec) def __init__(self, chain_spec, stalled_grace_seconds, failed_grace_seconds=None, final_func=None):
CICRegistry.init(c.w3, config.get('CIC_REGISTRY_ADDRESS'), chain_spec) self.chain_spec = chain_spec
CICRegistry.add_path(config.get('ETH_ABI_DIR')) if failed_grace_seconds == None:
failed_grace_seconds = stalled_grace_seconds
self.stalled_grace_seconds = stalled_grace_seconds
self.failed_grace_seconds = failed_grace_seconds
self.final_func = final_func
def get(self):
# before = datetime.datetime.utcnow() - datetime.timedelta(seconds=self.failed_grace_seconds)
# failed_txs = get_status_tx(
# StatusEnum.SENDFAIL.value,
# before=before,
# )
before = datetime.datetime.utcnow() - datetime.timedelta(seconds=self.stalled_grace_seconds)
stalled_txs = get_status_tx(
StatusBits.IN_NETWORK.value,
not_status=StatusBits.FINAL | StatusBits.MANUAL | StatusBits.OBSOLETE,
before=before,
)
# return list(failed_txs.keys()) + list(stalled_txs.keys())
return stalled_txs
def process(self, conn, ref):
logg.debug('tx {}'.format(ref))
for f in self.filter:
f(conn, ref, None, str(self.chain_spec))
def loop(self, interval):
while self.running and Syncer.running_global:
rpc = RPCConnection.connect(self.chain_spec, 'default')
for tx in self.get():
self.process(rpc, tx)
if self.final_func != None:
self.final_func(rpc, self.chain_spec)
time.sleep(interval)
def main():
syncer = RetrySyncer(chain_spec, straggler_delay, final_func=dispatch) syncer = RetrySyncer(chain_spec, straggler_delay, final_func=dispatch)
syncer.filter.append(sendfail_filter) syncer.filter.append(sendfail_filter)

View File

@ -49,7 +49,7 @@ logg = logging.getLogger()
config_dir = os.path.join('/usr/local/etc/cic-eth') config_dir = os.path.join('/usr/local/etc/cic-eth')
argparser = argparse.ArgumentParser() argparser = argparse.ArgumentParser()
argparser.add_argument('-p', '--provider', dest='p', type=str, help='web3 provider') argparser.add_argument('-p', '--provider', dest='p', type=str, help='rpc provider')
argparser.add_argument('-c', type=str, default=config_dir, help='config file') argparser.add_argument('-c', type=str, default=config_dir, help='config file')
argparser.add_argument('-q', type=str, default='cic-eth', help='queue name for worker tasks') argparser.add_argument('-q', type=str, default='cic-eth', help='queue name for worker tasks')
argparser.add_argument('-r', type=str, help='CIC registry address') argparser.add_argument('-r', type=str, help='CIC registry address')

View File

@ -1 +0,0 @@
from .base import Syncer

View File

@ -1,201 +0,0 @@
# standard imports
import logging
# local imports
from cic_eth.db.models.sync import BlockchainSync
from cic_eth.db.models.base import SessionBase
logg = logging.getLogger()
class SyncerBackend:
"""Interface to block and transaction sync state.
:param chain_spec: Chain spec for the chain that syncer is running for.
:type chain_spec: cic_registry.chain.ChainSpec
:param object_id: Unique id for the syncer session.
:type object_id: number
"""
def __init__(self, chain_spec, object_id):
self.db_session = None
self.db_object = None
self.chain_spec = chain_spec
self.object_id = object_id
self.connect()
self.disconnect()
def connect(self):
"""Loads the state of the syncer session with the given id.
"""
if self.db_session == None:
self.db_session = SessionBase.create_session()
q = self.db_session.query(BlockchainSync)
q = q.filter(BlockchainSync.id==self.object_id)
self.db_object = q.first()
if self.db_object == None:
self.disconnect()
raise ValueError('sync entry with id {} not found'.format(self.object_id))
return self.db_session
def disconnect(self):
"""Commits state of sync to backend.
"""
if self.db_session != None:
self.db_session.add(self.db_object)
self.db_session.commit()
self.db_session.close()
self.db_session = None
def chain(self):
"""Returns chain spec for syncer
:returns: Chain spec
:rtype chain_spec: cic_registry.chain.ChainSpec
"""
return self.chain_spec
def get(self):
"""Get the current state of the syncer cursor.
:returns: Block and block transaction height, respectively
:rtype: tuple
"""
self.connect()
pair = self.db_object.cursor()
self.disconnect()
return pair
def set(self, block_height, tx_height):
"""Update the state of the syncer cursor
:param block_height: Block height of cursor
:type block_height: number
:param tx_height: Block transaction height of cursor
:type tx_height: number
:returns: Block and block transaction height, respectively
:rtype: tuple
"""
self.connect()
pair = self.db_object.set(block_height, tx_height)
self.disconnect()
return pair
def start(self):
"""Get the initial state of the syncer cursor.
:returns: Initial block and block transaction height, respectively
:rtype: tuple
"""
self.connect()
pair = self.db_object.start()
self.disconnect()
return pair
def target(self):
"""Get the target state (upper bound of sync) of the syncer cursor.
:returns: Target block height
:rtype: number
"""
self.connect()
target = self.db_object.target()
self.disconnect()
return target
@staticmethod
def first(chain):
"""Returns the model object of the most recent syncer in backend.
:param chain: Chain spec of chain that syncer is running for.
:type chain: cic_registry.chain.ChainSpec
:returns: Last syncer object
:rtype: cic_eth.db.models.BlockchainSync
"""
return BlockchainSync.first(chain)
@staticmethod
def initial(chain, block_height):
"""Creates a new syncer session and commit its initial state to backend.
:param chain: Chain spec of chain that syncer is running for.
:type chain: cic_registry.chain.ChainSpec
:param block_height: Target block height
:type block_height: number
:returns: New syncer object
:rtype: cic_eth.db.models.BlockchainSync
"""
object_id = None
session = SessionBase.create_session()
o = BlockchainSync(chain, 0, 0, block_height)
session.add(o)
session.commit()
object_id = o.id
session.close()
return SyncerBackend(chain, object_id)
@staticmethod
def resume(chain, block_height):
"""Retrieves and returns all previously unfinished syncer sessions.
:param chain: Chain spec of chain that syncer is running for.
:type chain: cic_registry.chain.ChainSpec
:param block_height: Target block height
:type block_height: number
:returns: Syncer objects of unfinished syncs
:rtype: list of cic_eth.db.models.BlockchainSync
"""
syncers = []
session = SessionBase.create_session()
object_id = None
for object_id in BlockchainSync.get_unsynced(session=session):
logg.debug('block syncer resume added previously unsynced sync entry id {}'.format(object_id))
syncers.append(SyncerBackend(chain, object_id))
(block_resume, tx_resume) = BlockchainSync.get_last_live_height(block_height, session=session)
if block_height != block_resume:
o = BlockchainSync(chain, block_resume, tx_resume, block_height)
session.add(o)
session.commit()
object_id = o.id
syncers.append(SyncerBackend(chain, object_id))
logg.debug('block syncer resume added new sync entry from previous run id {}, start{}:{} target {}'.format(object_id, block_resume, tx_resume, block_height))
session.close()
return syncers
@staticmethod
def live(chain, block_height):
"""Creates a new open-ended syncer session starting at the given block height.
:param chain: Chain spec of chain that syncer is running for.
:type chain: cic_registry.chain.ChainSpec
:param block_height: Target block height
:type block_height: number
:returns: "Live" syncer object
:rtype: cic_eth.db.models.BlockchainSync
"""
object_id = None
session = SessionBase.create_session()
o = BlockchainSync(chain, block_height, 0, None)
session.add(o)
session.commit()
object_id = o.id
session.close()
return SyncerBackend(chain, object_id)

View File

@ -1,51 +0,0 @@
# TODO: extend blocksync model
class Syncer:
"""Base class and interface for implementing a block sync poller routine.
:param bc_cache: Retrieves block cache cursors for chain head and latest processed block.
:type bc_cache: cic_eth.sync.SyncerBackend
"""
w3 = None
running_global = True
def __init__(self, bc_cache):
self.cursor = None
self.bc_cache = bc_cache
self.filter = []
self.running = True
def chain(self):
"""Returns the string representation of the chain spec for the chain the syncer is running on.
:returns: Chain spec string
:rtype: str
"""
return self.bc_cache.chain()
def get(self):
"""Get latest unprocessed blocks.
:returns: list of block hash strings
:rtype: list
"""
raise NotImplementedError()
def process(self, w3, ref):
"""Process transactions in a single block.
:param ref: Reference of object to process
:type ref: str, 0x-hex
"""
raise NotImplementedError()
def loop(self, interval):
"""Entry point for syncer loop
:param interval: Delay in seconds until next attempt if no new blocks are found.
:type interval: int
"""
raise NotImplementedError()

View File

@ -1,4 +0,0 @@
class LoopDone(Exception):
"""Exception raised when a syncing is complete.
"""
pass

View File

@ -1,51 +0,0 @@
# standard imports
import logging
# third-party imports
import web3
# local imports
from .mined import MinedSyncer
from .base import Syncer
logg = logging.getLogger()
class HeadSyncer(MinedSyncer):
"""Implements the get method in Syncer for retrieving every new mined block.
:param bc_cache: Retrieves block cache cursors for chain head and latest processed block.
:type bc_cache: Object implementing methods from cic_eth.sync.SyncerBackend
"""
def __init__(self, bc_cache):
super(HeadSyncer, self).__init__(bc_cache)
# TODO: filter not returning all blocks, at least with ganache. kind of defeats the point, then
#self.w3_filter = rpc.w3.eth.filter({
# 'fromBlock': block_offset,
# }) #'latest')
#self.bc_cache.set(block_offset, 0)
logg.debug('initialized head syncer with offset {}'.format(bc_cache.start()))
"""Implements Syncer.get
:param w3: Web3 object
:type w3: web3.Web3
:returns: Block hash of newly mined blocks. if any
:rtype: list of str, 0x-hex
"""
def get(self, w3):
# Of course, the filter doesn't return the same block dict format as getBlock() so we'll just waste some cycles getting the hashes instead.
#hashes = []
#for block in self.w3_filter.get_new_entries():
# hashes.append(block['blockHash'])
#logg.debug('blocks {}'.format(hashes))
#return hashes
(block_number, tx_number) = self.bc_cache.get()
block_hash = []
try:
block = w3.eth.getBlock(block_number)
block_hash.append(block.hash)
except web3.exceptions.BlockNotFound:
pass
return block_hash

View File

@ -1,74 +0,0 @@
# standard imports
import logging
# third-party imports
from web3.exceptions import BlockNotFound
from .error import LoopDone
# local imports
from .mined import MinedSyncer
from .base import Syncer
from cic_eth.db.models.base import SessionBase
logg = logging.getLogger()
class HistorySyncer(MinedSyncer):
"""Implements the get method in Syncer for retrieving all blocks between last processed block before previous shutdown and block height at time of syncer start.
:param bc_cache: Retrieves block cache cursors for chain head and latest processed block.
:type bc_cache: Object implementing methods from cic_eth.sync.SyncerBackend
:param mx: Maximum number of blocks to return in one call
:type mx: int
"""
def __init__(self, bc_cache, mx=500):
super(HistorySyncer, self).__init__(bc_cache)
self.max = mx
self.target = bc_cache.target()
logg.info('History syncer target block number {}'.format(self.target))
session_offset = self.bc_cache.get()
self.block_offset = session_offset[0]
self.tx_offset = session_offset[1]
logg.info('History syncer starting at {}:{}'.format(session_offset[0], session_offset[1]))
self.filter = []
"""Implements Syncer.get
BUG: Should also raise LoopDone when block array is empty after loop.
:param w3: Web3 object
:type w3: web3.Web3
:raises LoopDone: If a block is not found.
:return: Return a batch of blocks to process
:rtype: list of str, 0x-hex
"""
def get(self, w3):
sync_db = self.bc_cache
height = self.bc_cache.get()
logg.debug('height {}'.format(height))
block_last = height[0]
tx_last = height[1]
if not self.running:
raise LoopDone((block_last, tx_last))
b = []
block_target = block_last + self.max
if block_target > self.target:
block_target = self.target
logg.debug('target {} last {} max {}'.format(block_target, block_last, self.max))
for i in range(block_last, block_target):
if i == self.target:
logg.info('reached target {}, exiting'.format(i))
self.running = False
break
bhash = w3.eth.getBlock(i).hash
b.append(bhash)
logg.debug('appending block {} {}'.format(i, bhash.hex()))
if block_last == block_target:
logg.info('aleady reached target {}, exiting'.format(self.target))
self.running = False
return b

View File

@ -1,50 +0,0 @@
class MemPoolSyncer(Syncer):
def __init__(self, bc_cache):
raise NotImplementedError('incomplete, needs web3 tx to raw transaction conversion')
super(MemPoolSyncer, self).__init__(bc_cache)
# self.w3_filter = Syncer.w3.eth.filter('pending')
# for tx in tx_cache.txs:
# self.txs.append(tx)
# logg.debug('add tx {} to mempoolsyncer'.format(tx))
#
#
# def get(self):
# return self.w3_filter.get_new_entries()
#
#
# def process(self, tx_hash):
# tx_hash_hex = tx_hash.hex()
# if tx_hash_hex in self.txs:
# logg.debug('syncer already watching {}, skipping'.format(tx_hash_hex))
# tx = self.w3.eth.getTransaction(tx_hash_hex)
# serialized_tx = rlp.encode({
# 'nonce': tx.nonce,
# 'from': getattr(tx, 'from'),
# })
# logg.info('add {} to syncer: {}'.format(tx, serialized_tx))
# otx = Otx(
# nonce=tx.nonce,
# address=getattr(tx, 'from'),
# tx_hash=tx_hash_hex,
# signed_tx=serialized_tx,
# )
# Otx.session.add(otx)
# Otx.session.commit()
#
#
# def loop(self, interval):
# while Syncer.running:
# logg.debug('loop execute')
# txs = self.get()
# logg.debug('got txs {}'.format(txs))
# for tx in txs:
# #block_number = self.process(block.hex())
# self.process(tx)
# #if block_number > self.bc_cache.head():
# # self.bc_cache.head(block_number)
# time.sleep(interval)
# logg.info("Syncer no longer set to run, gracefully exiting")

View File

@ -1,109 +0,0 @@
# standard imports
import logging
import time
# third-party imports
import celery
# local impotes
from .base import Syncer
from cic_eth.queue.tx import set_final_status
from cic_eth.eth import RpcClient
app = celery.current_app
logg = logging.getLogger()
class MinedSyncer(Syncer):
"""Base implementation of block processor for mined blocks.
Loops through all transactions,
:param bc_cache: Retrieves block cache cursors for chain head and latest processed block.
:type bc_cache: Object implementing methods from cic_eth.sync.SyncerBackend
"""
yield_delay = 0.005
def __init__(self, bc_cache):
super(MinedSyncer, self).__init__(bc_cache)
self.block_offset = 0
self.tx_offset = 0
def process(self, w3, ref):
"""Processes transactions in a single block, advancing transaction (and block) cursor accordingly.
:param w3: Web3 object
:type w3: web3.Web3
:param ref: Block reference (hash) to process
:type ref: str, 0x-hex
:returns: Block number of next unprocessed block
:rtype: number
"""
b = w3.eth.getBlock(ref)
c = w3.eth.getBlockTransactionCount(ref)
s = 0
if self.block_offset == b.number:
s = self.tx_offset
logg.debug('processing {} (blocknumber {}, count {}, offset {})'.format(ref, b.number, c, s))
for i in range(s, c):
tx = w3.eth.getTransactionByBlock(ref, i)
tx_hash_hex = tx['hash'].hex()
rcpt = w3.eth.getTransactionReceipt(tx_hash_hex)
logg.debug('{}/{} processing tx {} from block {} {}'.format(i+1, c, tx_hash_hex, b.number, ref))
ours = False
# TODO: ensure filter loop can complete on graceful shutdown
for f in self.filter:
#try:
session = self.bc_cache.connect()
task_uuid = f(w3, tx, rcpt, self.chain(), session)
#except Exception as e:
# logg.error('error in filter {} tx {}: {}'.format(f, tx_hash_hex, e))
# continue
if task_uuid != None:
logg.debug('tx {} passed to celery task {}'.format(tx_hash_hex, task_uuid))
s = celery.signature(
'set_final_status',
[tx_hash_hex, rcpt['blockNumber'], not rcpt['status']],
)
s.apply_async()
break
next_tx = i + 1
if next_tx == c:
self.bc_cache.set(b.number+1, 0)
else:
self.bc_cache.set(b.number, next_tx)
if c == 0:
logg.info('synced block {} has no transactions'.format(b.number))
#self.bc_cache.session(b.number+1, 0)
self.bc_cache.set(b.number+1, 0)
return b['number']
def loop(self, interval):
"""Loop running until the "running" property of Syncer is set to False.
Retrieves latest unprocessed blocks and processes them.
:param interval: Delay in seconds until next attempt if no new blocks are found.
:type interval: int
"""
while self.running and Syncer.running_global:
self.bc_cache.connect()
c = RpcClient(self.chain())
logg.debug('loop execute')
e = self.get(c.w3)
logg.debug('got blocks {}'.format(e))
for block in e:
block_number = self.process(c.w3, block.hex())
logg.debug('processed block {} {}'.format(block_number, block.hex()))
self.bc_cache.disconnect()
if len(e) > 0:
time.sleep(self.yield_delay)
else:
time.sleep(interval)
logg.info("Syncer no longer set to run, gracefully exiting")

View File

@ -1,75 +0,0 @@
# standard imports
import logging
import datetime
import time
# third-party imports
import celery
# local imports
from .base import Syncer
from cic_eth.eth.rpc import RpcClient
from cic_eth.db.enum import (
StatusEnum,
StatusBits,
)
from cic_eth.queue.tx import get_status_tx
logg = logging.getLogger()
celery_app = celery.current_app
class noop_cache:
def __init__(self, chain_spec):
self.chain_spec = chain_spec
def chain(self):
return self.chain_spec
class RetrySyncer(Syncer):
def __init__(self, chain_spec, stalled_grace_seconds, failed_grace_seconds=None, final_func=None):
cache = noop_cache(chain_spec)
super(RetrySyncer, self).__init__(cache)
if failed_grace_seconds == None:
failed_grace_seconds = stalled_grace_seconds
self.stalled_grace_seconds = stalled_grace_seconds
self.failed_grace_seconds = failed_grace_seconds
self.final_func = final_func
def get(self, w3):
# before = datetime.datetime.utcnow() - datetime.timedelta(seconds=self.failed_grace_seconds)
# failed_txs = get_status_tx(
# StatusEnum.SENDFAIL.value,
# before=before,
# )
before = datetime.datetime.utcnow() - datetime.timedelta(seconds=self.stalled_grace_seconds)
stalled_txs = get_status_tx(
StatusBits.IN_NETWORK.value,
not_status=StatusBits.FINAL | StatusBits.MANUAL | StatusBits.OBSOLETE,
before=before,
)
# return list(failed_txs.keys()) + list(stalled_txs.keys())
return stalled_txs
def process(self, w3, ref):
logg.debug('tx {}'.format(ref))
for f in self.filter:
f(w3, ref, None, str(self.chain()))
def loop(self, interval):
chain_str = str(self.chain())
while self.running and Syncer.running_global:
c = RpcClient(self.chain())
for tx in self.get(c.w3):
self.process(c.w3, tx)
if self.final_func != None:
self.final_func(chain_str)
time.sleep(interval)