Fix more postgres session leaks in role

This commit is contained in:
nolash 2021-03-01 20:35:05 +01:00
parent bcf20cbfd3
commit 3ceafa90f4
Signed by untrusted user who does not match committer: lash
GPG Key ID: 21D2E7BB88C2A746
19 changed files with 111 additions and 91 deletions

View File

@ -8,6 +8,7 @@ from cic_registry import zero_address
# local imports # local imports
from cic_eth.db.enum import LockEnum from cic_eth.db.enum import LockEnum
from cic_eth.db.models.base import SessionBase
from cic_eth.db.models.lock import Lock from cic_eth.db.models.lock import Lock
from cic_eth.error import LockedError from cic_eth.error import LockedError
@ -116,9 +117,10 @@ def unlock_queue(chained_input, chain_str, address=zero_address):
@celery_app.task() @celery_app.task()
def check_lock(chained_input, chain_str, lock_flags, address=None): def check_lock(chained_input, chain_str, lock_flags, address=None):
r = Lock.check(chain_str, lock_flags, address=zero_address) session = SessionBase.create_session()
r = Lock.check(chain_str, lock_flags, address=zero_address, session=session)
if address != None: if address != None:
r |= Lock.check(chain_str, lock_flags, address=address) r |= Lock.check(chain_str, lock_flags, address=address, session=session)
if r > 0: if r > 0:
logg.debug('lock check {} has match {} for {}'.format(lock_flags, r, address)) logg.debug('lock check {} has match {} for {}'.format(lock_flags, r, address))
raise LockedError(r) raise LockedError(r)

View File

@ -114,7 +114,6 @@ class SessionBase(Model):
@staticmethod @staticmethod
def release_session(session=None): def release_session(session=None):
session.flush()
session_key = str(id(session)) session_key = str(id(session))
if SessionBase.localsessions.get(session_key) != None: if SessionBase.localsessions.get(session_key) != None:
logg.debug('destroying session {}'.format(session_key)) logg.debug('destroying session {}'.format(session_key))

View File

@ -55,11 +55,9 @@ class Lock(SessionBase):
:returns: New flag state of entry :returns: New flag state of entry
:rtype: number :rtype: number
""" """
localsession = session session = SessionBase.bind_session(session)
if localsession == None:
localsession = SessionBase.create_session()
q = localsession.query(Lock) q = session.query(Lock)
#q = q.join(TxCache, isouter=True) #q = q.join(TxCache, isouter=True)
q = q.filter(Lock.address==address) q = q.filter(Lock.address==address)
q = q.filter(Lock.blockchain==chain_str) q = q.filter(Lock.blockchain==chain_str)
@ -71,7 +69,8 @@ class Lock(SessionBase):
lock.address = address lock.address = address
lock.blockchain = chain_str lock.blockchain = chain_str
if tx_hash != None: if tx_hash != None:
q = localsession.query(Otx) session.flush()
q = session.query(Otx)
q = q.filter(Otx.tx_hash==tx_hash) q = q.filter(Otx.tx_hash==tx_hash)
otx = q.first() otx = q.first()
if otx != None: if otx != None:
@ -80,11 +79,10 @@ class Lock(SessionBase):
lock.flags |= flags lock.flags |= flags
r = lock.flags r = lock.flags
localsession.add(lock) session.add(lock)
localsession.commit() session.commit()
if session == None: SessionBase.release_session(session)
localsession.close()
return r return r
@ -110,11 +108,9 @@ class Lock(SessionBase):
:returns: New flag state of entry :returns: New flag state of entry
:rtype: number :rtype: number
""" """
localsession = session session = SessionBase.bind_session(session)
if localsession == None:
localsession = SessionBase.create_session()
q = localsession.query(Lock) q = session.query(Lock)
#q = q.join(TxCache, isouter=True) #q = q.join(TxCache, isouter=True)
q = q.filter(Lock.address==address) q = q.filter(Lock.address==address)
q = q.filter(Lock.blockchain==chain_str) q = q.filter(Lock.blockchain==chain_str)
@ -124,14 +120,13 @@ class Lock(SessionBase):
if lock != None: if lock != None:
lock.flags &= ~flags lock.flags &= ~flags
if lock.flags == 0: if lock.flags == 0:
localsession.delete(lock) session.delete(lock)
else: else:
localsession.add(lock) session.add(lock)
r = lock.flags r = lock.flags
localsession.commit() session.commit()
if session == None: SessionBase.release_session(session)
localsession.close()
return r return r
@ -156,22 +151,20 @@ class Lock(SessionBase):
:rtype: number :rtype: number
""" """
localsession = session session = SessionBase.bind_session(session)
if localsession == None:
localsession = SessionBase.create_session()
q = localsession.query(Lock) q = session.query(Lock)
#q = q.join(TxCache, isouter=True) #q = q.join(TxCache, isouter=True)
q = q.filter(Lock.address==address) q = q.filter(Lock.address==address)
q = q.filter(Lock.blockchain==chain_str) q = q.filter(Lock.blockchain==chain_str)
q = q.filter(Lock.flags.op('&')(flags)==flags) q = q.filter(Lock.flags.op('&')(flags)==flags)
lock = q.first() lock = q.first()
if session == None:
localsession.close()
r = 0 r = 0
if lock != None: if lock != None:
r = lock.flags & flags r = lock.flags & flags
SessionBase.release_session(session)
return r return r

