diff --git a/apps/cic-eth/cic_eth/db/models/base.py b/apps/cic-eth/cic_eth/db/models/base.py index 16d01d0..353c67c 100644 --- a/apps/cic-eth/cic_eth/db/models/base.py +++ b/apps/cic-eth/cic_eth/db/models/base.py @@ -6,6 +6,11 @@ from sqlalchemy import Column, Integer from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker +from sqlalchemy.pool import ( + StaticPool, + QueuePool, + AssertionPool, + ) logg = logging.getLogger() @@ -49,7 +54,7 @@ class SessionBase(Model): @staticmethod - def connect(dsn, debug=False): + def connect(dsn, pool_size=8, debug=False): """Create new database connection engine and connect to database backend. :param dsn: DSN string defining connection. @@ -57,14 +62,28 @@ class SessionBase(Model): """ e = None if SessionBase.poolable: - e = create_engine( - dsn, - max_overflow=50, - pool_pre_ping=True, - pool_size=20, - pool_recycle=10, - echo=debug, - ) + poolclass = QueuePool + if pool_size > 1: + e = create_engine( + dsn, + max_overflow=pool_size*3, + pool_pre_ping=True, + pool_size=pool_size, + pool_recycle=60, + poolclass=poolclass, + echo=debug, + ) + else: + if debug: + poolclass = AssertionPool + else: + poolclass = StaticPool + + e = create_engine( + dsn, + poolclass=poolclass, + echo=debug, + ) else: e = create_engine( dsn, diff --git a/apps/cic-eth/cic_eth/db/models/tx.py b/apps/cic-eth/cic_eth/db/models/tx.py index 98a36ba..5eb46a7 100644 --- a/apps/cic-eth/cic_eth/db/models/tx.py +++ b/apps/cic-eth/cic_eth/db/models/tx.py @@ -85,18 +85,18 @@ class TxCache(SessionBase): :param tx_hash_new: tx hash to associate the copied entry with :type tx_hash_new: str, 0x-hex """ - localsession = session - if localsession == None: - localsession = SessionBase.create_session() - + localsession = SessionBase.bind_session(session) + q = localsession.query(TxCache) q = q.join(Otx) q = q.filter(Otx.tx_hash==tx_hash_original) txc = q.first() if txc == None: + SessionBase.release_session(localsession) raise NotLocalTxError('original {}'.format(tx_hash_original)) if txc.block_number != None: + SessionBase.release_session(localsession) raise TxStateChangeError('cannot clone tx cache of confirmed tx {}'.format(tx_hash_original)) q = localsession.query(Otx) @@ -104,6 +104,7 @@ class TxCache(SessionBase): otx = q.first() if otx == None: + SessionBase.release_session(localsession) raise NotLocalTxError('new {}'.format(tx_hash_new)) txc_new = TxCache( @@ -118,15 +119,14 @@ class TxCache(SessionBase): localsession.add(txc_new) localsession.commit() - if session == None: - localsession.close() + SessionBase.release_session(localsession) - def __init__(self, tx_hash, sender, recipient, source_token_address, destination_token_address, from_value, to_value, block_number=None, tx_index=None): - session = SessionBase.create_session() - tx = session.query(Otx).filter(Otx.tx_hash==tx_hash).first() + def __init__(self, tx_hash, sender, recipient, source_token_address, destination_token_address, from_value, to_value, block_number=None, tx_index=None, session=None): + localsession = SessionBase.bind_session(session) + tx = localsession.query(Otx).filter(Otx.tx_hash==tx_hash).first() if tx == None: - session.close() + SessionBase.release_session(localsession) raise FileNotFoundError('outgoing transaction record unknown {} (add a Tx first)'.format(tx_hash)) self.otx_id = tx.id @@ -143,4 +143,5 @@ class TxCache(SessionBase): self.date_updated = self.date_created self.date_checked = self.date_created + SessionBase.release_session(localsession) diff --git a/apps/cic-eth/cic_eth/eth/account.py b/apps/cic-eth/cic_eth/eth/account.py index 01a9f6e..a3b2322 100644 --- a/apps/cic-eth/cic_eth/eth/account.py +++ b/apps/cic-eth/cic_eth/eth/account.py @@ -304,6 +304,8 @@ def cache_gift_data( tx = unpack_signed_raw_tx(tx_signed_raw_bytes, chain_spec.chain_id()) tx_data = unpack_gift(tx['data']) + session = SessionBase.create_session() + tx_cache = TxCache( tx_hash_hex, tx['from'], @@ -312,9 +314,9 @@ def cache_gift_data( zero_address, 0, 0, + session=session, ) - session = SessionBase.create_session() session.add(tx_cache) session.commit() cache_id = tx_cache.id @@ -347,6 +349,7 @@ def cache_account_data( tx = unpack_signed_raw_tx(tx_signed_raw_bytes, chain_spec.chain_id()) tx_data = unpack_register(tx['data']) + session = SessionBase.create_session() tx_cache = TxCache( tx_hash_hex, tx['from'], @@ -355,9 +358,8 @@ def cache_account_data( zero_address, 0, 0, + session=session, ) - - session = SessionBase.create_session() session.add(tx_cache) session.commit() cache_id = tx_cache.id diff --git a/apps/cic-eth/cic_eth/eth/token.py b/apps/cic-eth/cic_eth/eth/token.py index 60ffec9..6cbe088 100644 --- a/apps/cic-eth/cic_eth/eth/token.py +++ b/apps/cic-eth/cic_eth/eth/token.py @@ -381,6 +381,7 @@ def cache_transfer_data( tx['to'], tx_data['amount'], tx_data['amount'], + session=session, ) session.add(tx_cache) session.commit() @@ -440,6 +441,7 @@ def cache_approve_data( tx['to'], tx_data['amount'], tx_data['amount'], + session=session, ) session.add(tx_cache) session.commit() diff --git a/apps/cic-eth/cic_eth/eth/tx.py b/apps/cic-eth/cic_eth/eth/tx.py index c4a3fca..ce16145 100644 --- a/apps/cic-eth/cic_eth/eth/tx.py +++ b/apps/cic-eth/cic_eth/eth/tx.py @@ -78,7 +78,6 @@ def check_gas(self, tx_hashes, chain_str, txs=[], address=None, gas_required=Non # TODO: it should not be necessary to pass address explicitly, if not passed should be derived from the tx balance = c.w3.eth.getBalance(address) - logg.debug('check gas txs {}'.format(tx_hashes)) logg.debug('address {} has gas {} needs {}'.format(address, balance, gas_required)) if gas_required > balance: @@ -126,7 +125,6 @@ def check_gas(self, tx_hashes, chain_str, txs=[], address=None, gas_required=Non queue=queue, ) ready_tasks.append(s) - logg.debug('tasks {}'.format(ready_tasks)) celery.group(ready_tasks)() return txs @@ -143,7 +141,6 @@ def hashes_to_txs(self, tx_hashes): :returns: Signed raw transactions :rtype: list of str, 0x-hex """ - #logg = celery_app.log.get_default_logger() if len(tx_hashes) == 0: raise ValueError('no transaction to send') @@ -351,15 +348,12 @@ def send(self, txs, chain_str): tx_hash_hex = tx_hash.hex() queue = self.request.delivery_info.get('routing_key', None) - if queue == None: - logg.debug('send tx {} has no queue', tx_hash) c = RpcClient(chain_spec) r = None try: r = c.w3.eth.send_raw_transaction(tx_hex) except Exception as e: - logg.debug('e {}'.format(e)) raiser = ParityNodeHandler(chain_spec, queue) (t, e, m) = raiser.handle(e, tx_hash_hex, tx_hex) raise e(m) @@ -423,7 +417,7 @@ def refill_gas(self, recipient_address, chain_str): gas_price = c.gas_price() gas_limit = c.default_gas_limit refill_amount = c.refill_amount() - logg.debug('gas price {} nonce {}'.format(gas_price, nonce)) + logg.debug('tx send gas price {} nonce {}'.format(gas_price, nonce)) # create and sign transaction tx_send_gas = { @@ -436,7 +430,6 @@ def refill_gas(self, recipient_address, chain_str): 'value': refill_amount, 'data': '', } - logg.debug('txsend_gas {}'.format(tx_send_gas)) tx_send_gas_signed = c.w3.eth.sign_transaction(tx_send_gas) tx_hash = web3.Web3.keccak(hexstr=tx_send_gas_signed['raw']) tx_hash_hex = tx_hash.hex() @@ -487,11 +480,14 @@ def resend_with_higher_gas(self, txold_hash_hex, chain_str, gas=None, default_fa :rtype: str, 0x-hex """ session = SessionBase.create_session() - otx = session.query(Otx).filter(Otx.tx_hash==txold_hash_hex).first() - if otx == None: - session.close() - raise NotLocalTxError(txold_hash_hex) + + + q = session.query(Otx) + q = q.filter(Otx.tx_hash==txold_hash_hex) + otx = q.first() session.close() + if otx == None: + raise NotLocalTxError(txold_hash_hex) chain_spec = ChainSpec.from_chain_str(chain_str) c = RpcClient(chain_spec) @@ -508,7 +504,7 @@ def resend_with_higher_gas(self, txold_hash_hex, chain_str, gas=None, default_fa else: gas_price = c.gas_price() if tx['gasPrice'] > gas_price: - logg.warning('Network gas price {} is lower than overdue tx gas price {}'.format(gas_price, tx['gasPrice'])) + logg.info('Network gas price {} is lower than overdue tx gas price {}'.format(gas_price, tx['gasPrice'])) #tx['gasPrice'] = int(tx['gasPrice'] * default_factor) tx['gasPrice'] += 1 else: @@ -518,9 +514,6 @@ def resend_with_higher_gas(self, txold_hash_hex, chain_str, gas=None, default_fa else: tx['gasPrice'] = new_gas_price - logg.debug('after {}'.format(tx)) - - #(tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx, chain_str, queue) (tx_hash_hex, tx_signed_raw_hex) = sign_tx(tx, chain_str) queue_create( tx['nonce'], @@ -540,6 +533,7 @@ def resend_with_higher_gas(self, txold_hash_hex, chain_str, gas=None, default_fa queue=queue, ) s.apply_async() + return tx_hash_hex @@ -602,7 +596,9 @@ def resume_tx(self, txpending_hash_hex, chain_str): chain_spec = ChainSpec.from_chain_str(chain_str) session = SessionBase.create_session() - r = session.query(Otx.signed_tx).filter(Otx.tx_hash==txpending_hash_hex).first() + q = session.query(Otx.signed_tx) + q = q.filter(Otx.tx_hash==txpending_hash_hex) + r = q.first() session.close() if r == None: raise NotLocalTxError(txpending_hash_hex) diff --git a/apps/cic-eth/cic_eth/queue/tx.py b/apps/cic-eth/cic_eth/queue/tx.py index 94ab14b..9269bd8 100644 --- a/apps/cic-eth/cic_eth/queue/tx.py +++ b/apps/cic-eth/cic_eth/queue/tx.py @@ -35,8 +35,7 @@ celery_app = celery.current_app logg = logging.getLogger() -@celery_app.task() -def create(nonce, holder_address, tx_hash, signed_tx, chain_str, obsolete_predecessors=True): +def create(nonce, holder_address, tx_hash, signed_tx, chain_str, obsolete_predecessors=True, session=None): """Create a new transaction queue record. :param nonce: Transaction nonce @@ -52,10 +51,10 @@ def create(nonce, holder_address, tx_hash, signed_tx, chain_str, obsolete_predec :returns: transaction hash :rtype: str, 0x-hash """ - session = SessionBase.create_session() + session = SessionBase.bind_session(session) lock = Lock.check_aggregate(chain_str, LockEnum.QUEUE, holder_address, session=session) if lock > 0: - session.close() + SessionBase.release_session(session) raise LockedError(lock) o = Otx.add( @@ -81,7 +80,7 @@ def create(nonce, holder_address, tx_hash, signed_tx, chain_str, obsolete_predec otx.cancel(confirmed=False, session=session) session.commit() - session.close() + SessionBase.release_session(session) logg.debug('queue created nonce {} from {} hash {}'.format(nonce, holder_address, tx_hash)) return tx_hash @@ -100,7 +99,9 @@ def set_sent_status(tx_hash, fail=False): :rtype: boolean """ session = SessionBase.create_session() - o = session.query(Otx).filter(Otx.tx_hash==tx_hash).first() + q = session.query(Otx) + q = q.filter(Otx.tx_hash==tx_hash) + o = q.first() if o == None: logg.warning('not local tx, skipping {}'.format(tx_hash)) session.close() @@ -454,6 +455,7 @@ def get_tx(tx_hash): session = SessionBase.create_session() tx = session.query(Otx).filter(Otx.tx_hash==tx_hash).first() if tx == None: + session.close() raise NotLocalTxError('queue does not contain tx hash {}'.format(tx_hash)) o = { @@ -498,7 +500,7 @@ def get_nonce_tx(nonce, sender, chain_id): # TODO: pass chain spec instead of chain id -def get_paused_txs(status=None, sender=None, chain_id=0): +def get_paused_txs(status=None, sender=None, chain_id=0, session=None): """Returns not finalized transactions that have been attempted sent without success. :param status: If set, will return transactions with this local queue status only @@ -511,12 +513,13 @@ def get_paused_txs(status=None, sender=None, chain_id=0): :returns: Transactions :rtype: dict, with transaction hash as key, signed raw transaction as value """ - session = SessionBase.create_session() + session = SessionBase.bind_session(session) q = session.query(Otx) if status != None: #if status == StatusEnum.PENDING or status >= StatusEnum.SENT: if status == StatusEnum.PENDING or status & StatusBits.IN_NETWORK or not is_alive(status): + SessionBase.release_session(session) raise ValueError('not a valid paused tx value: {}'.format(status)) q = q.filter(Otx.status.op('&')(status.value)==status.value) q = q.join(TxCache) @@ -536,12 +539,12 @@ def get_paused_txs(status=None, sender=None, chain_id=0): #gas += tx['gas'] * tx['gasPrice'] txs[r.tx_hash] = r.signed_tx - session.close() + SessionBase.release_session(session) return txs -def get_status_tx(status, before=None, exact=False, limit=0): +def get_status_tx(status, before=None, exact=False, limit=0, session=None): """Retrieve transaction with a specific queue status. :param status: Status to match transactions with @@ -554,7 +557,7 @@ def get_status_tx(status, before=None, exact=False, limit=0): :rtype: list of cic_eth.db.models.otx.Otx """ txs = {} - session = SessionBase.create_session() + session = SessionBase.bind_session(session) q = session.query(Otx) q = q.join(TxCache) q = q.filter(TxCache.date_updated 0: logg.debug('gas refill tx {}'.format(tx_hash_hex)) - session = SessionBase.create_session() + session = SessionBase.bind_session(session) q = session.query(TxCache.recipient) q = q.join(Otx) q = q.filter(Otx.tx_hash==tx_hash_hex) r = q.first() - session.close() - if r == None: logg.warning('unsolicited gas refill tx {}'.format(tx_hash_hex)) + SessionBase.release_session(session) return chain_spec = ChainSpec.from_chain_str(chain_str) - txs = get_paused_txs(StatusEnum.WAITFORGAS, r[0], chain_spec.chain_id()) + txs = get_paused_txs(StatusEnum.WAITFORGAS, r[0], chain_spec.chain_id(), session=session) + + SessionBase.release_session(session) if len(txs) > 0: logg.info('resuming gas-in-waiting txs for {}: {}'.format(r[0], txs.keys())) @@ -49,6 +51,6 @@ class GasFilter(SyncFilter): r[0], 0, tx_hashes_hex=list(txs.keys()), - queue=queue, + queue=self.queue, ) s.apply_async() diff --git a/apps/cic-eth/cic_eth/runnable/daemons/filters/register.py b/apps/cic-eth/cic_eth/runnable/daemons/filters/register.py index 180cdca..3c003d3 100644 --- a/apps/cic-eth/cic_eth/runnable/daemons/filters/register.py +++ b/apps/cic-eth/cic_eth/runnable/daemons/filters/register.py @@ -15,7 +15,7 @@ account_registry_add_log_hash = '0x5ed3bdd47b9af629827a8d129aa39c870b10c03f0153f class RegistrationFilter(SyncFilter): - def filter(self, w3, tx, rcpt, chain_spec): + def filter(self, w3, tx, rcpt, chain_spec, session=None): logg.debug('applying registration filter') registered_address = None for l in rcpt['logs']: diff --git a/apps/cic-eth/cic_eth/runnable/daemons/filters/tx.py b/apps/cic-eth/cic_eth/runnable/daemons/filters/tx.py index 6a645ff..ceab9a1 100644 --- a/apps/cic-eth/cic_eth/runnable/daemons/filters/tx.py +++ b/apps/cic-eth/cic_eth/runnable/daemons/filters/tx.py @@ -6,6 +6,7 @@ import celery # local imports from cic_eth.db.models.otx import Otx +from cic_eth.db.models.base import SessionBase from .base import SyncFilter logg = logging.getLogger() @@ -17,15 +18,17 @@ class TxFilter(SyncFilter): self.queue = queue - def filter(self, w3, tx, rcpt, chain_spec): + def filter(self, w3, tx, rcpt, chain_spec, session=None): + session = SessionBase.bind_session(session) logg.debug('applying tx filter') tx_hash_hex = tx.hash.hex() - otx = Otx.load(tx_hash_hex) + otx = Otx.load(tx_hash_hex, session=session) + SessionBase.release_session(session) if otx == None: logg.debug('tx {} not found locally, skipping'.format(tx_hash_hex)) return None logg.info('otx found {}'.format(otx.tx_hash)) - s = celery.siignature( + s = celery.signature( 'cic_eth.queue.tx.set_final_status', [ tx_hash_hex, diff --git a/apps/cic-eth/cic_eth/runnable/daemons/manager.py b/apps/cic-eth/cic_eth/runnable/daemons/manager.py index bb85c46..db83460 100644 --- a/apps/cic-eth/cic_eth/runnable/daemons/manager.py +++ b/apps/cic-eth/cic_eth/runnable/daemons/manager.py @@ -118,7 +118,7 @@ declarator = CICRegistry.get_contract(chain_spec, 'AddressDeclarator', interface dsn = dsn_from_config(config) -SessionBase.connect(dsn) +SessionBase.connect(dsn, pool_size=1, debug=config.true('DATABASE_DEBUG')) def main(): @@ -180,7 +180,7 @@ def main(): registration_filter = RegistrationFilter() - gas_filter = GasFilter(c.gas_provider()) + gas_filter = GasFilter(c.gas_provider(), queue) i = 0 for syncer in syncers: diff --git a/apps/cic-eth/cic_eth/runnable/daemons/tasker.py b/apps/cic-eth/cic_eth/runnable/daemons/tasker.py index 6eb343c..6b9c4bf 100644 --- a/apps/cic-eth/cic_eth/runnable/daemons/tasker.py +++ b/apps/cic-eth/cic_eth/runnable/daemons/tasker.py @@ -78,7 +78,7 @@ logg.debug('config loaded from {}:\n{}'.format(args.c, config)) # connect to database dsn = dsn_from_config(config) -SessionBase.connect(dsn) +SessionBase.connect(dsn, pool_size=8, debug=config.true('DATABASE_DEBUG')) # verify database connection with minimal sanity query session = SessionBase.create_session() @@ -179,7 +179,6 @@ def web3ext_constructor(): return (blockchain_provider, w3) RpcClient.set_constructor(web3ext_constructor) -logg.info('ccc {}'.format(config.store['TASKS_TRACE_QUEUE_STATUS'])) Otx.tracing = config.true('TASKS_TRACE_QUEUE_STATUS') diff --git a/apps/cic-eth/cic_eth/sync/backend.py b/apps/cic-eth/cic_eth/sync/backend.py index af58be9..56f08f3 100644 --- a/apps/cic-eth/cic_eth/sync/backend.py +++ b/apps/cic-eth/cic_eth/sync/backend.py @@ -28,20 +28,25 @@ class SyncerBackend: def connect(self): """Loads the state of the syncer session with the given id. """ - self.db_session = SessionBase.create_session() + if self.db_session == None: + self.db_session = SessionBase.create_session() q = self.db_session.query(BlockchainSync) q = q.filter(BlockchainSync.id==self.object_id) self.db_object = q.first() if self.db_object == None: + self.disconnect() raise ValueError('sync entry with id {} not found'.format(self.object_id)) + return self.db_session def disconnect(self): """Commits state of sync to backend. """ - self.db_session.add(self.db_object) - self.db_session.commit() - self.db_session.close() + if self.db_session != None: + self.db_session.add(self.db_object) + self.db_session.commit() + self.db_session.close() + self.db_session = None def chain(self): diff --git a/apps/cic-eth/cic_eth/sync/mined.py b/apps/cic-eth/cic_eth/sync/mined.py index 616eea5..4481bb2 100644 --- a/apps/cic-eth/cic_eth/sync/mined.py +++ b/apps/cic-eth/cic_eth/sync/mined.py @@ -56,7 +56,8 @@ class MinedSyncer(Syncer): # TODO: ensure filter loop can complete on graceful shutdown for f in self.filter: #try: - task_uuid = f(w3, tx, rcpt, self.chain()) + session = self.bc_cache.connect() + task_uuid = f(w3, tx, rcpt, self.chain(), session) #except Exception as e: # logg.error('error in filter {} tx {}: {}'.format(f, tx_hash_hex, e)) # continue diff --git a/apps/cic-eth/cic_eth/version.py b/apps/cic-eth/cic_eth/version.py index 14d75c3..f782394 100644 --- a/apps/cic-eth/cic_eth/version.py +++ b/apps/cic-eth/cic_eth/version.py @@ -10,7 +10,7 @@ version = ( 0, 10, 0, - 'alpha.26', + 'alpha.27', ) version_object = semver.VersionInfo( diff --git a/docker-compose.yml b/docker-compose.yml index 7433730..2c54eae 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -212,6 +212,7 @@ services: DATABASE_PORT: ${DATABASE_PORT:-5432} DATABASE_ENGINE: ${DATABASE_ENGINE:-postgres} DATABASE_DRIVER: ${DATABASE_DRIVER:-psycopg2} + DATABASE_DEBUG: ${DATABASE_DEBUG:-0} PGPASSWORD: ${DATABASE_PASSWORD:-tralala} CIC_CHAIN_SPEC: ${CIC_CHAIN_SPEC:-Bloxberg:8996} BANCOR_DIR: ${BANCOR_DIR:-/usr/local/share/cic/bancor} @@ -252,6 +253,8 @@ services: DATABASE_PORT: ${DATABASE_PORT:-5432} DATABASE_ENGINE: ${DATABASE_ENGINE:-postgres} DATABASE_DRIVER: ${DATABASE_DRIVER:-psycopg2} + #DATABASE_DEBUG: ${DATABASE_DEBUG:-0} + DATABASE_DEBUG: 1 CIC_CHAIN_SPEC: ${CIC_CHAIN_SPEC:-Bloxberg:8996} CIC_REGISTRY_ADDRESS: $CIC_REGISTRY_ADDRESS #BANCOR_DIR: $BANCOR_DIR @@ -288,6 +291,7 @@ services: DATABASE_PORT: ${DATABASE_PORT:-5432} DATABASE_ENGINE: ${DATABASE_ENGINE:-postgres} DATABASE_DRIVER: ${DATABASE_DRIVER:-psycopg2} + DATABASE_DEBUG: ${DATABASE_DEBUG:-0} CIC_CHAIN_SPEC: ${CIC_CHAIN_SPEC:-Bloxberg:8996} CIC_REGISTRY_ADDRESS: $CIC_REGISTRY_ADDRESS #BANCOR_DIR: $BANCOR_DIR @@ -324,6 +328,7 @@ services: DATABASE_PORT: ${DATABASE_PORT:-5432} DATABASE_ENGINE: ${DATABASE_ENGINE:-postgres} DATABASE_DRIVER: ${DATABASE_DRIVER:-psycopg2} + DATABASE_DEBUG: ${DATABASE_DEBUG:-0} CIC_CHAIN_SPEC: ${CIC_CHAIN_SPEC:-Bloxberg:8996} CIC_REGISTRY_ADDRESS: $CIC_REGISTRY_ADDRESS #BANCOR_DIR: $BANCOR_DIR