Use chainlib directly for signing
This commit is contained in:
parent
94c8fd6cd6
commit
10835979bc
@ -126,5 +126,8 @@ def check_lock(chained_input, chain_str, lock_flags, address=None):
|
||||
r |= Lock.check(chain_str, lock_flags, address=address, session=session)
|
||||
if r > 0:
|
||||
logg.debug('lock check {} has match {} for {}'.format(lock_flags, r, address))
|
||||
session.close()
|
||||
raise LockedError(r)
|
||||
session.flush()
|
||||
session.close()
|
||||
return chained_input
|
||||
|
@ -24,6 +24,7 @@ def upgrade():
|
||||
sa.Column('blockchain', sa.String),
|
||||
sa.Column("flags", sa.BIGINT(), nullable=False, default=0),
|
||||
sa.Column("date_created", sa.DateTime, nullable=False),
|
||||
sa.Column("otx_id", sa.Integer, nullable=True),
|
||||
)
|
||||
op.create_index('idx_chain_address', 'lock', ['blockchain', 'address'], unique=True)
|
||||
|
||||
|
@ -116,6 +116,6 @@ class SessionBase(Model):
|
||||
def release_session(session=None):
|
||||
session_key = str(id(session))
|
||||
if SessionBase.localsessions.get(session_key) != None:
|
||||
logg.debug('destroying session {}'.format(session_key))
|
||||
logg.debug('commit and destroy session {}'.format(session_key))
|
||||
session.commit()
|
||||
session.close()
|
||||
|
@ -92,7 +92,7 @@ class Nonce(SessionBase):
|
||||
"""
|
||||
session = SessionBase.bind_session(session)
|
||||
|
||||
session.begin_nested()
|
||||
#session.begin_nested()
|
||||
#conn = Nonce.engine.connect()
|
||||
#if Nonce.transactional:
|
||||
# conn.execute('BEGIN')
|
||||
@ -112,7 +112,7 @@ class Nonce(SessionBase):
|
||||
#conn.execute('COMMIT')
|
||||
# logg.debug('unlocking nonce table for address {}'.format(address))
|
||||
#conn.close()
|
||||
session.commit()
|
||||
#session.commit()
|
||||
|
||||
SessionBase.release_session(session)
|
||||
return nonce
|
||||
|
@ -95,19 +95,16 @@ class Otx(SessionBase):
|
||||
:type block: number
|
||||
:raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist.
|
||||
"""
|
||||
localsession = session
|
||||
if localsession == None:
|
||||
localsession = SessionBase.create_session()
|
||||
session = SessionBase.bind_session(session)
|
||||
|
||||
if self.block != None:
|
||||
SessionBase.release_session(session)
|
||||
raise TxStateChangeError('Attempted set block {} when block was already {}'.format(block, self.block))
|
||||
self.block = block
|
||||
localsession.add(self)
|
||||
localsession.flush()
|
||||
session.add(self)
|
||||
session.flush()
|
||||
|
||||
if session==None:
|
||||
localsession.commit()
|
||||
localsession.close()
|
||||
SessionBase.release_session(session)
|
||||
|
||||
|
||||
def waitforgas(self, session=None):
|
||||
@ -123,8 +120,10 @@ class Otx(SessionBase):
|
||||
session = SessionBase.bind_session(session)
|
||||
|
||||
if self.status & StatusBits.FINAL:
|
||||
SessionBase.release_session(session)
|
||||
raise TxStateChangeError('GAS_ISSUES cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
|
||||
if self.status & StatusBits.IN_NETWORK:
|
||||
SessionBase.release_session(session)
|
||||
raise TxStateChangeError('GAS_ISSUES cannot be set on an entry with IN_NETWORK state set ({})'.format(status_str(self.status)))
|
||||
|
||||
self.__set_status(StatusBits.GAS_ISSUES, session)
|
||||
@ -147,8 +146,10 @@ class Otx(SessionBase):
|
||||
session = SessionBase.bind_session(session)
|
||||
|
||||
if self.status & StatusBits.FINAL:
|
||||
SessionBase.release_session(session)
|
||||
raise TxStateChangeError('FUBAR cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
|
||||
if is_error_status(self.status):
|
||||
SessionBase.release_session(session)
|
||||
raise TxStateChangeError('FUBAR cannot be set on an entry with an error state already set ({})'.format(status_str(self.status)))
|
||||
|
||||
self.__set_status(StatusBits.UNKNOWN_ERROR | StatusBits.FINAL, session)
|
||||
@ -170,10 +171,13 @@ class Otx(SessionBase):
|
||||
session = SessionBase.bind_session(session)
|
||||
|
||||
if self.status & StatusBits.FINAL:
|
||||
SessionBase.release_session(session)
|
||||
raise TxStateChangeError('REJECTED cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
|
||||
if self.status & StatusBits.IN_NETWORK:
|
||||
SessionBase.release_session(session)
|
||||
raise TxStateChangeError('REJECTED cannot be set on an entry already IN_NETWORK ({})'.format(status_str(self.status)))
|
||||
if is_error_status(self.status):
|
||||
SessionBase.release_session(session)
|
||||
raise TxStateChangeError('REJECTED cannot be set on an entry with an error state already set ({})'.format(status_str(self.status)))
|
||||
|
||||
self.__set_status(StatusBits.NODE_ERROR | StatusBits.FINAL, session)
|
||||
@ -193,10 +197,13 @@ class Otx(SessionBase):
|
||||
session = SessionBase.bind_session(session)
|
||||
|
||||
if self.status & StatusBits.FINAL:
|
||||
SessionBase.release_session(session)
|
||||
raise TxStateChangeError('OVERRIDDEN/OBSOLETED cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
|
||||
if self.status & StatusBits.IN_NETWORK:
|
||||
SessionBase.release_session(session)
|
||||
raise TxStateChangeError('OVERRIDDEN/OBSOLETED cannot be set on an entry already IN_NETWORK ({})'.format(status_str(self.status)))
|
||||
if self.status & StatusBits.OBSOLETE:
|
||||
SessionBase.release_session(session)
|
||||
raise TxStateChangeError('OVERRIDDEN/OBSOLETED cannot be set on an entry already OBSOLETE ({})'.format(status_str(self.status)))
|
||||
|
||||
self.__set_status(StatusBits.OBSOLETE, session)
|
||||
@ -216,6 +223,7 @@ class Otx(SessionBase):
|
||||
|
||||
|
||||
if self.status & StatusBits.FINAL:
|
||||
SessionBase.release_session(session)
|
||||
raise TxStateChangeError('OVERRIDDEN/OBSOLETED cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
|
||||
|
||||
self.__set_status(StatusBits.MANUAL, session)
|
||||
@ -238,8 +246,10 @@ class Otx(SessionBase):
|
||||
session = SessionBase.bind_session(session)
|
||||
|
||||
if self.status & StatusBits.FINAL:
|
||||
SessionBase.release_session(session)
|
||||
raise TxStateChangeError('RETRY cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
|
||||
if not is_error_status(self.status) and not StatusBits.IN_NETWORK & self.status > 0:
|
||||
SessionBase.release_session(session)
|
||||
raise TxStateChangeError('RETRY cannot be set on an entry that has no error ({})'.format(status_str(self.status)))
|
||||
|
||||
self.__set_status(StatusBits.QUEUED, session)
|
||||
@ -264,8 +274,10 @@ class Otx(SessionBase):
|
||||
session = SessionBase.bind_session(session)
|
||||
|
||||
if self.status & StatusBits.FINAL:
|
||||
SessionBase.release_session(session)
|
||||
raise TxStateChangeError('READYSEND cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
|
||||
if is_error_status(self.status):
|
||||
SessionBase.release_session(session)
|
||||
raise TxStateChangeError('READYSEND cannot be set on an errored state ({})'.format(status_str(self.status)))
|
||||
|
||||
self.__set_status(StatusBits.QUEUED, session)
|
||||
@ -290,6 +302,7 @@ class Otx(SessionBase):
|
||||
session = SessionBase.bind_session(session)
|
||||
|
||||
if self.status & StatusBits.FINAL:
|
||||
SessionBase.release_session(session)
|
||||
raise TxStateChangeError('SENT cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
|
||||
|
||||
self.__set_status(StatusBits.IN_NETWORK, session)
|
||||
@ -314,8 +327,10 @@ class Otx(SessionBase):
|
||||
session = SessionBase.bind_session(session)
|
||||
|
||||
if self.status & StatusBits.FINAL:
|
||||
SessionBase.release_session(session)
|
||||
raise TxStateChangeError('SENDFAIL cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
|
||||
if self.status & StatusBits.IN_NETWORK:
|
||||
SessionBase.release_session(session)
|
||||
raise TxStateChangeError('SENDFAIL cannot be set on an entry with IN_NETWORK state set ({})'.format(status_str(self.status)))
|
||||
|
||||
self.__set_status(StatusBits.LOCAL_ERROR | StatusBits.DEFERRED, session)
|
||||
@ -340,8 +355,10 @@ class Otx(SessionBase):
|
||||
session = SessionBase.bind_session(session)
|
||||
|
||||
if self.status & StatusBits.FINAL:
|
||||
SessionBase.release_session(session)
|
||||
raise TxStateChangeError('SENDFAIL cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
|
||||
if self.status & StatusBits.IN_NETWORK:
|
||||
SessionBase.release_session(session)
|
||||
raise TxStateChangeError('SENDFAIL cannot be set on an entry with IN_NETWORK state set ({})'.format(status_str(self.status)))
|
||||
|
||||
self.__reset_status(StatusBits.QUEUED, session)
|
||||
@ -368,8 +385,10 @@ class Otx(SessionBase):
|
||||
session = SessionBase.bind_session(session)
|
||||
|
||||
if self.status & StatusBits.FINAL:
|
||||
SessionBase.release_session(session)
|
||||
raise TxStateChangeError('REVERTED cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
|
||||
if not self.status & StatusBits.IN_NETWORK:
|
||||
SessionBase.release_session(session)
|
||||
raise TxStateChangeError('REVERTED cannot be set on an entry without IN_NETWORK state set ({})'.format(status_str(self.status)))
|
||||
|
||||
if block != None:
|
||||
@ -397,10 +416,12 @@ class Otx(SessionBase):
|
||||
session = SessionBase.bind_session(session)
|
||||
|
||||
if self.status & StatusBits.FINAL:
|
||||
SessionBase.release_session(session)
|
||||
raise TxStateChangeError('CANCEL cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
|
||||
|
||||
if confirmed:
|
||||
if self.status > 0 and not self.status & StatusBits.OBSOLETE:
|
||||
SessionBase.release_session(session)
|
||||
raise TxStateChangeError('CANCEL can only be set on an entry marked OBSOLETE ({})'.format(status_str(self.status)))
|
||||
self.__set_status(StatusEnum.CANCELLED, session)
|
||||
else:
|
||||
@ -425,10 +446,13 @@ class Otx(SessionBase):
|
||||
session = SessionBase.bind_session(session)
|
||||
|
||||
if self.status & StatusBits.FINAL:
|
||||
SessionBase.release_session(session)
|
||||
raise TxStateChangeError('SUCCESS cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
|
||||
if not self.status & StatusBits.IN_NETWORK:
|
||||
SessionBase.release_session(session)
|
||||
raise TxStateChangeError('SUCCESS cannot be set on an entry without IN_NETWORK state set ({})'.format(status_str(self.status)))
|
||||
if is_error_status(self.status):
|
||||
SessionBase.release_session(session)
|
||||
raise TxStateChangeError('SUCCESS cannot be set on an entry with error state set ({})'.format(status_str(self.status)))
|
||||
|
||||
if block != None:
|
||||
@ -509,22 +533,23 @@ class Otx(SessionBase):
|
||||
session.add(l)
|
||||
|
||||
|
||||
# TODO: it is not safe to return otx here unless session has been passed in
|
||||
@staticmethod
|
||||
def add(nonce, address, tx_hash, signed_tx, session=None):
|
||||
localsession = session
|
||||
if localsession == None:
|
||||
localsession = SessionBase.create_session()
|
||||
external_session = session != None
|
||||
|
||||
session = SessionBase.bind_session(session)
|
||||
|
||||
otx = Otx(nonce, address, tx_hash, signed_tx)
|
||||
localsession.add(otx)
|
||||
localsession.flush()
|
||||
session.add(otx)
|
||||
session.flush()
|
||||
if otx.tracing:
|
||||
otx.__state_log(session=localsession)
|
||||
localsession.flush()
|
||||
otx.__state_log(session=session)
|
||||
session.flush()
|
||||
|
||||
if session==None:
|
||||
localsession.commit()
|
||||
localsession.close()
|
||||
SessionBase.release_session(session)
|
||||
|
||||
if not external_session:
|
||||
return None
|
||||
|
||||
return otx
|
||||
|
@ -8,6 +8,11 @@ from cic_registry.chain import ChainSpec
|
||||
from erc20_single_shot_faucet import Faucet
|
||||
from cic_registry import zero_address
|
||||
from hexathon import strip_0x
|
||||
from chainlib.eth.connection import RPCConnection
|
||||
from chainlib.eth.sign import (
|
||||
new_account,
|
||||
sign_message,
|
||||
)
|
||||
|
||||
# local import
|
||||
from cic_eth.registry import safe_registry
|
||||
@ -28,6 +33,7 @@ from cic_eth.error import (
|
||||
from cic_eth.task import (
|
||||
CriticalSQLAlchemyTask,
|
||||
CriticalSQLAlchemyAndSignerTask,
|
||||
BaseTask,
|
||||
)
|
||||
|
||||
#logg = logging.getLogger(__name__)
|
||||
@ -145,8 +151,8 @@ def unpack_gift(data):
|
||||
|
||||
|
||||
# TODO: Separate out nonce initialization task
|
||||
@celery_app.task(base=CriticalSQLAlchemyAndSignerTask)
|
||||
def create(password, chain_str):
|
||||
@celery_app.task(bind=True, base=CriticalSQLAlchemyAndSignerTask)
|
||||
def create(self, password, chain_str):
|
||||
"""Creates and stores a new ethereum account in the keystore.
|
||||
|
||||
The password is passed on to the wallet backend, no encryption is performed in the task worker.
|
||||
@ -159,18 +165,23 @@ def create(password, chain_str):
|
||||
:rtype: str, 0x-hex
|
||||
"""
|
||||
chain_spec = ChainSpec.from_chain_str(chain_str)
|
||||
c = RpcClient(chain_spec)
|
||||
#c = RpcClient(chain_spec)
|
||||
a = None
|
||||
try:
|
||||
a = c.w3.eth.personal.new_account(password)
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
conn = RPCConnection.connect('signer')
|
||||
o = new_account()
|
||||
a = conn.do(o)
|
||||
|
||||
#try:
|
||||
# a = c.w3.eth.personal.new_account(password)
|
||||
#except FileNotFoundError:
|
||||
# pass
|
||||
if a == None:
|
||||
raise SignerError('create account')
|
||||
logg.debug('created account {}'.format(a))
|
||||
|
||||
# Initialize nonce provider record for account
|
||||
session = SessionBase.create_session()
|
||||
#session = SessionBase.create_session()
|
||||
session = self.create_session()
|
||||
Nonce.init(a, session=session)
|
||||
session.commit()
|
||||
session.close()
|
||||
@ -193,7 +204,8 @@ def register(self, account_address, chain_str, writer_address=None):
|
||||
"""
|
||||
chain_spec = ChainSpec.from_chain_str(chain_str)
|
||||
|
||||
session = SessionBase.create_session()
|
||||
session = self.create_session()
|
||||
#session = SessionBase.create_session()
|
||||
if writer_address == None:
|
||||
writer_address = AccountRole.get_address('ACCOUNTS_INDEX_WRITER', session=session)
|
||||
|
||||
@ -211,6 +223,7 @@ def register(self, account_address, chain_str, writer_address=None):
|
||||
tx_add = txf.add(account_address, chain_spec, self.request.root_id, session=session)
|
||||
|
||||
(tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_add, chain_str, queue, 'cic_eth.eth.account.cache_account_data', session=session)
|
||||
session.commit()
|
||||
session.close()
|
||||
|
||||
gas_budget = tx_add['gas'] * tx_add['gasPrice']
|
||||
@ -248,9 +261,11 @@ def gift(self, account_address, chain_str):
|
||||
registry = safe_registry(c.w3)
|
||||
txf = AccountTxFactory(account_address, c, registry=registry)
|
||||
|
||||
session = SessionBase.create_session()
|
||||
#session = SessionBase.create_session()
|
||||
session = self.create_session()
|
||||
tx_add = txf.gift(account_address, chain_spec, self.request.root_id, session=session)
|
||||
(tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_add, chain_str, queue, 'cic_eth.eth.account.cache_gift_data', session=session)
|
||||
session.commit()
|
||||
session.close()
|
||||
|
||||
gas_budget = tx_add['gas'] * tx_add['gasPrice']
|
||||
@ -279,16 +294,17 @@ def have(self, account, chain_str):
|
||||
:returns: Account, or None if not exists
|
||||
:rtype: Varies
|
||||
"""
|
||||
c = RpcClient(account)
|
||||
o = sign_message(account, '0x2a')
|
||||
try:
|
||||
c.w3.eth.sign(account, text='2a')
|
||||
conn = RPCConnection.connect('signer')
|
||||
conn.do(o)
|
||||
return account
|
||||
except Exception as e:
|
||||
logg.debug('cannot sign with {}: {}'.format(account, e))
|
||||
return None
|
||||
|
||||
|
||||
@celery_app.task(bind=True)
|
||||
@celery_app.task(bind=True, base=BaseTask)
|
||||
def role(self, account, chain_str):
|
||||
"""Return account role for address
|
||||
|
||||
@ -299,11 +315,15 @@ def role(self, account, chain_str):
|
||||
:returns: Account, or None if not exists
|
||||
:rtype: Varies
|
||||
"""
|
||||
return AccountRole.role_for(account)
|
||||
session = self.create_session()
|
||||
role_tag = AccountRole.role_for(account, session=session)
|
||||
session.close()
|
||||
return role_tag
|
||||
|
||||
|
||||
@celery_app.task(base=CriticalSQLAlchemyTask)
|
||||
@celery_app.task(bind=True, base=CriticalSQLAlchemyTask)
|
||||
def cache_gift_data(
|
||||
self,
|
||||
tx_hash_hex,
|
||||
tx_signed_raw_hex,
|
||||
chain_str,
|
||||
@ -326,7 +346,8 @@ def cache_gift_data(
|
||||
tx = unpack_signed_raw_tx(tx_signed_raw_bytes, chain_spec.chain_id())
|
||||
tx_data = unpack_gift(tx['data'])
|
||||
|
||||
session = SessionBase.create_session()
|
||||
#session = SessionBase.create_session()
|
||||
session = self.create_session()
|
||||
|
||||
tx_cache = TxCache(
|
||||
tx_hash_hex,
|
||||
@ -346,8 +367,9 @@ def cache_gift_data(
|
||||
return (tx_hash_hex, cache_id)
|
||||
|
||||
|
||||
@celery_app.task(base=CriticalSQLAlchemyTask)
|
||||
@celery_app.task(bind=True, base=CriticalSQLAlchemyTask)
|
||||
def cache_account_data(
|
||||
self,
|
||||
tx_hash_hex,
|
||||
tx_signed_raw_hex,
|
||||
chain_str,
|
||||
|
@ -1,9 +1,11 @@
|
||||
# standard imports
|
||||
import logging
|
||||
|
||||
# third-party imports
|
||||
# external imports
|
||||
import celery
|
||||
from cic_registry.chain import ChainSpec
|
||||
from chainlib.eth.sign import sign_transaction
|
||||
from chainlib.eth.connection import RPCConnection
|
||||
|
||||
# local imports
|
||||
from cic_eth.eth import RpcClient
|
||||
@ -26,16 +28,21 @@ def sign_tx(tx, chain_str):
|
||||
:rtype: tuple
|
||||
"""
|
||||
chain_spec = ChainSpec.from_chain_str(chain_str)
|
||||
c = RpcClient(chain_spec)
|
||||
#c = RpcClient(chain_spec)
|
||||
tx_transfer_signed = None
|
||||
conn = RPCConnection.connect('signer')
|
||||
try:
|
||||
tx_transfer_signed = c.w3.eth.sign_transaction(tx)
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
if tx_transfer_signed == None:
|
||||
raise SignerError('sign tx')
|
||||
o = sign_transaction(tx)
|
||||
tx_transfer_signed = conn.do(o)
|
||||
#try:
|
||||
# tx_transfer_signed = c.w3.eth.sign_transaction(tx)
|
||||
except Exception as e:
|
||||
raise SignerError('sign tx {}: {}'.format(tx, e))
|
||||
logg.debug('tx_transfer_signed {}'.format(tx_transfer_signed))
|
||||
tx_hash = c.w3.keccak(hexstr=tx_transfer_signed['raw'])
|
||||
h = sha3.keccak_256()
|
||||
h.update(tx_transfer_signed['raw'])
|
||||
g = h.digest()
|
||||
#tx_hash = c.w3.keccak(hexstr=tx_transfer_signed['raw'])
|
||||
tx_hash_hex = tx_hash.hex()
|
||||
return (tx_hash_hex, tx_transfer_signed['raw'],)
|
||||
|
||||
|
@ -624,6 +624,8 @@ def reserve_nonce(self, chained_input, signer=None):
|
||||
root_id = self.request.root_id
|
||||
nonce = NonceReservation.next(address, root_id)
|
||||
|
||||
session.commit()
|
||||
|
||||
session.close()
|
||||
|
||||
return chained_input
|
||||
|
@ -77,7 +77,17 @@ def create(nonce, holder_address, tx_hash, signed_tx, chain_str, obsolete_predec
|
||||
|
||||
for otx in q.all():
|
||||
logg.info('otx {} obsoleted by {}'.format(otx.tx_hash, tx_hash))
|
||||
try:
|
||||
otx.cancel(confirmed=False, session=session)
|
||||
except TxStateChangeError as e:
|
||||
logg.exception('obsolete fail: {}'.format(e))
|
||||
session.close()
|
||||
raise(e)
|
||||
except Exception as e:
|
||||
logg.exception('obsolete UNEXPECTED fail: {}'.format(e))
|
||||
session.close()
|
||||
raise(e)
|
||||
|
||||
|
||||
session.commit()
|
||||
SessionBase.release_session(session)
|
||||
@ -107,10 +117,20 @@ def set_sent_status(tx_hash, fail=False):
|
||||
session.close()
|
||||
return False
|
||||
|
||||
try:
|
||||
if fail:
|
||||
o.sendfail(session=session)
|
||||
else:
|
||||
o.sent(session=session)
|
||||
except TxStateChangeError as e:
|
||||
logg.exception('set sent fail: {}'.format(e))
|
||||
session.close()
|
||||
raise(e)
|
||||
except Exception as e:
|
||||
logg.exception('set sent UNEXPECED fail: {}'.format(e))
|
||||
session.close()
|
||||
raise(e)
|
||||
|
||||
|
||||
session.commit()
|
||||
session.close()
|
||||
@ -154,10 +174,20 @@ def set_final_status(tx_hash, block=None, fail=False):
|
||||
q = q.filter(Otx.tx_hash==tx_hash)
|
||||
o = q.first()
|
||||
|
||||
try:
|
||||
if fail:
|
||||
o.minefail(block, session=session)
|
||||
else:
|
||||
o.success(block, session=session)
|
||||
session.commit()
|
||||
except TxStateChangeError as e:
|
||||
logg.exception('set final fail: {}'.format(e))
|
||||
session.close()
|
||||
raise(e)
|
||||
except Exception as e:
|
||||
logg.exception('set final UNEXPECED fail: {}'.format(e))
|
||||
session.close()
|
||||
raise(e)
|
||||
|
||||
q = session.query(Otx)
|
||||
q = q.join(TxCache)
|
||||
@ -166,8 +196,16 @@ def set_final_status(tx_hash, block=None, fail=False):
|
||||
q = q.filter(Otx.tx_hash!=tx_hash)
|
||||
|
||||
for otwo in q.all():
|
||||
try:
|
||||
otwo.cancel(True, session=session)
|
||||
|
||||
except TxStateChangeError as e:
|
||||
logg.exception('cancel non-final fail: {}'.format(e))
|
||||
session.close()
|
||||
raise(e)
|
||||
except Exception as e:
|
||||
logg.exception('cancel non-final UNEXPECTED fail: {}'.format(e))
|
||||
session.close()
|
||||
raise(e)
|
||||
session.commit()
|
||||
session.close()
|
||||
|
||||
@ -195,12 +233,16 @@ def set_cancel(tx_hash, manual=False):
|
||||
|
||||
session.flush()
|
||||
|
||||
try:
|
||||
if manual:
|
||||
o.override(session=session)
|
||||
else:
|
||||
o.cancel(session=session)
|
||||
|
||||
session.commit()
|
||||
except TxStateChangeError as e:
|
||||
logg.exception('set cancel fail: {}'.format(e))
|
||||
except Exception as e:
|
||||
logg.exception('set cancel UNEXPECTED fail: {}'.format(e))
|
||||
session.close()
|
||||
|
||||
return tx_hash
|
||||
|
@ -31,6 +31,7 @@ class TxFilter(SyncFilter):
|
||||
logg.debug('tx {} not found locally, skipping'.format(tx_hash_hex))
|
||||
return None
|
||||
logg.info('tx filter match on {}'.format(otx.tx_hash))
|
||||
db_session.flush()
|
||||
SessionBase.release_session(db_session)
|
||||
s = celery.signature(
|
||||
'cic_eth.queue.tx.set_final_status',
|
||||
|
@ -8,12 +8,14 @@ import re
|
||||
import urllib
|
||||
import websocket
|
||||
|
||||
# third-party imports
|
||||
# external imports
|
||||
import celery
|
||||
import confini
|
||||
from crypto_dev_signer.eth.web3ext import Web3 as Web3Ext
|
||||
#from crypto_dev_signer.eth.web3ext import Web3 as Web3Ext
|
||||
from web3 import HTTPProvider, WebsocketProvider
|
||||
from gas_proxy.web3 import GasMiddleware
|
||||
import web3
|
||||
#from gas_proxy.web3 import GasMiddleware
|
||||
from chainlib.eth.connection import RPCConnection
|
||||
|
||||
# local imports
|
||||
from cic_registry.registry import CICRegistry
|
||||
@ -122,65 +124,85 @@ else:
|
||||
'result_backend': result,
|
||||
})
|
||||
|
||||
# set up signer
|
||||
RPCConnection.register_location('signer', config.get('SIGNER_SOCKET_PATH'))
|
||||
|
||||
# set up web3
|
||||
# TODO: web3 socket wrapping is now a lot of code. factor out
|
||||
class JSONRPCHttpSocketAdapter:
|
||||
|
||||
def __init__(self, url):
|
||||
self.response = None
|
||||
self.url = url
|
||||
|
||||
def send(self, data):
|
||||
logg.debug('redirecting socket send to jsonrpc http socket adapter {} {}'.format(self.url, data))
|
||||
req = urllib.request.Request(self.url, method='POST')
|
||||
req.add_header('Content-type', 'application/json')
|
||||
req.add_header('Connection', 'close')
|
||||
res = urllib.request.urlopen(req, data=data.encode('utf-8'))
|
||||
self.response = res.read().decode('utf-8')
|
||||
logg.debug('setting jsonrpc http socket adapter response to {}'.format(self.response))
|
||||
|
||||
def recv(self, n=0):
|
||||
return self.response
|
||||
|
||||
|
||||
# set up web3py
|
||||
re_websocket = re.compile('^wss?://')
|
||||
re_http = re.compile('^https?://')
|
||||
blockchain_provider = config.get('ETH_PROVIDER')
|
||||
socket_constructor = None
|
||||
if re.match(re_websocket, blockchain_provider) != None:
|
||||
def socket_constructor_ws():
|
||||
return websocket.create_connection(config.get('ETH_PROVIDER'))
|
||||
socket_constructor = socket_constructor_ws
|
||||
blockchain_provider = WebsocketProvider(blockchain_provider)
|
||||
blockchain_provider = web3.Web3.WebsocketProvider(blockchain_provider)
|
||||
elif re.match(re_http, blockchain_provider) != None:
|
||||
def socket_constructor_http():
|
||||
return JSONRPCHttpSocketAdapter(config.get('ETH_PROVIDER'))
|
||||
socket_constructor = socket_constructor_http
|
||||
blockchain_provider = HTTPProvider(blockchain_provider)
|
||||
blockchain_provider = web3.Web3.HTTPProvider(blockchain_provider)
|
||||
else:
|
||||
raise ValueError('unknown provider url {}'.format(blockchain_provider))
|
||||
|
||||
|
||||
def web3ext_constructor():
|
||||
w3 = Web3Ext(blockchain_provider, config.get('SIGNER_SOCKET_PATH'))
|
||||
GasMiddleware.socket_constructor = socket_constructor
|
||||
w3.middleware_onion.add(GasMiddleware)
|
||||
|
||||
def sign_transaction(tx):
|
||||
r = w3.eth.signTransaction(tx)
|
||||
d = r.__dict__
|
||||
for k in d.keys():
|
||||
if k == 'tx':
|
||||
d[k] = d[k].__dict__
|
||||
else:
|
||||
d[k] = d[k].hex()
|
||||
return d
|
||||
|
||||
setattr(w3.eth, 'sign_transaction', sign_transaction)
|
||||
setattr(w3.eth, 'send_raw_transaction', w3.eth.sendRawTransaction)
|
||||
def web3_constructor():
|
||||
w3 = web3.Web3(blockchain_provider)
|
||||
return (blockchain_provider, w3)
|
||||
RpcClient.set_constructor(web3ext_constructor)
|
||||
RpcClient.set_constructor(web3_constructor)
|
||||
|
||||
|
||||
#
|
||||
## set up web3
|
||||
## TODO: web3 socket wrapping is now a lot of code. factor out
|
||||
#class JSONRPCHttpSocketAdapter:
|
||||
#
|
||||
# def __init__(self, url):
|
||||
# self.response = None
|
||||
# self.url = url
|
||||
#
|
||||
# def send(self, data):
|
||||
# logg.debug('redirecting socket send to jsonrpc http socket adapter {} {}'.format(self.url, data))
|
||||
# req = urllib.request.Request(self.url, method='POST')
|
||||
# req.add_header('Content-type', 'application/json')
|
||||
# req.add_header('Connection', 'close')
|
||||
# res = urllib.request.urlopen(req, data=data.encode('utf-8'))
|
||||
# self.response = res.read().decode('utf-8')
|
||||
# logg.debug('setting jsonrpc http socket adapter response to {}'.format(self.response))
|
||||
#
|
||||
# def recv(self, n=0):
|
||||
# return self.response
|
||||
#
|
||||
#
|
||||
#re_websocket = re.compile('^wss?://')
|
||||
#re_http = re.compile('^https?://')
|
||||
#blockchain_provider = config.get('ETH_PROVIDER')
|
||||
#socket_constructor = None
|
||||
#if re.match(re_websocket, blockchain_provider) != None:
|
||||
# def socket_constructor_ws():
|
||||
# return websocket.create_connection(config.get('ETH_PROVIDER'))
|
||||
# socket_constructor = socket_constructor_ws
|
||||
# blockchain_provider = WebsocketProvider(blockchain_provider)
|
||||
#elif re.match(re_http, blockchain_provider) != None:
|
||||
# def socket_constructor_http():
|
||||
# return JSONRPCHttpSocketAdapter(config.get('ETH_PROVIDER'))
|
||||
# socket_constructor = socket_constructor_http
|
||||
# blockchain_provider = HTTPProvider(blockchain_provider)
|
||||
#else:
|
||||
# raise ValueError('unknown provider url {}'.format(blockchain_provider))
|
||||
#
|
||||
#
|
||||
#def web3ext_constructor():
|
||||
# w3 = Web3Ext(blockchain_provider, config.get('SIGNER_SOCKET_PATH'))
|
||||
# #GasMiddleware.socket_constructor = socket_constructor
|
||||
# #w3.middleware_onion.add(GasMiddleware)
|
||||
#
|
||||
# def sign_transaction(tx):
|
||||
# r = w3.eth.signTransaction(tx)
|
||||
# d = r.__dict__
|
||||
# for k in d.keys():
|
||||
# if k == 'tx':
|
||||
# d[k] = d[k].__dict__
|
||||
# else:
|
||||
# d[k] = d[k].hex()
|
||||
# return d
|
||||
#
|
||||
# setattr(w3.eth, 'sign_transaction', sign_transaction)
|
||||
# setattr(w3.eth, 'send_raw_transaction', w3.eth.sendRawTransaction)
|
||||
# return (blockchain_provider, w3)
|
||||
#RpcClient.set_constructor(web3ext_constructor)
|
||||
|
||||
Otx.tracing = config.true('TASKS_TRACE_QUEUE_STATUS')
|
||||
|
||||
|
@ -1,5 +1,8 @@
|
||||
# import
|
||||
import time
|
||||
import requests
|
||||
import logging
|
||||
import uuid
|
||||
|
||||
# external imports
|
||||
import celery
|
||||
@ -10,9 +13,23 @@ from cic_eth.error import (
|
||||
SignerError,
|
||||
EthError,
|
||||
)
|
||||
from cic_eth.db.models.base import SessionBase
|
||||
|
||||
logg = logging.getLogger(__name__)
|
||||
|
||||
celery_app = celery.current_app
|
||||
|
||||
|
||||
class CriticalTask(celery.Task):
|
||||
class BaseTask(celery.Task):
|
||||
|
||||
session_func = SessionBase.create_session
|
||||
|
||||
def create_session(self):
|
||||
logg.warning('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> session from base {}'.format(id(self.session_func)))
|
||||
return BaseTask.session_func()
|
||||
|
||||
|
||||
class CriticalTask(BaseTask):
|
||||
retry_jitter = True
|
||||
retry_backoff = True
|
||||
retry_backoff_max = 8
|
||||
@ -54,3 +71,9 @@ class CriticalWeb3AndSignerTask(CriticalTask):
|
||||
requests.exceptions.ConnectionError,
|
||||
SignerError,
|
||||
)
|
||||
|
||||
|
||||
@celery_app.task(bind=True, base=BaseTask)
|
||||
def hello(self):
|
||||
time.sleep(0.1)
|
||||
return id(SessionBase.create_session)
|
||||
|
@ -1,5 +1,5 @@
|
||||
[cic]
|
||||
registry_address =
|
||||
chain_spec =
|
||||
chain_spec = evm:bloxberg:8996
|
||||
tx_retry_delay =
|
||||
trust_address =
|
||||
|
@ -19,6 +19,6 @@ eth-gas-proxy==0.0.1a4
|
||||
websocket-client==0.57.0
|
||||
moolb~=0.1.1b2
|
||||
eth-address-index~=0.1.0a8
|
||||
chainlib~=0.0.1a20
|
||||
chainlib~=0.0.1a21
|
||||
hexathon~=0.0.1a3
|
||||
chainsyncer~=0.0.1a19
|
||||
|
@ -19,6 +19,7 @@ from cic_eth.db.models.role import AccountRole
|
||||
from cic_eth.eth.account import AccountTxFactory
|
||||
|
||||
logg = logging.getLogger() #__name__)
|
||||
logging.getLogger('fuuck').setLevel(logging.DEBUG)
|
||||
|
||||
|
||||
def test_create_account(
|
||||
@ -26,8 +27,9 @@ def test_create_account(
|
||||
init_w3,
|
||||
init_database,
|
||||
celery_session_worker,
|
||||
caplog,
|
||||
):
|
||||
|
||||
caplog.set_level(logging.DEBUG, 'cic_eth.task')
|
||||
s = celery.signature(
|
||||
'cic_eth.eth.account.create',
|
||||
[
|
||||
@ -42,7 +44,6 @@ def test_create_account(
|
||||
session = SessionBase.create_session()
|
||||
q = session.query(Nonce).filter(Nonce.address_hex==r)
|
||||
o = q.first()
|
||||
logg.debug('oooo s {}'.format(o))
|
||||
session.close()
|
||||
assert o != None
|
||||
assert o.nonce == 0
|
||||
@ -56,6 +57,7 @@ def test_create_account(
|
||||
)
|
||||
t = s.apply_async()
|
||||
assert r == t.get()
|
||||
print('caplog records {}'.format(caplog.records))
|
||||
|
||||
|
||||
def test_register_account(
|
||||
|
@ -1,3 +1,3 @@
|
||||
cic-base[full_graph]==0.1.1a24
|
||||
cic-eth==0.10.0a41
|
||||
cic-base[full_graph]==0.1.1a25
|
||||
cic-eth==0.10.0a42
|
||||
cic-types==0.1.0a8
|
||||
|
@ -42,6 +42,6 @@ rlp==2.0.1
|
||||
cryptocurrency-cli-tools==0.0.4
|
||||
giftable-erc20-token==0.0.7b12
|
||||
hexathon==0.0.1a3
|
||||
chainlib==0.0.1a20
|
||||
chainlib==0.0.1a21
|
||||
chainsyncer==0.0.1a19
|
||||
cic-registry==0.5.3.a22
|
||||
|
@ -275,7 +275,7 @@ services:
|
||||
- -c
|
||||
- |
|
||||
if [[ -f /tmp/cic/config/.env ]]; then source /tmp/cic/config/.env; fi
|
||||
./start_tracker.sh -v -c /usr/local/etc/cic-eth
|
||||
./start_tracker.sh -vv -c /usr/local/etc/cic-eth
|
||||
# command: "/root/start_manager.sh head -vv"
|
||||
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user