View File

@ -21,12 +21,9 @@ class Nonce(SessionBase):
@staticmethod @staticmethod
def get(address, session=None): def get(address, session=None):
localsession = session session = SessionBase.bind_session(session)
if localsession == None:
localsession = SessionBase.create_session()
q = session.query(Nonce)
q = localsession.query(Nonce)
q = q.filter(Nonce.address_hex==address) q = q.filter(Nonce.address_hex==address)
nonce = q.first() nonce = q.first()
@ -34,28 +31,29 @@ class Nonce(SessionBase):
if nonce != None: if nonce != None:
nonce_value = nonce.nonce; nonce_value = nonce.nonce;
if session == None: SessionBase.release_session(session)
localsession.close()
return nonce_value return nonce_value
@staticmethod @staticmethod
def __get(conn, address): def __get(session, address):
r = conn.execute("SELECT nonce FROM nonce WHERE address_hex = '{}'".format(address)) r = session.execute("SELECT nonce FROM nonce WHERE address_hex = '{}'".format(address))
nonce = r.fetchone() nonce = r.fetchone()
session.flush()
if nonce == None: if nonce == None:
return None return None
return nonce[0] return nonce[0]
@staticmethod @staticmethod
def __set(conn, address, nonce): def __set(session, address, nonce):
conn.execute("UPDATE nonce set nonce = {} WHERE address_hex = '{}'".format(nonce, address)) session.execute("UPDATE nonce set nonce = {} WHERE address_hex = '{}'".format(nonce, address))
session.flush()
@staticmethod @staticmethod
def next(address, initial_if_not_exists=0): def next(address, initial_if_not_exists=0, session=None):
"""Generate next nonce for the given address. """Generate next nonce for the given address.
If there is no previous nonce record for the address, the nonce may be initialized to a specified value, or 0 if no value has been given. If there is no previous nonce record for the address, the nonce may be initialized to a specified value, or 0 if no value has been given.
@ -67,20 +65,32 @@ class Nonce(SessionBase):
:returns: Nonce :returns: Nonce
:rtype: number :rtype: number
""" """
conn = Nonce.engine.connect() session = SessionBase.bind_session(session)
SessionBase.release_session(session)
session.begin_nested()
#conn = Nonce.engine.connect()
if Nonce.transactional: if Nonce.transactional:
conn.execute('BEGIN') #session.execute('BEGIN')
conn.execute('LOCK TABLE nonce IN SHARE ROW EXCLUSIVE MODE') session.execute('LOCK TABLE nonce IN SHARE ROW EXCLUSIVE MODE')
nonce = Nonce.__get(conn, address) session.flush()
nonce = Nonce.__get(session, address)
logg.debug('get nonce {} for address {}'.format(nonce, address)) logg.debug('get nonce {} for address {}'.format(nonce, address))
if nonce == None: if nonce == None:
nonce = initial_if_not_exists nonce = initial_if_not_exists
conn.execute("INSERT INTO nonce (nonce, address_hex) VALUES ({}, '{}')".format(nonce, address)) session.execute("INSERT INTO nonce (nonce, address_hex) VALUES ({}, '{}')".format(nonce, address))
session.flush()
logg.debug('setting default nonce to {} for address {}'.format(nonce, address)) logg.debug('setting default nonce to {} for address {}'.format(nonce, address))
Nonce.__set(conn, address, nonce+1) Nonce.__set(session, address, nonce+1)
if Nonce.transactional: #if Nonce.transactional:
conn.execute('COMMIT') #session.execute('COMMIT')
conn.close() #session.execute('UNLOCK TABLE nonce')
#conn.close()
session.commit()
session.commit()
SessionBase.release_session(session)
return nonce return nonce

