Fix dispatcher memory leak

This commit is contained in:
Louis Holbrook 2021-07-14 22:02:59 +00:00
parent 884b18f2f1
commit aa13517534
12 changed files with 50 additions and 17 deletions

View File

@ -100,3 +100,4 @@ class SessionBase(Model):
logg.debug('destroying session {}'.format(session_key)) logg.debug('destroying session {}'.format(session_key))
session.commit() session.commit()
session.close() session.close()
del SessionBase.localsessions[session_key]

View File

@ -1,4 +1,4 @@
cic-base~=0.2.0a2 cic-base~=0.2.0a4
alembic==1.4.2 alembic==1.4.2
confini>=0.3.6rc3,<0.5.0 confini>=0.3.6rc3,<0.5.0
uwsgi==2.0.19.1 uwsgi==2.0.19.1
@ -9,5 +9,5 @@ semver==2.13.0
psycopg2==2.8.6 psycopg2==2.8.6
celery==4.4.7 celery==4.4.7
redis==3.5.3 redis==3.5.3
chainsyncer[sql]~=0.0.3a4 chainsyncer[sql]~=0.0.3a5
erc20-faucet~=0.2.2a2 erc20-faucet~=0.2.2a2

View File

@ -126,3 +126,4 @@ class SessionBase(Model):
logg.debug('commit and destroy session {}'.format(session_key)) logg.debug('commit and destroy session {}'.format(session_key))
session.commit() session.commit()
session.close() session.close()
del SessionBase.localsessions[session_key]

View File

