Revert back to subsession handling of nonce

This commit is contained in:
nolash 2021-03-09 07:43:31 +01:00
parent 2fe205b1e8
commit 94c8fd6cd6
Signed by untrusted user who does not match committer: lash
GPG Key ID: 21D2E7BB88C2A746
7 changed files with 70 additions and 23 deletions

View File

@ -78,7 +78,7 @@ class Nonce(SessionBase):
# TODO: Incrementing nonce MUST be done by separate tasks. # TODO: Incrementing nonce MUST be done by separate tasks.
@staticmethod @staticmethod
def next(address, initial_if_not_exists=0): def next(address, initial_if_not_exists=0, session=None):
"""Generate next nonce for the given address. """Generate next nonce for the given address.
If there is no previous nonce record for the address, the nonce may be initialized to a specified value, or 0 if no value has been given. If there is no previous nonce record for the address, the nonce may be initialized to a specified value, or 0 if no value has been given.
@ -90,28 +90,31 @@ class Nonce(SessionBase):
:returns: Nonce :returns: Nonce
:rtype: number :rtype: number
""" """
#session = SessionBase.bind_session(session) session = SessionBase.bind_session(session)
#session.begin_nested() session.begin_nested()
conn = Nonce.engine.connect() #conn = Nonce.engine.connect()
if Nonce.transactional: #if Nonce.transactional:
conn.execute('BEGIN') # conn.execute('BEGIN')
conn.execute('LOCK TABLE nonce IN SHARE ROW EXCLUSIVE MODE') # conn.execute('LOCK TABLE nonce IN SHARE ROW EXCLUSIVE MODE')
logg.debug('locking nonce table for address {}'.format(address)) # logg.debug('locking nonce table for address {}'.format(address))
nonce = Nonce.__get(conn, address) #nonce = Nonce.__get(conn, address)
nonce = Nonce.__get(session, address)
logg.debug('get nonce {} for address {}'.format(nonce, address)) logg.debug('get nonce {} for address {}'.format(nonce, address))
if nonce == None: if nonce == None:
nonce = initial_if_not_exists nonce = initial_if_not_exists
logg.debug('setting default nonce to {} for address {}'.format(nonce, address)) logg.debug('setting default nonce to {} for address {}'.format(nonce, address))
Nonce.__init(conn, address, nonce) #Nonce.__init(conn, address, nonce)
Nonce.__set(conn, address, nonce+1) Nonce.__init(session, address, nonce)
if Nonce.transactional: #Nonce.__set(conn, address, nonce+1)
conn.execute('COMMIT') Nonce.__set(session, address, nonce + 1)
logg.debug('unlocking nonce table for address {}'.format(address)) #if Nonce.transactional:
conn.close() #conn.execute('COMMIT')
#session.commit() # logg.debug('unlocking nonce table for address {}'.format(address))
#conn.close()
session.commit()
#SessionBase.release_session(session) SessionBase.release_session(session)
return nonce return nonce
@ -173,7 +176,7 @@ class NonceReservation(SessionBase):
if NonceReservation.peek(key, session) != None: if NonceReservation.peek(key, session) != None:
raise IntegrityError('nonce for key {}'.format(key)) raise IntegrityError('nonce for key {}'.format(key))
nonce = Nonce.next(address) nonce = Nonce.next(address, session=session)
o = NonceReservation() o = NonceReservation()
o.nonce = nonce o.nonce = nonce

View File

@ -302,7 +302,7 @@ def role(self, account, chain_str):
return AccountRole.role_for(account) return AccountRole.role_for(account)
@celery_app.task() @celery_app.task(base=CriticalSQLAlchemyTask)
def cache_gift_data( def cache_gift_data(
tx_hash_hex, tx_hash_hex,
tx_signed_raw_hex, tx_signed_raw_hex,

View File

@ -95,7 +95,7 @@ def check_gas(self, tx_hashes, chain_str, txs=[], address=None, gas_required=Non
try: try:
balance = c.w3.eth.getBalance(address) balance = c.w3.eth.getBalance(address)
except ValueError as e: except ValueError as e:
raise EthError('balance call for {}'.format()) raise EthError('balance call for {}: {}'.format(address, e))
logg.debug('address {} has gas {} needs {}'.format(address, balance, gas_required)) logg.debug('address {} has gas {} needs {}'.format(address, balance, gas_required))

View File

@ -3,6 +3,7 @@ import logging
# external imports # external imports
from cic_registry import CICRegistry from cic_registry import CICRegistry
from cic_registry.registry import from_identifier
from chainlib.chain import ChainSpec from chainlib.chain import ChainSpec
from cic_registry.chain import ChainRegistry from cic_registry.chain import ChainRegistry
from cic_registry.helper.declarator import DeclaratorOracleAdapter from cic_registry.helper.declarator import DeclaratorOracleAdapter
@ -17,7 +18,7 @@ def safe_registry(w3):
return CICRegistry return CICRegistry
def init_registry(config, w3): def init_registry(config, w3, auto_populate=True):
chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC')) chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
CICRegistry.init(w3, config.get('CIC_REGISTRY_ADDRESS'), chain_spec) CICRegistry.init(w3, config.get('CIC_REGISTRY_ADDRESS'), chain_spec)
CICRegistry.add_path(config.get('ETH_ABI_DIR')) CICRegistry.add_path(config.get('ETH_ABI_DIR'))
@ -36,4 +37,44 @@ def init_registry(config, w3):
oracle = DeclaratorOracleAdapter(declarator.contract, trusted_addresses) oracle = DeclaratorOracleAdapter(declarator.contract, trusted_addresses)
chain_registry.add_oracle(oracle, 'naive_erc20_oracle') chain_registry.add_oracle(oracle, 'naive_erc20_oracle')
if auto_populate:
populate(chain_spec, w3)
return CICRegistry return CICRegistry
def populate(chain_spec, w3):
registry = CICRegistry.get_contract(chain_spec, 'CICRegistry')
fn = registry.function('identifiers')
i = 0
token_registry_contract = None
while True:
identifier_hex = None
try:
identifier_hex = fn(i).call()
except ValueError:
break
identifier = from_identifier(identifier_hex)
i += 1
try:
if identifier == 'TokenRegistry':
c = CICRegistry.get_contract(chain_spec, identifier, interface='RegistryClient')
token_registry_contract = c
else:
c = CICRegistry.get_contract(chain_spec, identifier)
logg.info('found token registry contract {}'.format(c.address()))
except ValueError:
logg.error('contract for identifier {} not found'.format(identifier))
continue
fn = token_registry_contract.function('entry')
i = 0
while True:
token_address = None
try:
token_address = fn(i).call()
except ValueError:
break
CICRegistry.get_address(chain_spec, token_address)
i += 1

View File

@ -81,7 +81,7 @@ logg.debug('config loaded from {}:\n{}'.format(args.c, config))
# connect to database # connect to database
dsn = dsn_from_config(config) dsn = dsn_from_config(config)
SessionBase.connect(dsn, pool_size=8, debug=config.true('DATABASE_DEBUG')) SessionBase.connect(dsn, pool_size=50, debug=config.true('DATABASE_DEBUG'))
# verify database connection with minimal sanity query # verify database connection with minimal sanity query
session = SessionBase.create_session() session = SessionBase.create_session()

View File

@ -66,7 +66,7 @@ cic_base.config.log(config)
dsn = dsn_from_config(config) dsn = dsn_from_config(config)
SessionBase.connect(dsn, pool_size=1, debug=config.true('DATABASE_DEBUG')) SessionBase.connect(dsn, pool_size=16, debug=config.true('DATABASE_DEBUG'))
re_websocket = re.compile('^wss?://') re_websocket = re.compile('^wss?://')
re_http = re.compile('^https?://') re_http = re.compile('^https?://')

View File

@ -22,6 +22,7 @@ class CriticalSQLAlchemyTask(CriticalTask):
autoretry_for = ( autoretry_for = (
sqlalchemy.exc.DatabaseError, sqlalchemy.exc.DatabaseError,
sqlalchemy.exc.TimeoutError, sqlalchemy.exc.TimeoutError,
sqlalchemy.exc.ResourceClosedError,
) )
@ -36,6 +37,7 @@ class CriticalSQLAlchemyAndWeb3Task(CriticalTask):
sqlalchemy.exc.DatabaseError, sqlalchemy.exc.DatabaseError,
sqlalchemy.exc.TimeoutError, sqlalchemy.exc.TimeoutError,
requests.exceptions.ConnectionError, requests.exceptions.ConnectionError,
sqlalchemy.exc.ResourceClosedError,
EthError, EthError,
) )
@ -43,6 +45,7 @@ class CriticalSQLAlchemyAndSignerTask(CriticalTask):
autoretry_for = ( autoretry_for = (
sqlalchemy.exc.DatabaseError, sqlalchemy.exc.DatabaseError,
sqlalchemy.exc.TimeoutError, sqlalchemy.exc.TimeoutError,
sqlalchemy.exc.ResourceClosedError,
SignerError, SignerError,
) )