View File

@ -40,12 +40,14 @@ class AccountRole(SessionBase):
session = SessionBase.bind_session(session) session = SessionBase.bind_session(session)
role = AccountRole.get_role(tag, session) role = AccountRole.__get_role(tag, session)
r = zero_address r = zero_address
if role != None: if role != None:
r = role.address_hex r = role.address_hex
session.flush()
SessionBase.release_session(session) SessionBase.release_session(session)
return r return r
@ -64,6 +66,8 @@ class AccountRole(SessionBase):
role = AccountRole.__get_role(tag, session) role = AccountRole.__get_role(tag, session)
session.flush()
SessionBase.release_session(session) SessionBase.release_session(session)
return role return role
@ -74,7 +78,6 @@ class AccountRole(SessionBase):
q = session.query(AccountRole) q = session.query(AccountRole)
q = q.filter(AccountRole.tag==tag) q = q.filter(AccountRole.tag==tag)
r = q.first() r = q.first()
session.flush()
return r return r
@ -93,11 +96,13 @@ class AccountRole(SessionBase):
""" """
session = SessionBase.bind_session(session) session = SessionBase.bind_session(session)
role = AccountRole.get_role(tag, session) role = AccountRole.__get_role(tag, session)
if role == None: if role == None:
role = AccountRole(tag) role = AccountRole(tag)
role.address_hex = address_hex role.address_hex = address_hex
session.flush()
SessionBase.release_session(session) SessionBase.release_session(session)
return role return role

View File

@ -85,26 +85,27 @@ class TxCache(SessionBase):
:param tx_hash_new: tx hash to associate the copied entry with :param tx_hash_new: tx hash to associate the copied entry with
:type tx_hash_new: str, 0x-hex :type tx_hash_new: str, 0x-hex
""" """
localsession = SessionBase.bind_session(session) session = SessionBase.bind_session(session)
q = localsession.query(TxCache) q = session.query(TxCache)
q = q.join(Otx) q = q.join(Otx)
q = q.filter(Otx.tx_hash==tx_hash_original) q = q.filter(Otx.tx_hash==tx_hash_original)
txc = q.first() txc = q.first()
if txc == None: if txc == None:
SessionBase.release_session(localsession) SessionBase.release_session(session)
raise NotLocalTxError('original {}'.format(tx_hash_original)) raise NotLocalTxError('original {}'.format(tx_hash_original))
if txc.block_number != None: if txc.block_number != None:
SessionBase.release_session(localsession) SessionBase.release_session(session)
raise TxStateChangeError('cannot clone tx cache of confirmed tx {}'.format(tx_hash_original)) raise TxStateChangeError('cannot clone tx cache of confirmed tx {}'.format(tx_hash_original))
q = localsession.query(Otx) session.flush()
q = session.query(Otx)
q = q.filter(Otx.tx_hash==tx_hash_new) q = q.filter(Otx.tx_hash==tx_hash_new)
otx = q.first() otx = q.first()
if otx == None: if otx == None:
SessionBase.release_session(localsession) SessionBase.release_session(session)
raise NotLocalTxError('new {}'.format(tx_hash_new)) raise NotLocalTxError('new {}'.format(tx_hash_new))
txc_new = TxCache( txc_new = TxCache(
@ -115,18 +116,21 @@ class TxCache(SessionBase):
txc.destination_token_address, txc.destination_token_address,
int(txc.from_value), int(txc.from_value),
int(txc.to_value), int(txc.to_value),
session=session,
) )
localsession.add(txc_new) session.add(txc_new)
localsession.commit() session.commit()
SessionBase.release_session(localsession) SessionBase.release_session(session)
def __init__(self, tx_hash, sender, recipient, source_token_address, destination_token_address, from_value, to_value, block_number=None, tx_index=None, session=None): def __init__(self, tx_hash, sender, recipient, source_token_address, destination_token_address, from_value, to_value, block_number=None, tx_index=None, session=None):
localsession = SessionBase.bind_session(session) session = SessionBase.bind_session(session)
tx = localsession.query(Otx).filter(Otx.tx_hash==tx_hash).first() q = session.query(Otx)
q = q.filter(Otx.tx_hash==tx_hash)
tx = q.first()
if tx == None: if tx == None:
SessionBase.release_session(localsession) SessionBase.release_session(session)
raise FileNotFoundError('outgoing transaction record unknown {} (add a Tx first)'.format(tx_hash)) raise FileNotFoundError('outgoing transaction record unknown {} (add a Tx first)'.format(tx_hash))
self.otx_id = tx.id self.otx_id = tx.id
@ -143,5 +147,5 @@ class TxCache(SessionBase):
self.date_updated = self.date_created self.date_updated = self.date_created
self.date_checked = self.date_created self.date_checked = self.date_created
SessionBase.release_session(localsession) SessionBase.release_session(session)

