diff --git a/apps/cic-eth/cic_eth/runnable/daemons/tasker.py b/apps/cic-eth/cic_eth/runnable/daemons/tasker.py index cb456761..384e6584 100644 --- a/apps/cic-eth/cic_eth/runnable/daemons/tasker.py +++ b/apps/cic-eth/cic_eth/runnable/daemons/tasker.py @@ -32,6 +32,7 @@ from cic_eth.admin import ctrl from cic_eth.eth.rpc import RpcClient from cic_eth.eth.rpc import GasOracle from cic_eth.queue import tx +from cic_eth.queue import balance from cic_eth.callbacks import Callback from cic_eth.callbacks import http from cic_eth.callbacks import tcp diff --git a/apps/contract-migration/scripts/config/traffic.ini b/apps/contract-migration/scripts/config/traffic.ini index 27035a60..df89d1aa 100644 --- a/apps/contract-migration/scripts/config/traffic.ini +++ b/apps/contract-migration/scripts/config/traffic.ini @@ -1,3 +1,4 @@ [traffic] #local.noop_traffic = 2 -local.account = 2 +#local.account = 2 +local.transfer = 2 diff --git a/apps/contract-migration/scripts/local/account.py b/apps/contract-migration/scripts/local/account.py index 813c776c..41ff5187 100644 --- a/apps/contract-migration/scripts/local/account.py +++ b/apps/contract-migration/scripts/local/account.py @@ -11,9 +11,9 @@ queue = 'cic-eth' name = 'account' -def do(tokens, accounts, aux, block_number, tx_index): +def do(token_pair, sender, recipient, sender_balance, aux, block_number, tx_index): - logg.debug('running {} {} {}'.format(__name__, tokens, accounts)) + logg.debug('running {} {} {}'.format(__name__, token_pair, sender, recipient)) api = Api( str(aux['chain_spec']), queue=queue, @@ -21,5 +21,7 @@ def do(tokens, accounts, aux, block_number, tx_index): callback_task='cic_eth.callbacks.redis.redis', callback_queue=queue, ) + t = api.create_account(register=True) - return (None, t,) + + return (None, t, sender_balance, sender_balance,) diff --git a/apps/contract-migration/scripts/traffic.py b/apps/contract-migration/scripts/traffic.py index ff674c2a..1beaed2d 100644 --- a/apps/contract-migration/scripts/traffic.py +++ b/apps/contract-migration/scripts/traffic.py @@ -30,6 +30,7 @@ from chainlib.eth.block import block_latest from crypto_dev_signer.eth.signer import ReferenceSigner as EIP155Signer from crypto_dev_signer.keystore import DictKeystore from hexathon import strip_0x +from cic_eth.api.api_task import Api logging.basicConfig(level=logging.WARNING) logg = logging.getLogger() @@ -88,6 +89,9 @@ config.add(batchsize, '_BATCH_SIZE', True) config.add(args.redis_host_callback, '_REDIS_HOST_CALLBACK', True) config.add(args.redis_port_callback, '_REDIS_PORT_CALLBACK', True) +# queue +config.add(args.q, '_CELERY_QUEUE', True) + # signer keystore = DictKeystore() if args.y == None: @@ -124,6 +128,11 @@ class TrafficItem: self.uuid = uuid.uuid4() self.ext = None self.result = None + self.sender = None + self.recipient = None + self.source_token = None + self.destination_token = None + self.source_value = 0 def __str__(self): @@ -196,19 +205,18 @@ class TrafficTasker: self.tokens = self.oracles['token'].get_tokens() self.accounts = self.oracles['account'].get_accounts() - self.balances = {} - self.aux = copy.copy(self.default_aux) + self.__balances = {} def load_balances(self): pass - def cache_balance(self, holder_address, token, value): - if self.balances.get(holder_address) == None: - self.balances[holder_address] = {} - self.balances[holder_address][token] = value + def __cache_balance(self, holder_address, token, value): + if self.__balances.get(holder_address) == None: + self.__balances[holder_address] = {} + self.__balances[holder_address][token] = value logg.debug('setting cached balance of {} token {} to {}'.format(holder_address, token, value)) @@ -217,6 +225,28 @@ class TrafficTasker: self.aux[k] = v + def balances(self, accounts=None): + if accounts == None: + accounts = self.accounts + for account in accounts: + for token in self.tokens: + # TODO: use proper redis callback + api = Api( + str(self.aux['chain_spec']), + queue=self.aux['api_queue'], + #callback_param='{}:{}:{}:{}'.format(aux['redis_host_callback'], aux['redis_port_callback'], aux['redis_db'], aux['redis_channel']), + #callback_task='cic_eth.callbacks.redis.redis', + #callback_queue=queue, + ) + t = api.balance(account, token.symbol()) + r = t.get() + for c in t.collect(): + r = c[1] + assert t.successful() + self.__cache_balance(account, token.symbol(), r[0]) + logg.debug('balance sender {} token {} = {}'.format(account, token, r)) + return self.__balances + class Handler: @@ -248,15 +278,51 @@ class Handler: logg.debug('executing handler refresh with accouts {}'.format(traffic_tasker.accounts)) logg.debug('executing handler refresh with tokens {}'.format(traffic_tasker.tokens)) + #senders = copy.copy(traffic_tasker.accounts) + sender_indices = [*range(0, len(traffic_tasker.accounts))] + # TODO: only get balances for the selection that we will be generating for + balances = traffic_tasker.balances() + while True: traffic_item = traffic_router.reserve() if traffic_item == None: logg.debug('no traffic_items left to reserve {}'.format(traffic_item)) break - (e, t,) = traffic_item.method(traffic_tasker.tokens, traffic_tasker.accounts, traffic_tasker.aux, block_number, tx_index) + + # TODO: temporary selection + token_pair = [ + traffic_tasker.tokens[0], + traffic_tasker.tokens[0], + ] + sender_index_index = random.randint(0, len(sender_indices)-1) + sender_index = sender_indices[sender_index_index] + sender = traffic_tasker.accounts[sender_index] + logg.debug('balances {}'.format(balances)) + balance_unit = balances[sender][token_pair[0].symbol()] + balance = balance_unit['balance_network'] - balance_unit['balance_outgoing'] + if len(sender_indices) == 1: + sender_indices[m] = sender_sender_indices[len(senders)-1] + sender_indices = sender_indices[:len(sender_indices)-1] + + recipient_index = random.randint(0, len(traffic_tasker.accounts)) + recipient = traffic_tasker.accounts[recipient_index] + + (e, t, balance,) = traffic_item.method( + token_pair, + sender, + recipient, + balance, + traffic_tasker.aux, + block_number, + tx_index, + ) + balances[sender][token_pair[0].symbol()] = balance_unit + sender_indices.append(recipient_index) + if e != None: - logg.info('traffic method {} failed: {}'.format(traffic_item.name(), e)) + logg.info('failed {}: {}'.format(str(traffic_item), e)) continue + if t == None: logg.info('traffic method {} completed immediately') self.traffic_router.release(traffic_item) @@ -311,7 +377,7 @@ class TokenOracle: token_address = self.getter.get_index(i) t = self.registry.get_address(self.chain_spec, token_address) token_symbol = t.symbol() - self.tokens.append(token_address) + self.tokens.append(t) logg.debug('adding token idx {} symbol {} address {}'.format(i, token_symbol, token_address)) @@ -374,7 +440,6 @@ def main(local_config=None): # Set up magic traffic handler handler = Handler(config, traffic_router) - # Set up syncer syncer_backend = MemBackend(str(chain_spec), 0) o = block_latest() @@ -390,6 +455,7 @@ def main(local_config=None): 'redis_host_callback': config.get('_REDIS_HOST_CALLBACK'), 'redis_port_callback': config.get('_REDIS_PORT_CALLBACK'), 'redis_db': config.get('REDIS_DB'), + 'api_queue': config.get('_CELERY_QUEUE'), } syncer = HeadSyncer(syncer_backend, loop_callback=handler.refresh)