Fix last(?) leak in syncer

Signed-off-by: nolash <dev@holbrook.no>
This commit is contained in:
Louis Holbrook 2021-02-19 07:06:05 +00:00
parent 2657ed58d3
commit fe499de1e4
16 changed files with 116 additions and 77 deletions

View File

@ -6,6 +6,11 @@ from sqlalchemy import Column, Integer
from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import (
StaticPool,
QueuePool,
AssertionPool,
)
logg = logging.getLogger() logg = logging.getLogger()
@ -49,7 +54,7 @@ class SessionBase(Model):
@staticmethod @staticmethod
def connect(dsn, debug=False): def connect(dsn, pool_size=8, debug=False):
"""Create new database connection engine and connect to database backend. """Create new database connection engine and connect to database backend.
:param dsn: DSN string defining connection. :param dsn: DSN string defining connection.
@ -57,12 +62,26 @@ class SessionBase(Model):
""" """
e = None e = None
if SessionBase.poolable: if SessionBase.poolable:
poolclass = QueuePool
if pool_size > 1:
e = create_engine( e = create_engine(
dsn, dsn,
max_overflow=50, max_overflow=pool_size*3,
pool_pre_ping=True, pool_pre_ping=True,
pool_size=20, pool_size=pool_size,
pool_recycle=10, pool_recycle=60,
poolclass=poolclass,
echo=debug,
)
else:
if debug:
poolclass = AssertionPool
else:
poolclass = StaticPool
e = create_engine(
dsn,
poolclass=poolclass,
echo=debug, echo=debug,
) )
else: else:

View File

