From 3ceafa90f423c5aa66bdb109231f543a8e730b92 Mon Sep 17 00:00:00 2001 From: nolash Date: Mon, 1 Mar 2021 20:35:05 +0100 Subject: [PATCH] Fix more postgres session leaks in role --- apps/cic-eth/cic_eth/admin/ctrl.py | 6 ++- apps/cic-eth/cic_eth/db/models/base.py | 1 - apps/cic-eth/cic_eth/db/models/lock.py | 43 +++++++-------- apps/cic-eth/cic_eth/db/models/nonce.py | 52 +++++++++++-------- apps/cic-eth/cic_eth/db/models/role.py | 11 ++-- apps/cic-eth/cic_eth/db/models/tx.py | 30 ++++++----- apps/cic-eth/cic_eth/eth/account.py | 19 ++++--- apps/cic-eth/cic_eth/eth/factory.py | 4 +- apps/cic-eth/cic_eth/eth/nonce.py | 4 +- apps/cic-eth/cic_eth/eth/task.py | 3 +- apps/cic-eth/cic_eth/queue/tx.py | 4 +- apps/cic-eth/cic_eth/version.py | 2 +- apps/cic-eth/docker/Dockerfile | 5 +- apps/cic-eth/requirements.txt | 4 +- apps/contract-migration/docker/Dockerfile | 2 +- .../scripts/requirements.txt | 4 +- apps/contract-migration/seed_cic_eth.sh | 2 +- apps/requirements.txt | 2 +- docker-compose.yml | 4 +- 19 files changed, 111 insertions(+), 91 deletions(-) diff --git a/apps/cic-eth/cic_eth/admin/ctrl.py b/apps/cic-eth/cic_eth/admin/ctrl.py index 6b28d86d..5c9d769c 100644 --- a/apps/cic-eth/cic_eth/admin/ctrl.py +++ b/apps/cic-eth/cic_eth/admin/ctrl.py @@ -8,6 +8,7 @@ from cic_registry import zero_address # local imports from cic_eth.db.enum import LockEnum +from cic_eth.db.models.base import SessionBase from cic_eth.db.models.lock import Lock from cic_eth.error import LockedError @@ -116,9 +117,10 @@ def unlock_queue(chained_input, chain_str, address=zero_address): @celery_app.task() def check_lock(chained_input, chain_str, lock_flags, address=None): - r = Lock.check(chain_str, lock_flags, address=zero_address) + session = SessionBase.create_session() + r = Lock.check(chain_str, lock_flags, address=zero_address, session=session) if address != None: - r |= Lock.check(chain_str, lock_flags, address=address) + r |= Lock.check(chain_str, lock_flags, address=address, session=session) if r > 0: logg.debug('lock check {} has match {} for {}'.format(lock_flags, r, address)) raise LockedError(r) diff --git a/apps/cic-eth/cic_eth/db/models/base.py b/apps/cic-eth/cic_eth/db/models/base.py index db570dfc..353c67cc 100644 --- a/apps/cic-eth/cic_eth/db/models/base.py +++ b/apps/cic-eth/cic_eth/db/models/base.py @@ -114,7 +114,6 @@ class SessionBase(Model): @staticmethod def release_session(session=None): - session.flush() session_key = str(id(session)) if SessionBase.localsessions.get(session_key) != None: logg.debug('destroying session {}'.format(session_key)) diff --git a/apps/cic-eth/cic_eth/db/models/lock.py b/apps/cic-eth/cic_eth/db/models/lock.py index dbd0fd64..a8724fb5 100644 --- a/apps/cic-eth/cic_eth/db/models/lock.py +++ b/apps/cic-eth/cic_eth/db/models/lock.py @@ -55,11 +55,9 @@ class Lock(SessionBase): :returns: New flag state of entry :rtype: number """ - localsession = session - if localsession == None: - localsession = SessionBase.create_session() + session = SessionBase.bind_session(session) - q = localsession.query(Lock) + q = session.query(Lock) #q = q.join(TxCache, isouter=True) q = q.filter(Lock.address==address) q = q.filter(Lock.blockchain==chain_str) @@ -71,7 +69,8 @@ class Lock(SessionBase): lock.address = address lock.blockchain = chain_str if tx_hash != None: - q = localsession.query(Otx) + session.flush() + q = session.query(Otx) q = q.filter(Otx.tx_hash==tx_hash) otx = q.first() if otx != None: @@ -80,12 +79,11 @@ class Lock(SessionBase): lock.flags |= flags r = lock.flags - localsession.add(lock) - localsession.commit() + session.add(lock) + session.commit() + + SessionBase.release_session(session) - if session == None: - localsession.close() - return r @@ -110,11 +108,9 @@ class Lock(SessionBase): :returns: New flag state of entry :rtype: number """ - localsession = session - if localsession == None: - localsession = SessionBase.create_session() + session = SessionBase.bind_session(session) - q = localsession.query(Lock) + q = session.query(Lock) #q = q.join(TxCache, isouter=True) q = q.filter(Lock.address==address) q = q.filter(Lock.blockchain==chain_str) @@ -124,14 +120,13 @@ class Lock(SessionBase): if lock != None: lock.flags &= ~flags if lock.flags == 0: - localsession.delete(lock) + session.delete(lock) else: - localsession.add(lock) + session.add(lock) r = lock.flags - localsession.commit() + session.commit() - if session == None: - localsession.close() + SessionBase.release_session(session) return r @@ -156,22 +151,20 @@ class Lock(SessionBase): :rtype: number """ - localsession = session - if localsession == None: - localsession = SessionBase.create_session() + session = SessionBase.bind_session(session) - q = localsession.query(Lock) + q = session.query(Lock) #q = q.join(TxCache, isouter=True) q = q.filter(Lock.address==address) q = q.filter(Lock.blockchain==chain_str) q = q.filter(Lock.flags.op('&')(flags)==flags) lock = q.first() - if session == None: - localsession.close() r = 0 if lock != None: r = lock.flags & flags + + SessionBase.release_session(session) return r diff --git a/apps/cic-eth/cic_eth/db/models/nonce.py b/apps/cic-eth/cic_eth/db/models/nonce.py index 2a7ad004..a029d81e 100644 --- a/apps/cic-eth/cic_eth/db/models/nonce.py +++ b/apps/cic-eth/cic_eth/db/models/nonce.py @@ -21,12 +21,9 @@ class Nonce(SessionBase): @staticmethod def get(address, session=None): - localsession = session - if localsession == None: - localsession = SessionBase.create_session() + session = SessionBase.bind_session(session) - - q = localsession.query(Nonce) + q = session.query(Nonce) q = q.filter(Nonce.address_hex==address) nonce = q.first() @@ -34,28 +31,29 @@ class Nonce(SessionBase): if nonce != None: nonce_value = nonce.nonce; - if session == None: - localsession.close() + SessionBase.release_session(session) return nonce_value @staticmethod - def __get(conn, address): - r = conn.execute("SELECT nonce FROM nonce WHERE address_hex = '{}'".format(address)) + def __get(session, address): + r = session.execute("SELECT nonce FROM nonce WHERE address_hex = '{}'".format(address)) nonce = r.fetchone() + session.flush() if nonce == None: return None return nonce[0] @staticmethod - def __set(conn, address, nonce): - conn.execute("UPDATE nonce set nonce = {} WHERE address_hex = '{}'".format(nonce, address)) + def __set(session, address, nonce): + session.execute("UPDATE nonce set nonce = {} WHERE address_hex = '{}'".format(nonce, address)) + session.flush() @staticmethod - def next(address, initial_if_not_exists=0): + def next(address, initial_if_not_exists=0, session=None): """Generate next nonce for the given address. If there is no previous nonce record for the address, the nonce may be initialized to a specified value, or 0 if no value has been given. @@ -67,20 +65,32 @@ class Nonce(SessionBase): :returns: Nonce :rtype: number """ - conn = Nonce.engine.connect() + session = SessionBase.bind_session(session) + + SessionBase.release_session(session) + + session.begin_nested() + #conn = Nonce.engine.connect() if Nonce.transactional: - conn.execute('BEGIN') - conn.execute('LOCK TABLE nonce IN SHARE ROW EXCLUSIVE MODE') - nonce = Nonce.__get(conn, address) + #session.execute('BEGIN') + session.execute('LOCK TABLE nonce IN SHARE ROW EXCLUSIVE MODE') + session.flush() + nonce = Nonce.__get(session, address) logg.debug('get nonce {} for address {}'.format(nonce, address)) if nonce == None: nonce = initial_if_not_exists - conn.execute("INSERT INTO nonce (nonce, address_hex) VALUES ({}, '{}')".format(nonce, address)) + session.execute("INSERT INTO nonce (nonce, address_hex) VALUES ({}, '{}')".format(nonce, address)) + session.flush() logg.debug('setting default nonce to {} for address {}'.format(nonce, address)) - Nonce.__set(conn, address, nonce+1) - if Nonce.transactional: - conn.execute('COMMIT') - conn.close() + Nonce.__set(session, address, nonce+1) + #if Nonce.transactional: + #session.execute('COMMIT') + #session.execute('UNLOCK TABLE nonce') + #conn.close() + session.commit() + session.commit() + + SessionBase.release_session(session) return nonce diff --git a/apps/cic-eth/cic_eth/db/models/role.py b/apps/cic-eth/cic_eth/db/models/role.py index d632b0a9..7096a5b3 100644 --- a/apps/cic-eth/cic_eth/db/models/role.py +++ b/apps/cic-eth/cic_eth/db/models/role.py @@ -40,12 +40,14 @@ class AccountRole(SessionBase): session = SessionBase.bind_session(session) - role = AccountRole.get_role(tag, session) + role = AccountRole.__get_role(tag, session) r = zero_address if role != None: r = role.address_hex + session.flush() + SessionBase.release_session(session) return r @@ -63,6 +65,8 @@ class AccountRole(SessionBase): session = SessionBase.bind_session(session) role = AccountRole.__get_role(tag, session) + + session.flush() SessionBase.release_session(session) @@ -74,7 +78,6 @@ class AccountRole(SessionBase): q = session.query(AccountRole) q = q.filter(AccountRole.tag==tag) r = q.first() - session.flush() return r @@ -93,10 +96,12 @@ class AccountRole(SessionBase): """ session = SessionBase.bind_session(session) - role = AccountRole.get_role(tag, session) + role = AccountRole.__get_role(tag, session) if role == None: role = AccountRole(tag) role.address_hex = address_hex + + session.flush() SessionBase.release_session(session) diff --git a/apps/cic-eth/cic_eth/db/models/tx.py b/apps/cic-eth/cic_eth/db/models/tx.py index 5eb46a7c..34abbd69 100644 --- a/apps/cic-eth/cic_eth/db/models/tx.py +++ b/apps/cic-eth/cic_eth/db/models/tx.py @@ -85,26 +85,27 @@ class TxCache(SessionBase): :param tx_hash_new: tx hash to associate the copied entry with :type tx_hash_new: str, 0x-hex """ - localsession = SessionBase.bind_session(session) + session = SessionBase.bind_session(session) - q = localsession.query(TxCache) + q = session.query(TxCache) q = q.join(Otx) q = q.filter(Otx.tx_hash==tx_hash_original) txc = q.first() if txc == None: - SessionBase.release_session(localsession) + SessionBase.release_session(session) raise NotLocalTxError('original {}'.format(tx_hash_original)) if txc.block_number != None: - SessionBase.release_session(localsession) + SessionBase.release_session(session) raise TxStateChangeError('cannot clone tx cache of confirmed tx {}'.format(tx_hash_original)) - q = localsession.query(Otx) + session.flush() + q = session.query(Otx) q = q.filter(Otx.tx_hash==tx_hash_new) otx = q.first() if otx == None: - SessionBase.release_session(localsession) + SessionBase.release_session(session) raise NotLocalTxError('new {}'.format(tx_hash_new)) txc_new = TxCache( @@ -115,18 +116,21 @@ class TxCache(SessionBase): txc.destination_token_address, int(txc.from_value), int(txc.to_value), + session=session, ) - localsession.add(txc_new) - localsession.commit() + session.add(txc_new) + session.commit() - SessionBase.release_session(localsession) + SessionBase.release_session(session) def __init__(self, tx_hash, sender, recipient, source_token_address, destination_token_address, from_value, to_value, block_number=None, tx_index=None, session=None): - localsession = SessionBase.bind_session(session) - tx = localsession.query(Otx).filter(Otx.tx_hash==tx_hash).first() + session = SessionBase.bind_session(session) + q = session.query(Otx) + q = q.filter(Otx.tx_hash==tx_hash) + tx = q.first() if tx == None: - SessionBase.release_session(localsession) + SessionBase.release_session(session) raise FileNotFoundError('outgoing transaction record unknown {} (add a Tx first)'.format(tx_hash)) self.otx_id = tx.id @@ -143,5 +147,5 @@ class TxCache(SessionBase): self.date_updated = self.date_created self.date_checked = self.date_created - SessionBase.release_session(localsession) + SessionBase.release_session(session) diff --git a/apps/cic-eth/cic_eth/eth/account.py b/apps/cic-eth/cic_eth/eth/account.py index 61facebe..62d2d547 100644 --- a/apps/cic-eth/cic_eth/eth/account.py +++ b/apps/cic-eth/cic_eth/eth/account.py @@ -36,6 +36,7 @@ class AccountTxFactory(TxFactory): self, address, chain_spec, + session=None, ): """Register an Ethereum account address with the on-chain account registry @@ -58,7 +59,7 @@ class AccountTxFactory(TxFactory): 'gas': gas, 'gasPrice': self.gas_price, 'chainId': chain_spec.chain_id(), - 'nonce': self.next_nonce(), + 'nonce': self.next_nonce(session=session), 'value': 0, }) return tx_add @@ -68,6 +69,7 @@ class AccountTxFactory(TxFactory): self, address, chain_spec, + session=None, ): """Trigger the on-chain faucet to disburse tokens to the provided Ethereum account @@ -88,7 +90,7 @@ class AccountTxFactory(TxFactory): 'gas': gas, 'gasPrice': self.gas_price, 'chainId': chain_spec.chain_id(), - 'nonce': self.next_nonce(), + 'nonce': self.next_nonce(session=session), 'value': 0, }) return tx_add @@ -157,7 +159,10 @@ def create(password, chain_str): # TODO: this can safely be set to zero, since we are randomly creating account n = c.w3.eth.getTransactionCount(a, 'pending') session = SessionBase.create_session() - o = session.query(Nonce).filter(Nonce.address_hex==a).first() + q = session.query(Nonce) + q = q.filter(Nonce.address_hex==a) + o = q.first() + session.flush() if o == None: o = Nonce() o.address_hex = a @@ -186,20 +191,20 @@ def register(self, account_address, chain_str, writer_address=None): session = SessionBase.create_session() if writer_address == None: - writer_address = AccountRole.get_address('ACCOUNTS_INDEX_WRITER', session) - session.close() + writer_address = AccountRole.get_address('ACCOUNTS_INDEX_WRITER', session=session) if writer_address == zero_address: + session.close() raise RoleMissingError(account_address) - logg.debug('adding account address {} to index; writer {}'.format(account_address, writer_address)) queue = self.request.delivery_info['routing_key'] c = RpcClient(chain_spec, holder_address=writer_address) txf = AccountTxFactory(writer_address, c) - tx_add = txf.add(account_address, chain_spec) + tx_add = txf.add(account_address, chain_spec, session=session) + session.close() (tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_add, chain_str, queue, 'cic_eth.eth.account.cache_account_data') gas_budget = tx_add['gas'] * tx_add['gasPrice'] diff --git a/apps/cic-eth/cic_eth/eth/factory.py b/apps/cic-eth/cic_eth/eth/factory.py index fef8a025..76a15fbc 100644 --- a/apps/cic-eth/cic_eth/eth/factory.py +++ b/apps/cic-eth/cic_eth/eth/factory.py @@ -32,10 +32,10 @@ class TxFactory: logg.debug('txfactory instance address {} gas price'.format(self.address, self.gas_price)) - def next_nonce(self): + def next_nonce(self, session=None): """Returns the current cached nonce value, and increments it for next transaction. :returns: Nonce :rtype: number """ - return self.nonce_oracle.next() + return self.nonce_oracle.next(session=session) diff --git a/apps/cic-eth/cic_eth/eth/nonce.py b/apps/cic-eth/cic_eth/eth/nonce.py index 6e8e0a75..e8c96b90 100644 --- a/apps/cic-eth/cic_eth/eth/nonce.py +++ b/apps/cic-eth/cic_eth/eth/nonce.py @@ -14,10 +14,10 @@ class NonceOracle(): self.default_nonce = default_nonce - def next(self): + def next(self, session=None): """Get next unique nonce. :returns: Nonce :rtype: number """ - return Nonce.next(self.address, self.default_nonce) + return Nonce.next(self.address, self.default_nonce, session=session) diff --git a/apps/cic-eth/cic_eth/eth/task.py b/apps/cic-eth/cic_eth/eth/task.py index 72193e6b..67ff9564 100644 --- a/apps/cic-eth/cic_eth/eth/task.py +++ b/apps/cic-eth/cic_eth/eth/task.py @@ -33,7 +33,7 @@ def sign_tx(tx, chain_str): return (tx_hash_hex, tx_transfer_signed['raw'],) -def sign_and_register_tx(tx, chain_str, queue, cache_task=None): +def sign_and_register_tx(tx, chain_str, queue, cache_task=None, session=None): """Signs the provided transaction, and adds it to the transaction queue cache (with status PENDING). :param tx: Standard ethereum transaction data @@ -58,6 +58,7 @@ def sign_and_register_tx(tx, chain_str, queue, cache_task=None): tx_hash_hex, tx_signed_raw_hex, chain_str, + session=session, ) if cache_task != None: diff --git a/apps/cic-eth/cic_eth/queue/tx.py b/apps/cic-eth/cic_eth/queue/tx.py index 29136348..e4a8f818 100644 --- a/apps/cic-eth/cic_eth/queue/tx.py +++ b/apps/cic-eth/cic_eth/queue/tx.py @@ -472,7 +472,9 @@ def get_tx(tx_hash): :rtype: dict """ session = SessionBase.create_session() - tx = session.query(Otx).filter(Otx.tx_hash==tx_hash).first() + q = session.query(Otx) + q = q.filter(Otx.tx_hash==tx_hash) + tx = q.first() if tx == None: session.close() raise NotLocalTxError('queue does not contain tx hash {}'.format(tx_hash)) diff --git a/apps/cic-eth/cic_eth/version.py b/apps/cic-eth/cic_eth/version.py index c02937ad..06391aa8 100644 --- a/apps/cic-eth/cic_eth/version.py +++ b/apps/cic-eth/cic_eth/version.py @@ -10,7 +10,7 @@ version = ( 0, 10, 0, - 'alpha.35', + 'alpha.36', ) version_object = semver.VersionInfo( diff --git a/apps/cic-eth/docker/Dockerfile b/apps/cic-eth/docker/Dockerfile index 14f3b0ba..6ed831b5 100644 --- a/apps/cic-eth/docker/Dockerfile +++ b/apps/cic-eth/docker/Dockerfile @@ -16,7 +16,7 @@ ARG root_requirement_file='requirements.txt' #RUN apk add linux-headers #RUN apk add libffi-dev RUN apt-get update && \ - apt install -y gcc gnupg libpq-dev wget make g++ gnupg bash procps + apt install -y gcc gnupg libpq-dev wget make g++ gnupg bash procps git # Copy shared requirements from top of mono-repo RUN echo "copying root req file ${root_requirement_file}" @@ -42,7 +42,6 @@ COPY cic-eth/config/ /usr/local/etc/cic-eth/ COPY cic-eth/cic_eth/db/migrations/ /usr/local/share/cic-eth/alembic/ COPY cic-eth/crypto_dev_signer_config/ /usr/local/etc/crypto-dev-signer/ -RUN apt-get install -y git && \ - git clone https://gitlab.com/grassrootseconomics/cic-contracts.git && \ +RUN git clone https://gitlab.com/grassrootseconomics/cic-contracts.git && \ mkdir -p /usr/local/share/cic/solidity && \ cp -R cic-contracts/abis /usr/local/share/cic/solidity/abi diff --git a/apps/cic-eth/requirements.txt b/apps/cic-eth/requirements.txt index 3e1e2da1..1c9e50ce 100644 --- a/apps/cic-eth/requirements.txt +++ b/apps/cic-eth/requirements.txt @@ -20,5 +20,5 @@ moolb~=0.1.1b2 eth-address-index~=0.1.0a8 chainlib~=0.0.1a19 hexathon~=0.0.1a3 -chainsyncer~=0.0.1a18 -cic-base==0.1.1a9 +chainsyncer~=0.0.1a19 +cic-base==0.1.1a10 diff --git a/apps/contract-migration/docker/Dockerfile b/apps/contract-migration/docker/Dockerfile index 0d26e429..c257aa50 100644 --- a/apps/contract-migration/docker/Dockerfile +++ b/apps/contract-migration/docker/Dockerfile @@ -107,7 +107,7 @@ RUN cd cic-bancor/python && \ RUN apt-get install -y cargo -ARG cic_base_version=0.1.1a9 +ARG cic_base_version=0.1.1a10 RUN pip install --extra-index-url $pip_extra_index_url cic-base[full_graph]==$cic_base_version ARG cic_registry_version=0.5.3a22 diff --git a/apps/contract-migration/scripts/requirements.txt b/apps/contract-migration/scripts/requirements.txt index 97785e3f..d311e7ae 100644 --- a/apps/contract-migration/scripts/requirements.txt +++ b/apps/contract-migration/scripts/requirements.txt @@ -1,3 +1,3 @@ -cic-base[full_graph]==0.1.1a9 -cic-eth==0.10.0a34 +cic-base[full_graph]==0.1.1a10 +cic-eth==0.10.0a36 cic-types==0.1.0a8 diff --git a/apps/contract-migration/seed_cic_eth.sh b/apps/contract-migration/seed_cic_eth.sh index fdba8973..b8b3002a 100644 --- a/apps/contract-migration/seed_cic_eth.sh +++ b/apps/contract-migration/seed_cic_eth.sh @@ -31,7 +31,7 @@ set -e set -a # We need to not install these here... -pip install --extra-index-url $DEV_PIP_EXTRA_INDEX_URL cic-eth==0.10.0a34 chainlib==0.0.1a19 cic-contracts==0.0.2a2 +pip install --extra-index-url $DEV_PIP_EXTRA_INDEX_URL cic-eth==0.10.0a36 chainlib==0.0.1a19 cic-contracts==0.0.2a2 >&2 echo "create account for gas gifter" old_gas_provider=$DEV_ETH_ACCOUNT_GAS_PROVIDER diff --git a/apps/requirements.txt b/apps/requirements.txt index 00b23fe4..ed354d85 100644 --- a/apps/requirements.txt +++ b/apps/requirements.txt @@ -43,5 +43,5 @@ cryptocurrency-cli-tools==0.0.4 giftable-erc20-token==0.0.7b12 hexathon==0.0.1a3 chainlib==0.0.1a19 -chainsyncer==0.0.1a18 +chainsyncer==0.0.1a19 cic-registry==0.5.3.a22 diff --git a/docker-compose.yml b/docker-compose.yml index b9b3e23a..adac706f 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -161,7 +161,7 @@ services: - -c - | if [[ -f /tmp/cic/config/.env ]]; then source /tmp/cic/config/.env; fi - /usr/local/bin/cic-cache-tracker -vv + /usr/local/bin/cic-cache-tracker -v volumes: - contract-config:/tmp/cic/config/:ro @@ -313,7 +313,7 @@ services: - -c - | if [[ -f /tmp/cic/config/.env ]]; then source /tmp/cic/config/.env; fi - ./start_dispatcher.sh -q cic-eth -vv + ./start_dispatcher.sh -q cic-eth -v # command: "/root/start_dispatcher.sh -q cic-eth -vv"