View File

@ -36,6 +36,7 @@ class AccountTxFactory(TxFactory):
self, self,
address, address,
chain_spec, chain_spec,
session=None,
): ):
"""Register an Ethereum account address with the on-chain account registry """Register an Ethereum account address with the on-chain account registry
@ -58,7 +59,7 @@ class AccountTxFactory(TxFactory):
'gas': gas, 'gas': gas,
'gasPrice': self.gas_price, 'gasPrice': self.gas_price,
'chainId': chain_spec.chain_id(), 'chainId': chain_spec.chain_id(),
'nonce': self.next_nonce(), 'nonce': self.next_nonce(session=session),
'value': 0, 'value': 0,
}) })
return tx_add return tx_add
@ -68,6 +69,7 @@ class AccountTxFactory(TxFactory):
self, self,
address, address,
chain_spec, chain_spec,
session=None,
): ):
"""Trigger the on-chain faucet to disburse tokens to the provided Ethereum account """Trigger the on-chain faucet to disburse tokens to the provided Ethereum account
@ -88,7 +90,7 @@ class AccountTxFactory(TxFactory):
'gas': gas, 'gas': gas,
'gasPrice': self.gas_price, 'gasPrice': self.gas_price,
'chainId': chain_spec.chain_id(), 'chainId': chain_spec.chain_id(),
'nonce': self.next_nonce(), 'nonce': self.next_nonce(session=session),
'value': 0, 'value': 0,
}) })
return tx_add return tx_add
@ -157,7 +159,10 @@ def create(password, chain_str):
# TODO: this can safely be set to zero, since we are randomly creating account # TODO: this can safely be set to zero, since we are randomly creating account
n = c.w3.eth.getTransactionCount(a, 'pending') n = c.w3.eth.getTransactionCount(a, 'pending')
session = SessionBase.create_session() session = SessionBase.create_session()
o = session.query(Nonce).filter(Nonce.address_hex==a).first() q = session.query(Nonce)
q = q.filter(Nonce.address_hex==a)
o = q.first()
session.flush()
if o == None: if o == None:
o = Nonce() o = Nonce()
o.address_hex = a o.address_hex = a
@ -186,20 +191,20 @@ def register(self, account_address, chain_str, writer_address=None):
session = SessionBase.create_session() session = SessionBase.create_session()
if writer_address == None: if writer_address == None:
writer_address = AccountRole.get_address('ACCOUNTS_INDEX_WRITER', session) writer_address = AccountRole.get_address('ACCOUNTS_INDEX_WRITER', session=session)
session.close()
if writer_address == zero_address: if writer_address == zero_address:
session.close()
raise RoleMissingError(account_address) raise RoleMissingError(account_address)
logg.debug('adding account address {} to index; writer {}'.format(account_address, writer_address)) logg.debug('adding account address {} to index; writer {}'.format(account_address, writer_address))
queue = self.request.delivery_info['routing_key'] queue = self.request.delivery_info['routing_key']
c = RpcClient(chain_spec, holder_address=writer_address) c = RpcClient(chain_spec, holder_address=writer_address)
txf = AccountTxFactory(writer_address, c) txf = AccountTxFactory(writer_address, c)
tx_add = txf.add(account_address, chain_spec) tx_add = txf.add(account_address, chain_spec, session=session)
session.close()
(tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_add, chain_str, queue, 'cic_eth.eth.account.cache_account_data') (tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_add, chain_str, queue, 'cic_eth.eth.account.cache_account_data')
gas_budget = tx_add['gas'] * tx_add['gasPrice'] gas_budget = tx_add['gas'] * tx_add['gasPrice']

