diff --git a/apps/cic-eth/cic_eth/runnable/daemons/dispatcher.py b/apps/cic-eth/cic_eth/runnable/daemons/dispatcher.py index 88bb87ef..981d36b2 100644 --- a/apps/cic-eth/cic_eth/runnable/daemons/dispatcher.py +++ b/apps/cic-eth/cic_eth/runnable/daemons/dispatcher.py @@ -90,6 +90,7 @@ class DispatchSyncer: def __init__(self, chain_spec): self.chain_spec = chain_spec + self.session = None def chain(self): @@ -100,7 +101,7 @@ class DispatchSyncer: c = len(txs.keys()) logg.debug('processing {} txs {}'.format(c, list(txs.keys()))) chain_str = str(self.chain_spec) - session = SessionBase.create_session() + self.session = SessionBase.create_session() for k in txs.keys(): tx_raw = txs[k] tx_raw_bytes = bytes.fromhex(strip_0x(tx_raw)) @@ -108,8 +109,10 @@ class DispatchSyncer: try: set_reserved(self.chain_spec, tx['hash'], session=session) + self.session.commit() except NotLocalTxError as e: logg.warning('dispatcher was triggered with non-local tx {}'.format(tx['hash'])) + self.session.rollback() continue s_check = celery.signature( @@ -132,7 +135,8 @@ class DispatchSyncer: s_check.link(s_send) t = s_check.apply_async() logg.info('processed {}'.format(k)) - session.close() + self.session.close() + self.session = None def loop(self, interval): @@ -146,6 +150,9 @@ class DispatchSyncer: conn = RPCConnection.connect(self.chain_spec, 'default') self.process(conn, txs) except ConnectionError as e: + if self.session != None: + self.session.close() + self.session = None logg.error('connection to node failed: {}'.format(e)) if len(utxs) > 0: