From aa135175348eee20d128ed2e6df9a694fec614f0 Mon Sep 17 00:00:00 2001 From: Louis Holbrook Date: Wed, 14 Jul 2021 22:02:59 +0000 Subject: [PATCH] Fix dispatcher memory leak --- apps/cic-cache/cic_cache/db/models/base.py | 1 + apps/cic-cache/requirements.txt | 4 ++-- apps/cic-eth/cic_eth/db/models/base.py | 1 + .../cic_eth/pytest/fixtures_database.py | 2 +- apps/cic-eth/cic_eth/queue/query.py | 1 + .../cic_eth/runnable/daemons/dispatcher.py | 23 ++++++++++++++----- apps/cic-eth/config/test/database.ini | 2 +- apps/cic-eth/services_requirements.txt | 4 ++-- .../test_task_address.py} | 22 +++++++++++++++--- apps/cic-eth/tests/unit/db/test_nonce_db.py | 5 ++-- apps/cic-ussd/cic_ussd/db/models/base.py | 1 + apps/contract-migration/reset.sh | 1 + 12 files changed, 50 insertions(+), 17 deletions(-) rename apps/cic-eth/tests/{unit/ext/test_address.py => task/test_task_address.py} (71%) diff --git a/apps/cic-cache/cic_cache/db/models/base.py b/apps/cic-cache/cic_cache/db/models/base.py index 16d01d0e..9da79975 100644 --- a/apps/cic-cache/cic_cache/db/models/base.py +++ b/apps/cic-cache/cic_cache/db/models/base.py @@ -100,3 +100,4 @@ class SessionBase(Model): logg.debug('destroying session {}'.format(session_key)) session.commit() session.close() + del SessionBase.localsessions[session_key] diff --git a/apps/cic-cache/requirements.txt b/apps/cic-cache/requirements.txt index 92e95994..54b7a895 100644 --- a/apps/cic-cache/requirements.txt +++ b/apps/cic-cache/requirements.txt @@ -1,4 +1,4 @@ -cic-base~=0.2.0a2 +cic-base~=0.2.0a4 alembic==1.4.2 confini>=0.3.6rc3,<0.5.0 uwsgi==2.0.19.1 @@ -9,5 +9,5 @@ semver==2.13.0 psycopg2==2.8.6 celery==4.4.7 redis==3.5.3 -chainsyncer[sql]~=0.0.3a4 +chainsyncer[sql]~=0.0.3a5 erc20-faucet~=0.2.2a2 diff --git a/apps/cic-eth/cic_eth/db/models/base.py b/apps/cic-eth/cic_eth/db/models/base.py index 9cdcabf7..47a0bb93 100644 --- a/apps/cic-eth/cic_eth/db/models/base.py +++ b/apps/cic-eth/cic_eth/db/models/base.py @@ -126,3 +126,4 @@ class SessionBase(Model): logg.debug('commit and destroy session {}'.format(session_key)) session.commit() session.close() + del SessionBase.localsessions[session_key] diff --git a/apps/cic-eth/cic_eth/pytest/fixtures_database.py b/apps/cic-eth/cic_eth/pytest/fixtures_database.py index d7fa81d8..539c6bf7 100644 --- a/apps/cic-eth/cic_eth/pytest/fixtures_database.py +++ b/apps/cic-eth/cic_eth/pytest/fixtures_database.py @@ -27,7 +27,7 @@ def database_engine( SessionBase.poolable = False dsn = dsn_from_config(load_config) #SessionBase.connect(dsn, True) - SessionBase.connect(dsn, debug=load_config.get('DATABASE_DEBUG') != None) + SessionBase.connect(dsn, debug=load_config.true('DATABASE_DEBUG')) return dsn diff --git a/apps/cic-eth/cic_eth/queue/query.py b/apps/cic-eth/cic_eth/queue/query.py index 9ce3a17f..c29bb430 100644 --- a/apps/cic-eth/cic_eth/queue/query.py +++ b/apps/cic-eth/cic_eth/queue/query.py @@ -100,6 +100,7 @@ def get_upcoming_tx(chain_spec, status=StatusEnum.READYSEND, not_status=None, re q_outer = q_outer.join(Lock, isouter=True) q_outer = q_outer.filter(or_(Lock.flags==None, Lock.flags.op('&')(LockEnum.SEND.value)==0)) + if not is_alive(status): SessionBase.release_session(session) raise ValueError('not a valid non-final tx value: {}'.format(status)) diff --git a/apps/cic-eth/cic_eth/runnable/daemons/dispatcher.py b/apps/cic-eth/cic_eth/runnable/daemons/dispatcher.py index 08a54185..7e22e911 100644 --- a/apps/cic-eth/cic_eth/runnable/daemons/dispatcher.py +++ b/apps/cic-eth/cic_eth/runnable/daemons/dispatcher.py @@ -90,6 +90,7 @@ class DispatchSyncer: def __init__(self, chain_spec): self.chain_spec = chain_spec + self.session = None def chain(self): @@ -100,16 +101,18 @@ class DispatchSyncer: c = len(txs.keys()) logg.debug('processing {} txs {}'.format(c, list(txs.keys()))) chain_str = str(self.chain_spec) - session = SessionBase.create_session() + self.session = SessionBase.create_session() for k in txs.keys(): tx_raw = txs[k] tx_raw_bytes = bytes.fromhex(strip_0x(tx_raw)) tx = unpack(tx_raw_bytes, self.chain_spec) try: - set_reserved(self.chain_spec, tx['hash'], session=session) + set_reserved(self.chain_spec, tx['hash'], session=self.session) + self.session.commit() except NotLocalTxError as e: logg.warning('dispatcher was triggered with non-local tx {}'.format(tx['hash'])) + self.session.rollback() continue s_check = celery.signature( @@ -132,16 +135,25 @@ class DispatchSyncer: s_check.link(s_send) t = s_check.apply_async() logg.info('processed {}'.format(k)) + self.session.close() + self.session = None - def loop(self, w3, interval): + def loop(self, interval): while run: txs = {} typ = StatusBits.QUEUED utxs = get_upcoming_tx(self.chain_spec, typ) for k in utxs.keys(): txs[k] = utxs[k] - self.process(w3, txs) + try: + conn = RPCConnection.connect(self.chain_spec, 'default') + self.process(conn, txs) + except ConnectionError as e: + if self.session != None: + self.session.close() + self.session = None + logg.error('connection to node failed: {}'.format(e)) if len(utxs) > 0: time.sleep(self.yield_delay) @@ -151,8 +163,7 @@ class DispatchSyncer: def main(): syncer = DispatchSyncer(chain_spec) - conn = RPCConnection.connect(chain_spec, 'default') - syncer.loop(conn, float(config.get('DISPATCHER_LOOP_INTERVAL'))) + syncer.loop(float(config.get('DISPATCHER_LOOP_INTERVAL'))) sys.exit(0) diff --git a/apps/cic-eth/config/test/database.ini b/apps/cic-eth/config/test/database.ini index 8c355ef3..34070210 100644 --- a/apps/cic-eth/config/test/database.ini +++ b/apps/cic-eth/config/test/database.ini @@ -6,4 +6,4 @@ HOST=localhost PORT=5432 ENGINE=sqlite DRIVER=pysqlite -DEBUG= +DEBUG=0 diff --git a/apps/cic-eth/services_requirements.txt b/apps/cic-eth/services_requirements.txt index 16713117..66ea7f8b 100644 --- a/apps/cic-eth/services_requirements.txt +++ b/apps/cic-eth/services_requirements.txt @@ -1,5 +1,5 @@ -chainsyncer[sql]~=0.0.3a4 -chainqueue~=0.0.2b6 +chainsyncer[sql]~=0.0.3a5 +chainqueue~=0.0.2b7 alembic==1.4.2 confini>=0.3.6rc4,<0.5.0 redis==3.5.3 diff --git a/apps/cic-eth/tests/unit/ext/test_address.py b/apps/cic-eth/tests/task/test_task_address.py similarity index 71% rename from apps/cic-eth/tests/unit/ext/test_address.py rename to apps/cic-eth/tests/task/test_task_address.py index b7d92b69..fbcce337 100644 --- a/apps/cic-eth/tests/unit/ext/test_address.py +++ b/apps/cic-eth/tests/task/test_task_address.py @@ -1,4 +1,5 @@ # external imports +import celery from chainlib.eth.nonce import RPCNonceOracle from chainlib.eth.tx import ( receipt, @@ -20,6 +21,7 @@ def test_translate( cic_registry, init_celery_tasks, register_lookups, + celery_session_worker, ): nonce_oracle = RPCNonceOracle(contract_roles['CONTRACT_DEPLOYER'], eth_rpc) @@ -46,6 +48,20 @@ def test_translate( 'recipient': agent_roles['BOB'], 'recipient_label': None, } - tx = translate_tx_addresses(tx, [contract_roles['CONTRACT_DEPLOYER']], default_chain_spec.asdict()) - assert tx['sender_label'] == 'alice' - assert tx['recipient_label'] == 'bob' + + #tx = translate_tx_addresses(tx, [contract_roles['CONTRACT_DEPLOYER']], default_chain_spec.asdict()) + s = celery.signature( + 'cic_eth.ext.address.translate_tx_addresses', + [ + tx, + [contract_roles['CONTRACT_DEPLOYER']], + default_chain_spec.asdict(), + ], + queue=None, + ) + t = s.apply_async() + r = t.get_leaf() + assert t.successful() + + assert r['sender_label'] == 'alice' + assert r['recipient_label'] == 'bob' diff --git a/apps/cic-eth/tests/unit/db/test_nonce_db.py b/apps/cic-eth/tests/unit/db/test_nonce_db.py index 2b2dee6e..400cee16 100644 --- a/apps/cic-eth/tests/unit/db/test_nonce_db.py +++ b/apps/cic-eth/tests/unit/db/test_nonce_db.py @@ -1,6 +1,7 @@ -# third-party imports +# extended imports import pytest import uuid +import unittest # local imports from cic_eth.db.models.nonce import ( @@ -55,7 +56,7 @@ def test_nonce_reserve( o = q.first() assert o.nonce == 43 - nonce = NonceReservation.release(eth_empty_accounts[0], str(uu)) + nonce = NonceReservation.release(eth_empty_accounts[0], str(uu), session=init_database) init_database.commit() assert nonce == (str(uu), 42) diff --git a/apps/cic-ussd/cic_ussd/db/models/base.py b/apps/cic-ussd/cic_ussd/db/models/base.py index d76ee6c4..af5b9067 100644 --- a/apps/cic-ussd/cic_ussd/db/models/base.py +++ b/apps/cic-ussd/cic_ussd/db/models/base.py @@ -127,3 +127,4 @@ class SessionBase(Model): logg.debug('commit and destroy session {}'.format(session_key)) session.commit() session.close() + del SessionBase.localsessions[session_key] diff --git a/apps/contract-migration/reset.sh b/apps/contract-migration/reset.sh index 2066e4c5..3b99a3d0 100755 --- a/apps/contract-migration/reset.sh +++ b/apps/contract-migration/reset.sh @@ -148,6 +148,7 @@ else fi mkdir -p $CIC_DATA_DIR +>&2 echo using data dir $CIC_DATA_DIR for environment variable dump # this is consumed in downstream services to set environment variables cat << EOF > $CIC_DATA_DIR/.env