From 10835979bc679258994cc2a7b26a66489224ed4c Mon Sep 17 00:00:00 2001 From: nolash Date: Thu, 11 Mar 2021 11:40:02 +0100 Subject: [PATCH] Use chainlib directly for signing --- apps/cic-eth/cic_eth/admin/ctrl.py | 3 + .../versions/89e1e9baa53c_add_account_lock.py | 1 + apps/cic-eth/cic_eth/db/models/base.py | 2 +- apps/cic-eth/cic_eth/db/models/nonce.py | 4 +- apps/cic-eth/cic_eth/db/models/otx.py | 61 ++++++--- apps/cic-eth/cic_eth/eth/account.py | 58 +++++--- apps/cic-eth/cic_eth/eth/task.py | 23 ++-- apps/cic-eth/cic_eth/eth/tx.py | 2 + apps/cic-eth/cic_eth/queue/tx.py | 76 ++++++++--- .../cic_eth/runnable/daemons/filters/tx.py | 1 + .../cic_eth/runnable/daemons/tasker.py | 126 ++++++++++-------- apps/cic-eth/cic_eth/task.py | 25 +++- apps/cic-eth/config/cic.ini | 2 +- apps/cic-eth/requirements.txt | 2 +- apps/cic-eth/tests/tasks/test_account.py | 6 +- .../scripts/requirements.txt | 4 +- apps/requirements.txt | 2 +- docker-compose.yml | 2 +- 18 files changed, 275 insertions(+), 125 deletions(-) diff --git a/apps/cic-eth/cic_eth/admin/ctrl.py b/apps/cic-eth/cic_eth/admin/ctrl.py index 1868238e..d7008592 100644 --- a/apps/cic-eth/cic_eth/admin/ctrl.py +++ b/apps/cic-eth/cic_eth/admin/ctrl.py @@ -126,5 +126,8 @@ def check_lock(chained_input, chain_str, lock_flags, address=None): r |= Lock.check(chain_str, lock_flags, address=address, session=session) if r > 0: logg.debug('lock check {} has match {} for {}'.format(lock_flags, r, address)) + session.close() raise LockedError(r) + session.flush() + session.close() return chained_input diff --git a/apps/cic-eth/cic_eth/db/migrations/postgresql/versions/89e1e9baa53c_add_account_lock.py b/apps/cic-eth/cic_eth/db/migrations/postgresql/versions/89e1e9baa53c_add_account_lock.py index 05517fd2..4b1c4401 100644 --- a/apps/cic-eth/cic_eth/db/migrations/postgresql/versions/89e1e9baa53c_add_account_lock.py +++ b/apps/cic-eth/cic_eth/db/migrations/postgresql/versions/89e1e9baa53c_add_account_lock.py @@ -24,6 +24,7 @@ def upgrade(): sa.Column('blockchain', sa.String), sa.Column("flags", sa.BIGINT(), nullable=False, default=0), sa.Column("date_created", sa.DateTime, nullable=False), + sa.Column("otx_id", sa.Integer, nullable=True), ) op.create_index('idx_chain_address', 'lock', ['blockchain', 'address'], unique=True) diff --git a/apps/cic-eth/cic_eth/db/models/base.py b/apps/cic-eth/cic_eth/db/models/base.py index da5d019a..fc3541c5 100644 --- a/apps/cic-eth/cic_eth/db/models/base.py +++ b/apps/cic-eth/cic_eth/db/models/base.py @@ -116,6 +116,6 @@ class SessionBase(Model): def release_session(session=None): session_key = str(id(session)) if SessionBase.localsessions.get(session_key) != None: - logg.debug('destroying session {}'.format(session_key)) + logg.debug('commit and destroy session {}'.format(session_key)) session.commit() session.close() diff --git a/apps/cic-eth/cic_eth/db/models/nonce.py b/apps/cic-eth/cic_eth/db/models/nonce.py index c6aee894..1df15792 100644 --- a/apps/cic-eth/cic_eth/db/models/nonce.py +++ b/apps/cic-eth/cic_eth/db/models/nonce.py @@ -92,7 +92,7 @@ class Nonce(SessionBase): """ session = SessionBase.bind_session(session) - session.begin_nested() + #session.begin_nested() #conn = Nonce.engine.connect() #if Nonce.transactional: # conn.execute('BEGIN') @@ -112,7 +112,7 @@ class Nonce(SessionBase): #conn.execute('COMMIT') # logg.debug('unlocking nonce table for address {}'.format(address)) #conn.close() - session.commit() + #session.commit() SessionBase.release_session(session) return nonce diff --git a/apps/cic-eth/cic_eth/db/models/otx.py b/apps/cic-eth/cic_eth/db/models/otx.py index 06f2d655..4a52b632 100644 --- a/apps/cic-eth/cic_eth/db/models/otx.py +++ b/apps/cic-eth/cic_eth/db/models/otx.py @@ -95,19 +95,16 @@ class Otx(SessionBase): :type block: number :raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist. """ - localsession = session - if localsession == None: - localsession = SessionBase.create_session() + session = SessionBase.bind_session(session) if self.block != None: + SessionBase.release_session(session) raise TxStateChangeError('Attempted set block {} when block was already {}'.format(block, self.block)) self.block = block - localsession.add(self) - localsession.flush() + session.add(self) + session.flush() - if session==None: - localsession.commit() - localsession.close() + SessionBase.release_session(session) def waitforgas(self, session=None): @@ -123,8 +120,10 @@ class Otx(SessionBase): session = SessionBase.bind_session(session) if self.status & StatusBits.FINAL: + SessionBase.release_session(session) raise TxStateChangeError('GAS_ISSUES cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status))) if self.status & StatusBits.IN_NETWORK: + SessionBase.release_session(session) raise TxStateChangeError('GAS_ISSUES cannot be set on an entry with IN_NETWORK state set ({})'.format(status_str(self.status))) self.__set_status(StatusBits.GAS_ISSUES, session) @@ -147,8 +146,10 @@ class Otx(SessionBase): session = SessionBase.bind_session(session) if self.status & StatusBits.FINAL: + SessionBase.release_session(session) raise TxStateChangeError('FUBAR cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status))) if is_error_status(self.status): + SessionBase.release_session(session) raise TxStateChangeError('FUBAR cannot be set on an entry with an error state already set ({})'.format(status_str(self.status))) self.__set_status(StatusBits.UNKNOWN_ERROR | StatusBits.FINAL, session) @@ -170,10 +171,13 @@ class Otx(SessionBase): session = SessionBase.bind_session(session) if self.status & StatusBits.FINAL: + SessionBase.release_session(session) raise TxStateChangeError('REJECTED cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status))) if self.status & StatusBits.IN_NETWORK: + SessionBase.release_session(session) raise TxStateChangeError('REJECTED cannot be set on an entry already IN_NETWORK ({})'.format(status_str(self.status))) if is_error_status(self.status): + SessionBase.release_session(session) raise TxStateChangeError('REJECTED cannot be set on an entry with an error state already set ({})'.format(status_str(self.status))) self.__set_status(StatusBits.NODE_ERROR | StatusBits.FINAL, session) @@ -193,10 +197,13 @@ class Otx(SessionBase): session = SessionBase.bind_session(session) if self.status & StatusBits.FINAL: + SessionBase.release_session(session) raise TxStateChangeError('OVERRIDDEN/OBSOLETED cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status))) if self.status & StatusBits.IN_NETWORK: + SessionBase.release_session(session) raise TxStateChangeError('OVERRIDDEN/OBSOLETED cannot be set on an entry already IN_NETWORK ({})'.format(status_str(self.status))) if self.status & StatusBits.OBSOLETE: + SessionBase.release_session(session) raise TxStateChangeError('OVERRIDDEN/OBSOLETED cannot be set on an entry already OBSOLETE ({})'.format(status_str(self.status))) self.__set_status(StatusBits.OBSOLETE, session) @@ -216,6 +223,7 @@ class Otx(SessionBase): if self.status & StatusBits.FINAL: + SessionBase.release_session(session) raise TxStateChangeError('OVERRIDDEN/OBSOLETED cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status))) self.__set_status(StatusBits.MANUAL, session) @@ -238,8 +246,10 @@ class Otx(SessionBase): session = SessionBase.bind_session(session) if self.status & StatusBits.FINAL: + SessionBase.release_session(session) raise TxStateChangeError('RETRY cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status))) if not is_error_status(self.status) and not StatusBits.IN_NETWORK & self.status > 0: + SessionBase.release_session(session) raise TxStateChangeError('RETRY cannot be set on an entry that has no error ({})'.format(status_str(self.status))) self.__set_status(StatusBits.QUEUED, session) @@ -264,8 +274,10 @@ class Otx(SessionBase): session = SessionBase.bind_session(session) if self.status & StatusBits.FINAL: + SessionBase.release_session(session) raise TxStateChangeError('READYSEND cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status))) if is_error_status(self.status): + SessionBase.release_session(session) raise TxStateChangeError('READYSEND cannot be set on an errored state ({})'.format(status_str(self.status))) self.__set_status(StatusBits.QUEUED, session) @@ -290,6 +302,7 @@ class Otx(SessionBase): session = SessionBase.bind_session(session) if self.status & StatusBits.FINAL: + SessionBase.release_session(session) raise TxStateChangeError('SENT cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status))) self.__set_status(StatusBits.IN_NETWORK, session) @@ -314,8 +327,10 @@ class Otx(SessionBase): session = SessionBase.bind_session(session) if self.status & StatusBits.FINAL: + SessionBase.release_session(session) raise TxStateChangeError('SENDFAIL cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status))) if self.status & StatusBits.IN_NETWORK: + SessionBase.release_session(session) raise TxStateChangeError('SENDFAIL cannot be set on an entry with IN_NETWORK state set ({})'.format(status_str(self.status))) self.__set_status(StatusBits.LOCAL_ERROR | StatusBits.DEFERRED, session) @@ -340,8 +355,10 @@ class Otx(SessionBase): session = SessionBase.bind_session(session) if self.status & StatusBits.FINAL: + SessionBase.release_session(session) raise TxStateChangeError('SENDFAIL cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status))) if self.status & StatusBits.IN_NETWORK: + SessionBase.release_session(session) raise TxStateChangeError('SENDFAIL cannot be set on an entry with IN_NETWORK state set ({})'.format(status_str(self.status))) self.__reset_status(StatusBits.QUEUED, session) @@ -368,8 +385,10 @@ class Otx(SessionBase): session = SessionBase.bind_session(session) if self.status & StatusBits.FINAL: + SessionBase.release_session(session) raise TxStateChangeError('REVERTED cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status))) if not self.status & StatusBits.IN_NETWORK: + SessionBase.release_session(session) raise TxStateChangeError('REVERTED cannot be set on an entry without IN_NETWORK state set ({})'.format(status_str(self.status))) if block != None: @@ -397,10 +416,12 @@ class Otx(SessionBase): session = SessionBase.bind_session(session) if self.status & StatusBits.FINAL: + SessionBase.release_session(session) raise TxStateChangeError('CANCEL cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status))) if confirmed: if self.status > 0 and not self.status & StatusBits.OBSOLETE: + SessionBase.release_session(session) raise TxStateChangeError('CANCEL can only be set on an entry marked OBSOLETE ({})'.format(status_str(self.status))) self.__set_status(StatusEnum.CANCELLED, session) else: @@ -425,10 +446,13 @@ class Otx(SessionBase): session = SessionBase.bind_session(session) if self.status & StatusBits.FINAL: + SessionBase.release_session(session) raise TxStateChangeError('SUCCESS cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status))) if not self.status & StatusBits.IN_NETWORK: + SessionBase.release_session(session) raise TxStateChangeError('SUCCESS cannot be set on an entry without IN_NETWORK state set ({})'.format(status_str(self.status))) if is_error_status(self.status): + SessionBase.release_session(session) raise TxStateChangeError('SUCCESS cannot be set on an entry with error state set ({})'.format(status_str(self.status))) if block != None: @@ -509,22 +533,23 @@ class Otx(SessionBase): session.add(l) + # TODO: it is not safe to return otx here unless session has been passed in @staticmethod def add(nonce, address, tx_hash, signed_tx, session=None): - localsession = session - if localsession == None: - localsession = SessionBase.create_session() + external_session = session != None + + session = SessionBase.bind_session(session) otx = Otx(nonce, address, tx_hash, signed_tx) - localsession.add(otx) - localsession.flush() + session.add(otx) + session.flush() if otx.tracing: - otx.__state_log(session=localsession) - localsession.flush() + otx.__state_log(session=session) + session.flush() - if session==None: - localsession.commit() - localsession.close() + SessionBase.release_session(session) + + if not external_session: return None return otx diff --git a/apps/cic-eth/cic_eth/eth/account.py b/apps/cic-eth/cic_eth/eth/account.py index 90ab9c23..5d6c1bd8 100644 --- a/apps/cic-eth/cic_eth/eth/account.py +++ b/apps/cic-eth/cic_eth/eth/account.py @@ -8,6 +8,11 @@ from cic_registry.chain import ChainSpec from erc20_single_shot_faucet import Faucet from cic_registry import zero_address from hexathon import strip_0x +from chainlib.eth.connection import RPCConnection +from chainlib.eth.sign import ( + new_account, + sign_message, + ) # local import from cic_eth.registry import safe_registry @@ -28,6 +33,7 @@ from cic_eth.error import ( from cic_eth.task import ( CriticalSQLAlchemyTask, CriticalSQLAlchemyAndSignerTask, + BaseTask, ) #logg = logging.getLogger(__name__) @@ -145,8 +151,8 @@ def unpack_gift(data): # TODO: Separate out nonce initialization task -@celery_app.task(base=CriticalSQLAlchemyAndSignerTask) -def create(password, chain_str): +@celery_app.task(bind=True, base=CriticalSQLAlchemyAndSignerTask) +def create(self, password, chain_str): """Creates and stores a new ethereum account in the keystore. The password is passed on to the wallet backend, no encryption is performed in the task worker. @@ -159,18 +165,23 @@ def create(password, chain_str): :rtype: str, 0x-hex """ chain_spec = ChainSpec.from_chain_str(chain_str) - c = RpcClient(chain_spec) + #c = RpcClient(chain_spec) a = None - try: - a = c.w3.eth.personal.new_account(password) - except FileNotFoundError: - pass + conn = RPCConnection.connect('signer') + o = new_account() + a = conn.do(o) + + #try: + # a = c.w3.eth.personal.new_account(password) + #except FileNotFoundError: + # pass if a == None: raise SignerError('create account') logg.debug('created account {}'.format(a)) # Initialize nonce provider record for account - session = SessionBase.create_session() + #session = SessionBase.create_session() + session = self.create_session() Nonce.init(a, session=session) session.commit() session.close() @@ -193,7 +204,8 @@ def register(self, account_address, chain_str, writer_address=None): """ chain_spec = ChainSpec.from_chain_str(chain_str) - session = SessionBase.create_session() + session = self.create_session() + #session = SessionBase.create_session() if writer_address == None: writer_address = AccountRole.get_address('ACCOUNTS_INDEX_WRITER', session=session) @@ -211,6 +223,7 @@ def register(self, account_address, chain_str, writer_address=None): tx_add = txf.add(account_address, chain_spec, self.request.root_id, session=session) (tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_add, chain_str, queue, 'cic_eth.eth.account.cache_account_data', session=session) + session.commit() session.close() gas_budget = tx_add['gas'] * tx_add['gasPrice'] @@ -248,9 +261,11 @@ def gift(self, account_address, chain_str): registry = safe_registry(c.w3) txf = AccountTxFactory(account_address, c, registry=registry) - session = SessionBase.create_session() + #session = SessionBase.create_session() + session = self.create_session() tx_add = txf.gift(account_address, chain_spec, self.request.root_id, session=session) (tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_add, chain_str, queue, 'cic_eth.eth.account.cache_gift_data', session=session) + session.commit() session.close() gas_budget = tx_add['gas'] * tx_add['gasPrice'] @@ -279,16 +294,17 @@ def have(self, account, chain_str): :returns: Account, or None if not exists :rtype: Varies """ - c = RpcClient(account) + o = sign_message(account, '0x2a') try: - c.w3.eth.sign(account, text='2a') + conn = RPCConnection.connect('signer') + conn.do(o) return account except Exception as e: logg.debug('cannot sign with {}: {}'.format(account, e)) return None + - -@celery_app.task(bind=True) +@celery_app.task(bind=True, base=BaseTask) def role(self, account, chain_str): """Return account role for address @@ -299,11 +315,15 @@ def role(self, account, chain_str): :returns: Account, or None if not exists :rtype: Varies """ - return AccountRole.role_for(account) + session = self.create_session() + role_tag = AccountRole.role_for(account, session=session) + session.close() + return role_tag -@celery_app.task(base=CriticalSQLAlchemyTask) +@celery_app.task(bind=True, base=CriticalSQLAlchemyTask) def cache_gift_data( + self, tx_hash_hex, tx_signed_raw_hex, chain_str, @@ -326,7 +346,8 @@ def cache_gift_data( tx = unpack_signed_raw_tx(tx_signed_raw_bytes, chain_spec.chain_id()) tx_data = unpack_gift(tx['data']) - session = SessionBase.create_session() + #session = SessionBase.create_session() + session = self.create_session() tx_cache = TxCache( tx_hash_hex, @@ -346,8 +367,9 @@ def cache_gift_data( return (tx_hash_hex, cache_id) -@celery_app.task(base=CriticalSQLAlchemyTask) +@celery_app.task(bind=True, base=CriticalSQLAlchemyTask) def cache_account_data( + self, tx_hash_hex, tx_signed_raw_hex, chain_str, diff --git a/apps/cic-eth/cic_eth/eth/task.py b/apps/cic-eth/cic_eth/eth/task.py index 60bbb590..42bce354 100644 --- a/apps/cic-eth/cic_eth/eth/task.py +++ b/apps/cic-eth/cic_eth/eth/task.py @@ -1,9 +1,11 @@ # standard imports import logging -# third-party imports +# external imports import celery from cic_registry.chain import ChainSpec +from chainlib.eth.sign import sign_transaction +from chainlib.eth.connection import RPCConnection # local imports from cic_eth.eth import RpcClient @@ -26,16 +28,21 @@ def sign_tx(tx, chain_str): :rtype: tuple """ chain_spec = ChainSpec.from_chain_str(chain_str) - c = RpcClient(chain_spec) + #c = RpcClient(chain_spec) tx_transfer_signed = None + conn = RPCConnection.connect('signer') try: - tx_transfer_signed = c.w3.eth.sign_transaction(tx) - except FileNotFoundError: - pass - if tx_transfer_signed == None: - raise SignerError('sign tx') + o = sign_transaction(tx) + tx_transfer_signed = conn.do(o) + #try: + # tx_transfer_signed = c.w3.eth.sign_transaction(tx) + except Exception as e: + raise SignerError('sign tx {}: {}'.format(tx, e)) logg.debug('tx_transfer_signed {}'.format(tx_transfer_signed)) - tx_hash = c.w3.keccak(hexstr=tx_transfer_signed['raw']) + h = sha3.keccak_256() + h.update(tx_transfer_signed['raw']) + g = h.digest() + #tx_hash = c.w3.keccak(hexstr=tx_transfer_signed['raw']) tx_hash_hex = tx_hash.hex() return (tx_hash_hex, tx_transfer_signed['raw'],) diff --git a/apps/cic-eth/cic_eth/eth/tx.py b/apps/cic-eth/cic_eth/eth/tx.py index 2a424397..c5f21676 100644 --- a/apps/cic-eth/cic_eth/eth/tx.py +++ b/apps/cic-eth/cic_eth/eth/tx.py @@ -624,6 +624,8 @@ def reserve_nonce(self, chained_input, signer=None): root_id = self.request.root_id nonce = NonceReservation.next(address, root_id) + session.commit() + session.close() return chained_input diff --git a/apps/cic-eth/cic_eth/queue/tx.py b/apps/cic-eth/cic_eth/queue/tx.py index 4f95d10b..520a4529 100644 --- a/apps/cic-eth/cic_eth/queue/tx.py +++ b/apps/cic-eth/cic_eth/queue/tx.py @@ -77,7 +77,17 @@ def create(nonce, holder_address, tx_hash, signed_tx, chain_str, obsolete_predec for otx in q.all(): logg.info('otx {} obsoleted by {}'.format(otx.tx_hash, tx_hash)) - otx.cancel(confirmed=False, session=session) + try: + otx.cancel(confirmed=False, session=session) + except TxStateChangeError as e: + logg.exception('obsolete fail: {}'.format(e)) + session.close() + raise(e) + except Exception as e: + logg.exception('obsolete UNEXPECTED fail: {}'.format(e)) + session.close() + raise(e) + session.commit() SessionBase.release_session(session) @@ -107,10 +117,20 @@ def set_sent_status(tx_hash, fail=False): session.close() return False - if fail: - o.sendfail(session=session) - else: - o.sent(session=session) + try: + if fail: + o.sendfail(session=session) + else: + o.sent(session=session) + except TxStateChangeError as e: + logg.exception('set sent fail: {}'.format(e)) + session.close() + raise(e) + except Exception as e: + logg.exception('set sent UNEXPECED fail: {}'.format(e)) + session.close() + raise(e) + session.commit() session.close() @@ -154,10 +174,20 @@ def set_final_status(tx_hash, block=None, fail=False): q = q.filter(Otx.tx_hash==tx_hash) o = q.first() - if fail: - o.minefail(block, session=session) - else: - o.success(block, session=session) + try: + if fail: + o.minefail(block, session=session) + else: + o.success(block, session=session) + session.commit() + except TxStateChangeError as e: + logg.exception('set final fail: {}'.format(e)) + session.close() + raise(e) + except Exception as e: + logg.exception('set final UNEXPECED fail: {}'.format(e)) + session.close() + raise(e) q = session.query(Otx) q = q.join(TxCache) @@ -166,8 +196,16 @@ def set_final_status(tx_hash, block=None, fail=False): q = q.filter(Otx.tx_hash!=tx_hash) for otwo in q.all(): - otwo.cancel(True, session=session) - + try: + otwo.cancel(True, session=session) + except TxStateChangeError as e: + logg.exception('cancel non-final fail: {}'.format(e)) + session.close() + raise(e) + except Exception as e: + logg.exception('cancel non-final UNEXPECTED fail: {}'.format(e)) + session.close() + raise(e) session.commit() session.close() @@ -195,12 +233,16 @@ def set_cancel(tx_hash, manual=False): session.flush() - if manual: - o.override(session=session) - else: - o.cancel(session=session) - - session.commit() + try: + if manual: + o.override(session=session) + else: + o.cancel(session=session) + session.commit() + except TxStateChangeError as e: + logg.exception('set cancel fail: {}'.format(e)) + except Exception as e: + logg.exception('set cancel UNEXPECTED fail: {}'.format(e)) session.close() return tx_hash diff --git a/apps/cic-eth/cic_eth/runnable/daemons/filters/tx.py b/apps/cic-eth/cic_eth/runnable/daemons/filters/tx.py index dfa97839..c796d33c 100644 --- a/apps/cic-eth/cic_eth/runnable/daemons/filters/tx.py +++ b/apps/cic-eth/cic_eth/runnable/daemons/filters/tx.py @@ -31,6 +31,7 @@ class TxFilter(SyncFilter): logg.debug('tx {} not found locally, skipping'.format(tx_hash_hex)) return None logg.info('tx filter match on {}'.format(otx.tx_hash)) + db_session.flush() SessionBase.release_session(db_session) s = celery.signature( 'cic_eth.queue.tx.set_final_status', diff --git a/apps/cic-eth/cic_eth/runnable/daemons/tasker.py b/apps/cic-eth/cic_eth/runnable/daemons/tasker.py index ea16fe1b..a111a61d 100644 --- a/apps/cic-eth/cic_eth/runnable/daemons/tasker.py +++ b/apps/cic-eth/cic_eth/runnable/daemons/tasker.py @@ -8,12 +8,14 @@ import re import urllib import websocket -# third-party imports +# external imports import celery import confini -from crypto_dev_signer.eth.web3ext import Web3 as Web3Ext +#from crypto_dev_signer.eth.web3ext import Web3 as Web3Ext from web3 import HTTPProvider, WebsocketProvider -from gas_proxy.web3 import GasMiddleware +import web3 +#from gas_proxy.web3 import GasMiddleware +from chainlib.eth.connection import RPCConnection # local imports from cic_registry.registry import CICRegistry @@ -122,65 +124,85 @@ else: 'result_backend': result, }) +# set up signer +RPCConnection.register_location('signer', config.get('SIGNER_SOCKET_PATH')) -# set up web3 -# TODO: web3 socket wrapping is now a lot of code. factor out -class JSONRPCHttpSocketAdapter: - - def __init__(self, url): - self.response = None - self.url = url - - def send(self, data): - logg.debug('redirecting socket send to jsonrpc http socket adapter {} {}'.format(self.url, data)) - req = urllib.request.Request(self.url, method='POST') - req.add_header('Content-type', 'application/json') - req.add_header('Connection', 'close') - res = urllib.request.urlopen(req, data=data.encode('utf-8')) - self.response = res.read().decode('utf-8') - logg.debug('setting jsonrpc http socket adapter response to {}'.format(self.response)) - - def recv(self, n=0): - return self.response - - +# set up web3py re_websocket = re.compile('^wss?://') re_http = re.compile('^https?://') blockchain_provider = config.get('ETH_PROVIDER') -socket_constructor = None if re.match(re_websocket, blockchain_provider) != None: - def socket_constructor_ws(): - return websocket.create_connection(config.get('ETH_PROVIDER')) - socket_constructor = socket_constructor_ws - blockchain_provider = WebsocketProvider(blockchain_provider) + blockchain_provider = web3.Web3.WebsocketProvider(blockchain_provider) elif re.match(re_http, blockchain_provider) != None: - def socket_constructor_http(): - return JSONRPCHttpSocketAdapter(config.get('ETH_PROVIDER')) - socket_constructor = socket_constructor_http - blockchain_provider = HTTPProvider(blockchain_provider) + blockchain_provider = web3.Web3.HTTPProvider(blockchain_provider) else: raise ValueError('unknown provider url {}'.format(blockchain_provider)) - -def web3ext_constructor(): - w3 = Web3Ext(blockchain_provider, config.get('SIGNER_SOCKET_PATH')) - GasMiddleware.socket_constructor = socket_constructor - w3.middleware_onion.add(GasMiddleware) - - def sign_transaction(tx): - r = w3.eth.signTransaction(tx) - d = r.__dict__ - for k in d.keys(): - if k == 'tx': - d[k] = d[k].__dict__ - else: - d[k] = d[k].hex() - return d - - setattr(w3.eth, 'sign_transaction', sign_transaction) - setattr(w3.eth, 'send_raw_transaction', w3.eth.sendRawTransaction) +def web3_constructor(): + w3 = web3.Web3(blockchain_provider) return (blockchain_provider, w3) -RpcClient.set_constructor(web3ext_constructor) +RpcClient.set_constructor(web3_constructor) + + +# +## set up web3 +## TODO: web3 socket wrapping is now a lot of code. factor out +#class JSONRPCHttpSocketAdapter: +# +# def __init__(self, url): +# self.response = None +# self.url = url +# +# def send(self, data): +# logg.debug('redirecting socket send to jsonrpc http socket adapter {} {}'.format(self.url, data)) +# req = urllib.request.Request(self.url, method='POST') +# req.add_header('Content-type', 'application/json') +# req.add_header('Connection', 'close') +# res = urllib.request.urlopen(req, data=data.encode('utf-8')) +# self.response = res.read().decode('utf-8') +# logg.debug('setting jsonrpc http socket adapter response to {}'.format(self.response)) +# +# def recv(self, n=0): +# return self.response +# +# +#re_websocket = re.compile('^wss?://') +#re_http = re.compile('^https?://') +#blockchain_provider = config.get('ETH_PROVIDER') +#socket_constructor = None +#if re.match(re_websocket, blockchain_provider) != None: +# def socket_constructor_ws(): +# return websocket.create_connection(config.get('ETH_PROVIDER')) +# socket_constructor = socket_constructor_ws +# blockchain_provider = WebsocketProvider(blockchain_provider) +#elif re.match(re_http, blockchain_provider) != None: +# def socket_constructor_http(): +# return JSONRPCHttpSocketAdapter(config.get('ETH_PROVIDER')) +# socket_constructor = socket_constructor_http +# blockchain_provider = HTTPProvider(blockchain_provider) +#else: +# raise ValueError('unknown provider url {}'.format(blockchain_provider)) +# +# +#def web3ext_constructor(): +# w3 = Web3Ext(blockchain_provider, config.get('SIGNER_SOCKET_PATH')) +# #GasMiddleware.socket_constructor = socket_constructor +# #w3.middleware_onion.add(GasMiddleware) +# +# def sign_transaction(tx): +# r = w3.eth.signTransaction(tx) +# d = r.__dict__ +# for k in d.keys(): +# if k == 'tx': +# d[k] = d[k].__dict__ +# else: +# d[k] = d[k].hex() +# return d +# +# setattr(w3.eth, 'sign_transaction', sign_transaction) +# setattr(w3.eth, 'send_raw_transaction', w3.eth.sendRawTransaction) +# return (blockchain_provider, w3) +#RpcClient.set_constructor(web3ext_constructor) Otx.tracing = config.true('TASKS_TRACE_QUEUE_STATUS') diff --git a/apps/cic-eth/cic_eth/task.py b/apps/cic-eth/cic_eth/task.py index b17cb6fe..c9043a1d 100644 --- a/apps/cic-eth/cic_eth/task.py +++ b/apps/cic-eth/cic_eth/task.py @@ -1,5 +1,8 @@ # import +import time import requests +import logging +import uuid # external imports import celery @@ -10,9 +13,23 @@ from cic_eth.error import ( SignerError, EthError, ) +from cic_eth.db.models.base import SessionBase + +logg = logging.getLogger(__name__) + +celery_app = celery.current_app -class CriticalTask(celery.Task): +class BaseTask(celery.Task): + + session_func = SessionBase.create_session + + def create_session(self): + logg.warning('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> session from base {}'.format(id(self.session_func))) + return BaseTask.session_func() + + +class CriticalTask(BaseTask): retry_jitter = True retry_backoff = True retry_backoff_max = 8 @@ -54,3 +71,9 @@ class CriticalWeb3AndSignerTask(CriticalTask): requests.exceptions.ConnectionError, SignerError, ) + + +@celery_app.task(bind=True, base=BaseTask) +def hello(self): + time.sleep(0.1) + return id(SessionBase.create_session) diff --git a/apps/cic-eth/config/cic.ini b/apps/cic-eth/config/cic.ini index 73cc99c6..7c6d825c 100644 --- a/apps/cic-eth/config/cic.ini +++ b/apps/cic-eth/config/cic.ini @@ -1,5 +1,5 @@ [cic] registry_address = -chain_spec = +chain_spec = evm:bloxberg:8996 tx_retry_delay = trust_address = diff --git a/apps/cic-eth/requirements.txt b/apps/cic-eth/requirements.txt index 54515f8f..39592245 100644 --- a/apps/cic-eth/requirements.txt +++ b/apps/cic-eth/requirements.txt @@ -19,6 +19,6 @@ eth-gas-proxy==0.0.1a4 websocket-client==0.57.0 moolb~=0.1.1b2 eth-address-index~=0.1.0a8 -chainlib~=0.0.1a20 +chainlib~=0.0.1a21 hexathon~=0.0.1a3 chainsyncer~=0.0.1a19 diff --git a/apps/cic-eth/tests/tasks/test_account.py b/apps/cic-eth/tests/tasks/test_account.py index a14b5add..c449d12a 100644 --- a/apps/cic-eth/tests/tasks/test_account.py +++ b/apps/cic-eth/tests/tasks/test_account.py @@ -19,6 +19,7 @@ from cic_eth.db.models.role import AccountRole from cic_eth.eth.account import AccountTxFactory logg = logging.getLogger() #__name__) +logging.getLogger('fuuck').setLevel(logging.DEBUG) def test_create_account( @@ -26,8 +27,9 @@ def test_create_account( init_w3, init_database, celery_session_worker, + caplog, ): - + caplog.set_level(logging.DEBUG, 'cic_eth.task') s = celery.signature( 'cic_eth.eth.account.create', [ @@ -42,7 +44,6 @@ def test_create_account( session = SessionBase.create_session() q = session.query(Nonce).filter(Nonce.address_hex==r) o = q.first() - logg.debug('oooo s {}'.format(o)) session.close() assert o != None assert o.nonce == 0 @@ -56,6 +57,7 @@ def test_create_account( ) t = s.apply_async() assert r == t.get() + print('caplog records {}'.format(caplog.records)) def test_register_account( diff --git a/apps/contract-migration/scripts/requirements.txt b/apps/contract-migration/scripts/requirements.txt index 36659b04..657948b1 100644 --- a/apps/contract-migration/scripts/requirements.txt +++ b/apps/contract-migration/scripts/requirements.txt @@ -1,3 +1,3 @@ -cic-base[full_graph]==0.1.1a24 -cic-eth==0.10.0a41 +cic-base[full_graph]==0.1.1a25 +cic-eth==0.10.0a42 cic-types==0.1.0a8 diff --git a/apps/requirements.txt b/apps/requirements.txt index 51a413eb..619ba109 100644 --- a/apps/requirements.txt +++ b/apps/requirements.txt @@ -42,6 +42,6 @@ rlp==2.0.1 cryptocurrency-cli-tools==0.0.4 giftable-erc20-token==0.0.7b12 hexathon==0.0.1a3 -chainlib==0.0.1a20 +chainlib==0.0.1a21 chainsyncer==0.0.1a19 cic-registry==0.5.3.a22 diff --git a/docker-compose.yml b/docker-compose.yml index 7bd962bf..86e72ca7 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -275,7 +275,7 @@ services: - -c - | if [[ -f /tmp/cic/config/.env ]]; then source /tmp/cic/config/.env; fi - ./start_tracker.sh -v -c /usr/local/etc/cic-eth + ./start_tracker.sh -vv -c /usr/local/etc/cic-eth # command: "/root/start_manager.sh head -vv"