Ensure serial send of txs to node

This commit is contained in:
nolash 2021-04-08 07:09:38 +02:00
parent 2f0a95d2b6
commit 8f2d3997d3
Signed by untrusted user who does not match committer: lash
GPG Key ID: 21D2E7BB88C2A746
23 changed files with 261 additions and 56 deletions

View File

@ -10,6 +10,7 @@ from sqlalchemy.pool import (
StaticPool,
QueuePool,
AssertionPool,
NullPool,
)
logg = logging.getLogger()
@ -64,6 +65,7 @@ class SessionBase(Model):
if SessionBase.poolable:
poolclass = QueuePool
if pool_size > 1:
logg.info('db using queue pool')
e = create_engine(
dsn,
max_overflow=pool_size*3,
@ -74,17 +76,22 @@ class SessionBase(Model):
echo=debug,
)
else:
if debug:
if pool_size == 0:
logg.info('db using nullpool')
poolclass = NullPool
elif debug:
logg.info('db using assertion pool')
poolclass = AssertionPool
else:
logg.info('db using static pool')
poolclass = StaticPool
e = create_engine(
dsn,
poolclass=poolclass,
echo=debug,
)
else:
logg.info('db not poolable')
e = create_engine(
dsn,
echo=debug,

View File

@ -91,7 +91,7 @@ logg.debug('config loaded from {}:\n{}'.format(args.c, config))
# connect to database
dsn = dsn_from_config(config)
SessionBase.connect(dsn, pool_size=50, debug=config.true('DATABASE_DEBUG'))
SessionBase.connect(dsn, pool_size=int(config.get('DATABASE_POOL_SIZE')), debug=config.true('DATABASE_DEBUG'))
# verify database connection with minimal sanity query
session = SessionBase.create_session()

View File

@ -6,4 +6,5 @@ HOST=localhost
PORT=5432
ENGINE=postgresql
DRIVER=psycopg2
POOL_SIZE=50
DEBUG=0

View File

@ -6,4 +6,5 @@ HOST=localhost
PORT=63432
ENGINE=postgresql
DRIVER=psycopg2
POOL_SIZE=50
DEBUG=0

View File

@ -6,3 +6,5 @@ HOST=localhost
PORT=5432
ENGINE=postgresql
DRIVER=psycopg2
DEBUG=0
POOL_SIZE=1

View File

@ -1,47 +1,129 @@
# standard imports
# stanard imports
import logging
import datetime
# third-party imports
# external imports
from sqlalchemy import Column, Integer, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import (
StaticPool,
QueuePool,
AssertionPool,
NullPool,
)
logg = logging.getLogger().getChild(__name__)
Model = declarative_base(name='Model')
class SessionBase(Model):
"""The base object for all SQLAlchemy enabled models. All other models must extend this.
"""
__abstract__ = True
id = Column(Integer, primary_key=True)
created = Column(DateTime, default=datetime.datetime.utcnow)
updated = Column(DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow)
id = Column(Integer, primary_key=True)
engine = None
session = None
query = None
"""Database connection engine of the running aplication"""
sessionmaker = None
"""Factory object responsible for creating sessions from the connection pool"""
transactional = True
"""Whether the database backend supports query transactions. Should be explicitly set by initialization code"""
poolable = True
"""Whether the database backend supports connection pools. Should be explicitly set by initialization code"""
procedural = True
"""Whether the database backend supports stored procedures"""
localsessions = {}
"""Contains dictionary of sessions initiated by db model components"""
@staticmethod
def create_session():
session = sessionmaker(bind=SessionBase.engine)
return session()
"""Creates a new database session.
"""
return SessionBase.sessionmaker()
@staticmethod
def _set_engine(engine):
"""Sets the database engine static property
"""
SessionBase.engine = engine
SessionBase.sessionmaker = sessionmaker(bind=SessionBase.engine)
@staticmethod
def build():
Model.metadata.create_all(bind=SessionBase.engine)
def connect(dsn, pool_size=16, debug=False):
"""Create new database connection engine and connect to database backend.
:param dsn: DSN string defining connection.
:type dsn: str
"""
e = None
if SessionBase.poolable:
poolclass = QueuePool
if pool_size > 1:
logg.info('db using queue pool')
e = create_engine(
dsn,
max_overflow=pool_size*3,
pool_pre_ping=True,
pool_size=pool_size,
pool_recycle=60,
poolclass=poolclass,
echo=debug,
)
else:
if pool_size == 0:
poolclass = NullPool
elif debug:
poolclass = AssertionPool
else:
poolclass = StaticPool
e = create_engine(
dsn,
poolclass=poolclass,
echo=debug,
)
else:
logg.info('db connection not poolable')
e = create_engine(
dsn,
echo=debug,
)
SessionBase._set_engine(e)
@staticmethod
# https://docs.sqlalchemy.org/en/13/core/pooling.html#pool-disconnects
def connect(data_source_name):
engine = create_engine(data_source_name, pool_pre_ping=True)
SessionBase._set_engine(engine)
@staticmethod
def disconnect():
"""Disconnect from database and free resources.
"""
SessionBase.engine.dispose()
SessionBase.engine = None
@staticmethod
def bind_session(session=None):
localsession = session
if localsession == None:
localsession = SessionBase.create_session()
localsession_key = str(id(localsession))
logg.debug('creating new session {}'.format(localsession_key))
SessionBase.localsessions[localsession_key] = localsession
return localsession
@staticmethod
def release_session(session=None):
session_key = str(id(session))
if SessionBase.localsessions.get(session_key) != None:
logg.debug('commit and destroy session {}'.format(session_key))
session.commit()
session.close()

View File

@ -63,8 +63,9 @@ class PhonePointerMetadata(Metadata):
cic_meta_signer = Signer()
signature = cic_meta_signer.sign_digest(data=data)
algorithm = cic_meta_signer.get_operational_key().get('algo')
decoded_data = data.decode('utf-8')
formatted_data = {
'm': data.decode('utf-8'),
'm': decoded_data,
's': {
'engine': engine,
'algo': algorithm,
@ -76,8 +77,9 @@ class PhonePointerMetadata(Metadata):
try:
result = make_request(method='PUT', url=self.url, data=formatted_data, headers=self.headers)
logg.info(f'Signed content submission status: {result.status_code}.')
logg.debug(f'signed phone pointer metadata submission status: {result.status_code}.')
result.raise_for_status()
logg.info('phone {} metadata pointer {} set to {}'.format(self.identifier.decode('utf-8'), self.metadata_pointer, decoded_data))
except requests.exceptions.HTTPError as error:
raise MetadataStoreError(error)

View File

@ -44,7 +44,7 @@ class Signer:
gpg_keys = self.gpg.list_keys()
key_algorithm = gpg_keys[0].get('algo')
key_id = gpg_keys[0].get("keyid")
logg.info(f'using signing key: {key_id}, algorithm: {key_algorithm}')
logg.debug(f'using signing key: {key_id}, algorithm: {key_algorithm}')
return gpg_keys[0]
def sign_digest(self, data: bytes):

View File

@ -74,7 +74,7 @@ class UserMetadata(Metadata):
try:
result = make_request(method='PUT', url=self.url, data=formatted_data, headers=self.headers)
logg.info(f'Signed content submission status: {result.status_code}.')
logg.debug(f'signed user metadata submission status: {result.status_code}.')
result.raise_for_status()
except requests.exceptions.HTTPError as error:
raise MetadataStoreError(error)

View File

@ -65,7 +65,6 @@ config.censor('PASSWORD', 'DATABASE')
# define log levels
if args.vv:
logging.getLogger().setLevel(logging.DEBUG)
logging.getLogger('sqlalchemy.engine').setLevel(logging.DEBUG)
elif args.v:
logging.getLogger().setLevel(logging.INFO)
@ -182,6 +181,7 @@ def application(env, start_response):
logg.error('invalid phone number {}'.format(phone_number))
start_response('400 Invalid phone number format', errors_headers)
return []
logg.debug('session {} started for {}'.format(external_session_id, phone_number))
# handle menu interaction requests
chain_str = chain_spec.__str__()

View File

@ -20,6 +20,7 @@ from cic_ussd.validator import validate_presence
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
logging.getLogger('gnupg').setLevel(logging.WARNING)
config_directory = '/usr/local/etc/cic-ussd/'
@ -47,7 +48,7 @@ logg.debug(config)
# connect to database
data_source_name = dsn_from_config(config)
SessionBase.connect(data_source_name=data_source_name)
SessionBase.connect(data_source_name, pool_size=int(config.get('DATABASE_POOL_SIZE')), debug=config.true('DATABASE_DEBUG'))
# verify database connection with minimal sanity query
session = SessionBase.create_session()

View File

@ -6,9 +6,23 @@ import sqlalchemy
# local imports
from cic_ussd.error import MetadataStoreError
from cic_ussd.db.models.base import SessionBase
class CriticalTask(celery.Task):
class BaseTask(celery.Task):
session_func = SessionBase.create_session
def create_session(self):
return BaseTask.session_func()
def log_banner(self):
logg.debug('task {} root uuid {}'.format(self.__class__.__name__, self.request.root_id))
return
class CriticalTask(BaseTask):
retry_jitter = True
retry_backoff = True
retry_backoff_max = 8
@ -18,7 +32,8 @@ class CriticalSQLAlchemyTask(CriticalTask):
autoretry_for = (
sqlalchemy.exc.DatabaseError,
sqlalchemy.exc.TimeoutError,
)
sqlalchemy.exc.ResourceClosedError,
)
class CriticalMetadataTask(CriticalTask):

View File

@ -77,6 +77,8 @@ def process_account_creation_callback(self, result: str, url: str, status_code:
session.close()
raise ActionDataNotFoundError(f'Account creation task: {task_id}, returned unexpected response: {status_code}')
session.close()
@celery_app.task
def process_incoming_transfer_callback(result: dict, param: str, status_code: int):
@ -130,6 +132,7 @@ def process_incoming_transfer_callback(result: dict, param: str, status_code: in
session.close()
raise ValueError(f'Unexpected status code: {status_code}.')
session.close()
@celery_app.task
def process_balances_callback(result: list, param: str, status_code: int):
@ -173,7 +176,6 @@ def define_transaction_action_tag(
def process_statement_callback(result, param: str, status_code: int):
if status_code == 0:
# create session
session = SessionBase.create_session()
processed_transactions = []
# process transaction data to cache
@ -186,6 +188,7 @@ def process_statement_callback(result, param: str, status_code: int):
if '0x0000000000000000000000000000000000000000' in source_token:
pass
else:
session = SessionBase.create_session()
# describe a processed transaction
processed_transaction = {}
@ -214,6 +217,8 @@ def process_statement_callback(result, param: str, status_code: int):
else:
logg.warning(f'Tx with recipient not found in cic-ussd')
session.close()
# add transaction values
processed_transaction['to_value'] = from_wei(value=transaction.get('to_value')).__str__()
processed_transaction['from_value'] = from_wei(value=transaction.get('from_value')).__str__()

View File

@ -13,7 +13,7 @@ from cic_ussd.metadata.phone import PhonePointerMetadata
from cic_ussd.tasks.base import CriticalMetadataTask
celery_app = celery.current_app
logg = logging.getLogger()
logg = logging.getLogger().getChild(__name__)
@celery_app.task

View File

@ -70,3 +70,4 @@ def persist_session_to_db(external_session_id: str):
session.close()
raise SessionNotFoundError('Session does not exist!')
session.close()

View File

@ -2,4 +2,4 @@
. /root/db.sh
/usr/local/bin/cic-ussd-tasker -vv "$@"
/usr/local/bin/cic-ussd-tasker $@

View File

@ -2,4 +2,6 @@
. /root/db.sh
/usr/local/bin/uwsgi --wsgi-file /usr/local/lib/python3.8/site-packages/cic_ussd/runnable/server.py --http :9000 --pyargv "-vv"
server_port=${SERVER_PORT:-9000}
/usr/local/bin/uwsgi --wsgi-file /usr/local/lib/python3.8/site-packages/cic_ussd/runnable/server.py --http :$server_port --pyargv "$@"

View File

@ -215,7 +215,7 @@ if __name__ == '__main__':
amount = genAmount()
fa.write('{},{}\n'.format(eth,amount))
logg.debug('pidx {}, uid {}, eth {}, amount {}'.format(pidx, uid, eth, amount))
logg.debug('pidx {}, uid {}, eth {}, amount {}, phone {}'.format(pidx, uid, eth, amount, phone))
i += 1

View File

@ -13,6 +13,10 @@ from hexathon import (
add_0x,
)
from chainlib.eth.address import to_checksum_address
from chainlib.eth.tx import (
unpack,
raw,
)
from cic_types.processor import generate_metadata_pointer
from cic_types.models.person import Person
@ -23,9 +27,10 @@ celery_app = celery.current_app
class MetadataTask(celery.Task):
count = 0
balances = None
chain_spec = None
import_path = 'out'
import_dir = 'out'
meta_host = None
meta_port = None
meta_path = ''
@ -33,11 +38,12 @@ class MetadataTask(celery.Task):
balance_processor = None
autoretry_for = (
urllib.error.HTTPError,
OSError,
)
retry_kwargs = {
'countdown': 3,
'max_retries': 100,
}
retry_jitter = True
retry_backoff = True
retry_backoff_max = 60
max_retries = None
@classmethod
def meta_url(self):
@ -79,12 +85,12 @@ def resolve_phone(self, phone):
@celery_app.task(bind=True, base=MetadataTask)
def generate_metadata(self, address, phone):
old_address = old_address_from_phone(self.import_path, phone)
old_address = old_address_from_phone(self.import_dir, phone)
logg.debug('address {}'.format(address))
old_address_upper = strip_0x(old_address).upper()
metadata_path = '{}/old/{}/{}/{}.json'.format(
self.import_path,
self.import_dir,
old_address_upper[:2],
old_address_upper[2:4],
old_address_upper,
@ -103,7 +109,7 @@ def generate_metadata(self, address, phone):
new_address_clean = strip_0x(address)
filepath = os.path.join(
self.import_path,
self.import_dir,
'new',
new_address_clean[:2].upper(),
new_address_clean[2:4].upper(),
@ -118,7 +124,7 @@ def generate_metadata(self, address, phone):
meta_key = generate_metadata_pointer(bytes.fromhex(new_address_clean), 'cic.person')
meta_filepath = os.path.join(
self.import_path,
self.import_dir,
'meta',
'{}.json'.format(new_address_clean.upper()),
)
@ -130,19 +136,77 @@ def generate_metadata(self, address, phone):
@celery_app.task(bind=True, base=MetadataTask)
def transfer_opening_balance(self, address, phone, serial):
def opening_balance_tx(self, address, phone, serial):
old_address = old_address_from_phone(self.import_path, phone)
old_address = old_address_from_phone(self.import_dir, phone)
k = to_checksum_address(strip_0x(old_address))
balance = self.balances[k]
logg.debug('found balance {} for address {} phone {}'.format(balance, old_address, phone))
decimal_balance = self.balance_processor.get_decimal_amount(balance)
decimal_balance = self.balance_processor.get_decimal_amount(int(balance))
tx_hash_hex = self.balance_processor.get_rpc_tx(address, decimal_balance, serial)
logg.debug('sending {} to {} tx hash {}'.format(decimal_balance, address, tx_hash_hex))
(tx_hash_hex, o) = self.balance_processor.get_rpc_tx(address, decimal_balance, serial)
return tx_hash_hex
tx = unpack(bytes.fromhex(strip_0x(o)), self.chain_spec)
logg.debug('generated tx token value {} to {} tx hash {}'.format(decimal_balance, address, tx_hash_hex))
tx_path = os.path.join(
self.import_dir,
'txs',
strip_0x(tx_hash_hex),
)
f = open(tx_path, 'w')
f.write(o)
f.close()
tx_nonce_path = os.path.join(
self.import_dir,
'txs',
'.' + str(tx['nonce']),
)
os.symlink(os.path.realpath(tx_path), tx_nonce_path)
return tx['hash']
@celery_app.task(bind=True, base=MetadataTask, autoretry_for=(FileNotFoundError,), max_retries=None, countdown=0.1)
def send_txs(self, nonce):
if nonce == self.count + self.balance_processor.nonce_offset:
logg.info('reached nonce {} (offset {} + count {}) exiting'.format(nonce, self.balance_processor.nonce_offset, self.count))
return
tx_nonce_path = os.path.join(
self.import_dir,
'txs',
'.' + str(nonce),
)
f = open(tx_nonce_path, 'r')
tx_signed_raw_hex = f.read()
f.close()
os.unlink(tx_nonce_path)
o = raw(tx_signed_raw_hex)
tx_hash_hex = self.balance_processor.conn.do(o)
logg.info('sent nonce {} tx hash {}'.format(nonce, tx_hash_hex)) #tx_signed_raw_hex))
nonce += 1
queue = self.request.delivery_info.get('routing_key')
s = celery.signature(
'import_task.send_txs',
[
nonce,
],
queue=queue,
)
s.apply_async()
return nonce

View File

@ -138,7 +138,17 @@ def main():
f.close()
MetadataTask.balances = balances
MetadataTask.count = i
s = celery.signature(
'import_task.send_txs',
[
MetadataTask.balance_processor.nonce_offset,
],
queue='cic-import-ussd',
)
s.apply_async()
argv = ['worker', '-Q', 'cic-import-ussd', '--loglevel=DEBUG']
celery_app.worker_main(argv)

View File

@ -37,7 +37,7 @@ argparser.add_argument('--redis-host', dest='redis_host', type=str, help='redis
argparser.add_argument('--redis-port', dest='redis_port', type=int, help='redis host to use for task submission')
argparser.add_argument('--redis-db', dest='redis_db', type=int, help='redis db to use for task submission and callback')
argparser.add_argument('--batch-size', dest='batch_size', default=100, type=int, help='burst size of sending transactions to node') # batch size should be slightly below cumulative gas limit worth, eg 80000 gas txs with 8000000 limit is a bit less than 100 batch size
argparser.add_argument('--batch-delay', dest='batch_delay', default=2, type=int, help='seconds delay between batches')
argparser.add_argument('--batch-delay', dest='batch_delay', default=3, type=int, help='seconds delay between batches')
argparser.add_argument('--timeout', default=60.0, type=float, help='Callback timeout')
argparser.add_argument('-q', type=str, default='cic-eth', help='Task queue')
argparser.add_argument('-v', action='store_true', help='Be verbose')
@ -78,6 +78,9 @@ os.makedirs(meta_dir)
user_old_dir = os.path.join(args.user_dir, 'old')
os.stat(user_old_dir)
txs_dir = os.path.join(args.user_dir, 'txs')
os.makedirs(txs_dir)
chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
chain_str = str(chain_spec)
@ -165,7 +168,7 @@ if __name__ == '__main__':
)
s_balance = celery.signature(
'import_task.transfer_opening_balance',
'import_task.opening_balance_tx',
[
phone,
i,
@ -175,7 +178,7 @@ if __name__ == '__main__':
s_meta.link(s_balance)
s_phone.link(s_meta)
s_phone.apply_async()
s_phone.apply_async(countdown=7) # block time plus a bit of time for ussd processing
i += 1
sys.stdout.write('imported {} {}'.format(i, u).ljust(200) + "\r")

View File

@ -7,7 +7,10 @@ from eth_token_index import TokenUniqueSymbolIndex
from chainlib.eth.gas import OverrideGasOracle
from chainlib.eth.nonce import OverrideNonceOracle
from chainlib.eth.erc20 import ERC20
from chainlib.eth.tx import count
from chainlib.eth.tx import (
count,
TxFormat,
)
logg = logging.getLogger().getChild(__name__)
@ -51,16 +54,18 @@ class BalanceProcessor:
tx_factory = ERC20(self.chain_spec)
o = tx_factory.decimals(self.token_address)
r = self.conn.do(o)
self.value_multiplier = int(r, 16) ** 10
n = tx_factory.parse_decimals(r)
self.value_multiplier = 10 ** n
def get_rpc_tx(self, recipient, value, i):
logg.debug('initiating nonce offset {} for recipient {}'.format(self.nonce_offset + i, recipient))
nonce_oracle = OverrideNonceOracle(self.signer_address, self.nonce_offset + i)
tx_factory = ERC20(self.chain_spec, signer=self.signer, nonce_oracle=nonce_oracle, gas_oracle=self.gas_oracle)
(tx_hash_hex, o) = tx_factory.transfer(self.token_address, self.signer_address, recipient, value)
self.conn.do(o)
return tx_hash_hex
return tx_factory.transfer(self.token_address, self.signer_address, recipient, value, tx_format=TxFormat.RLP_SIGNED)
#(tx_hash_hex, o) = tx_factory.transfer(self.token_address, self.signer_address, recipient, value)
#self.conn.do(o)
#return tx_hash_hex
def get_decimal_amount(self, value):

View File

@ -164,6 +164,7 @@ services:
DATABASE_ENGINE: ${DATABASE_ENGINE:-postgres}
DATABASE_DRIVER: ${DATABASE_DRIVER:-psycopg2}
DATABASE_DEBUG: 1
DATABASE_POOL_SIZE: 0
ETH_ABI_DIR: ${ETH_ABI_DIR:-/usr/local/share/cic/solidity/abi}
CIC_TRUST_ADDRESS: ${DEV_ETH_ACCOUNT_CONTRACT_DEPLOYER:-0xEb3907eCad74a0013c259D5874AE7f22DcBcC95C}
CIC_CHAIN_SPEC: ${CIC_CHAIN_SPEC:-evm:bloxberg:8996}
@ -233,6 +234,7 @@ services:
DATABASE_ENGINE: ${DATABASE_ENGINE:-postgres}
DATABASE_DRIVER: ${DATABASE_DRIVER:-psycopg2}
DATABASE_DEBUG: ${DATABASE_DEBUG:-0}
DATABASE_POOL_SIZE: 0
PGPASSWORD: ${DATABASE_PASSWORD:-tralala}
CIC_CHAIN_SPEC: ${CIC_CHAIN_SPEC:-evm:bloxberg:8996}
BANCOR_DIR: ${BANCOR_DIR:-/usr/local/share/cic/bancor}
@ -430,6 +432,7 @@ services:
DATABASE_NAME: ${DATABASE_NAME_CIC_NOTIFY:-cic_notify}
DATABASE_ENGINE: ${DATABASE_ENGINE:-postgres}
DATABASE_DRIVER: ${DATABASE_DRIVER:-psycopg2}
DATABASE_POOL_SIZE: 0
PGPASSWORD: ${DATABASE_PASSWORD:-tralala}
CELERY_BROKER_URL: ${CELERY_BROKER_URL:-redis://redis}
CELERY_RESULT_URL: ${CELERY_BROKER_URL:-redis://redis}
@ -503,7 +506,7 @@ services:
deploy:
restart_policy:
condition: on-failure
command: "/root/start_uwsgi.sh"
command: "/root/start_uwsgi.sh -vv"
cic-ussd-tasker:
# image: grassrootseconomics:cic-ussd
@ -518,6 +521,7 @@ services:
DATABASE_NAME: cic_ussd
DATABASE_ENGINE: postgresql
DATABASE_DRIVER: psycopg2
DATABASE_POOL_SIZE: 0
CELERY_BROKER_URL: ${CELERY_BROKER_URL:-redis://redis}
CELERY_RESULT_URL: ${CELERY_BROKER_URL:-redis://redis}
PGP_PASSPHRASE: merman
@ -530,4 +534,4 @@ services:
deploy:
restart_policy:
condition: on-failure
command: "/root/start_tasker.sh -q cic-ussd"
command: "/root/start_tasker.sh -q cic-ussd -vv"