From 32ce706b2d9036eee4a904b78ccba71352549db5 Mon Sep 17 00:00:00 2001 From: nolash Date: Thu, 18 Feb 2021 23:09:48 +0100 Subject: [PATCH] WIP fix leaks in syncer filters --- apps/cic-eth/cic_eth/queue/tx.py | 21 ++++++++++--------- .../runnable/daemons/filters/callback.py | 2 +- .../cic_eth/runnable/daemons/filters/gas.py | 16 +++++++------- .../runnable/daemons/filters/register.py | 2 +- .../cic_eth/runnable/daemons/filters/tx.py | 9 +++++--- .../cic_eth/runnable/daemons/manager.py | 2 +- apps/cic-eth/cic_eth/sync/backend.py | 12 +++++++---- apps/cic-eth/cic_eth/sync/mined.py | 2 +- 8 files changed, 38 insertions(+), 28 deletions(-) diff --git a/apps/cic-eth/cic_eth/queue/tx.py b/apps/cic-eth/cic_eth/queue/tx.py index b6050636..9269bd89 100644 --- a/apps/cic-eth/cic_eth/queue/tx.py +++ b/apps/cic-eth/cic_eth/queue/tx.py @@ -500,7 +500,7 @@ def get_nonce_tx(nonce, sender, chain_id): # TODO: pass chain spec instead of chain id -def get_paused_txs(status=None, sender=None, chain_id=0): +def get_paused_txs(status=None, sender=None, chain_id=0, session=None): """Returns not finalized transactions that have been attempted sent without success. :param status: If set, will return transactions with this local queue status only @@ -513,12 +513,13 @@ def get_paused_txs(status=None, sender=None, chain_id=0): :returns: Transactions :rtype: dict, with transaction hash as key, signed raw transaction as value """ - session = SessionBase.create_session() + session = SessionBase.bind_session(session) q = session.query(Otx) if status != None: #if status == StatusEnum.PENDING or status >= StatusEnum.SENT: if status == StatusEnum.PENDING or status & StatusBits.IN_NETWORK or not is_alive(status): + SessionBase.release_session(session) raise ValueError('not a valid paused tx value: {}'.format(status)) q = q.filter(Otx.status.op('&')(status.value)==status.value) q = q.join(TxCache) @@ -538,12 +539,12 @@ def get_paused_txs(status=None, sender=None, chain_id=0): #gas += tx['gas'] * tx['gasPrice'] txs[r.tx_hash] = r.signed_tx - session.close() + SessionBase.release_session(session) return txs -def get_status_tx(status, before=None, exact=False, limit=0): +def get_status_tx(status, before=None, exact=False, limit=0, session=None): """Retrieve transaction with a specific queue status. :param status: Status to match transactions with @@ -556,7 +557,7 @@ def get_status_tx(status, before=None, exact=False, limit=0): :rtype: list of cic_eth.db.models.otx.Otx """ txs = {} - session = SessionBase.create_session() + session = SessionBase.bind_session(session) q = session.query(Otx) q = q.join(TxCache) q = q.filter(TxCache.date_updated 0: logg.debug('gas refill tx {}'.format(tx_hash_hex)) - session = SessionBase.create_session() + session = SessionBase.bind_session(session) q = session.query(TxCache.recipient) q = q.join(Otx) q = q.filter(Otx.tx_hash==tx_hash_hex) r = q.first() - session.close() - if r == None: logg.warning('unsolicited gas refill tx {}'.format(tx_hash_hex)) + SessionBase.release_session(session) return chain_spec = ChainSpec.from_chain_str(chain_str) - txs = get_paused_txs(StatusEnum.WAITFORGAS, r[0], chain_spec.chain_id()) + txs = get_paused_txs(StatusEnum.WAITFORGAS, r[0], chain_spec.chain_id(), session=session) + + SessionBase.release_session(session) if len(txs) > 0: logg.info('resuming gas-in-waiting txs for {}: {}'.format(r[0], txs.keys())) @@ -49,6 +51,6 @@ class GasFilter(SyncFilter): r[0], 0, tx_hashes_hex=list(txs.keys()), - queue=queue, + queue=self.queue, ) s.apply_async() diff --git a/apps/cic-eth/cic_eth/runnable/daemons/filters/register.py b/apps/cic-eth/cic_eth/runnable/daemons/filters/register.py index 180cdca6..3c003d35 100644 --- a/apps/cic-eth/cic_eth/runnable/daemons/filters/register.py +++ b/apps/cic-eth/cic_eth/runnable/daemons/filters/register.py @@ -15,7 +15,7 @@ account_registry_add_log_hash = '0x5ed3bdd47b9af629827a8d129aa39c870b10c03f0153f class RegistrationFilter(SyncFilter): - def filter(self, w3, tx, rcpt, chain_spec): + def filter(self, w3, tx, rcpt, chain_spec, session=None): logg.debug('applying registration filter') registered_address = None for l in rcpt['logs']: diff --git a/apps/cic-eth/cic_eth/runnable/daemons/filters/tx.py b/apps/cic-eth/cic_eth/runnable/daemons/filters/tx.py index 6a645ff3..ceab9a10 100644 --- a/apps/cic-eth/cic_eth/runnable/daemons/filters/tx.py +++ b/apps/cic-eth/cic_eth/runnable/daemons/filters/tx.py @@ -6,6 +6,7 @@ import celery # local imports from cic_eth.db.models.otx import Otx +from cic_eth.db.models.base import SessionBase from .base import SyncFilter logg = logging.getLogger() @@ -17,15 +18,17 @@ class TxFilter(SyncFilter): self.queue = queue - def filter(self, w3, tx, rcpt, chain_spec): + def filter(self, w3, tx, rcpt, chain_spec, session=None): + session = SessionBase.bind_session(session) logg.debug('applying tx filter') tx_hash_hex = tx.hash.hex() - otx = Otx.load(tx_hash_hex) + otx = Otx.load(tx_hash_hex, session=session) + SessionBase.release_session(session) if otx == None: logg.debug('tx {} not found locally, skipping'.format(tx_hash_hex)) return None logg.info('otx found {}'.format(otx.tx_hash)) - s = celery.siignature( + s = celery.signature( 'cic_eth.queue.tx.set_final_status', [ tx_hash_hex, diff --git a/apps/cic-eth/cic_eth/runnable/daemons/manager.py b/apps/cic-eth/cic_eth/runnable/daemons/manager.py index c6d7b836..db83460d 100644 --- a/apps/cic-eth/cic_eth/runnable/daemons/manager.py +++ b/apps/cic-eth/cic_eth/runnable/daemons/manager.py @@ -180,7 +180,7 @@ def main(): registration_filter = RegistrationFilter() - gas_filter = GasFilter(c.gas_provider()) + gas_filter = GasFilter(c.gas_provider(), queue) i = 0 for syncer in syncers: diff --git a/apps/cic-eth/cic_eth/sync/backend.py b/apps/cic-eth/cic_eth/sync/backend.py index af58be94..eaf3ddc6 100644 --- a/apps/cic-eth/cic_eth/sync/backend.py +++ b/apps/cic-eth/cic_eth/sync/backend.py @@ -28,20 +28,24 @@ class SyncerBackend: def connect(self): """Loads the state of the syncer session with the given id. """ - self.db_session = SessionBase.create_session() + if self.db_session == None: + self.db_session = SessionBase.create_session() q = self.db_session.query(BlockchainSync) q = q.filter(BlockchainSync.id==self.object_id) self.db_object = q.first() if self.db_object == None: + self.disconnect() raise ValueError('sync entry with id {} not found'.format(self.object_id)) def disconnect(self): """Commits state of sync to backend. """ - self.db_session.add(self.db_object) - self.db_session.commit() - self.db_session.close() + if self.db_session != None: + self.db_session.add(self.db_object) + self.db_session.commit() + self.db_session.close() + self.db_session = None def chain(self): diff --git a/apps/cic-eth/cic_eth/sync/mined.py b/apps/cic-eth/cic_eth/sync/mined.py index 616eea54..e0e165b6 100644 --- a/apps/cic-eth/cic_eth/sync/mined.py +++ b/apps/cic-eth/cic_eth/sync/mined.py @@ -56,7 +56,7 @@ class MinedSyncer(Syncer): # TODO: ensure filter loop can complete on graceful shutdown for f in self.filter: #try: - task_uuid = f(w3, tx, rcpt, self.chain()) + task_uuid = f(w3, tx, rcpt, self.chain(), self.db_session) #except Exception as e: # logg.error('error in filter {} tx {}: {}'.format(f, tx_hash_hex, e)) # continue