diff --git a/apps/cic-eth/cic_eth/db/models/base.py b/apps/cic-eth/cic_eth/db/models/base.py index fc3541c5..9cdcabf7 100644 --- a/apps/cic-eth/cic_eth/db/models/base.py +++ b/apps/cic-eth/cic_eth/db/models/base.py @@ -10,6 +10,7 @@ from sqlalchemy.pool import ( StaticPool, QueuePool, AssertionPool, + NullPool, ) logg = logging.getLogger() @@ -64,6 +65,7 @@ class SessionBase(Model): if SessionBase.poolable: poolclass = QueuePool if pool_size > 1: + logg.info('db using queue pool') e = create_engine( dsn, max_overflow=pool_size*3, @@ -74,17 +76,22 @@ class SessionBase(Model): echo=debug, ) else: - if debug: + if pool_size == 0: + logg.info('db using nullpool') + poolclass = NullPool + elif debug: + logg.info('db using assertion pool') poolclass = AssertionPool else: + logg.info('db using static pool') poolclass = StaticPool - e = create_engine( dsn, poolclass=poolclass, echo=debug, ) else: + logg.info('db not poolable') e = create_engine( dsn, echo=debug, diff --git a/apps/cic-eth/cic_eth/runnable/daemons/tasker.py b/apps/cic-eth/cic_eth/runnable/daemons/tasker.py index ef7c122b..8a4a47f5 100644 --- a/apps/cic-eth/cic_eth/runnable/daemons/tasker.py +++ b/apps/cic-eth/cic_eth/runnable/daemons/tasker.py @@ -91,7 +91,7 @@ logg.debug('config loaded from {}:\n{}'.format(args.c, config)) # connect to database dsn = dsn_from_config(config) -SessionBase.connect(dsn, pool_size=50, debug=config.true('DATABASE_DEBUG')) +SessionBase.connect(dsn, pool_size=int(config.get('DATABASE_POOL_SIZE')), debug=config.true('DATABASE_DEBUG')) # verify database connection with minimal sanity query session = SessionBase.create_session() diff --git a/apps/cic-eth/config/database.ini b/apps/cic-eth/config/database.ini index 50a2dd0d..4517445c 100644 --- a/apps/cic-eth/config/database.ini +++ b/apps/cic-eth/config/database.ini @@ -6,4 +6,5 @@ HOST=localhost PORT=5432 ENGINE=postgresql DRIVER=psycopg2 +POOL_SIZE=50 DEBUG=0 diff --git a/apps/cic-eth/config/docker/database.ini b/apps/cic-eth/config/docker/database.ini index a5f2be48..6448be0e 100644 --- a/apps/cic-eth/config/docker/database.ini +++ b/apps/cic-eth/config/docker/database.ini @@ -6,4 +6,5 @@ HOST=localhost PORT=63432 ENGINE=postgresql DRIVER=psycopg2 +POOL_SIZE=50 DEBUG=0 diff --git a/apps/cic-ussd/.config/database.ini b/apps/cic-ussd/.config/database.ini index 7e2c3e44..5bce6bcf 100644 --- a/apps/cic-ussd/.config/database.ini +++ b/apps/cic-ussd/.config/database.ini @@ -6,3 +6,5 @@ HOST=localhost PORT=5432 ENGINE=postgresql DRIVER=psycopg2 +DEBUG=0 +POOL_SIZE=1 diff --git a/apps/cic-ussd/cic_ussd/db/models/base.py b/apps/cic-ussd/cic_ussd/db/models/base.py index 9ff5227e..d76ee6c4 100644 --- a/apps/cic-ussd/cic_ussd/db/models/base.py +++ b/apps/cic-ussd/cic_ussd/db/models/base.py @@ -1,47 +1,129 @@ -# standard imports +# stanard imports +import logging import datetime -# third-party imports +# external imports from sqlalchemy import Column, Integer, DateTime from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker +from sqlalchemy.pool import ( + StaticPool, + QueuePool, + AssertionPool, + NullPool, + ) + +logg = logging.getLogger().getChild(__name__) Model = declarative_base(name='Model') class SessionBase(Model): + """The base object for all SQLAlchemy enabled models. All other models must extend this. + """ __abstract__ = True - id = Column(Integer, primary_key=True) created = Column(DateTime, default=datetime.datetime.utcnow) updated = Column(DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow) + id = Column(Integer, primary_key=True) + engine = None - session = None - query = None + """Database connection engine of the running aplication""" + sessionmaker = None + """Factory object responsible for creating sessions from the connection pool""" + transactional = True + """Whether the database backend supports query transactions. Should be explicitly set by initialization code""" + poolable = True + """Whether the database backend supports connection pools. Should be explicitly set by initialization code""" + procedural = True + """Whether the database backend supports stored procedures""" + localsessions = {} + """Contains dictionary of sessions initiated by db model components""" + @staticmethod def create_session(): - session = sessionmaker(bind=SessionBase.engine) - return session() + """Creates a new database session. + """ + return SessionBase.sessionmaker() + @staticmethod def _set_engine(engine): + """Sets the database engine static property + """ SessionBase.engine = engine + SessionBase.sessionmaker = sessionmaker(bind=SessionBase.engine) + @staticmethod - def build(): - Model.metadata.create_all(bind=SessionBase.engine) + def connect(dsn, pool_size=16, debug=False): + """Create new database connection engine and connect to database backend. + + :param dsn: DSN string defining connection. + :type dsn: str + """ + e = None + if SessionBase.poolable: + poolclass = QueuePool + if pool_size > 1: + logg.info('db using queue pool') + e = create_engine( + dsn, + max_overflow=pool_size*3, + pool_pre_ping=True, + pool_size=pool_size, + pool_recycle=60, + poolclass=poolclass, + echo=debug, + ) + else: + if pool_size == 0: + poolclass = NullPool + elif debug: + poolclass = AssertionPool + else: + poolclass = StaticPool + e = create_engine( + dsn, + poolclass=poolclass, + echo=debug, + ) + else: + logg.info('db connection not poolable') + e = create_engine( + dsn, + echo=debug, + ) + + SessionBase._set_engine(e) - @staticmethod - # https://docs.sqlalchemy.org/en/13/core/pooling.html#pool-disconnects - def connect(data_source_name): - engine = create_engine(data_source_name, pool_pre_ping=True) - SessionBase._set_engine(engine) @staticmethod def disconnect(): + """Disconnect from database and free resources. + """ SessionBase.engine.dispose() SessionBase.engine = None + + @staticmethod + def bind_session(session=None): + localsession = session + if localsession == None: + localsession = SessionBase.create_session() + localsession_key = str(id(localsession)) + logg.debug('creating new session {}'.format(localsession_key)) + SessionBase.localsessions[localsession_key] = localsession + return localsession + + + @staticmethod + def release_session(session=None): + session_key = str(id(session)) + if SessionBase.localsessions.get(session_key) != None: + logg.debug('commit and destroy session {}'.format(session_key)) + session.commit() + session.close() diff --git a/apps/cic-ussd/cic_ussd/metadata/phone.py b/apps/cic-ussd/cic_ussd/metadata/phone.py index f50ef0a3..f7eea026 100644 --- a/apps/cic-ussd/cic_ussd/metadata/phone.py +++ b/apps/cic-ussd/cic_ussd/metadata/phone.py @@ -63,8 +63,9 @@ class PhonePointerMetadata(Metadata): cic_meta_signer = Signer() signature = cic_meta_signer.sign_digest(data=data) algorithm = cic_meta_signer.get_operational_key().get('algo') + decoded_data = data.decode('utf-8') formatted_data = { - 'm': data.decode('utf-8'), + 'm': decoded_data, 's': { 'engine': engine, 'algo': algorithm, @@ -76,8 +77,9 @@ class PhonePointerMetadata(Metadata): try: result = make_request(method='PUT', url=self.url, data=formatted_data, headers=self.headers) - logg.info(f'Signed content submission status: {result.status_code}.') + logg.debug(f'signed phone pointer metadata submission status: {result.status_code}.') result.raise_for_status() + logg.info('phone {} metadata pointer {} set to {}'.format(self.identifier.decode('utf-8'), self.metadata_pointer, decoded_data)) except requests.exceptions.HTTPError as error: raise MetadataStoreError(error) diff --git a/apps/cic-ussd/cic_ussd/metadata/signer.py b/apps/cic-ussd/cic_ussd/metadata/signer.py index 5639fcd5..dc187d29 100644 --- a/apps/cic-ussd/cic_ussd/metadata/signer.py +++ b/apps/cic-ussd/cic_ussd/metadata/signer.py @@ -44,7 +44,7 @@ class Signer: gpg_keys = self.gpg.list_keys() key_algorithm = gpg_keys[0].get('algo') key_id = gpg_keys[0].get("keyid") - logg.info(f'using signing key: {key_id}, algorithm: {key_algorithm}') + logg.debug(f'using signing key: {key_id}, algorithm: {key_algorithm}') return gpg_keys[0] def sign_digest(self, data: bytes): diff --git a/apps/cic-ussd/cic_ussd/metadata/user.py b/apps/cic-ussd/cic_ussd/metadata/user.py index 2f8b3286..876ed935 100644 --- a/apps/cic-ussd/cic_ussd/metadata/user.py +++ b/apps/cic-ussd/cic_ussd/metadata/user.py @@ -74,7 +74,7 @@ class UserMetadata(Metadata): try: result = make_request(method='PUT', url=self.url, data=formatted_data, headers=self.headers) - logg.info(f'Signed content submission status: {result.status_code}.') + logg.debug(f'signed user metadata submission status: {result.status_code}.') result.raise_for_status() except requests.exceptions.HTTPError as error: raise MetadataStoreError(error) diff --git a/apps/cic-ussd/cic_ussd/runnable/server.py b/apps/cic-ussd/cic_ussd/runnable/server.py index ecb8481e..f2dff795 100644 --- a/apps/cic-ussd/cic_ussd/runnable/server.py +++ b/apps/cic-ussd/cic_ussd/runnable/server.py @@ -65,7 +65,6 @@ config.censor('PASSWORD', 'DATABASE') # define log levels if args.vv: logging.getLogger().setLevel(logging.DEBUG) - logging.getLogger('sqlalchemy.engine').setLevel(logging.DEBUG) elif args.v: logging.getLogger().setLevel(logging.INFO) @@ -182,6 +181,7 @@ def application(env, start_response): logg.error('invalid phone number {}'.format(phone_number)) start_response('400 Invalid phone number format', errors_headers) return [] + logg.debug('session {} started for {}'.format(external_session_id, phone_number)) # handle menu interaction requests chain_str = chain_spec.__str__() diff --git a/apps/cic-ussd/cic_ussd/runnable/tasker.py b/apps/cic-ussd/cic_ussd/runnable/tasker.py index 6d787f9d..ac39e06f 100644 --- a/apps/cic-ussd/cic_ussd/runnable/tasker.py +++ b/apps/cic-ussd/cic_ussd/runnable/tasker.py @@ -20,6 +20,7 @@ from cic_ussd.validator import validate_presence logging.basicConfig(level=logging.WARNING) logg = logging.getLogger() +logging.getLogger('gnupg').setLevel(logging.WARNING) config_directory = '/usr/local/etc/cic-ussd/' @@ -47,7 +48,7 @@ logg.debug(config) # connect to database data_source_name = dsn_from_config(config) -SessionBase.connect(data_source_name=data_source_name) +SessionBase.connect(data_source_name, pool_size=int(config.get('DATABASE_POOL_SIZE')), debug=config.true('DATABASE_DEBUG')) # verify database connection with minimal sanity query session = SessionBase.create_session() diff --git a/apps/cic-ussd/cic_ussd/tasks/base.py b/apps/cic-ussd/cic_ussd/tasks/base.py index fb50ffb0..0f9a7438 100644 --- a/apps/cic-ussd/cic_ussd/tasks/base.py +++ b/apps/cic-ussd/cic_ussd/tasks/base.py @@ -6,9 +6,23 @@ import sqlalchemy # local imports from cic_ussd.error import MetadataStoreError +from cic_ussd.db.models.base import SessionBase -class CriticalTask(celery.Task): +class BaseTask(celery.Task): + + session_func = SessionBase.create_session + + def create_session(self): + return BaseTask.session_func() + + + def log_banner(self): + logg.debug('task {} root uuid {}'.format(self.__class__.__name__, self.request.root_id)) + return + + +class CriticalTask(BaseTask): retry_jitter = True retry_backoff = True retry_backoff_max = 8 @@ -18,7 +32,8 @@ class CriticalSQLAlchemyTask(CriticalTask): autoretry_for = ( sqlalchemy.exc.DatabaseError, sqlalchemy.exc.TimeoutError, - ) + sqlalchemy.exc.ResourceClosedError, + ) class CriticalMetadataTask(CriticalTask): diff --git a/apps/cic-ussd/cic_ussd/tasks/callback_handler.py b/apps/cic-ussd/cic_ussd/tasks/callback_handler.py index 01036c43..2c247339 100644 --- a/apps/cic-ussd/cic_ussd/tasks/callback_handler.py +++ b/apps/cic-ussd/cic_ussd/tasks/callback_handler.py @@ -77,6 +77,8 @@ def process_account_creation_callback(self, result: str, url: str, status_code: session.close() raise ActionDataNotFoundError(f'Account creation task: {task_id}, returned unexpected response: {status_code}') + session.close() + @celery_app.task def process_incoming_transfer_callback(result: dict, param: str, status_code: int): @@ -130,6 +132,7 @@ def process_incoming_transfer_callback(result: dict, param: str, status_code: in session.close() raise ValueError(f'Unexpected status code: {status_code}.') + session.close() @celery_app.task def process_balances_callback(result: list, param: str, status_code: int): @@ -173,7 +176,6 @@ def define_transaction_action_tag( def process_statement_callback(result, param: str, status_code: int): if status_code == 0: # create session - session = SessionBase.create_session() processed_transactions = [] # process transaction data to cache @@ -186,6 +188,7 @@ def process_statement_callback(result, param: str, status_code: int): if '0x0000000000000000000000000000000000000000' in source_token: pass else: + session = SessionBase.create_session() # describe a processed transaction processed_transaction = {} @@ -214,6 +217,8 @@ def process_statement_callback(result, param: str, status_code: int): else: logg.warning(f'Tx with recipient not found in cic-ussd') + session.close() + # add transaction values processed_transaction['to_value'] = from_wei(value=transaction.get('to_value')).__str__() processed_transaction['from_value'] = from_wei(value=transaction.get('from_value')).__str__() diff --git a/apps/cic-ussd/cic_ussd/tasks/metadata.py b/apps/cic-ussd/cic_ussd/tasks/metadata.py index a9d6948e..095613fc 100644 --- a/apps/cic-ussd/cic_ussd/tasks/metadata.py +++ b/apps/cic-ussd/cic_ussd/tasks/metadata.py @@ -13,7 +13,7 @@ from cic_ussd.metadata.phone import PhonePointerMetadata from cic_ussd.tasks.base import CriticalMetadataTask celery_app = celery.current_app -logg = logging.getLogger() +logg = logging.getLogger().getChild(__name__) @celery_app.task diff --git a/apps/cic-ussd/cic_ussd/tasks/ussd_session.py b/apps/cic-ussd/cic_ussd/tasks/ussd_session.py index 85f388e2..dac199f2 100644 --- a/apps/cic-ussd/cic_ussd/tasks/ussd_session.py +++ b/apps/cic-ussd/cic_ussd/tasks/ussd_session.py @@ -70,3 +70,4 @@ def persist_session_to_db(external_session_id: str): session.close() raise SessionNotFoundError('Session does not exist!') + session.close() diff --git a/apps/cic-ussd/docker/start_tasker.sh b/apps/cic-ussd/docker/start_tasker.sh index bd588f28..37f32597 100644 --- a/apps/cic-ussd/docker/start_tasker.sh +++ b/apps/cic-ussd/docker/start_tasker.sh @@ -2,4 +2,4 @@ . /root/db.sh -/usr/local/bin/cic-ussd-tasker -vv "$@" \ No newline at end of file +/usr/local/bin/cic-ussd-tasker $@ diff --git a/apps/cic-ussd/docker/start_uwsgi.sh b/apps/cic-ussd/docker/start_uwsgi.sh index ac5261cb..ff3271ec 100644 --- a/apps/cic-ussd/docker/start_uwsgi.sh +++ b/apps/cic-ussd/docker/start_uwsgi.sh @@ -2,4 +2,6 @@ . /root/db.sh -/usr/local/bin/uwsgi --wsgi-file /usr/local/lib/python3.8/site-packages/cic_ussd/runnable/server.py --http :9000 --pyargv "-vv" +server_port=${SERVER_PORT:-9000} + +/usr/local/bin/uwsgi --wsgi-file /usr/local/lib/python3.8/site-packages/cic_ussd/runnable/server.py --http :$server_port --pyargv "$@" diff --git a/apps/contract-migration/scripts/create_import_users.py b/apps/contract-migration/scripts/create_import_users.py index 112a1f76..11a0eae9 100644 --- a/apps/contract-migration/scripts/create_import_users.py +++ b/apps/contract-migration/scripts/create_import_users.py @@ -215,7 +215,7 @@ if __name__ == '__main__': amount = genAmount() fa.write('{},{}\n'.format(eth,amount)) - logg.debug('pidx {}, uid {}, eth {}, amount {}'.format(pidx, uid, eth, amount)) + logg.debug('pidx {}, uid {}, eth {}, amount {}, phone {}'.format(pidx, uid, eth, amount, phone)) i += 1 diff --git a/apps/contract-migration/scripts/import_task.py b/apps/contract-migration/scripts/import_task.py index 863c7bcb..813a9705 100644 --- a/apps/contract-migration/scripts/import_task.py +++ b/apps/contract-migration/scripts/import_task.py @@ -13,6 +13,10 @@ from hexathon import ( add_0x, ) from chainlib.eth.address import to_checksum_address +from chainlib.eth.tx import ( + unpack, + raw, + ) from cic_types.processor import generate_metadata_pointer from cic_types.models.person import Person @@ -23,9 +27,10 @@ celery_app = celery.current_app class MetadataTask(celery.Task): + count = 0 balances = None chain_spec = None - import_path = 'out' + import_dir = 'out' meta_host = None meta_port = None meta_path = '' @@ -33,11 +38,12 @@ class MetadataTask(celery.Task): balance_processor = None autoretry_for = ( urllib.error.HTTPError, + OSError, ) - retry_kwargs = { - 'countdown': 3, - 'max_retries': 100, - } + retry_jitter = True + retry_backoff = True + retry_backoff_max = 60 + max_retries = None @classmethod def meta_url(self): @@ -79,12 +85,12 @@ def resolve_phone(self, phone): @celery_app.task(bind=True, base=MetadataTask) def generate_metadata(self, address, phone): - old_address = old_address_from_phone(self.import_path, phone) + old_address = old_address_from_phone(self.import_dir, phone) logg.debug('address {}'.format(address)) old_address_upper = strip_0x(old_address).upper() metadata_path = '{}/old/{}/{}/{}.json'.format( - self.import_path, + self.import_dir, old_address_upper[:2], old_address_upper[2:4], old_address_upper, @@ -103,7 +109,7 @@ def generate_metadata(self, address, phone): new_address_clean = strip_0x(address) filepath = os.path.join( - self.import_path, + self.import_dir, 'new', new_address_clean[:2].upper(), new_address_clean[2:4].upper(), @@ -118,7 +124,7 @@ def generate_metadata(self, address, phone): meta_key = generate_metadata_pointer(bytes.fromhex(new_address_clean), 'cic.person') meta_filepath = os.path.join( - self.import_path, + self.import_dir, 'meta', '{}.json'.format(new_address_clean.upper()), ) @@ -130,19 +136,77 @@ def generate_metadata(self, address, phone): @celery_app.task(bind=True, base=MetadataTask) -def transfer_opening_balance(self, address, phone, serial): +def opening_balance_tx(self, address, phone, serial): - old_address = old_address_from_phone(self.import_path, phone) + old_address = old_address_from_phone(self.import_dir, phone) k = to_checksum_address(strip_0x(old_address)) balance = self.balances[k] logg.debug('found balance {} for address {} phone {}'.format(balance, old_address, phone)) - decimal_balance = self.balance_processor.get_decimal_amount(balance) + decimal_balance = self.balance_processor.get_decimal_amount(int(balance)) - tx_hash_hex = self.balance_processor.get_rpc_tx(address, decimal_balance, serial) - logg.debug('sending {} to {} tx hash {}'.format(decimal_balance, address, tx_hash_hex)) + (tx_hash_hex, o) = self.balance_processor.get_rpc_tx(address, decimal_balance, serial) - return tx_hash_hex + tx = unpack(bytes.fromhex(strip_0x(o)), self.chain_spec) + logg.debug('generated tx token value {} to {} tx hash {}'.format(decimal_balance, address, tx_hash_hex)) + tx_path = os.path.join( + self.import_dir, + 'txs', + strip_0x(tx_hash_hex), + ) + + f = open(tx_path, 'w') + f.write(o) + f.close() + + tx_nonce_path = os.path.join( + self.import_dir, + 'txs', + '.' + str(tx['nonce']), + ) + os.symlink(os.path.realpath(tx_path), tx_nonce_path) + + return tx['hash'] + + +@celery_app.task(bind=True, base=MetadataTask, autoretry_for=(FileNotFoundError,), max_retries=None, countdown=0.1) +def send_txs(self, nonce): + + if nonce == self.count + self.balance_processor.nonce_offset: + logg.info('reached nonce {} (offset {} + count {}) exiting'.format(nonce, self.balance_processor.nonce_offset, self.count)) + return + + + tx_nonce_path = os.path.join( + self.import_dir, + 'txs', + '.' + str(nonce), + ) + f = open(tx_nonce_path, 'r') + tx_signed_raw_hex = f.read() + f.close() + + os.unlink(tx_nonce_path) + + o = raw(tx_signed_raw_hex) + tx_hash_hex = self.balance_processor.conn.do(o) + + logg.info('sent nonce {} tx hash {}'.format(nonce, tx_hash_hex)) #tx_signed_raw_hex)) + + nonce += 1 + + queue = self.request.delivery_info.get('routing_key') + s = celery.signature( + 'import_task.send_txs', + [ + nonce, + ], + queue=queue, + ) + s.apply_async() + + + return nonce diff --git a/apps/contract-migration/scripts/import_ussd_balance.py b/apps/contract-migration/scripts/import_ussd_balance.py index 1b69fb05..1f01c1d6 100644 --- a/apps/contract-migration/scripts/import_ussd_balance.py +++ b/apps/contract-migration/scripts/import_ussd_balance.py @@ -138,7 +138,17 @@ def main(): f.close() MetadataTask.balances = balances - + MetadataTask.count = i + + s = celery.signature( + 'import_task.send_txs', + [ + MetadataTask.balance_processor.nonce_offset, + ], + queue='cic-import-ussd', + ) + s.apply_async() + argv = ['worker', '-Q', 'cic-import-ussd', '--loglevel=DEBUG'] celery_app.worker_main(argv) diff --git a/apps/contract-migration/scripts/import_ussd_users.py b/apps/contract-migration/scripts/import_ussd_users.py index 32c0b93e..ca9bf3f1 100644 --- a/apps/contract-migration/scripts/import_ussd_users.py +++ b/apps/contract-migration/scripts/import_ussd_users.py @@ -37,7 +37,7 @@ argparser.add_argument('--redis-host', dest='redis_host', type=str, help='redis argparser.add_argument('--redis-port', dest='redis_port', type=int, help='redis host to use for task submission') argparser.add_argument('--redis-db', dest='redis_db', type=int, help='redis db to use for task submission and callback') argparser.add_argument('--batch-size', dest='batch_size', default=100, type=int, help='burst size of sending transactions to node') # batch size should be slightly below cumulative gas limit worth, eg 80000 gas txs with 8000000 limit is a bit less than 100 batch size -argparser.add_argument('--batch-delay', dest='batch_delay', default=2, type=int, help='seconds delay between batches') +argparser.add_argument('--batch-delay', dest='batch_delay', default=3, type=int, help='seconds delay between batches') argparser.add_argument('--timeout', default=60.0, type=float, help='Callback timeout') argparser.add_argument('-q', type=str, default='cic-eth', help='Task queue') argparser.add_argument('-v', action='store_true', help='Be verbose') @@ -78,6 +78,9 @@ os.makedirs(meta_dir) user_old_dir = os.path.join(args.user_dir, 'old') os.stat(user_old_dir) +txs_dir = os.path.join(args.user_dir, 'txs') +os.makedirs(txs_dir) + chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC')) chain_str = str(chain_spec) @@ -165,7 +168,7 @@ if __name__ == '__main__': ) s_balance = celery.signature( - 'import_task.transfer_opening_balance', + 'import_task.opening_balance_tx', [ phone, i, @@ -175,7 +178,7 @@ if __name__ == '__main__': s_meta.link(s_balance) s_phone.link(s_meta) - s_phone.apply_async() + s_phone.apply_async(countdown=7) # block time plus a bit of time for ussd processing i += 1 sys.stdout.write('imported {} {}'.format(i, u).ljust(200) + "\r") diff --git a/apps/contract-migration/scripts/import_util.py b/apps/contract-migration/scripts/import_util.py index 3ec9e453..ebe706bb 100644 --- a/apps/contract-migration/scripts/import_util.py +++ b/apps/contract-migration/scripts/import_util.py @@ -7,7 +7,10 @@ from eth_token_index import TokenUniqueSymbolIndex from chainlib.eth.gas import OverrideGasOracle from chainlib.eth.nonce import OverrideNonceOracle from chainlib.eth.erc20 import ERC20 -from chainlib.eth.tx import count +from chainlib.eth.tx import ( + count, + TxFormat, + ) logg = logging.getLogger().getChild(__name__) @@ -51,16 +54,18 @@ class BalanceProcessor: tx_factory = ERC20(self.chain_spec) o = tx_factory.decimals(self.token_address) r = self.conn.do(o) - self.value_multiplier = int(r, 16) ** 10 + n = tx_factory.parse_decimals(r) + self.value_multiplier = 10 ** n def get_rpc_tx(self, recipient, value, i): logg.debug('initiating nonce offset {} for recipient {}'.format(self.nonce_offset + i, recipient)) nonce_oracle = OverrideNonceOracle(self.signer_address, self.nonce_offset + i) tx_factory = ERC20(self.chain_spec, signer=self.signer, nonce_oracle=nonce_oracle, gas_oracle=self.gas_oracle) - (tx_hash_hex, o) = tx_factory.transfer(self.token_address, self.signer_address, recipient, value) - self.conn.do(o) - return tx_hash_hex + return tx_factory.transfer(self.token_address, self.signer_address, recipient, value, tx_format=TxFormat.RLP_SIGNED) + #(tx_hash_hex, o) = tx_factory.transfer(self.token_address, self.signer_address, recipient, value) + #self.conn.do(o) + #return tx_hash_hex def get_decimal_amount(self, value): diff --git a/docker-compose.yml b/docker-compose.yml index 6f20e1fd..85dbfbce 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -164,6 +164,7 @@ services: DATABASE_ENGINE: ${DATABASE_ENGINE:-postgres} DATABASE_DRIVER: ${DATABASE_DRIVER:-psycopg2} DATABASE_DEBUG: 1 + DATABASE_POOL_SIZE: 0 ETH_ABI_DIR: ${ETH_ABI_DIR:-/usr/local/share/cic/solidity/abi} CIC_TRUST_ADDRESS: ${DEV_ETH_ACCOUNT_CONTRACT_DEPLOYER:-0xEb3907eCad74a0013c259D5874AE7f22DcBcC95C} CIC_CHAIN_SPEC: ${CIC_CHAIN_SPEC:-evm:bloxberg:8996} @@ -233,6 +234,7 @@ services: DATABASE_ENGINE: ${DATABASE_ENGINE:-postgres} DATABASE_DRIVER: ${DATABASE_DRIVER:-psycopg2} DATABASE_DEBUG: ${DATABASE_DEBUG:-0} + DATABASE_POOL_SIZE: 0 PGPASSWORD: ${DATABASE_PASSWORD:-tralala} CIC_CHAIN_SPEC: ${CIC_CHAIN_SPEC:-evm:bloxberg:8996} BANCOR_DIR: ${BANCOR_DIR:-/usr/local/share/cic/bancor} @@ -430,6 +432,7 @@ services: DATABASE_NAME: ${DATABASE_NAME_CIC_NOTIFY:-cic_notify} DATABASE_ENGINE: ${DATABASE_ENGINE:-postgres} DATABASE_DRIVER: ${DATABASE_DRIVER:-psycopg2} + DATABASE_POOL_SIZE: 0 PGPASSWORD: ${DATABASE_PASSWORD:-tralala} CELERY_BROKER_URL: ${CELERY_BROKER_URL:-redis://redis} CELERY_RESULT_URL: ${CELERY_BROKER_URL:-redis://redis} @@ -503,7 +506,7 @@ services: deploy: restart_policy: condition: on-failure - command: "/root/start_uwsgi.sh" + command: "/root/start_uwsgi.sh -vv" cic-ussd-tasker: # image: grassrootseconomics:cic-ussd @@ -518,6 +521,7 @@ services: DATABASE_NAME: cic_ussd DATABASE_ENGINE: postgresql DATABASE_DRIVER: psycopg2 + DATABASE_POOL_SIZE: 0 CELERY_BROKER_URL: ${CELERY_BROKER_URL:-redis://redis} CELERY_RESULT_URL: ${CELERY_BROKER_URL:-redis://redis} PGP_PASSPHRASE: merman @@ -530,4 +534,4 @@ services: deploy: restart_policy: condition: on-failure - command: "/root/start_tasker.sh -q cic-ussd" + command: "/root/start_tasker.sh -q cic-ussd -vv"