Move old cic-cache

This commit is contained in:
nolash 2021-02-19 08:11:48 +01:00
parent 8a43d67c72
commit 4762856653
Signed by untrusted user who does not match committer: lash
GPG Key ID: 21D2E7BB88C2A746
18 changed files with 137 additions and 69 deletions

View File

@ -5,6 +5,7 @@ include:
- local: 'apps/cic-ussd/.gitlab-ci.yml' - local: 'apps/cic-ussd/.gitlab-ci.yml'
- local: 'apps/cic-notify/.gitlab-ci.yml' - local: 'apps/cic-notify/.gitlab-ci.yml'
- local: 'apps/cic-meta/.gitlab-ci.yml' - local: 'apps/cic-meta/.gitlab-ci.yml'
- local: 'apps/cic-cache/.gitlab-ci.yml'
stages: stages:
- build - build

3
.gitmodules vendored
View File

@ -1,3 +0,0 @@
[submodule "apps/cic-cache"]
path = apps/cic-cache
url = git@gitlab.com:grassrootseconomics/cic-cache.git

@ -1 +0,0 @@
Subproject commit d2cb3a45558d7ca3a412c97c6aea794d9ac6c6f5

View File

@ -6,6 +6,11 @@ from sqlalchemy import Column, Integer
from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import (
StaticPool,
QueuePool,
AssertionPool,
)
logg = logging.getLogger() logg = logging.getLogger()
@ -49,7 +54,11 @@ class SessionBase(Model):
@staticmethod @staticmethod
<<<<<<< HEAD
def connect(dsn, pool_size=5, debug=False): def connect(dsn, pool_size=5, debug=False):
=======
def connect(dsn, pool_size=8, debug=False):
>>>>>>> origin/master
"""Create new database connection engine and connect to database backend. """Create new database connection engine and connect to database backend.
:param dsn: DSN string defining connection. :param dsn: DSN string defining connection.
@ -57,6 +66,7 @@ class SessionBase(Model):
""" """
e = None e = None
if SessionBase.poolable: if SessionBase.poolable:
<<<<<<< HEAD
e = create_engine( e = create_engine(
dsn, dsn,
max_overflow=pool_size*3, max_overflow=pool_size*3,
@ -65,6 +75,30 @@ class SessionBase(Model):
pool_recycle=10, pool_recycle=10,
echo=debug, echo=debug,
) )
=======
poolclass = QueuePool
if pool_size > 1:
e = create_engine(
dsn,
max_overflow=pool_size*3,
pool_pre_ping=True,
pool_size=pool_size,
pool_recycle=60,
poolclass=poolclass,
echo=debug,
)
else:
if debug:
poolclass = AssertionPool
else:
poolclass = StaticPool
e = create_engine(
dsn,
poolclass=poolclass,
echo=debug,
)
>>>>>>> origin/master
else: else:
e = create_engine( e = create_engine(
dsn, dsn,

View File

@ -85,9 +85,7 @@ class TxCache(SessionBase):
:param tx_hash_new: tx hash to associate the copied entry with :param tx_hash_new: tx hash to associate the copied entry with
:type tx_hash_new: str, 0x-hex :type tx_hash_new: str, 0x-hex
""" """
localsession = session localsession = SessionBase.bind_session(session)
if localsession == None:
localsession = SessionBase.create_session()
q = localsession.query(TxCache) q = localsession.query(TxCache)
q = q.join(Otx) q = q.join(Otx)
@ -95,8 +93,10 @@ class TxCache(SessionBase):
txc = q.first() txc = q.first()
if txc == None: if txc == None:
SessionBase.release_session(localsession)
raise NotLocalTxError('original {}'.format(tx_hash_original)) raise NotLocalTxError('original {}'.format(tx_hash_original))
if txc.block_number != None: if txc.block_number != None:
SessionBase.release_session(localsession)
raise TxStateChangeError('cannot clone tx cache of confirmed tx {}'.format(tx_hash_original)) raise TxStateChangeError('cannot clone tx cache of confirmed tx {}'.format(tx_hash_original))
q = localsession.query(Otx) q = localsession.query(Otx)
@ -104,6 +104,7 @@ class TxCache(SessionBase):
otx = q.first() otx = q.first()
if otx == None: if otx == None:
SessionBase.release_session(localsession)
raise NotLocalTxError('new {}'.format(tx_hash_new)) raise NotLocalTxError('new {}'.format(tx_hash_new))
txc_new = TxCache( txc_new = TxCache(
@ -118,15 +119,14 @@ class TxCache(SessionBase):
localsession.add(txc_new) localsession.add(txc_new)
localsession.commit() localsession.commit()
if session == None: SessionBase.release_session(localsession)
localsession.close()
def __init__(self, tx_hash, sender, recipient, source_token_address, destination_token_address, from_value, to_value, block_number=None, tx_index=None): def __init__(self, tx_hash, sender, recipient, source_token_address, destination_token_address, from_value, to_value, block_number=None, tx_index=None, session=None):
session = SessionBase.create_session() localsession = SessionBase.bind_session(session)
tx = session.query(Otx).filter(Otx.tx_hash==tx_hash).first() tx = localsession.query(Otx).filter(Otx.tx_hash==tx_hash).first()
if tx == None: if tx == None:
session.close() SessionBase.release_session(localsession)
raise FileNotFoundError('outgoing transaction record unknown {} (add a Tx first)'.format(tx_hash)) raise FileNotFoundError('outgoing transaction record unknown {} (add a Tx first)'.format(tx_hash))
self.otx_id = tx.id self.otx_id = tx.id
@ -143,4 +143,5 @@ class TxCache(SessionBase):
self.date_updated = self.date_created self.date_updated = self.date_created
self.date_checked = self.date_created self.date_checked = self.date_created
SessionBase.release_session(localsession)

View File

@ -304,6 +304,8 @@ def cache_gift_data(
tx = unpack_signed_raw_tx(tx_signed_raw_bytes, chain_spec.chain_id()) tx = unpack_signed_raw_tx(tx_signed_raw_bytes, chain_spec.chain_id())
tx_data = unpack_gift(tx['data']) tx_data = unpack_gift(tx['data'])
session = SessionBase.create_session()
tx_cache = TxCache( tx_cache = TxCache(
tx_hash_hex, tx_hash_hex,
tx['from'], tx['from'],
@ -312,9 +314,9 @@ def cache_gift_data(
zero_address, zero_address,
0, 0,
0, 0,
session=session,
) )
session = SessionBase.create_session()
session.add(tx_cache) session.add(tx_cache)
session.commit() session.commit()
cache_id = tx_cache.id cache_id = tx_cache.id
@ -347,6 +349,7 @@ def cache_account_data(
tx = unpack_signed_raw_tx(tx_signed_raw_bytes, chain_spec.chain_id()) tx = unpack_signed_raw_tx(tx_signed_raw_bytes, chain_spec.chain_id())
tx_data = unpack_register(tx['data']) tx_data = unpack_register(tx['data'])
session = SessionBase.create_session()
tx_cache = TxCache( tx_cache = TxCache(
tx_hash_hex, tx_hash_hex,
tx['from'], tx['from'],
@ -355,9 +358,8 @@ def cache_account_data(
zero_address, zero_address,
0, 0,
0, 0,
session=session,
) )
session = SessionBase.create_session()
session.add(tx_cache) session.add(tx_cache)
session.commit() session.commit()
cache_id = tx_cache.id cache_id = tx_cache.id

View File

@ -381,6 +381,7 @@ def cache_transfer_data(
tx['to'], tx['to'],
tx_data['amount'], tx_data['amount'],
tx_data['amount'], tx_data['amount'],
session=session,
) )
session.add(tx_cache) session.add(tx_cache)
session.commit() session.commit()
@ -440,6 +441,7 @@ def cache_approve_data(
tx['to'], tx['to'],
tx_data['amount'], tx_data['amount'],
tx_data['amount'], tx_data['amount'],
session=session,
) )
session.add(tx_cache) session.add(tx_cache)
session.commit() session.commit()

View File

@ -78,7 +78,6 @@ def check_gas(self, tx_hashes, chain_str, txs=[], address=None, gas_required=Non
# TODO: it should not be necessary to pass address explicitly, if not passed should be derived from the tx # TODO: it should not be necessary to pass address explicitly, if not passed should be derived from the tx
balance = c.w3.eth.getBalance(address) balance = c.w3.eth.getBalance(address)
logg.debug('check gas txs {}'.format(tx_hashes))
logg.debug('address {} has gas {} needs {}'.format(address, balance, gas_required)) logg.debug('address {} has gas {} needs {}'.format(address, balance, gas_required))
if gas_required > balance: if gas_required > balance:
@ -126,7 +125,6 @@ def check_gas(self, tx_hashes, chain_str, txs=[], address=None, gas_required=Non
queue=queue, queue=queue,
) )
ready_tasks.append(s) ready_tasks.append(s)
logg.debug('tasks {}'.format(ready_tasks))
celery.group(ready_tasks)() celery.group(ready_tasks)()
return txs return txs
@ -143,7 +141,6 @@ def hashes_to_txs(self, tx_hashes):
:returns: Signed raw transactions :returns: Signed raw transactions
:rtype: list of str, 0x-hex :rtype: list of str, 0x-hex
""" """
#logg = celery_app.log.get_default_logger()
if len(tx_hashes) == 0: if len(tx_hashes) == 0:
raise ValueError('no transaction to send') raise ValueError('no transaction to send')
@ -351,15 +348,12 @@ def send(self, txs, chain_str):
tx_hash_hex = tx_hash.hex() tx_hash_hex = tx_hash.hex()
queue = self.request.delivery_info.get('routing_key', None) queue = self.request.delivery_info.get('routing_key', None)
if queue == None:
logg.debug('send tx {} has no queue', tx_hash)
c = RpcClient(chain_spec) c = RpcClient(chain_spec)
r = None r = None
try: try:
r = c.w3.eth.send_raw_transaction(tx_hex) r = c.w3.eth.send_raw_transaction(tx_hex)
except Exception as e: except Exception as e:
logg.debug('e {}'.format(e))
raiser = ParityNodeHandler(chain_spec, queue) raiser = ParityNodeHandler(chain_spec, queue)
(t, e, m) = raiser.handle(e, tx_hash_hex, tx_hex) (t, e, m) = raiser.handle(e, tx_hash_hex, tx_hex)
raise e(m) raise e(m)
@ -423,7 +417,7 @@ def refill_gas(self, recipient_address, chain_str):
gas_price = c.gas_price() gas_price = c.gas_price()
gas_limit = c.default_gas_limit gas_limit = c.default_gas_limit
refill_amount = c.refill_amount() refill_amount = c.refill_amount()
logg.debug('gas price {} nonce {}'.format(gas_price, nonce)) logg.debug('tx send gas price {} nonce {}'.format(gas_price, nonce))
# create and sign transaction # create and sign transaction
tx_send_gas = { tx_send_gas = {
@ -436,7 +430,6 @@ def refill_gas(self, recipient_address, chain_str):
'value': refill_amount, 'value': refill_amount,
'data': '', 'data': '',
} }
logg.debug('txsend_gas {}'.format(tx_send_gas))
tx_send_gas_signed = c.w3.eth.sign_transaction(tx_send_gas) tx_send_gas_signed = c.w3.eth.sign_transaction(tx_send_gas)
tx_hash = web3.Web3.keccak(hexstr=tx_send_gas_signed['raw']) tx_hash = web3.Web3.keccak(hexstr=tx_send_gas_signed['raw'])
tx_hash_hex = tx_hash.hex() tx_hash_hex = tx_hash.hex()
@ -487,11 +480,14 @@ def resend_with_higher_gas(self, txold_hash_hex, chain_str, gas=None, default_fa
:rtype: str, 0x-hex :rtype: str, 0x-hex
""" """
session = SessionBase.create_session() session = SessionBase.create_session()
otx = session.query(Otx).filter(Otx.tx_hash==txold_hash_hex).first()
if otx == None:
session.close() q = session.query(Otx)
raise NotLocalTxError(txold_hash_hex) q = q.filter(Otx.tx_hash==txold_hash_hex)
otx = q.first()
session.close() session.close()
if otx == None:
raise NotLocalTxError(txold_hash_hex)
chain_spec = ChainSpec.from_chain_str(chain_str) chain_spec = ChainSpec.from_chain_str(chain_str)
c = RpcClient(chain_spec) c = RpcClient(chain_spec)
@ -508,7 +504,7 @@ def resend_with_higher_gas(self, txold_hash_hex, chain_str, gas=None, default_fa
else: else:
gas_price = c.gas_price() gas_price = c.gas_price()
if tx['gasPrice'] > gas_price: if tx['gasPrice'] > gas_price:
logg.warning('Network gas price {} is lower than overdue tx gas price {}'.format(gas_price, tx['gasPrice'])) logg.info('Network gas price {} is lower than overdue tx gas price {}'.format(gas_price, tx['gasPrice']))
#tx['gasPrice'] = int(tx['gasPrice'] * default_factor) #tx['gasPrice'] = int(tx['gasPrice'] * default_factor)
tx['gasPrice'] += 1 tx['gasPrice'] += 1
else: else:
@ -518,9 +514,6 @@ def resend_with_higher_gas(self, txold_hash_hex, chain_str, gas=None, default_fa
else: else:
tx['gasPrice'] = new_gas_price tx['gasPrice'] = new_gas_price
logg.debug('after {}'.format(tx))
#(tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx, chain_str, queue)
(tx_hash_hex, tx_signed_raw_hex) = sign_tx(tx, chain_str) (tx_hash_hex, tx_signed_raw_hex) = sign_tx(tx, chain_str)
queue_create( queue_create(
tx['nonce'], tx['nonce'],
@ -540,6 +533,7 @@ def resend_with_higher_gas(self, txold_hash_hex, chain_str, gas=None, default_fa
queue=queue, queue=queue,
) )
s.apply_async() s.apply_async()
return tx_hash_hex return tx_hash_hex
@ -602,7 +596,9 @@ def resume_tx(self, txpending_hash_hex, chain_str):
chain_spec = ChainSpec.from_chain_str(chain_str) chain_spec = ChainSpec.from_chain_str(chain_str)
session = SessionBase.create_session() session = SessionBase.create_session()
r = session.query(Otx.signed_tx).filter(Otx.tx_hash==txpending_hash_hex).first() q = session.query(Otx.signed_tx)
q = q.filter(Otx.tx_hash==txpending_hash_hex)
r = q.first()
session.close() session.close()
if r == None: if r == None:
raise NotLocalTxError(txpending_hash_hex) raise NotLocalTxError(txpending_hash_hex)

View File

@ -35,8 +35,7 @@ celery_app = celery.current_app
logg = logging.getLogger() logg = logging.getLogger()
@celery_app.task() def create(nonce, holder_address, tx_hash, signed_tx, chain_str, obsolete_predecessors=True, session=None):
def create(nonce, holder_address, tx_hash, signed_tx, chain_str, obsolete_predecessors=True):
"""Create a new transaction queue record. """Create a new transaction queue record.
:param nonce: Transaction nonce :param nonce: Transaction nonce
@ -52,10 +51,10 @@ def create(nonce, holder_address, tx_hash, signed_tx, chain_str, obsolete_predec
:returns: transaction hash :returns: transaction hash
:rtype: str, 0x-hash :rtype: str, 0x-hash
""" """
session = SessionBase.create_session() session = SessionBase.bind_session(session)
lock = Lock.check_aggregate(chain_str, LockEnum.QUEUE, holder_address, session=session) lock = Lock.check_aggregate(chain_str, LockEnum.QUEUE, holder_address, session=session)
if lock > 0: if lock > 0:
session.close() SessionBase.release_session(session)
raise LockedError(lock) raise LockedError(lock)
o = Otx.add( o = Otx.add(
@ -81,7 +80,7 @@ def create(nonce, holder_address, tx_hash, signed_tx, chain_str, obsolete_predec
otx.cancel(confirmed=False, session=session) otx.cancel(confirmed=False, session=session)
session.commit() session.commit()
session.close() SessionBase.release_session(session)
logg.debug('queue created nonce {} from {} hash {}'.format(nonce, holder_address, tx_hash)) logg.debug('queue created nonce {} from {} hash {}'.format(nonce, holder_address, tx_hash))
return tx_hash return tx_hash
@ -100,7 +99,9 @@ def set_sent_status(tx_hash, fail=False):
:rtype: boolean :rtype: boolean
""" """
session = SessionBase.create_session() session = SessionBase.create_session()
o = session.query(Otx).filter(Otx.tx_hash==tx_hash).first() q = session.query(Otx)
q = q.filter(Otx.tx_hash==tx_hash)
o = q.first()
if o == None: if o == None:
logg.warning('not local tx, skipping {}'.format(tx_hash)) logg.warning('not local tx, skipping {}'.format(tx_hash))
session.close() session.close()
@ -454,6 +455,7 @@ def get_tx(tx_hash):
session = SessionBase.create_session() session = SessionBase.create_session()
tx = session.query(Otx).filter(Otx.tx_hash==tx_hash).first() tx = session.query(Otx).filter(Otx.tx_hash==tx_hash).first()
if tx == None: if tx == None:
session.close()
raise NotLocalTxError('queue does not contain tx hash {}'.format(tx_hash)) raise NotLocalTxError('queue does not contain tx hash {}'.format(tx_hash))
o = { o = {
@ -498,7 +500,7 @@ def get_nonce_tx(nonce, sender, chain_id):
# TODO: pass chain spec instead of chain id # TODO: pass chain spec instead of chain id
def get_paused_txs(status=None, sender=None, chain_id=0): def get_paused_txs(status=None, sender=None, chain_id=0, session=None):
"""Returns not finalized transactions that have been attempted sent without success. """Returns not finalized transactions that have been attempted sent without success.
:param status: If set, will return transactions with this local queue status only :param status: If set, will return transactions with this local queue status only
@ -511,12 +513,13 @@ def get_paused_txs(status=None, sender=None, chain_id=0):
:returns: Transactions :returns: Transactions
:rtype: dict, with transaction hash as key, signed raw transaction as value :rtype: dict, with transaction hash as key, signed raw transaction as value
""" """
session = SessionBase.create_session() session = SessionBase.bind_session(session)
q = session.query(Otx) q = session.query(Otx)
if status != None: if status != None:
#if status == StatusEnum.PENDING or status >= StatusEnum.SENT: #if status == StatusEnum.PENDING or status >= StatusEnum.SENT:
if status == StatusEnum.PENDING or status & StatusBits.IN_NETWORK or not is_alive(status): if status == StatusEnum.PENDING or status & StatusBits.IN_NETWORK or not is_alive(status):
SessionBase.release_session(session)
raise ValueError('not a valid paused tx value: {}'.format(status)) raise ValueError('not a valid paused tx value: {}'.format(status))
q = q.filter(Otx.status.op('&')(status.value)==status.value) q = q.filter(Otx.status.op('&')(status.value)==status.value)
q = q.join(TxCache) q = q.join(TxCache)
@ -536,12 +539,12 @@ def get_paused_txs(status=None, sender=None, chain_id=0):
#gas += tx['gas'] * tx['gasPrice'] #gas += tx['gas'] * tx['gasPrice']
txs[r.tx_hash] = r.signed_tx txs[r.tx_hash] = r.signed_tx
session.close() SessionBase.release_session(session)
return txs return txs
def get_status_tx(status, before=None, exact=False, limit=0): def get_status_tx(status, before=None, exact=False, limit=0, session=None):
"""Retrieve transaction with a specific queue status. """Retrieve transaction with a specific queue status.
:param status: Status to match transactions with :param status: Status to match transactions with
@ -554,7 +557,7 @@ def get_status_tx(status, before=None, exact=False, limit=0):
:rtype: list of cic_eth.db.models.otx.Otx :rtype: list of cic_eth.db.models.otx.Otx
""" """
txs = {} txs = {}
session = SessionBase.create_session() session = SessionBase.bind_session(session)
q = session.query(Otx) q = session.query(Otx)
q = q.join(TxCache) q = q.join(TxCache)
q = q.filter(TxCache.date_updated<before) q = q.filter(TxCache.date_updated<before)
@ -568,12 +571,12 @@ def get_status_tx(status, before=None, exact=False, limit=0):
break break
txs[o.tx_hash] = o.signed_tx txs[o.tx_hash] = o.signed_tx
i += 1 i += 1
session.close() SessionBase.release_session(session)
return txs return txs
# TODO: move query to model # TODO: move query to model
def get_upcoming_tx(status=StatusEnum.READYSEND, recipient=None, before=None, chain_id=0): def get_upcoming_tx(status=StatusEnum.READYSEND, recipient=None, before=None, chain_id=0, session=None):
"""Returns the next pending transaction, specifically the transaction with the lowest nonce, for every recipient that has pending transactions. """Returns the next pending transaction, specifically the transaction with the lowest nonce, for every recipient that has pending transactions.
Will omit addresses that have the LockEnum.SEND bit in Lock set. Will omit addresses that have the LockEnum.SEND bit in Lock set.
@ -592,7 +595,7 @@ def get_upcoming_tx(status=StatusEnum.READYSEND, recipient=None, before=None, ch
:returns: Transactions :returns: Transactions
:rtype: dict, with transaction hash as key, signed raw transaction as value :rtype: dict, with transaction hash as key, signed raw transaction as value
""" """
session = SessionBase.create_session() session = SessionBase.bind_session(session)
q_outer = session.query( q_outer = session.query(
TxCache.sender, TxCache.sender,
func.min(Otx.nonce).label('nonce'), func.min(Otx.nonce).label('nonce'),
@ -602,6 +605,7 @@ def get_upcoming_tx(status=StatusEnum.READYSEND, recipient=None, before=None, ch
q_outer = q_outer.filter(or_(Lock.flags==None, Lock.flags.op('&')(LockEnum.SEND.value)==0)) q_outer = q_outer.filter(or_(Lock.flags==None, Lock.flags.op('&')(LockEnum.SEND.value)==0))
if not is_alive(status): if not is_alive(status):
SessionBase.release_session(session)
raise ValueError('not a valid non-final tx value: {}'.format(status)) raise ValueError('not a valid non-final tx value: {}'.format(status))
if status == StatusEnum.PENDING: if status == StatusEnum.PENDING:
q_outer = q_outer.filter(Otx.status==status.value) q_outer = q_outer.filter(Otx.status==status.value)
@ -643,7 +647,7 @@ def get_upcoming_tx(status=StatusEnum.READYSEND, recipient=None, before=None, ch
session.add(o) session.add(o)
session.commit() session.commit()
session.close() SessionBase.release_session(session)
return txs return txs

View File

@ -37,7 +37,7 @@ class CallbackFilter(SyncFilter):
transfer_type, transfer_type,
int(rcpt.status == 0), int(rcpt.status == 0),
], ],
queue=tc.queue, queue=self.queue,
) )
# s_translate = celery.signature( # s_translate = celery.signature(
# 'cic_eth.ext.address.translate', # 'cic_eth.ext.address.translate',
@ -82,7 +82,7 @@ class CallbackFilter(SyncFilter):
return (transfer_type, transfer_data) return (transfer_type, transfer_data)
def filter(self, w3, tx, rcpt, chain_spec): def filter(self, w3, tx, rcpt, chain_spec, session=None):
logg.debug('applying callback filter "{}:{}"'.format(self.queue, self.method)) logg.debug('applying callback filter "{}:{}"'.format(self.queue, self.method))
chain_str = str(chain_spec) chain_str = str(chain_spec)

View File

@ -18,30 +18,40 @@ logg = logging.getLogger()
class GasFilter(SyncFilter): class GasFilter(SyncFilter):
<<<<<<< HEAD
def __init__(self, queue, gas_provider): def __init__(self, queue, gas_provider):
=======
def __init__(self, gas_provider, queue=None):
self.queue = queue
>>>>>>> origin/master
self.gas_provider = gas_provider self.gas_provider = gas_provider
self.queue = queue self.queue = queue
def filter(self, w3, tx, rcpt, chain_str): def filter(self, w3, tx, rcpt, chain_str, session=None):
logg.debug('applying gas filter') logg.debug('applying gas filter')
tx_hash_hex = tx.hash.hex() tx_hash_hex = tx.hash.hex()
if tx['value'] > 0: if tx['value'] > 0:
logg.debug('gas refill tx {}'.format(tx_hash_hex)) logg.debug('gas refill tx {}'.format(tx_hash_hex))
session = SessionBase.create_session() session = SessionBase.bind_session(session)
q = session.query(TxCache.recipient) q = session.query(TxCache.recipient)
q = q.join(Otx) q = q.join(Otx)
q = q.filter(Otx.tx_hash==tx_hash_hex) q = q.filter(Otx.tx_hash==tx_hash_hex)
r = q.first() r = q.first()
session.close()
if r == None: if r == None:
logg.warning('unsolicited gas refill tx {}'.format(tx_hash_hex)) logg.warning('unsolicited gas refill tx {}'.format(tx_hash_hex))
SessionBase.release_session(session)
return return
chain_spec = ChainSpec.from_chain_str(chain_str) chain_spec = ChainSpec.from_chain_str(chain_str)
<<<<<<< HEAD
txs = get_paused_txs(StatusBits.GAS_ISSUES, r[0], chain_spec.chain_id()) txs = get_paused_txs(StatusBits.GAS_ISSUES, r[0], chain_spec.chain_id())
=======
txs = get_paused_txs(StatusEnum.WAITFORGAS, r[0], chain_spec.chain_id(), session=session)
SessionBase.release_session(session)
>>>>>>> origin/master
if len(txs) > 0: if len(txs) > 0:
logg.info('resuming gas-in-waiting txs for {}: {}'.format(r[0], txs.keys())) logg.info('resuming gas-in-waiting txs for {}: {}'.format(r[0], txs.keys()))

View File

@ -15,11 +15,15 @@ account_registry_add_log_hash = '0x5ed3bdd47b9af629827a8d129aa39c870b10c03f0153f
class RegistrationFilter(SyncFilter): class RegistrationFilter(SyncFilter):
<<<<<<< HEAD
def __init__(self, queue): def __init__(self, queue):
self.queue = queue self.queue = queue
def filter(self, w3, tx, rcpt, chain_spec): def filter(self, w3, tx, rcpt, chain_spec):
=======
def filter(self, w3, tx, rcpt, chain_spec, session=None):
>>>>>>> origin/master
logg.debug('applying registration filter') logg.debug('applying registration filter')
registered_address = None registered_address = None
for l in rcpt['logs']: for l in rcpt['logs']:

View File

@ -6,6 +6,7 @@ import celery
# local imports # local imports
from cic_eth.db.models.otx import Otx from cic_eth.db.models.otx import Otx
from cic_eth.db.models.base import SessionBase
from .base import SyncFilter from .base import SyncFilter
logg = logging.getLogger() logg = logging.getLogger()
@ -17,10 +18,12 @@ class TxFilter(SyncFilter):
self.queue = queue self.queue = queue
def filter(self, w3, tx, rcpt, chain_spec): def filter(self, w3, tx, rcpt, chain_spec, session=None):
session = SessionBase.bind_session(session)
logg.debug('applying tx filter') logg.debug('applying tx filter')
tx_hash_hex = tx.hash.hex() tx_hash_hex = tx.hash.hex()
otx = Otx.load(tx_hash_hex) otx = Otx.load(tx_hash_hex, session=session)
SessionBase.release_session(session)
if otx == None: if otx == None:
logg.debug('tx {} not found locally, skipping'.format(tx_hash_hex)) logg.debug('tx {} not found locally, skipping'.format(tx_hash_hex))
return None return None

View File

@ -118,7 +118,7 @@ declarator = CICRegistry.get_contract(chain_spec, 'AddressDeclarator', interface
dsn = dsn_from_config(config) dsn = dsn_from_config(config)
SessionBase.connect(dsn) SessionBase.connect(dsn, pool_size=1, debug=config.true('DATABASE_DEBUG'))
def main(): def main():
@ -180,7 +180,11 @@ def main():
registration_filter = RegistrationFilter(queue) registration_filter = RegistrationFilter(queue)
<<<<<<< HEAD
gas_filter = GasFilter(queue, c.gas_provider()) gas_filter = GasFilter(queue, c.gas_provider())
=======
gas_filter = GasFilter(c.gas_provider(), queue)
>>>>>>> origin/master
i = 0 i = 0
for syncer in syncers: for syncer in syncers:

View File

@ -78,7 +78,7 @@ logg.debug('config loaded from {}:\n{}'.format(args.c, config))
# connect to database # connect to database
dsn = dsn_from_config(config) dsn = dsn_from_config(config)
SessionBase.connect(dsn) SessionBase.connect(dsn, pool_size=8, debug=config.true('DATABASE_DEBUG'))
# verify database connection with minimal sanity query # verify database connection with minimal sanity query
session = SessionBase.create_session() session = SessionBase.create_session()
@ -179,7 +179,6 @@ def web3ext_constructor():
return (blockchain_provider, w3) return (blockchain_provider, w3)
RpcClient.set_constructor(web3ext_constructor) RpcClient.set_constructor(web3ext_constructor)
logg.info('ccc {}'.format(config.store['TASKS_TRACE_QUEUE_STATUS']))
Otx.tracing = config.true('TASKS_TRACE_QUEUE_STATUS') Otx.tracing = config.true('TASKS_TRACE_QUEUE_STATUS')

View File

@ -28,20 +28,25 @@ class SyncerBackend:
def connect(self): def connect(self):
"""Loads the state of the syncer session with the given id. """Loads the state of the syncer session with the given id.
""" """
self.db_session = SessionBase.create_session() if self.db_session == None:
self.db_session = SessionBase.create_session()
q = self.db_session.query(BlockchainSync) q = self.db_session.query(BlockchainSync)
q = q.filter(BlockchainSync.id==self.object_id) q = q.filter(BlockchainSync.id==self.object_id)
self.db_object = q.first() self.db_object = q.first()
if self.db_object == None: if self.db_object == None:
self.disconnect()
raise ValueError('sync entry with id {} not found'.format(self.object_id)) raise ValueError('sync entry with id {} not found'.format(self.object_id))
return self.db_session
def disconnect(self): def disconnect(self):
"""Commits state of sync to backend. """Commits state of sync to backend.
""" """
self.db_session.add(self.db_object) if self.db_session != None:
self.db_session.commit() self.db_session.add(self.db_object)
self.db_session.close() self.db_session.commit()
self.db_session.close()
self.db_session = None
def chain(self): def chain(self):

View File

@ -58,7 +58,8 @@ class MinedSyncer(Syncer):
# TODO: ensure filter loop can complete on graceful shutdown # TODO: ensure filter loop can complete on graceful shutdown
for f in self.filter: for f in self.filter:
#try: #try:
task_uuid = f(w3, tx, rcpt, self.chain()) session = self.bc_cache.connect()
task_uuid = f(w3, tx, rcpt, self.chain(), session)
#except Exception as e: #except Exception as e:
# logg.error('error in filter {} tx {}: {}'.format(f, tx_hash_hex, e)) # logg.error('error in filter {} tx {}: {}'.format(f, tx_hash_hex, e))
# continue # continue

View File

@ -131,8 +131,8 @@ services:
cic-cache-tracker: cic-cache-tracker:
build: build:
context: apps/cic-cache/ context: apps
dockerfile: docker/Dockerfile dockerfile: cic-cache/docker/Dockerfile
environment: environment:
CIC_REGISTRY_ADDRESS: $CIC_REGISTRY_ADDRESS # supplied at contract-config after contract provisioning CIC_REGISTRY_ADDRESS: $CIC_REGISTRY_ADDRESS # supplied at contract-config after contract provisioning
ETH_PROVIDER: ${ETH_PROVIDER:-http://eth:8545} ETH_PROVIDER: ${ETH_PROVIDER:-http://eth:8545}
@ -167,8 +167,8 @@ services:
cic-cache-server: cic-cache-server:
build: build:
context: apps/cic-cache/ context: apps
dockerfile: docker/Dockerfile dockerfile: cic-cache/docker/Dockerfile
environment: environment:
DATABASE_USER: $DATABASE_USER DATABASE_USER: $DATABASE_USER
DATABASE_HOST: $DATABASE_HOST DATABASE_HOST: $DATABASE_HOST
@ -253,7 +253,12 @@ services:
DATABASE_PORT: ${DATABASE_PORT:-5432} DATABASE_PORT: ${DATABASE_PORT:-5432}
DATABASE_ENGINE: ${DATABASE_ENGINE:-postgres} DATABASE_ENGINE: ${DATABASE_ENGINE:-postgres}
DATABASE_DRIVER: ${DATABASE_DRIVER:-psycopg2} DATABASE_DRIVER: ${DATABASE_DRIVER:-psycopg2}
<<<<<<< HEAD
DATABASE_DEBUG: ${DATABASE_DEBUG:-0} DATABASE_DEBUG: ${DATABASE_DEBUG:-0}
=======
#DATABASE_DEBUG: ${DATABASE_DEBUG:-0}
DATABASE_DEBUG: 1
>>>>>>> origin/master
CIC_CHAIN_SPEC: ${CIC_CHAIN_SPEC:-Bloxberg:8996} CIC_CHAIN_SPEC: ${CIC_CHAIN_SPEC:-Bloxberg:8996}
CIC_REGISTRY_ADDRESS: $CIC_REGISTRY_ADDRESS CIC_REGISTRY_ADDRESS: $CIC_REGISTRY_ADDRESS
#BANCOR_DIR: $BANCOR_DIR #BANCOR_DIR: $BANCOR_DIR
@ -290,6 +295,7 @@ services:
DATABASE_PORT: ${DATABASE_PORT:-5432} DATABASE_PORT: ${DATABASE_PORT:-5432}
DATABASE_ENGINE: ${DATABASE_ENGINE:-postgres} DATABASE_ENGINE: ${DATABASE_ENGINE:-postgres}
DATABASE_DRIVER: ${DATABASE_DRIVER:-psycopg2} DATABASE_DRIVER: ${DATABASE_DRIVER:-psycopg2}
DATABASE_DEBUG: ${DATABASE_DEBUG:-0}
CIC_CHAIN_SPEC: ${CIC_CHAIN_SPEC:-Bloxberg:8996} CIC_CHAIN_SPEC: ${CIC_CHAIN_SPEC:-Bloxberg:8996}
CIC_REGISTRY_ADDRESS: $CIC_REGISTRY_ADDRESS CIC_REGISTRY_ADDRESS: $CIC_REGISTRY_ADDRESS
#BANCOR_DIR: $BANCOR_DIR #BANCOR_DIR: $BANCOR_DIR