@ -85,9 +85,7 @@ class TxCache(SessionBase):
:param tx_hash_new: tx hash to associate the copied entry with :param tx_hash_new: tx hash to associate the copied entry with
:type tx_hash_new: str, 0x-hex :type tx_hash_new: str, 0x-hex
""" """
localsession = session localsession = SessionBase.bind_session(session)
if localsession == None:
localsession = SessionBase.create_session()
q = localsession.query(TxCache) q = localsession.query(TxCache)
q = q.join(Otx) q = q.join(Otx)
@ -95,8 +93,10 @@ class TxCache(SessionBase):
txc = q.first() txc = q.first()
if txc == None: if txc == None:
SessionBase.release_session(localsession)
raise NotLocalTxError('original {}'.format(tx_hash_original)) raise NotLocalTxError('original {}'.format(tx_hash_original))
if txc.block_number != None: if txc.block_number != None:
SessionBase.release_session(localsession)
raise TxStateChangeError('cannot clone tx cache of confirmed tx {}'.format(tx_hash_original)) raise TxStateChangeError('cannot clone tx cache of confirmed tx {}'.format(tx_hash_original))
q = localsession.query(Otx) q = localsession.query(Otx)
@ -104,6 +104,7 @@ class TxCache(SessionBase):
otx = q.first() otx = q.first()
if otx == None: if otx == None:
SessionBase.release_session(localsession)
raise NotLocalTxError('new {}'.format(tx_hash_new)) raise NotLocalTxError('new {}'.format(tx_hash_new))
txc_new = TxCache( txc_new = TxCache(
@ -118,15 +119,14 @@ class TxCache(SessionBase):
localsession.add(txc_new) localsession.add(txc_new)
localsession.commit() localsession.commit()
if session == None: SessionBase.release_session(localsession)
localsession.close()
def __init__(self, tx_hash, sender, recipient, source_token_address, destination_token_address, from_value, to_value, block_number=None, tx_index=None): def __init__(self, tx_hash, sender, recipient, source_token_address, destination_token_address, from_value, to_value, block_number=None, tx_index=None, session=None):
session = SessionBase.create_session() localsession = SessionBase.bind_session(session)
tx = session.query(Otx).filter(Otx.tx_hash==tx_hash).first() tx = localsession.query(Otx).filter(Otx.tx_hash==tx_hash).first()
if tx == None: if tx == None:
session.close() SessionBase.release_session(localsession)
raise FileNotFoundError('outgoing transaction record unknown {} (add a Tx first)'.format(tx_hash)) raise FileNotFoundError('outgoing transaction record unknown {} (add a Tx first)'.format(tx_hash))
self.otx_id = tx.id self.otx_id = tx.id
@ -143,4 +143,5 @@ class TxCache(SessionBase):
self.date_updated = self.date_created self.date_updated = self.date_created
self.date_checked = self.date_created self.date_checked = self.date_created
SessionBase.release_session(localsession)

View File

@ -304,6 +304,8 @@ def cache_gift_data(
tx = unpack_signed_raw_tx(tx_signed_raw_bytes, chain_spec.chain_id()) tx = unpack_signed_raw_tx(tx_signed_raw_bytes, chain_spec.chain_id())
tx_data = unpack_gift(tx['data']) tx_data = unpack_gift(tx['data'])
session = SessionBase.create_session()
tx_cache = TxCache( tx_cache = TxCache(
tx_hash_hex, tx_hash_hex,
tx['from'], tx['from'],
@ -312,9 +314,9 @@ def cache_gift_data(
zero_address, zero_address,
0, 0,
0, 0,
session=session,
) )
session = SessionBase.create_session()
session.add(tx_cache) session.add(tx_cache)
session.commit() session.commit()
cache_id = tx_cache.id cache_id = tx_cache.id
@ -347,6 +349,7 @@ def cache_account_data(
tx = unpack_signed_raw_tx(tx_signed_raw_bytes, chain_spec.chain_id()) tx = unpack_signed_raw_tx(tx_signed_raw_bytes, chain_spec.chain_id())
tx_data = unpack_register(tx['data']) tx_data = unpack_register(tx['data'])
session = SessionBase.create_session()
tx_cache = TxCache( tx_cache = TxCache(
tx_hash_hex, tx_hash_hex,
tx['from'], tx['from'],
@ -355,9 +358,8 @@ def cache_account_data(
zero_address, zero_address,
0, 0,
0, 0,
session=session,
) )
session = SessionBase.create_session()
session.add(tx_cache) session.add(tx_cache)
session.commit() session.commit()
cache_id = tx_cache.id cache_id = tx_cache.id

View File

@ -381,6 +381,7 @@ def cache_transfer_data(
tx['to'], tx['to'],
tx_data['amount'], tx_data['amount'],
tx_data['amount'], tx_data['amount'],
session=session,
) )
session.add(tx_cache) session.add(tx_cache)
session.commit() session.commit()
@ -440,6 +441,7 @@ def cache_approve_data(
tx['to'], tx['to'],
tx_data['amount'], tx_data['amount'],
tx_data['amount'], tx_data['amount'],
session=session,
) )
session.add(tx_cache) session.add(tx_cache)
session.commit() session.commit()

View File

@ -78,7 +78,6 @@ def check_gas(self, tx_hashes, chain_str, txs=[], address=None, gas_required=Non
# TODO: it should not be necessary to pass address explicitly, if not passed should be derived from the tx # TODO: it should not be necessary to pass address explicitly, if not passed should be derived from the tx
balance = c.w3.eth.getBalance(address) balance = c.w3.eth.getBalance(address)
logg.debug('check gas txs {}'.format(tx_hashes))
logg.debug('address {} has gas {} needs {}'.format(address, balance, gas_required)) logg.debug('address {} has gas {} needs {}'.format(address, balance, gas_required))
if gas_required > balance: if gas_required > balance:
@ -126,7 +125,6 @@ def check_gas(self, tx_hashes, chain_str, txs=[], address=None, gas_required=Non
queue=queue, queue=queue,
) )
ready_tasks.append(s) ready_tasks.append(s)
logg.debug('tasks {}'.format(ready_tasks))
celery.group(ready_tasks)() celery.group(ready_tasks)()
return txs return txs
@ -143,7 +141,6 @@ def hashes_to_txs(self, tx_hashes):
:returns: Signed raw transactions :returns: Signed raw transactions
:rtype: list of str, 0x-hex :rtype: list of str, 0x-hex
""" """
#logg = celery_app.log.get_default_logger()
if len(tx_hashes) == 0: if len(tx_hashes) == 0:
raise ValueError('no transaction to send') raise ValueError('no transaction to send')
@ -351,15 +348,12 @@ def send(self, txs, chain_str):
tx_hash_hex = tx_hash.hex() tx_hash_hex = tx_hash.hex()
queue = self.request.delivery_info.get('routing_key', None) queue = self.request.delivery_info.get('routing_key', None)
if queue == None:
logg.debug('send tx {} has no queue', tx_hash)
c = RpcClient(chain_spec) c = RpcClient(chain_spec)
r = None r = None
try: try:
r = c.w3.eth.send_raw_transaction(tx_hex) r = c.w3.eth.send_raw_transaction(tx_hex)
except Exception as e: except Exception as e:
logg.debug('e {}'.format(e))
raiser = ParityNodeHandler(chain_spec, queue) raiser = ParityNodeHandler(chain_spec, queue)
(t, e, m) = raiser.handle(e, tx_hash_hex, tx_hex) (t, e, m) = raiser.handle(e, tx_hash_hex, tx_hex)
raise e(m) raise e(m)
@ -423,7 +417,7 @@ def refill_gas(self, recipient_address, chain_str):
gas_price = c.gas_price() gas_price = c.gas_price()
gas_limit = c.default_gas_limit gas_limit = c.default_gas_limit
refill_amount = c.refill_amount() refill_amount = c.refill_amount()
logg.debug('gas price {} nonce {}'.format(gas_price, nonce)) logg.debug('tx send gas price {} nonce {}'.format(gas_price, nonce))
# create and sign transaction # create and sign transaction
tx_send_gas = { tx_send_gas = {
@ -436,7 +430,6 @@ def refill_gas(self, recipient_address, chain_str):
'value': refill_amount, 'value': refill_amount,
'data': '', 'data': '',
} }
logg.debug('txsend_gas {}'.format(tx_send_gas))
tx_send_gas_signed = c.w3.eth.sign_transaction(tx_send_gas) tx_send_gas_signed = c.w3.eth.sign_transaction(tx_send_gas)
tx_hash = web3.Web3.keccak(hexstr=tx_send_gas_signed['raw']) tx_hash = web3.Web3.keccak(hexstr=tx_send_gas_signed['raw'])
tx_hash_hex = tx_hash.hex() tx_hash_hex = tx_hash.hex()
@ -487,11 +480,14 @@ def resend_with_higher_gas(self, txold_hash_hex, chain_str, gas=None, default_fa
:rtype: str, 0x-hex :rtype: str, 0x-hex
""" """
session = SessionBase.create_session() session = SessionBase.create_session()
otx = session.query(Otx).filter(Otx.tx_hash==txold_hash_hex).first()
q = session.query(Otx)
q = q.filter(Otx.tx_hash==txold_hash_hex)
otx = q.first()
session.close()
if otx == None: if otx == None:
session.close()
raise NotLocalTxError(txold_hash_hex) raise NotLocalTxError(txold_hash_hex)
session.close()
chain_spec = ChainSpec.from_chain_str(chain_str) chain_spec = ChainSpec.from_chain_str(chain_str)
c = RpcClient(chain_spec) c = RpcClient(chain_spec)
@ -508,7 +504,7 @@ def resend_with_higher_gas(self, txold_hash_hex, chain_str, gas=None, default_fa
else: else:
gas_price = c.gas_price() gas_price = c.gas_price()
if tx['gasPrice'] > gas_price: if tx['gasPrice'] > gas_price:
logg.warning('Network gas price {} is lower than overdue tx gas price {}'.format(gas_price, tx['gasPrice'])) logg.info('Network gas price {} is lower than overdue tx gas price {}'.format(gas_price, tx['gasPrice']))
#tx['gasPrice'] = int(tx['gasPrice'] * default_factor) #tx['gasPrice'] = int(tx['gasPrice'] * default_factor)
tx['gasPrice'] += 1 tx['gasPrice'] += 1
else: else:
@ -518,9 +514,6 @@ def resend_with_higher_gas(self, txold_hash_hex, chain_str, gas=None, default_fa
else: else:
tx['gasPrice'] = new_gas_price tx['gasPrice'] = new_gas_price
logg.debug('after {}'.format(tx))
#(tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx, chain_str, queue)
(tx_hash_hex, tx_signed_raw_hex) = sign_tx(tx, chain_str) (tx_hash_hex, tx_signed_raw_hex) = sign_tx(tx, chain_str)
queue_create( queue_create(
tx['nonce'], tx['nonce'],
@ -540,6 +533,7 @@ def resend_with_higher_gas(self, txold_hash_hex, chain_str, gas=None, default_fa
queue=queue, queue=queue,
) )
s.apply_async() s.apply_async()
return tx_hash_hex return tx_hash_hex
@ -602,7 +596,9 @@ def resume_tx(self, txpending_hash_hex, chain_str):
chain_spec = ChainSpec.from_chain_str(chain_str) chain_spec = ChainSpec.from_chain_str(chain_str)
session = SessionBase.create_session() session = SessionBase.create_session()
r = session.query(Otx.signed_tx).filter(Otx.tx_hash==txpending_hash_hex).first() q = session.query(Otx.signed_tx)
q = q.filter(Otx.tx_hash==txpending_hash_hex)
r = q.first()
session.close() session.close()
if r == None: if r == None:
raise NotLocalTxError(txpending_hash_hex) raise NotLocalTxError(txpending_hash_hex)

View File

@ -35,8 +35,7 @@ celery_app = celery.current_app
logg = logging.getLogger() logg = logging.getLogger()
@celery_app.task() def create(nonce, holder_address, tx_hash, signed_tx, chain_str, obsolete_predecessors=True, session=None):
def create(nonce, holder_address, tx_hash, signed_tx, chain_str, obsolete_predecessors=True):
"""Create a new transaction queue record. """Create a new transaction queue record.
:param nonce: Transaction nonce :param nonce: Transaction nonce
@ -52,10 +51,10 @@ def create(nonce, holder_address, tx_hash, signed_tx, chain_str, obsolete_predec
:returns: transaction hash :returns: transaction hash
:rtype: str, 0x-hash :rtype: str, 0x-hash
""" """
session = SessionBase.create_session() session = SessionBase.bind_session(session)
lock = Lock.check_aggregate(chain_str, LockEnum.QUEUE, holder_address, session=session) lock = Lock.check_aggregate(chain_str, LockEnum.QUEUE, holder_address, session=session)
if lock > 0: if lock > 0:
session.close() SessionBase.release_session(session)
raise LockedError(lock) raise LockedError(lock)
o = Otx.add( o = Otx.add(
@ -81,7 +80,7 @@ def create(nonce, holder_address, tx_hash, signed_tx, chain_str, obsolete_predec
otx.cancel(confirmed=False, session=session) otx.cancel(confirmed=False, session=session)
session.commit() session.commit()
session.close() SessionBase.release_session(session)
logg.debug('queue created nonce {} from {} hash {}'.format(nonce, holder_address, tx_hash)) logg.debug('queue created nonce {} from {} hash {}'.format(nonce, holder_address, tx_hash))
return tx_hash return tx_hash
@ -100,7 +99,9 @@ def set_sent_status(tx_hash, fail=False):
:rtype: boolean :rtype: boolean
""" """
session = SessionBase.create_session() session = SessionBase.create_session()
o = session.query(Otx).filter(Otx.tx_hash==tx_hash).first() q = session.query(Otx)
q = q.filter(Otx.tx_hash==tx_hash)
o = q.first()
if o == None: if o == None:
logg.warning('not local tx, skipping {}'.format(tx_hash)) logg.warning('not local tx, skipping {}'.format(tx_hash))
session.close() session.close()
@ -454,6 +455,7 @@ def get_tx(tx_hash):
session = SessionBase.create_session() session = SessionBase.create_session()
tx = session.query(Otx).filter(Otx.tx_hash==tx_hash).first() tx = session.query(Otx).filter(Otx.tx_hash==tx_hash).first()
if tx == None: if tx == None:
session.close()
raise NotLocalTxError('queue does not contain tx hash {}'.format(tx_hash)) raise NotLocalTxError('queue does not contain tx hash {}'.format(tx_hash))
o = { o = {
@ -498,7 +500,7 @@ def get_nonce_tx(nonce, sender, chain_id):
# TODO: pass chain spec instead of chain id # TODO: pass chain spec instead of chain id
def get_paused_txs(status=None, sender=None, chain_id=0): def get_paused_txs(status=None, sender=None, chain_id=0, session=None):
"""Returns not finalized transactions that have been attempted sent without success. """Returns not finalized transactions that have been attempted sent without success.
:param status: If set, will return transactions with this local queue status only :param status: If set, will return transactions with this local queue status only
@ -511,12 +513,13 @@ def get_paused_txs(status=None, sender=None, chain_id=0):
:returns: Transactions :returns: Transactions
:rtype: dict, with transaction hash as key, signed raw transaction as value :rtype: dict, with transaction hash as key, signed raw transaction as value
""" """
session = SessionBase.create_session() session = SessionBase.bind_session(session)
q = session.query(Otx) q = session.query(Otx)
if status != None: if status != None:
#if status == StatusEnum.PENDING or status >= StatusEnum.SENT: #if status == StatusEnum.PENDING or status >= StatusEnum.SENT:
if status == StatusEnum.PENDING or status & StatusBits.IN_NETWORK or not is_alive(status): if status == StatusEnum.PENDING or status & StatusBits.IN_NETWORK or not is_alive(status):
SessionBase.release_session(session)
raise ValueError('not a valid paused tx value: {}'.format(status)) raise ValueError('not a valid paused tx value: {}'.format(status))
q = q.filter(Otx.status.op('&')(status.value)==status.value) q = q.filter(Otx.status.op('&')(status.value)==status.value)
q = q.join(TxCache) q = q.join(TxCache)
@ -536,12 +539,12 @@ def get_paused_txs(status=None, sender=None, chain_id=0):
#gas += tx['gas'] * tx['gasPrice'] #gas += tx['gas'] * tx['gasPrice']
txs[r.tx_hash] = r.signed_tx txs[r.tx_hash] = r.signed_tx
session.close() SessionBase.release_session(session)
return txs return txs
def get_status_tx(status, before=None, exact=False, limit=0): def get_status_tx(status, before=None, exact=False, limit=0, session=None):
"""Retrieve transaction with a specific queue status. """Retrieve transaction with a specific queue status.
:param status: Status to match transactions with :param status: Status to match transactions with
@ -554,7 +557,7 @@ def get_status_tx(status, before=None, exact=False, limit=0):
:rtype: list of cic_eth.db.models.otx.Otx :rtype: list of cic_eth.db.models.otx.Otx
""" """
txs = {} txs = {}
session = SessionBase.create_session() session = SessionBase.bind_session(session)
q = session.query(Otx) q = session.query(Otx)
q = q.join(TxCache) q = q.join(TxCache)
q = q.filter(TxCache.date_updated<before) q = q.filter(TxCache.date_updated<before)
@ -568,12 +571,12 @@ def get_status_tx(status, before=None, exact=False, limit=0):
break break
txs[o.tx_hash] = o.signed_tx txs[o.tx_hash] = o.signed_tx
i += 1 i += 1
session.close() SessionBase.release_session(session)
return txs return txs
# TODO: move query to model # TODO: move query to model
def get_upcoming_tx(status=StatusEnum.READYSEND, recipient=None, before=None, chain_id=0): def get_upcoming_tx(status=StatusEnum.READYSEND, recipient=None, before=None, chain_id=0, session=None):
"""Returns the next pending transaction, specifically the transaction with the lowest nonce, for every recipient that has pending transactions. """Returns the next pending transaction, specifically the transaction with the lowest nonce, for every recipient that has pending transactions.
Will omit addresses that have the LockEnum.SEND bit in Lock set. Will omit addresses that have the LockEnum.SEND bit in Lock set.
@ -592,7 +595,7 @@ def get_upcoming_tx(status=StatusEnum.READYSEND, recipient=None, before=None, ch
:returns: Transactions :returns: Transactions
:rtype: dict, with transaction hash as key, signed raw transaction as value :rtype: dict, with transaction hash as key, signed raw transaction as value
""" """
session = SessionBase.create_session() session = SessionBase.bind_session(session)
q_outer = session.query( q_outer = session.query(
TxCache.sender, TxCache.sender,
func.min(Otx.nonce).label('nonce'), func.min(Otx.nonce).label('nonce'),
@ -602,6 +605,7 @@ def get_upcoming_tx(status=StatusEnum.READYSEND, recipient=None, before=None, ch
q_outer = q_outer.filter(or_(Lock.flags==None, Lock.flags.op('&')(LockEnum.SEND.value)==0)) q_outer = q_outer.filter(or_(Lock.flags==None, Lock.flags.op('&')(LockEnum.SEND.value)==0))
if not is_alive(status): if not is_alive(status):
SessionBase.release_session(session)
raise ValueError('not a valid non-final tx value: {}'.format(status)) raise ValueError('not a valid non-final tx value: {}'.format(status))
if status == StatusEnum.PENDING: if status == StatusEnum.PENDING:
q_outer = q_outer.filter(Otx.status==status.value) q_outer = q_outer.filter(Otx.status==status.value)
@ -643,7 +647,7 @@ def get_upcoming_tx(status=StatusEnum.READYSEND, recipient=None, before=None, ch
session.add(o) session.add(o)
session.commit() session.commit()
session.close() SessionBase.release_session(session)
return txs return txs

View File

@ -37,7 +37,7 @@ class CallbackFilter(SyncFilter):
transfer_type, transfer_type,
int(rcpt.status == 0), int(rcpt.status == 0),
], ],
queue=tc.queue, queue=self.queue,
) )
# s_translate = celery.signature( # s_translate = celery.signature(
# 'cic_eth.ext.address.translate', # 'cic_eth.ext.address.translate',
@ -82,7 +82,7 @@ class CallbackFilter(SyncFilter):
return (transfer_type, transfer_data) return (transfer_type, transfer_data)
def filter(self, w3, tx, rcpt, chain_spec): def filter(self, w3, tx, rcpt, chain_spec, session=None):
logg.debug('applying callback filter "{}:{}"'.format(self.queue, self.method)) logg.debug('applying callback filter "{}:{}"'.format(self.queue, self.method))
chain_str = str(chain_spec) chain_str = str(chain_spec)

View File

@ -17,29 +17,31 @@ logg = logging.getLogger()
class GasFilter(SyncFilter): class GasFilter(SyncFilter):
def __init__(self, gas_provider): def __init__(self, gas_provider, queue=None):
self.queue = queue
self.gas_provider = gas_provider self.gas_provider = gas_provider
def filter(self, w3, tx, rcpt, chain_str): def filter(self, w3, tx, rcpt, chain_str, session=None):
logg.debug('applying gas filter') logg.debug('applying gas filter')
tx_hash_hex = tx.hash.hex() tx_hash_hex = tx.hash.hex()
if tx['value'] > 0: if tx['value'] > 0:
logg.debug('gas refill tx {}'.format(tx_hash_hex)) logg.debug('gas refill tx {}'.format(tx_hash_hex))
session = SessionBase.create_session() session = SessionBase.bind_session(session)
q = session.query(TxCache.recipient) q = session.query(TxCache.recipient)
q = q.join(Otx) q = q.join(Otx)
q = q.filter(Otx.tx_hash==tx_hash_hex) q = q.filter(Otx.tx_hash==tx_hash_hex)
r = q.first() r = q.first()
session.close()
if r == None: if r == None:
logg.warning('unsolicited gas refill tx {}'.format(tx_hash_hex)) logg.warning('unsolicited gas refill tx {}'.format(tx_hash_hex))
SessionBase.release_session(session)
return return
chain_spec = ChainSpec.from_chain_str(chain_str) chain_spec = ChainSpec.from_chain_str(chain_str)
txs = get_paused_txs(StatusEnum.WAITFORGAS, r[0], chain_spec.chain_id()) txs = get_paused_txs(StatusEnum.WAITFORGAS, r[0], chain_spec.chain_id(), session=session)
SessionBase.release_session(session)
if len(txs) > 0: if len(txs) > 0:
logg.info('resuming gas-in-waiting txs for {}: {}'.format(r[0], txs.keys())) logg.info('resuming gas-in-waiting txs for {}: {}'.format(r[0], txs.keys()))
@ -49,6 +51,6 @@ class GasFilter(SyncFilter):
r[0], r[0],
0, 0,
tx_hashes_hex=list(txs.keys()), tx_hashes_hex=list(txs.keys()),
queue=queue, queue=self.queue,
) )
s.apply_async() s.apply_async()

View File

@ -15,7 +15,7 @@ account_registry_add_log_hash = '0x5ed3bdd47b9af629827a8d129aa39c870b10c03f0153f
class RegistrationFilter(SyncFilter): class RegistrationFilter(SyncFilter):
def filter(self, w3, tx, rcpt, chain_spec): def filter(self, w3, tx, rcpt, chain_spec, session=None):
logg.debug('applying registration filter') logg.debug('applying registration filter')
registered_address = None registered_address = None
for l in rcpt['logs']: for l in rcpt['logs']:

View File

@ -6,6 +6,7 @@ import celery
# local imports # local imports
from cic_eth.db.models.otx import Otx from cic_eth.db.models.otx import Otx
from cic_eth.db.models.base import SessionBase
from .base import SyncFilter from .base import SyncFilter
logg = logging.getLogger() logg = logging.getLogger()
@ -17,15 +18,17 @@ class TxFilter(SyncFilter):
self.queue = queue self.queue = queue
def filter(self, w3, tx, rcpt, chain_spec): def filter(self, w3, tx, rcpt, chain_spec, session=None):
session = SessionBase.bind_session(session)
logg.debug('applying tx filter') logg.debug('applying tx filter')
tx_hash_hex = tx.hash.hex() tx_hash_hex = tx.hash.hex()
otx = Otx.load(tx_hash_hex) otx = Otx.load(tx_hash_hex, session=session)
SessionBase.release_session(session)
if otx == None: if otx == None:
logg.debug('tx {} not found locally, skipping'.format(tx_hash_hex)) logg.debug('tx {} not found locally, skipping'.format(tx_hash_hex))
return None return None
logg.info('otx found {}'.format(otx.tx_hash)) logg.info('otx found {}'.format(otx.tx_hash))
s = celery.siignature( s = celery.signature(
'cic_eth.queue.tx.set_final_status', 'cic_eth.queue.tx.set_final_status',
[ [
tx_hash_hex, tx_hash_hex,

View File

@ -118,7 +118,7 @@ declarator = CICRegistry.get_contract(chain_spec, 'AddressDeclarator', interface
dsn = dsn_from_config(config) dsn = dsn_from_config(config)
SessionBase.connect(dsn) SessionBase.connect(dsn, pool_size=1, debug=config.true('DATABASE_DEBUG'))
def main(): def main():
@ -180,7 +180,7 @@ def main():
registration_filter = RegistrationFilter() registration_filter = RegistrationFilter()
gas_filter = GasFilter(c.gas_provider()) gas_filter = GasFilter(c.gas_provider(), queue)
i = 0 i = 0
for syncer in syncers: for syncer in syncers:

View File

@ -78,7 +78,7 @@ logg.debug('config loaded from {}:\n{}'.format(args.c, config))
# connect to database # connect to database
dsn = dsn_from_config(config) dsn = dsn_from_config(config)
SessionBase.connect(dsn) SessionBase.connect(dsn, pool_size=8, debug=config.true('DATABASE_DEBUG'))
# verify database connection with minimal sanity query # verify database connection with minimal sanity query
session = SessionBase.create_session() session = SessionBase.create_session()
@ -179,7 +179,6 @@ def web3ext_constructor():
return (blockchain_provider, w3) return (blockchain_provider, w3)
RpcClient.set_constructor(web3ext_constructor) RpcClient.set_constructor(web3ext_constructor)
logg.info('ccc {}'.format(config.store['TASKS_TRACE_QUEUE_STATUS']))
Otx.tracing = config.true('TASKS_TRACE_QUEUE_STATUS') Otx.tracing = config.true('TASKS_TRACE_QUEUE_STATUS')

View File

@ -28,20 +28,25 @@ class SyncerBackend:
def connect(self): def connect(self):
"""Loads the state of the syncer session with the given id. """Loads the state of the syncer session with the given id.
""" """
if self.db_session == None:
self.db_session = SessionBase.create_session() self.db_session = SessionBase.create_session()
q = self.db_session.query(BlockchainSync) q = self.db_session.query(BlockchainSync)
q = q.filter(BlockchainSync.id==self.object_id) q = q.filter(BlockchainSync.id==self.object_id)
self.db_object = q.first() self.db_object = q.first()
if self.db_object == None: if self.db_object == None:
self.disconnect()
raise ValueError('sync entry with id {} not found'.format(self.object_id)) raise ValueError('sync entry with id {} not found'.format(self.object_id))
return self.db_session
def disconnect(self): def disconnect(self):
"""Commits state of sync to backend. """Commits state of sync to backend.
""" """
if self.db_session != None:
self.db_session.add(self.db_object) self.db_session.add(self.db_object)
self.db_session.commit() self.db_session.commit()
self.db_session.close() self.db_session.close()
self.db_session = None
def chain(self): def chain(self):

View File

@ -56,7 +56,8 @@ class MinedSyncer(Syncer):
# TODO: ensure filter loop can complete on graceful shutdown # TODO: ensure filter loop can complete on graceful shutdown
for f in self.filter: for f in self.filter:
#try: #try:
task_uuid = f(w3, tx, rcpt, self.chain()) session = self.bc_cache.connect()
task_uuid = f(w3, tx, rcpt, self.chain(), session)
#except Exception as e: #except Exception as e:
# logg.error('error in filter {} tx {}: {}'.format(f, tx_hash_hex, e)) # logg.error('error in filter {} tx {}: {}'.format(f, tx_hash_hex, e))
# continue # continue

View File

@ -10,7 +10,7 @@ version = (
0, 0,
10, 10,
0, 0,
'alpha.26', 'alpha.27',
) )
version_object = semver.VersionInfo( version_object = semver.VersionInfo(

View File

@ -212,6 +212,7 @@ services:
DATABASE_PORT: ${DATABASE_PORT:-5432} DATABASE_PORT: ${DATABASE_PORT:-5432}
DATABASE_ENGINE: ${DATABASE_ENGINE:-postgres} DATABASE_ENGINE: ${DATABASE_ENGINE:-postgres}
DATABASE_DRIVER: ${DATABASE_DRIVER:-psycopg2} DATABASE_DRIVER: ${DATABASE_DRIVER:-psycopg2}
DATABASE_DEBUG: ${DATABASE_DEBUG:-0}
PGPASSWORD: ${DATABASE_PASSWORD:-tralala} PGPASSWORD: ${DATABASE_PASSWORD:-tralala}
CIC_CHAIN_SPEC: ${CIC_CHAIN_SPEC:-Bloxberg:8996} CIC_CHAIN_SPEC: ${CIC_CHAIN_SPEC:-Bloxberg:8996}
BANCOR_DIR: ${BANCOR_DIR:-/usr/local/share/cic/bancor} BANCOR_DIR: ${BANCOR_DIR:-/usr/local/share/cic/bancor}
@ -252,6 +253,8 @@ services:
DATABASE_PORT: ${DATABASE_PORT:-5432} DATABASE_PORT: ${DATABASE_PORT:-5432}
DATABASE_ENGINE: ${DATABASE_ENGINE:-postgres} DATABASE_ENGINE: ${DATABASE_ENGINE:-postgres}
DATABASE_DRIVER: ${DATABASE_DRIVER:-psycopg2} DATABASE_DRIVER: ${DATABASE_DRIVER:-psycopg2}
#DATABASE_DEBUG: ${DATABASE_DEBUG:-0}
DATABASE_DEBUG: 1
CIC_CHAIN_SPEC: ${CIC_CHAIN_SPEC:-Bloxberg:8996} CIC_CHAIN_SPEC: ${CIC_CHAIN_SPEC:-Bloxberg:8996}
CIC_REGISTRY_ADDRESS: $CIC_REGISTRY_ADDRESS CIC_REGISTRY_ADDRESS: $CIC_REGISTRY_ADDRESS
#BANCOR_DIR: $BANCOR_DIR #BANCOR_DIR: $BANCOR_DIR
@ -288,6 +291,7 @@ services:
DATABASE_PORT: ${DATABASE_PORT:-5432} DATABASE_PORT: ${DATABASE_PORT:-5432}
DATABASE_ENGINE: ${DATABASE_ENGINE:-postgres} DATABASE_ENGINE: ${DATABASE_ENGINE:-postgres}
DATABASE_DRIVER: ${DATABASE_DRIVER:-psycopg2} DATABASE_DRIVER: ${DATABASE_DRIVER:-psycopg2}
DATABASE_DEBUG: ${DATABASE_DEBUG:-0}
CIC_CHAIN_SPEC: ${CIC_CHAIN_SPEC:-Bloxberg:8996} CIC_CHAIN_SPEC: ${CIC_CHAIN_SPEC:-Bloxberg:8996}
CIC_REGISTRY_ADDRESS: $CIC_REGISTRY_ADDRESS CIC_REGISTRY_ADDRESS: $CIC_REGISTRY_ADDRESS
#BANCOR_DIR: $BANCOR_DIR #BANCOR_DIR: $BANCOR_DIR
@ -324,6 +328,7 @@ services:
DATABASE_PORT: ${DATABASE_PORT:-5432} DATABASE_PORT: ${DATABASE_PORT:-5432}
DATABASE_ENGINE: ${DATABASE_ENGINE:-postgres} DATABASE_ENGINE: ${DATABASE_ENGINE:-postgres}
DATABASE_DRIVER: ${DATABASE_DRIVER:-psycopg2} DATABASE_DRIVER: ${DATABASE_DRIVER:-psycopg2}
DATABASE_DEBUG: ${DATABASE_DEBUG:-0}
CIC_CHAIN_SPEC: ${CIC_CHAIN_SPEC:-Bloxberg:8996} CIC_CHAIN_SPEC: ${CIC_CHAIN_SPEC:-Bloxberg:8996}
CIC_REGISTRY_ADDRESS: $CIC_REGISTRY_ADDRESS CIC_REGISTRY_ADDRESS: $CIC_REGISTRY_ADDRESS
#BANCOR_DIR: $BANCOR_DIR #BANCOR_DIR: $BANCOR_DIR