View File

@ -32,10 +32,10 @@ class TxFactory:
logg.debug('txfactory instance address {} gas price'.format(self.address, self.gas_price)) logg.debug('txfactory instance address {} gas price'.format(self.address, self.gas_price))
def next_nonce(self): def next_nonce(self, session=None):
"""Returns the current cached nonce value, and increments it for next transaction. """Returns the current cached nonce value, and increments it for next transaction.
:returns: Nonce :returns: Nonce
:rtype: number :rtype: number
""" """
return self.nonce_oracle.next() return self.nonce_oracle.next(session=session)

View File

@ -14,10 +14,10 @@ class NonceOracle():
self.default_nonce = default_nonce self.default_nonce = default_nonce
def next(self): def next(self, session=None):
"""Get next unique nonce. """Get next unique nonce.
:returns: Nonce :returns: Nonce
:rtype: number :rtype: number
""" """
return Nonce.next(self.address, self.default_nonce) return Nonce.next(self.address, self.default_nonce, session=session)

View File

@ -33,7 +33,7 @@ def sign_tx(tx, chain_str):
return (tx_hash_hex, tx_transfer_signed['raw'],) return (tx_hash_hex, tx_transfer_signed['raw'],)
def sign_and_register_tx(tx, chain_str, queue, cache_task=None): def sign_and_register_tx(tx, chain_str, queue, cache_task=None, session=None):
"""Signs the provided transaction, and adds it to the transaction queue cache (with status PENDING). """Signs the provided transaction, and adds it to the transaction queue cache (with status PENDING).
:param tx: Standard ethereum transaction data :param tx: Standard ethereum transaction data
@ -58,6 +58,7 @@ def sign_and_register_tx(tx, chain_str, queue, cache_task=None):
tx_hash_hex, tx_hash_hex,
tx_signed_raw_hex, tx_signed_raw_hex,
chain_str, chain_str,
session=session,
) )
if cache_task != None: if cache_task != None:

View File

@ -472,7 +472,9 @@ def get_tx(tx_hash):
:rtype: dict :rtype: dict
""" """
session = SessionBase.create_session() session = SessionBase.create_session()
tx = session.query(Otx).filter(Otx.tx_hash==tx_hash).first() q = session.query(Otx)
q = q.filter(Otx.tx_hash==tx_hash)
tx = q.first()
if tx == None: if tx == None:
session.close() session.close()
raise NotLocalTxError('queue does not contain tx hash {}'.format(tx_hash)) raise NotLocalTxError('queue does not contain tx hash {}'.format(tx_hash))

View File