@ -27,7 +27,7 @@ def database_engine(
SessionBase.poolable = False SessionBase.poolable = False
dsn = dsn_from_config(load_config) dsn = dsn_from_config(load_config)
#SessionBase.connect(dsn, True) #SessionBase.connect(dsn, True)
SessionBase.connect(dsn, debug=load_config.get('DATABASE_DEBUG') != None) SessionBase.connect(dsn, debug=load_config.true('DATABASE_DEBUG'))
return dsn return dsn

View File

@ -100,6 +100,7 @@ def get_upcoming_tx(chain_spec, status=StatusEnum.READYSEND, not_status=None, re
q_outer = q_outer.join(Lock, isouter=True) q_outer = q_outer.join(Lock, isouter=True)
q_outer = q_outer.filter(or_(Lock.flags==None, Lock.flags.op('&')(LockEnum.SEND.value)==0)) q_outer = q_outer.filter(or_(Lock.flags==None, Lock.flags.op('&')(LockEnum.SEND.value)==0))
if not is_alive(status): if not is_alive(status):
SessionBase.release_session(session) SessionBase.release_session(session)
raise ValueError('not a valid non-final tx value: {}'.format(status)) raise ValueError('not a valid non-final tx value: {}'.format(status))

View File

@ -90,6 +90,7 @@ class DispatchSyncer:
def __init__(self, chain_spec): def __init__(self, chain_spec):
self.chain_spec = chain_spec self.chain_spec = chain_spec
self.session = None
def chain(self): def chain(self):
@ -100,16 +101,18 @@ class DispatchSyncer:
c = len(txs.keys()) c = len(txs.keys())
logg.debug('processing {} txs {}'.format(c, list(txs.keys()))) logg.debug('processing {} txs {}'.format(c, list(txs.keys())))
chain_str = str(self.chain_spec) chain_str = str(self.chain_spec)
session = SessionBase.create_session() self.session = SessionBase.create_session()
for k in txs.keys(): for k in txs.keys():
tx_raw = txs[k] tx_raw = txs[k]
tx_raw_bytes = bytes.fromhex(strip_0x(tx_raw)) tx_raw_bytes = bytes.fromhex(strip_0x(tx_raw))
tx = unpack(tx_raw_bytes, self.chain_spec) tx = unpack(tx_raw_bytes, self.chain_spec)
try: try:
set_reserved(self.chain_spec, tx['hash'], session=session) set_reserved(self.chain_spec, tx['hash'], session=self.session)
self.session.commit()
except NotLocalTxError as e: except NotLocalTxError as e:
logg.warning('dispatcher was triggered with non-local tx {}'.format(tx['hash'])) logg.warning('dispatcher was triggered with non-local tx {}'.format(tx['hash']))
self.session.rollback()
continue continue
s_check = celery.signature( s_check = celery.signature(
@ -132,16 +135,25 @@ class DispatchSyncer:
s_check.link(s_send) s_check.link(s_send)
t = s_check.apply_async() t = s_check.apply_async()
logg.info('processed {}'.format(k)) logg.info('processed {}'.format(k))
self.session.close()
self.session = None
def loop(self, w3, interval): def loop(self, interval):
while run: while run:
txs = {} txs = {}
typ = StatusBits.QUEUED typ = StatusBits.QUEUED
utxs = get_upcoming_tx(self.chain_spec, typ) utxs = get_upcoming_tx(self.chain_spec, typ)
for k in utxs.keys(): for k in utxs.keys():
txs[k] = utxs[k] txs[k] = utxs[k]
self.process(w3, txs) try:
conn = RPCConnection.connect(self.chain_spec, 'default')
self.process(conn, txs)
except ConnectionError as e:
if self.session != None:
self.session.close()
self.session = None
logg.error('connection to node failed: {}'.format(e))
if len(utxs) > 0: if len(utxs) > 0:
time.sleep(self.yield_delay) time.sleep(self.yield_delay)
@ -151,8 +163,7 @@ class DispatchSyncer:
def main(): def main():
syncer = DispatchSyncer(chain_spec) syncer = DispatchSyncer(chain_spec)
conn = RPCConnection.connect(chain_spec, 'default') syncer.loop(float(config.get('DISPATCHER_LOOP_INTERVAL')))
syncer.loop(conn, float(config.get('DISPATCHER_LOOP_INTERVAL')))
sys.exit(0) sys.exit(0)

View File

@ -6,4 +6,4 @@ HOST=localhost
PORT=5432 PORT=5432
ENGINE=sqlite ENGINE=sqlite
DRIVER=pysqlite DRIVER=pysqlite
DEBUG= DEBUG=0

View File

@ -1,5 +1,5 @@
chainsyncer[sql]~=0.0.3a4 chainsyncer[sql]~=0.0.3a5
chainqueue~=0.0.2b6 chainqueue~=0.0.2b7
alembic==1.4.2 alembic==1.4.2
confini>=0.3.6rc4,<0.5.0 confini>=0.3.6rc4,<0.5.0
redis==3.5.3 redis==3.5.3

View File

@ -1,4 +1,5 @@
# external imports # external imports
import celery
from chainlib.eth.nonce import RPCNonceOracle from chainlib.eth.nonce import RPCNonceOracle
from chainlib.eth.tx import ( from chainlib.eth.tx import (
receipt, receipt,
@ -20,6 +21,7 @@ def test_translate(
cic_registry, cic_registry,
init_celery_tasks, init_celery_tasks,
register_lookups, register_lookups,
celery_session_worker,
): ):
nonce_oracle = RPCNonceOracle(contract_roles['CONTRACT_DEPLOYER'], eth_rpc) nonce_oracle = RPCNonceOracle(contract_roles['CONTRACT_DEPLOYER'], eth_rpc)
@ -46,6 +48,20 @@ def test_translate(
'recipient': agent_roles['BOB'], 'recipient': agent_roles['BOB'],
'recipient_label': None, 'recipient_label': None,
} }
tx = translate_tx_addresses(tx, [contract_roles['CONTRACT_DEPLOYER']], default_chain_spec.asdict())
assert tx['sender_label'] == 'alice' #tx = translate_tx_addresses(tx, [contract_roles['CONTRACT_DEPLOYER']], default_chain_spec.asdict())
assert tx['recipient_label'] == 'bob' s = celery.signature(
'cic_eth.ext.address.translate_tx_addresses',
[
tx,
[contract_roles['CONTRACT_DEPLOYER']],
default_chain_spec.asdict(),
],
queue=None,
)
t = s.apply_async()
r = t.get_leaf()
assert t.successful()
assert r['sender_label'] == 'alice'
assert r['recipient_label'] == 'bob'

View File

@ -1,6 +1,7 @@
# third-party imports # extended imports
import pytest import pytest
import uuid import uuid
import unittest
# local imports # local imports
from cic_eth.db.models.nonce import ( from cic_eth.db.models.nonce import (
@ -55,7 +56,7 @@ def test_nonce_reserve(
o = q.first() o = q.first()
assert o.nonce == 43 assert o.nonce == 43
nonce = NonceReservation.release(eth_empty_accounts[0], str(uu)) nonce = NonceReservation.release(eth_empty_accounts[0], str(uu), session=init_database)
init_database.commit() init_database.commit()
assert nonce == (str(uu), 42) assert nonce == (str(uu), 42)

View File

@ -127,3 +127,4 @@ class SessionBase(Model):
logg.debug('commit and destroy session {}'.format(session_key)) logg.debug('commit and destroy session {}'.format(session_key))
session.commit() session.commit()
session.close() session.close()
del SessionBase.localsessions[session_key]

View File

@ -148,6 +148,7 @@ else
fi fi
mkdir -p $CIC_DATA_DIR mkdir -p $CIC_DATA_DIR
>&2 echo using data dir $CIC_DATA_DIR for environment variable dump
# this is consumed in downstream services to set environment variables # this is consumed in downstream services to set environment variables
cat << EOF > $CIC_DATA_DIR/.env cat << EOF > $CIC_DATA_DIR/.env