WIP rehabilitate admin api runnables

This commit is contained in:
nolash 2021-03-28 18:32:37 +02:00
parent f35bbc84e4
commit 1322ebc8b6
Signed by untrusted user who does not match committer: lash
GPG Key ID: 21D2E7BB88C2A746
13 changed files with 80 additions and 54 deletions

View File

@ -16,7 +16,11 @@ from chainlib.eth.tx import (
receipt,
unpack,
)
from hexathon import strip_0x
from chainlib.hash import keccak256_hex_to_hex
from hexathon import (
strip_0x,
add_0x,
)
from chainlib.eth.gas import balance
# local imports
@ -207,12 +211,12 @@ class AdminApi:
blocking_nonce = tx['nonce']
nonce_otx = tx['nonce']
#nonce_cache = Nonce.get(address)
nonce_w3 = self.w3.eth.getTransactionCount(address, 'pending')
nonce_cache = Nonce.get(address)
#nonce_w3 = self.w3.eth.getTransactionCount(address, 'pending')
return {
'nonce': {
'network': nonce_w3,
'network': nonce_cache,
'queue': nonce_otx,
#'cache': nonce_cache,
'blocking': blocking_nonce,
@ -334,7 +338,8 @@ class AdminApi:
ValueError('Specify only one of hash or raw tx')
if tx_raw != None:
tx_hash = self.w3.keccak(hexstr=tx_raw).hex()
tx_hash = add_0x(keccak256_hex_to_hex(tx_raw))
#tx_hash = self.w3.keccak(hexstr=tx_raw).hex()
s = celery.signature(
'cic_eth.queue.tx.get_tx_cache',
@ -347,19 +352,21 @@ class AdminApi:
source_token = None
if tx['source_token'] != ZERO_ADDRESS:
try:
source_token = CICRegistry.get_address(chain_spec, tx['source_token']).contract
source_token = registry.by_address(tx['source_token'])
#source_token = CICRegistry.get_address(chain_spec, tx['source_token']).contract
except UnknownContractError:
source_token_contract = self.w3.eth.contract(abi=CICRegistry.abi('ERC20'), address=tx['source_token'])
source_token = CICRegistry.add_token(chain_spec, source_token_contract)
#source_token_contract = self.w3.eth.contract(abi=CICRegistry.abi('ERC20'), address=tx['source_token'])
#source_token = CICRegistry.add_token(chain_spec, source_token_contract)
logg.warning('unknown source token contract {}'.format(tx['source_token']))
destination_token = None
if tx['source_token'] != ZERO_ADDRESS:
try:
destination_token = CICRegistry.get_address(chain_spec, tx['destination_token'])
#destination_token = CICRegistry.get_address(chain_spec, tx['destination_token'])
destination_token = registry.by_address(tx['destination_token'])
except UnknownContractError:
destination_token_contract = self.w3.eth.contract(abi=CICRegistry.abi('ERC20'), address=tx['source_token'])
destination_token = CICRegistry.add_token(chain_spec, destination_token_contract)
#destination_token_contract = self.w3.eth.contract(abi=CICRegistry.abi('ERC20'), address=tx['source_token'])
#destination_token = CICRegistry.add_token(chain_spec, destination_token_contract)
logg.warning('unknown destination token contract {}'.format(tx['destination_token']))
tx['sender_description'] = 'Custodial account'
@ -371,7 +378,7 @@ class AdminApi:
try:
#sender_contract = CICRegistry.get_address(chain_spec, tx['sender'])
sender_contract = registry.by_address(tx['sender'], sender_address=self.call_address)
tx['sender_description'] = 'Contract {}'.format(sender_contract.identifier())
tx['sender_description'] = 'Contract at {}'.format(tx['sender']) #sender_contract)
except UnknownContractError:
tx['sender_description'] = 'Unknown contract'
except KeyError as e:
@ -407,8 +414,9 @@ class AdminApi:
r = self.rpc.do(o)
if len(strip_0x(r, allow_empty=True)) > 0:
try:
recipient_contract = CICRegistry.get_address(chain_spec, tx['recipient'])
tx['recipient_description'] = 'Contract {}'.format(recipient_contract.identifier())
#recipient_contract = CICRegistry.by_address(tx['recipient'])
recipient_contract = registry.by_address(tx['recipient'])
tx['recipient_description'] = 'Contract at {}'.format(tx['recipient']) #recipient_contract)
except UnknownContractError as e:
tx['recipient_description'] = 'Unknown contract'
except KeyError as e:

View File

@ -137,15 +137,15 @@ class NonceReservation(SessionBase):
q = q.filter(NonceReservation.address_hex==address)
o = q.first()
nonce = None
r = None
if o != None:
nonce = o.nonce
r = (o.key, o.nonce)
session.flush()
SessionBase.release_session(session)
return nonce
return r
@staticmethod
@ -153,31 +153,33 @@ class NonceReservation(SessionBase):
session = SessionBase.bind_session(session)
nonce = NonceReservation.peek(address, key, session=session)
q = session.query(NonceReservation)
q = q.filter(NonceReservation.address_hex==address)
q = q.filter(NonceReservation.key==key)
o = q.first()
o = NonceReservation.peek(address, key, session=session)
if o == None:
raise IntegrityError('nonce {} for key {} address {}'.format(nonce, key, address))
SessionBase.release_session(session)
raise IntegrityError('"release" called on key {} address {} which does not exists'.format(key, address))
q = session.query(NonceReservation)
q = q.filter(NonceReservation.key==key)
q = q.filter(NonceReservation.address_hex==address)
o = q.first()
r = (o.key, o.nonce)
session.delete(o)
session.flush()
SessionBase.release_session(session)
return nonce
return r
@staticmethod
def next(address, key, session=None):
session = SessionBase.bind_session(session)
if NonceReservation.peek(address, key, session) != None:
raise IntegrityError('nonce for key {} address {}'.format(key, address))
o = NonceReservation.peek(address, key, session)
if o != None:
raise IntegrityError('"next" called on nonce for key {} address {} during active key {}'.format(key, address, o[0]))
nonce = Nonce.next(address, session=session)
@ -186,7 +188,8 @@ class NonceReservation(SessionBase):
o.key = key
o.address_hex = address
session.add(o)
r = (key, nonce)
SessionBase.release_session(session)
return nonce
return r

View File

@ -107,7 +107,7 @@ def register(self, account_address, chain_spec_dict, writer_address=None):
if writer_address == ZERO_ADDRESS:
session.close()
raise RoleMissingError(account_address)
raise RoleMissingError('writer address for regsistering {}'.format(account_address))
logg.debug('adding account address {} to index; writer {}'.format(account_address, writer_address))
queue = self.request.delivery_info.get('routing_key')
@ -116,6 +116,9 @@ def register(self, account_address, chain_spec_dict, writer_address=None):
rpc = RPCConnection.connect(chain_spec, 'default')
registry = CICRegistry(chain_spec, rpc)
call_address = AccountRole.get_address('DEFAULT', session=session)
if writer_address == ZERO_ADDRESS:
session.close()
raise RoleMissingError('call address for resgistering {}'.format(account_address))
account_registry_address = registry.by_name('AccountRegistry', sender_address=call_address)
# Generate and sign transaction

View File

@ -28,4 +28,5 @@ class CustodialTaskNonceOracle():
:returns: Nonce
:rtype: number
"""
return NonceReservation.release(self.address, self.uuid, session=self.session)
r = NonceReservation.release(self.address, self.uuid, session=self.session)
return r[1]

View File

@ -445,6 +445,8 @@ def resend_with_higher_gas(self, txold_hash_hex, chain_str, gas=None, default_fa
@celery_app.task(bind=True, base=CriticalSQLAlchemyTask)
def reserve_nonce(self, chained_input, signer_address=None):
self.log_banner()
session = SessionBase.create_session()
address = None
@ -464,7 +466,8 @@ def reserve_nonce(self, chained_input, signer_address=None):
raise ValueError('invalid result when resolving address for nonce {}'.format(address))
root_id = self.request.root_id
nonce = NonceReservation.next(address, root_id)
r = NonceReservation.next(address, root_id)
logg.debug('nonce {} reserved for address {} task {}'.format(r[1], address, r[0]))
session.commit()

View File

@ -596,7 +596,7 @@ def get_nonce_tx(nonce, sender, chain_id):
txs = {}
for r in q.all():
tx_signed_bytes = bytes.fromhex(r.signed_tx[2:])
tx = unpack_signed_raw_tx(tx_signed_bytes, chain_id)
tx = unpack(tx_signed_bytes, chain_id)
if sender == None or tx['from'] == sender:
txs[r.tx_hash] = r.signed_tx
@ -641,7 +641,7 @@ def get_paused_txs(status=None, sender=None, chain_id=0, session=None):
for r in q.all():
tx_signed_bytes = bytes.fromhex(r.signed_tx[2:])
tx = unpack_signed_raw_tx(tx_signed_bytes, chain_id)
tx = unpack(tx_signed_bytes, chain_id)
if sender == None or tx['from'] == sender:
#gas += tx['gas'] * tx['gasPrice']
txs[r.tx_hash] = r.signed_tx
@ -747,7 +747,7 @@ def get_upcoming_tx(status=StatusEnum.READYSEND, recipient=None, before=None, ch
continue
tx_signed_bytes = bytes.fromhex(o.signed_tx[2:])
tx = unpack_signed_raw_tx(tx_signed_bytes, chain_id)
tx = unpack(tx_signed_bytes, chain_id)
txs[o.tx_hash] = o.signed_tx
q = session.query(TxCache)

View File

@ -14,12 +14,12 @@ import celery
from cic_eth_registry import CICRegistry
from chainlib.chain import ChainSpec
from chainlib.eth.tx import unpack
from chainlib.connection import RPCConnection
from chainsyncer.error import SyncDone
from hexathon import strip_0x
# local imports
import cic_eth
from cic_eth.eth import RpcClient
from cic_eth.db import SessionBase
from cic_eth.db.enum import StatusEnum
from cic_eth.db.enum import StatusBits
@ -40,17 +40,14 @@ from cic_eth.error import (
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
logging.getLogger('websockets.protocol').setLevel(logging.CRITICAL)
logging.getLogger('web3.RequestManager').setLevel(logging.CRITICAL)
logging.getLogger('web3.providers.WebsocketProvider').setLevel(logging.CRITICAL)
logging.getLogger('web3.providers.HTTPProvider').setLevel(logging.CRITICAL)
config_dir = os.path.join('/usr/local/etc/cic-eth')
argparser = argparse.ArgumentParser(description='daemon that monitors transactions in new blocks')
argparser.add_argument('-p', '--provider', dest='p', type=str, help='rpc provider')
argparser.add_argument('-p', '--provider', default='http://localhost:8545', dest='p', type=str, help='rpc provider')
argparser.add_argument('-c', type=str, default=config_dir, help='config root to use')
argparser.add_argument('-i', '--chain-spec', dest='i', type=str, help='chain spec')
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
argparser.add_argument('-q', type=str, default='cic-eth', help='celery queue to submit transaction tasks to')
argparser.add_argument('-v', help='be verbose', action='store_true')
@ -67,6 +64,11 @@ os.makedirs(config_dir, 0o777, True)
config = confini.Config(config_dir, args.env_prefix)
config.process()
# override args
args_override = {
'CIC_CHAIN_SPEC': getattr(args, 'i'),
'ETH_PROVIDER': getattr(args, 'p'),
}
config.dict_override(args_override, 'cli flag')
config.censor('PASSWORD', 'DATABASE')
config.censor('PASSWORD', 'SSL')
logg.debug('config loaded from {}:\n{}'.format(config_dir, config))
@ -80,11 +82,10 @@ SessionBase.connect(dsn, debug=config.true('DATABASE_DEBUG'))
chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
RPCConnection.registry_location(args.p, chain_spec, tag='default')
RPCConnection.register_location(config.get('ETH_PROVIDER'), chain_spec, tag='default')
run = True
class DispatchSyncer:
yield_delay = 0.005
@ -104,7 +105,6 @@ class DispatchSyncer:
chain_str = str(self.chain_spec)
for k in txs.keys():
tx_raw = txs[k]
#tx = unpack_signed_raw_tx_hex(tx_raw, self.chain_spec.chain_id())
tx_raw_bytes = bytes.fromhex(strip_0x(tx_raw))
tx = unpack(tx_raw_bytes, self.chain_spec.chain_id())
@ -118,7 +118,7 @@ class DispatchSyncer:
'cic_eth.admin.ctrl.check_lock',
[
[tx_raw],
chain_str,
self.chain_spec.asdict(),
LockEnum.QUEUE,
tx['from'],
],
@ -127,7 +127,7 @@ class DispatchSyncer:
s_send = celery.signature(
'cic_eth.eth.tx.send',
[
chain_str,
self.chain_spec.asdict(),
],
queue=queue,
)
@ -153,8 +153,9 @@ class DispatchSyncer:
def main():
syncer = DispatchSyncer(chain_spec)
conn = RPCConnection.connect(chain_spec, 'default')
try:
syncer.loop(c.w3, float(config.get('DISPATCHER_LOOP_INTERVAL')))
syncer.loop(conn, float(config.get('DISPATCHER_LOOP_INTERVAL')))
except SyncDone as e:
sys.stderr.write("dispatcher done at block {}\n".format(e))

View File

@ -61,12 +61,12 @@ config = confini.Config(args.c, args.env_prefix)
config.process()
# override args
args_override = {
'ETH_ABI_DIR': getattr(args, 'abi_dir'),
'CIC_CHAIN_SPEC': getattr(args, 'i'),
'CIC_REGISTRY_ADDRESS': getattr(args, 'r'),
'ETH_PROVIDER': getattr(args, 'p'),
'TASKS_TRACE_QUEUE_STATUS': getattr(args, 'trace_queue_status'),
}
config.add(args.q, '_CELERY_QUEUE', True)
config.dict_override(args_override, 'cli flag')
config.censor('PASSWORD', 'DATABASE')
config.censor('PASSWORD', 'SSL')

View File

@ -82,7 +82,7 @@ chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
rpc = EthHTTPConnection(args.p)
registry_address = args.r
registry_address = config.get('CIC_REGISTRY_ADDRESS')
admin_api = AdminApi(rpc)
@ -158,10 +158,10 @@ def main():
renderer = render_tx
if len(config.get('_QUERY')) > 66:
registry = connect_registry(registry_address, chain_spec, rpc)
txs = [admin_api.tx(chain_spec, tx_raw=config.get('_QUERY'))]
txs = [admin_api.tx(chain_spec, tx_raw=config.get('_QUERY'), registry=registry)]
elif len(config.get('_QUERY')) > 42:
registry = connect_registry(registry_address, chain_spec, rpc)
txs = [admin_api.tx(chain_spec, tx_hash=config.get('_QUERY'))]
txs = [admin_api.tx(chain_spec, tx_hash=config.get('_QUERY'), registry=registry)]
elif len(config.get('_QUERY')) == 42:
registry = connect_registry(registry_address, chain_spec, rpc)
txs = admin_api.account(chain_spec, config.get('_QUERY'), include_recipient=False)

View File

@ -33,6 +33,11 @@ class BaseTask(celery.Task):
def create_session(self):
return BaseTask.session_func()
def log_banner(self):
logg.debug('task {} root uuid {}'.format(self.__class__.__name__, self.request.root_id))
return
class CriticalTask(BaseTask):
retry_jitter = True

View File

@ -1,6 +1,6 @@
cic-base~=0.1.2a41
cic-base~=0.1.2a42
celery==4.4.7
crypto-dev-signer~=0.4.14a14
crypto-dev-signer~=0.4.14a16
confini~=0.3.6rc3
cic-eth-registry~=0.5.4a7
#cic-bancor~=0.0.6

View File

@ -57,9 +57,9 @@ WORKDIR /home/grassroots
USER grassroots
ARG pip_extra_index_url=https://pip.grassrootseconomics.net:8433
ARG cic_base_version=0.1.2a41
ARG cic_base_version=0.1.2a43
ARG cic_eth_version=0.10.1a5+build.47c9f168
ARG sarafu_faucet_version=0.1.2a11
ARG sarafu_faucet_version=0.0.2a11
ARG cic_contracts_version=0.0.2a2
RUN pip install --user --extra-index-url $pip_extra_index_url cic-base[full_graph]==$cic_base_version \
cic-eth==$cic_eth_version \

View File

@ -32,6 +32,7 @@ set -a
# get required addresses from registries
DEV_TOKEN_INDEX_ADDRESS=`eth-contract-registry-list -i $CIC_CHAIN_SPEC -p $ETH_PROVIDER -r $CIC_REGISTRY_ADDRESS -f brief TokenRegistry`
DEV_ACCOUNTS_INDEX_ADDRESS=`eth-contract-registry-list -i $CIC_CHAIN_SPEC -p $ETH_PROVIDER -r $CIC_REGISTRY_ADDRESS -f brief AccountRegistry`
DEV_RESERVE_ADDRESS=`eth-token-index-list -i $CIC_CHAIN_SPEC -p $ETH_PROVIDER -a $DEV_TOKEN_INDEX_ADDRESS -f brief SRF`
>&2 echo "create account for gas gifter"
@ -59,6 +60,7 @@ cic-eth-tag -i $CIC_CHAIN_SPEC TRANSFER_AUTHORIZATION_OWNER $DEV_ETH_ACCOUNT_TRA
DEV_ETH_ACCOUNT_ACCOUNT_REGISTRY_WRITER=`cic-eth-create $debug --redis-host-callback=$REDIS_HOST --redis-port-callback=$REDIS_PORT --no-register`
echo DEV_ETH_ACCOUNT_ACCOUNT_REGISTRY_WRITER=$DEV_ETH_ACCOUNT_ACCOUNT_REGISTRY_WRITER >> $env_out_file
cic-eth-tag -i $CIC_CHAIN_SPEC ACCOUNT_REGISTRY_WRITER $DEV_ETH_ACCOUNT_ACCOUNT_REGISTRY_WRITER
eth-accounts-index-writer -y $keystore_file -i $CIC_CHAIN_SPEC -p $ETH_PROVIDER -a $DEV_ACCOUNTS_INDEX_ADDRESS $debug $DEV_ETH_ACCOUNT_ACCOUNT_REGISTRY_WRITER
# Transfer gas to custodial gas provider adddress
>&2 echo gift gas to gas gifter