@ -10,7 +10,7 @@ version = (
0, 0,
10, 10,
0, 0,
'alpha.35', 'alpha.36',
) )
version_object = semver.VersionInfo( version_object = semver.VersionInfo(

View File

@ -16,7 +16,7 @@ ARG root_requirement_file='requirements.txt'
#RUN apk add linux-headers #RUN apk add linux-headers
#RUN apk add libffi-dev #RUN apk add libffi-dev
RUN apt-get update && \ RUN apt-get update && \
apt install -y gcc gnupg libpq-dev wget make g++ gnupg bash procps apt install -y gcc gnupg libpq-dev wget make g++ gnupg bash procps git
# Copy shared requirements from top of mono-repo # Copy shared requirements from top of mono-repo
RUN echo "copying root req file ${root_requirement_file}" RUN echo "copying root req file ${root_requirement_file}"
@ -42,7 +42,6 @@ COPY cic-eth/config/ /usr/local/etc/cic-eth/
COPY cic-eth/cic_eth/db/migrations/ /usr/local/share/cic-eth/alembic/ COPY cic-eth/cic_eth/db/migrations/ /usr/local/share/cic-eth/alembic/
COPY cic-eth/crypto_dev_signer_config/ /usr/local/etc/crypto-dev-signer/ COPY cic-eth/crypto_dev_signer_config/ /usr/local/etc/crypto-dev-signer/
RUN apt-get install -y git && \ RUN git clone https://gitlab.com/grassrootseconomics/cic-contracts.git && \
git clone https://gitlab.com/grassrootseconomics/cic-contracts.git && \
mkdir -p /usr/local/share/cic/solidity && \ mkdir -p /usr/local/share/cic/solidity && \
cp -R cic-contracts/abis /usr/local/share/cic/solidity/abi cp -R cic-contracts/abis /usr/local/share/cic/solidity/abi

View File

@ -20,5 +20,5 @@ moolb~=0.1.1b2
eth-address-index~=0.1.0a8 eth-address-index~=0.1.0a8
chainlib~=0.0.1a19 chainlib~=0.0.1a19
hexathon~=0.0.1a3 hexathon~=0.0.1a3
chainsyncer~=0.0.1a18 chainsyncer~=0.0.1a19
cic-base==0.1.1a9 cic-base==0.1.1a10

View File

@ -107,7 +107,7 @@ RUN cd cic-bancor/python && \
RUN apt-get install -y cargo RUN apt-get install -y cargo
ARG cic_base_version=0.1.1a9 ARG cic_base_version=0.1.1a10
RUN pip install --extra-index-url $pip_extra_index_url cic-base[full_graph]==$cic_base_version RUN pip install --extra-index-url $pip_extra_index_url cic-base[full_graph]==$cic_base_version
ARG cic_registry_version=0.5.3a22 ARG cic_registry_version=0.5.3a22

View File

@ -1,3 +1,3 @@
cic-base[full_graph]==0.1.1a9 cic-base[full_graph]==0.1.1a10
cic-eth==0.10.0a34 cic-eth==0.10.0a36
cic-types==0.1.0a8 cic-types==0.1.0a8

View File

@ -31,7 +31,7 @@ set -e
set -a set -a
# We need to not install these here... # We need to not install these here...
pip install --extra-index-url $DEV_PIP_EXTRA_INDEX_URL cic-eth==0.10.0a34 chainlib==0.0.1a19 cic-contracts==0.0.2a2 pip install --extra-index-url $DEV_PIP_EXTRA_INDEX_URL cic-eth==0.10.0a36 chainlib==0.0.1a19 cic-contracts==0.0.2a2
>&2 echo "create account for gas gifter" >&2 echo "create account for gas gifter"
old_gas_provider=$DEV_ETH_ACCOUNT_GAS_PROVIDER old_gas_provider=$DEV_ETH_ACCOUNT_GAS_PROVIDER

View File

@ -43,5 +43,5 @@ cryptocurrency-cli-tools==0.0.4
giftable-erc20-token==0.0.7b12 giftable-erc20-token==0.0.7b12
hexathon==0.0.1a3 hexathon==0.0.1a3
chainlib==0.0.1a19 chainlib==0.0.1a19
chainsyncer==0.0.1a18 chainsyncer==0.0.1a19
cic-registry==0.5.3.a22 cic-registry==0.5.3.a22

View File

@ -161,7 +161,7 @@ services:
- -c - -c
- | - |
if [[ -f /tmp/cic/config/.env ]]; then source /tmp/cic/config/.env; fi if [[ -f /tmp/cic/config/.env ]]; then source /tmp/cic/config/.env; fi
/usr/local/bin/cic-cache-tracker -vv /usr/local/bin/cic-cache-tracker -v
volumes: volumes:
- contract-config:/tmp/cic/config/:ro - contract-config:/tmp/cic/config/:ro
@ -313,7 +313,7 @@ services:
- -c - -c
- | - |
if [[ -f /tmp/cic/config/.env ]]; then source /tmp/cic/config/.env; fi if [[ -f /tmp/cic/config/.env ]]; then source /tmp/cic/config/.env; fi
./start_dispatcher.sh -q cic-eth -vv ./start_dispatcher.sh -q cic-eth -v
# command: "/root/start_dispatcher.sh -q cic-eth -vv" # command: "/root/start_dispatcher.sh -q cic-eth -vv"