WIP fix leaks in syncer filters
This commit is contained in:
parent
1a31876e2e
commit
32ce706b2d
@ -500,7 +500,7 @@ def get_nonce_tx(nonce, sender, chain_id):
|
|||||||
|
|
||||||
|
|
||||||
# TODO: pass chain spec instead of chain id
|
# TODO: pass chain spec instead of chain id
|
||||||
def get_paused_txs(status=None, sender=None, chain_id=0):
|
def get_paused_txs(status=None, sender=None, chain_id=0, session=None):
|
||||||
"""Returns not finalized transactions that have been attempted sent without success.
|
"""Returns not finalized transactions that have been attempted sent without success.
|
||||||
|
|
||||||
:param status: If set, will return transactions with this local queue status only
|
:param status: If set, will return transactions with this local queue status only
|
||||||
@ -513,12 +513,13 @@ def get_paused_txs(status=None, sender=None, chain_id=0):
|
|||||||
:returns: Transactions
|
:returns: Transactions
|
||||||
:rtype: dict, with transaction hash as key, signed raw transaction as value
|
:rtype: dict, with transaction hash as key, signed raw transaction as value
|
||||||
"""
|
"""
|
||||||
session = SessionBase.create_session()
|
session = SessionBase.bind_session(session)
|
||||||
q = session.query(Otx)
|
q = session.query(Otx)
|
||||||
|
|
||||||
if status != None:
|
if status != None:
|
||||||
#if status == StatusEnum.PENDING or status >= StatusEnum.SENT:
|
#if status == StatusEnum.PENDING or status >= StatusEnum.SENT:
|
||||||
if status == StatusEnum.PENDING or status & StatusBits.IN_NETWORK or not is_alive(status):
|
if status == StatusEnum.PENDING or status & StatusBits.IN_NETWORK or not is_alive(status):
|
||||||
|
SessionBase.release_session(session)
|
||||||
raise ValueError('not a valid paused tx value: {}'.format(status))
|
raise ValueError('not a valid paused tx value: {}'.format(status))
|
||||||
q = q.filter(Otx.status.op('&')(status.value)==status.value)
|
q = q.filter(Otx.status.op('&')(status.value)==status.value)
|
||||||
q = q.join(TxCache)
|
q = q.join(TxCache)
|
||||||
@ -538,12 +539,12 @@ def get_paused_txs(status=None, sender=None, chain_id=0):
|
|||||||
#gas += tx['gas'] * tx['gasPrice']
|
#gas += tx['gas'] * tx['gasPrice']
|
||||||
txs[r.tx_hash] = r.signed_tx
|
txs[r.tx_hash] = r.signed_tx
|
||||||
|
|
||||||
session.close()
|
SessionBase.release_session(session)
|
||||||
|
|
||||||
return txs
|
return txs
|
||||||
|
|
||||||
|
|
||||||
def get_status_tx(status, before=None, exact=False, limit=0):
|
def get_status_tx(status, before=None, exact=False, limit=0, session=None):
|
||||||
"""Retrieve transaction with a specific queue status.
|
"""Retrieve transaction with a specific queue status.
|
||||||
|
|
||||||
:param status: Status to match transactions with
|
:param status: Status to match transactions with
|
||||||
@ -556,7 +557,7 @@ def get_status_tx(status, before=None, exact=False, limit=0):
|
|||||||
:rtype: list of cic_eth.db.models.otx.Otx
|
:rtype: list of cic_eth.db.models.otx.Otx
|
||||||
"""
|
"""
|
||||||
txs = {}
|
txs = {}
|
||||||
session = SessionBase.create_session()
|
session = SessionBase.bind_session(session)
|
||||||
q = session.query(Otx)
|
q = session.query(Otx)
|
||||||
q = q.join(TxCache)
|
q = q.join(TxCache)
|
||||||
q = q.filter(TxCache.date_updated<before)
|
q = q.filter(TxCache.date_updated<before)
|
||||||
@ -570,12 +571,12 @@ def get_status_tx(status, before=None, exact=False, limit=0):
|
|||||||
break
|
break
|
||||||
txs[o.tx_hash] = o.signed_tx
|
txs[o.tx_hash] = o.signed_tx
|
||||||
i += 1
|
i += 1
|
||||||
session.close()
|
SessionBase.release_session(session)
|
||||||
return txs
|
return txs
|
||||||
|
|
||||||
|
|
||||||
# TODO: move query to model
|
# TODO: move query to model
|
||||||
def get_upcoming_tx(status=StatusEnum.READYSEND, recipient=None, before=None, chain_id=0):
|
def get_upcoming_tx(status=StatusEnum.READYSEND, recipient=None, before=None, chain_id=0, session=None):
|
||||||
"""Returns the next pending transaction, specifically the transaction with the lowest nonce, for every recipient that has pending transactions.
|
"""Returns the next pending transaction, specifically the transaction with the lowest nonce, for every recipient that has pending transactions.
|
||||||
|
|
||||||
Will omit addresses that have the LockEnum.SEND bit in Lock set.
|
Will omit addresses that have the LockEnum.SEND bit in Lock set.
|
||||||
@ -594,7 +595,7 @@ def get_upcoming_tx(status=StatusEnum.READYSEND, recipient=None, before=None, ch
|
|||||||
:returns: Transactions
|
:returns: Transactions
|
||||||
:rtype: dict, with transaction hash as key, signed raw transaction as value
|
:rtype: dict, with transaction hash as key, signed raw transaction as value
|
||||||
"""
|
"""
|
||||||
session = SessionBase.create_session()
|
session = SessionBase.bind_session(session)
|
||||||
q_outer = session.query(
|
q_outer = session.query(
|
||||||
TxCache.sender,
|
TxCache.sender,
|
||||||
func.min(Otx.nonce).label('nonce'),
|
func.min(Otx.nonce).label('nonce'),
|
||||||
@ -604,7 +605,7 @@ def get_upcoming_tx(status=StatusEnum.READYSEND, recipient=None, before=None, ch
|
|||||||
q_outer = q_outer.filter(or_(Lock.flags==None, Lock.flags.op('&')(LockEnum.SEND.value)==0))
|
q_outer = q_outer.filter(or_(Lock.flags==None, Lock.flags.op('&')(LockEnum.SEND.value)==0))
|
||||||
|
|
||||||
if not is_alive(status):
|
if not is_alive(status):
|
||||||
session.close()
|
SessionBase.release_session(session)
|
||||||
raise ValueError('not a valid non-final tx value: {}'.format(status))
|
raise ValueError('not a valid non-final tx value: {}'.format(status))
|
||||||
if status == StatusEnum.PENDING:
|
if status == StatusEnum.PENDING:
|
||||||
q_outer = q_outer.filter(Otx.status==status.value)
|
q_outer = q_outer.filter(Otx.status==status.value)
|
||||||
@ -646,7 +647,7 @@ def get_upcoming_tx(status=StatusEnum.READYSEND, recipient=None, before=None, ch
|
|||||||
session.add(o)
|
session.add(o)
|
||||||
session.commit()
|
session.commit()
|
||||||
|
|
||||||
session.close()
|
SessionBase.release_session(session)
|
||||||
|
|
||||||
return txs
|
return txs
|
||||||
|
|
||||||
|
@ -82,7 +82,7 @@ class CallbackFilter(SyncFilter):
|
|||||||
return (transfer_type, transfer_data)
|
return (transfer_type, transfer_data)
|
||||||
|
|
||||||
|
|
||||||
def filter(self, w3, tx, rcpt, chain_spec):
|
def filter(self, w3, tx, rcpt, chain_spec, session=None):
|
||||||
logg.debug('applying callback filter "{}:{}"'.format(self.queue, self.method))
|
logg.debug('applying callback filter "{}:{}"'.format(self.queue, self.method))
|
||||||
chain_str = str(chain_spec)
|
chain_str = str(chain_spec)
|
||||||
|
|
||||||
|
@ -17,29 +17,31 @@ logg = logging.getLogger()
|
|||||||
|
|
||||||
class GasFilter(SyncFilter):
|
class GasFilter(SyncFilter):
|
||||||
|
|
||||||
def __init__(self, gas_provider):
|
def __init__(self, gas_provider, queue=None):
|
||||||
|
self.queue = queue
|
||||||
self.gas_provider = gas_provider
|
self.gas_provider = gas_provider
|
||||||
|
|
||||||
|
|
||||||
def filter(self, w3, tx, rcpt, chain_str):
|
def filter(self, w3, tx, rcpt, chain_str, session=None):
|
||||||
logg.debug('applying gas filter')
|
logg.debug('applying gas filter')
|
||||||
tx_hash_hex = tx.hash.hex()
|
tx_hash_hex = tx.hash.hex()
|
||||||
if tx['value'] > 0:
|
if tx['value'] > 0:
|
||||||
logg.debug('gas refill tx {}'.format(tx_hash_hex))
|
logg.debug('gas refill tx {}'.format(tx_hash_hex))
|
||||||
session = SessionBase.create_session()
|
session = SessionBase.bind_session(session)
|
||||||
q = session.query(TxCache.recipient)
|
q = session.query(TxCache.recipient)
|
||||||
q = q.join(Otx)
|
q = q.join(Otx)
|
||||||
q = q.filter(Otx.tx_hash==tx_hash_hex)
|
q = q.filter(Otx.tx_hash==tx_hash_hex)
|
||||||
r = q.first()
|
r = q.first()
|
||||||
|
|
||||||
session.close()
|
|
||||||
|
|
||||||
if r == None:
|
if r == None:
|
||||||
logg.warning('unsolicited gas refill tx {}'.format(tx_hash_hex))
|
logg.warning('unsolicited gas refill tx {}'.format(tx_hash_hex))
|
||||||
|
SessionBase.release_session(session)
|
||||||
return
|
return
|
||||||
|
|
||||||
chain_spec = ChainSpec.from_chain_str(chain_str)
|
chain_spec = ChainSpec.from_chain_str(chain_str)
|
||||||
txs = get_paused_txs(StatusEnum.WAITFORGAS, r[0], chain_spec.chain_id())
|
txs = get_paused_txs(StatusEnum.WAITFORGAS, r[0], chain_spec.chain_id(), session=session)
|
||||||
|
|
||||||
|
SessionBase.release_session(session)
|
||||||
|
|
||||||
if len(txs) > 0:
|
if len(txs) > 0:
|
||||||
logg.info('resuming gas-in-waiting txs for {}: {}'.format(r[0], txs.keys()))
|
logg.info('resuming gas-in-waiting txs for {}: {}'.format(r[0], txs.keys()))
|
||||||
@ -49,6 +51,6 @@ class GasFilter(SyncFilter):
|
|||||||
r[0],
|
r[0],
|
||||||
0,
|
0,
|
||||||
tx_hashes_hex=list(txs.keys()),
|
tx_hashes_hex=list(txs.keys()),
|
||||||
queue=queue,
|
queue=self.queue,
|
||||||
)
|
)
|
||||||
s.apply_async()
|
s.apply_async()
|
||||||
|
@ -15,7 +15,7 @@ account_registry_add_log_hash = '0x5ed3bdd47b9af629827a8d129aa39c870b10c03f0153f
|
|||||||
|
|
||||||
class RegistrationFilter(SyncFilter):
|
class RegistrationFilter(SyncFilter):
|
||||||
|
|
||||||
def filter(self, w3, tx, rcpt, chain_spec):
|
def filter(self, w3, tx, rcpt, chain_spec, session=None):
|
||||||
logg.debug('applying registration filter')
|
logg.debug('applying registration filter')
|
||||||
registered_address = None
|
registered_address = None
|
||||||
for l in rcpt['logs']:
|
for l in rcpt['logs']:
|
||||||
|
@ -6,6 +6,7 @@ import celery
|
|||||||
|
|
||||||
# local imports
|
# local imports
|
||||||
from cic_eth.db.models.otx import Otx
|
from cic_eth.db.models.otx import Otx
|
||||||
|
from cic_eth.db.models.base import SessionBase
|
||||||
from .base import SyncFilter
|
from .base import SyncFilter
|
||||||
|
|
||||||
logg = logging.getLogger()
|
logg = logging.getLogger()
|
||||||
@ -17,15 +18,17 @@ class TxFilter(SyncFilter):
|
|||||||
self.queue = queue
|
self.queue = queue
|
||||||
|
|
||||||
|
|
||||||
def filter(self, w3, tx, rcpt, chain_spec):
|
def filter(self, w3, tx, rcpt, chain_spec, session=None):
|
||||||
|
session = SessionBase.bind_session(session)
|
||||||
logg.debug('applying tx filter')
|
logg.debug('applying tx filter')
|
||||||
tx_hash_hex = tx.hash.hex()
|
tx_hash_hex = tx.hash.hex()
|
||||||
otx = Otx.load(tx_hash_hex)
|
otx = Otx.load(tx_hash_hex, session=session)
|
||||||
|
SessionBase.release_session(session)
|
||||||
if otx == None:
|
if otx == None:
|
||||||
logg.debug('tx {} not found locally, skipping'.format(tx_hash_hex))
|
logg.debug('tx {} not found locally, skipping'.format(tx_hash_hex))
|
||||||
return None
|
return None
|
||||||
logg.info('otx found {}'.format(otx.tx_hash))
|
logg.info('otx found {}'.format(otx.tx_hash))
|
||||||
s = celery.siignature(
|
s = celery.signature(
|
||||||
'cic_eth.queue.tx.set_final_status',
|
'cic_eth.queue.tx.set_final_status',
|
||||||
[
|
[
|
||||||
tx_hash_hex,
|
tx_hash_hex,
|
||||||
|
@ -180,7 +180,7 @@ def main():
|
|||||||
|
|
||||||
registration_filter = RegistrationFilter()
|
registration_filter = RegistrationFilter()
|
||||||
|
|
||||||
gas_filter = GasFilter(c.gas_provider())
|
gas_filter = GasFilter(c.gas_provider(), queue)
|
||||||
|
|
||||||
i = 0
|
i = 0
|
||||||
for syncer in syncers:
|
for syncer in syncers:
|
||||||
|
@ -28,20 +28,24 @@ class SyncerBackend:
|
|||||||
def connect(self):
|
def connect(self):
|
||||||
"""Loads the state of the syncer session with the given id.
|
"""Loads the state of the syncer session with the given id.
|
||||||
"""
|
"""
|
||||||
|
if self.db_session == None:
|
||||||
self.db_session = SessionBase.create_session()
|
self.db_session = SessionBase.create_session()
|
||||||
q = self.db_session.query(BlockchainSync)
|
q = self.db_session.query(BlockchainSync)
|
||||||
q = q.filter(BlockchainSync.id==self.object_id)
|
q = q.filter(BlockchainSync.id==self.object_id)
|
||||||
self.db_object = q.first()
|
self.db_object = q.first()
|
||||||
if self.db_object == None:
|
if self.db_object == None:
|
||||||
|
self.disconnect()
|
||||||
raise ValueError('sync entry with id {} not found'.format(self.object_id))
|
raise ValueError('sync entry with id {} not found'.format(self.object_id))
|
||||||
|
|
||||||
|
|
||||||
def disconnect(self):
|
def disconnect(self):
|
||||||
"""Commits state of sync to backend.
|
"""Commits state of sync to backend.
|
||||||
"""
|
"""
|
||||||
|
if self.db_session != None:
|
||||||
self.db_session.add(self.db_object)
|
self.db_session.add(self.db_object)
|
||||||
self.db_session.commit()
|
self.db_session.commit()
|
||||||
self.db_session.close()
|
self.db_session.close()
|
||||||
|
self.db_session = None
|
||||||
|
|
||||||
|
|
||||||
def chain(self):
|
def chain(self):
|
||||||
|
@ -56,7 +56,7 @@ class MinedSyncer(Syncer):
|
|||||||
# TODO: ensure filter loop can complete on graceful shutdown
|
# TODO: ensure filter loop can complete on graceful shutdown
|
||||||
for f in self.filter:
|
for f in self.filter:
|
||||||
#try:
|
#try:
|
||||||
task_uuid = f(w3, tx, rcpt, self.chain())
|
task_uuid = f(w3, tx, rcpt, self.chain(), self.db_session)
|
||||||
#except Exception as e:
|
#except Exception as e:
|
||||||
# logg.error('error in filter {} tx {}: {}'.format(f, tx_hash_hex, e))
|
# logg.error('error in filter {} tx {}: {}'.format(f, tx_hash_hex, e))
|
||||||
# continue
|
# continue
|
||||||
|
Loading…
Reference in New Issue
Block a user