From 93ae16b578d2a5defd5c6fbe6dd67a99ea7b4661 Mon Sep 17 00:00:00 2001 From: nolash Date: Sat, 20 Feb 2021 22:43:30 +0100 Subject: [PATCH] Short-circuit balance cache --- apps/contract-migration/scripts/traffic.py | 73 +++++++++++++--------- 1 file changed, 44 insertions(+), 29 deletions(-) diff --git a/apps/contract-migration/scripts/traffic.py b/apps/contract-migration/scripts/traffic.py index 69cb1e52..591df413 100644 --- a/apps/contract-migration/scripts/traffic.py +++ b/apps/contract-migration/scripts/traffic.py @@ -225,29 +225,41 @@ class TrafficTasker: self.aux[k] = v - def balances(self, accounts=None): - if accounts == None: - accounts = self.accounts - for account in accounts: - for token in self.tokens: - # TODO: use proper redis callback - api = Api( - str(self.aux['chain_spec']), - queue=self.aux['api_queue'], - #callback_param='{}:{}:{}:{}'.format(aux['redis_host_callback'], aux['redis_port_callback'], aux['redis_db'], aux['redis_channel']), - #callback_task='cic_eth.callbacks.redis.redis', - #callback_queue=queue, - ) - t = api.balance(account, token.symbol()) - r = t.get() - for c in t.collect(): - r = c[1] - assert t.successful() - self.__cache_balance(account, token.symbol(), r[0]) - logg.debug('balance sender {} token {} = {}'.format(account, token, r)) + def balances(self, accounts=None, refresh=False): + if refresh: + if accounts == None: + accounts = self.accounts + for account in accounts: + for token in self.tokens: + value = self.balance(account, token) + self.__cache_balance(account, token.symbol(), value) + logg.debug('balance sender {} token {} = {}'.format(account, token, value)) + else: + logg.debug('returning cached balances') return self.__balances + def balance(self, account, token): + # TODO: use proper redis callback + api = Api( + str(self.aux['chain_spec']), + queue=self.aux['api_queue'], + #callback_param='{}:{}:{}:{}'.format(aux['redis_host_callback'], aux['redis_port_callback'], aux['redis_db'], aux['redis_channel']), + #callback_task='cic_eth.callbacks.redis.redis', + #callback_queue=queue, + ) + t = api.balance(account, token.symbol()) + r = t.get() + for c in t.collect(): + r = c[1] + assert t.successful() + return r[0] + + + def update_balance(self, account, token, value): + self.__cache_balance(account, token.symbol(), value) + + class Handler: def __init__(self, config, traffic_router): @@ -256,6 +268,7 @@ class Handler: self.pubsub = self.__connect_redis(self.redis_channel, config) self.traffic_items = {} self.config = config + self.init = False def __connect_redis(self, redis_channel, config): @@ -271,6 +284,10 @@ class Handler: traffic_tasker = TrafficTasker() traffic_tasker.add_aux('redis_channel', self.redis_channel) + refresh_balance = not self.init + balances = traffic_tasker.balances(refresh=refresh_balance) + self.init = True + if len(traffic_tasker.tokens) == 0: logg.error('patiently waiting for at least one registered token...') return @@ -278,10 +295,8 @@ class Handler: logg.debug('executing handler refresh with accouts {}'.format(traffic_tasker.accounts)) logg.debug('executing handler refresh with tokens {}'.format(traffic_tasker.tokens)) - #senders = copy.copy(traffic_tasker.accounts) sender_indices = [*range(0, len(traffic_tasker.accounts))] # TODO: only get balances for the selection that we will be generating for - balances = traffic_tasker.balances() while True: traffic_item = traffic_router.reserve() @@ -297,17 +312,18 @@ class Handler: sender_index_index = random.randint(0, len(sender_indices)-1) sender_index = sender_indices[sender_index_index] sender = traffic_tasker.accounts[sender_index] - logg.debug('balances {}'.format(balances)) - balance_unit = balances[sender][token_pair[0].symbol()] - balance = balance_unit['balance_network'] - balance_unit['balance_outgoing'] + #balance_full = balances[sender][token_pair[0].symbol()] if len(sender_indices) == 1: sender_indices[m] = sender_sender_indices[len(senders)-1] sender_indices = sender_indices[:len(sender_indices)-1] - recipient_index = random.randint(0, len(traffic_tasker.accounts)) + balance_full = traffic_tasker.balance(sender, token_pair[0]) + balance = balance_full['balance_network'] - balance_full['balance_outgoing'] + + recipient_index = random.randint(0, len(traffic_tasker.accounts)-1) recipient = traffic_tasker.accounts[recipient_index] - (e, t, balance,) = traffic_item.method( + (e, t, balance_result,) = traffic_item.method( token_pair, sender, recipient, @@ -316,7 +332,7 @@ class Handler: block_number, tx_index, ) - balances[sender][token_pair[0].symbol()] = balance_unit + traffic_tasker.update_balance(sender, token_pair[0], balance_result) sender_indices.append(recipient_index) if e != None: @@ -331,7 +347,6 @@ class Handler: self.traffic_items[traffic_item.ext] = traffic_item - # TODO: add drain while True: m = self.pubsub.get_message(timeout=0.1) if m == None: