Remove session leak better
This commit is contained in:
parent
8b2652e551
commit
0131a29417
@ -90,6 +90,7 @@ class DispatchSyncer:
|
|||||||
|
|
||||||
def __init__(self, chain_spec):
|
def __init__(self, chain_spec):
|
||||||
self.chain_spec = chain_spec
|
self.chain_spec = chain_spec
|
||||||
|
self.session = None
|
||||||
|
|
||||||
|
|
||||||
def chain(self):
|
def chain(self):
|
||||||
@ -100,7 +101,7 @@ class DispatchSyncer:
|
|||||||
c = len(txs.keys())
|
c = len(txs.keys())
|
||||||
logg.debug('processing {} txs {}'.format(c, list(txs.keys())))
|
logg.debug('processing {} txs {}'.format(c, list(txs.keys())))
|
||||||
chain_str = str(self.chain_spec)
|
chain_str = str(self.chain_spec)
|
||||||
session = SessionBase.create_session()
|
self.session = SessionBase.create_session()
|
||||||
for k in txs.keys():
|
for k in txs.keys():
|
||||||
tx_raw = txs[k]
|
tx_raw = txs[k]
|
||||||
tx_raw_bytes = bytes.fromhex(strip_0x(tx_raw))
|
tx_raw_bytes = bytes.fromhex(strip_0x(tx_raw))
|
||||||
@ -108,8 +109,10 @@ class DispatchSyncer:
|
|||||||
|
|
||||||
try:
|
try:
|
||||||
set_reserved(self.chain_spec, tx['hash'], session=session)
|
set_reserved(self.chain_spec, tx['hash'], session=session)
|
||||||
|
self.session.commit()
|
||||||
except NotLocalTxError as e:
|
except NotLocalTxError as e:
|
||||||
logg.warning('dispatcher was triggered with non-local tx {}'.format(tx['hash']))
|
logg.warning('dispatcher was triggered with non-local tx {}'.format(tx['hash']))
|
||||||
|
self.session.rollback()
|
||||||
continue
|
continue
|
||||||
|
|
||||||
s_check = celery.signature(
|
s_check = celery.signature(
|
||||||
@ -132,7 +135,8 @@ class DispatchSyncer:
|
|||||||
s_check.link(s_send)
|
s_check.link(s_send)
|
||||||
t = s_check.apply_async()
|
t = s_check.apply_async()
|
||||||
logg.info('processed {}'.format(k))
|
logg.info('processed {}'.format(k))
|
||||||
session.close()
|
self.session.close()
|
||||||
|
self.session = None
|
||||||
|
|
||||||
|
|
||||||
def loop(self, interval):
|
def loop(self, interval):
|
||||||
@ -146,6 +150,9 @@ class DispatchSyncer:
|
|||||||
conn = RPCConnection.connect(self.chain_spec, 'default')
|
conn = RPCConnection.connect(self.chain_spec, 'default')
|
||||||
self.process(conn, txs)
|
self.process(conn, txs)
|
||||||
except ConnectionError as e:
|
except ConnectionError as e:
|
||||||
|
if self.session != None:
|
||||||
|
self.session.close()
|
||||||
|
self.session = None
|
||||||
logg.error('connection to node failed: {}'.format(e))
|
logg.error('connection to node failed: {}'.format(e))
|
||||||
|
|
||||||
if len(utxs) > 0:
|
if len(utxs) > 0:
|
||||||
|
Loading…
Reference in New Issue
Block a user