diff --git a/apps/contract-migration/scripts/cmd/traffic.py b/apps/contract-migration/scripts/cmd/traffic.py index f1eefb77..1e5f58c4 100644 --- a/apps/contract-migration/scripts/cmd/traffic.py +++ b/apps/contract-migration/scripts/cmd/traffic.py @@ -15,6 +15,11 @@ logg = logging.getLogger(__name__) def add_args(argparser): + """Parse script specific command line arguments + + :param argparser: Top-level argument parser + :type argparser: argparse.ArgumentParser + """ argparser.formatter_class = formatter_class=RawTextHelpFormatter argparser.add_argument('--redis-host-callback', dest='redis_host_callback', default='localhost', type=str, help='redis host to use for callback') argparser.add_argument('--redis-port-callback', dest='redis_port_callback', default=6379, type=int, help='redis port to use for callback') @@ -26,7 +31,13 @@ def add_args(argparser): class TrafficItem: + """Represents a single item of traffic meta that will be processed by a traffic generation method + The traffic generation module passed in the argument must implement a method "do" with interface conforming to local.noop_traffic.do. + + :param item: Traffic generation module. + :type item: function + """ def __init__(self, item): self.method = item.do self.uuid = uuid.uuid4() @@ -44,8 +55,15 @@ class TrafficItem: class TrafficRouter: + """Holds and selects from the collection of traffic generator modules that will be used for the execution. + :params batch_size: Amount of simultaneous traffic items that can simultanously be in flight. + :type batch_size: number + :raises ValueError: If batch size is zero of negative + """ def __init__(self, batch_size=1): + if batch_size < 1: + raise ValueError('batch size cannot be 0') self.items = [] self.weights = [] self.total_weights = 0 @@ -56,6 +74,18 @@ class TrafficRouter: def add(self, item, weight): + """Add a traffic generator module to the list of modules to choose between for traffic item exectuion. + + The probability that a module will be chosen for any single item is the ratio between the weight parameter and the accumulated weights for all items. + + See local.noop for which criteria the generator module must fulfill. + + :param item: Qualified class path to traffic generator module. Will be dynamically loaded. + :type item: str + :param weight: Selection probability weight + :type weight: number + :raises ModuleNotFound: Invalid item argument + """ self.weights.append(self.total_weights) self.total_weights += weight m = importlib.import_module(item) @@ -63,6 +93,16 @@ class TrafficRouter: def reserve(self): + """Selects the module to be used to execute the next traffic item, using the provided weights. + + If the current number of calls to "reserve" without corresponding calls to "release" equals the set batch size limit, None will be returned. The calling code should allow a short grace period before trying the call again. + :raises ValueError: No items have been added + :returns: A traffic item with the selected module method as the method property. + :rtype: TrafficItem|None + """ + if len(self.items) == 0: + raise ValueError('Add at least one item first') + if len(self.reserved) == self.batch_size: return None @@ -79,43 +119,60 @@ class TrafficRouter: def release(self, traffic_item): + """Releases the traffic item from the list of simultaneous traffic items in flight. + + :param traffic_item: Traffic item + :type traffic_item: TrafficItem + """ del self.reserved[traffic_item.uuid] def apply_import_dict(self, keys, dct): + """Convenience method to add traffic generator modules from a dictionary. + + :param keys: Keys in dictionary to add + :type keys: list of str + :param dct: Dictionary to choose module strings from + :type dct: dict + :raises ModuleNotFoundError: If one of the module strings refer to an invalid module. + """ # parse traffic items for k in keys: if len(k) > 8 and k[:8] == 'TRAFFIC_': v = int(dct.get(k)) - try: - self.add(k[8:].lower(), v) - except ModuleNotFoundError as e: - raise AttributeError('requested traffic item module not found: {}'.format(e)) + self.add(k[8:].lower(), v) logg.debug('found traffic item {} weight {}'.format(k, v)) +# TODO: This will not work well with big networks. The provisioner should use lazy loading and LRU instead. class TrafficProvisioner: + """Loads metadata necessary for traffic item execution. + + Instantiation will by default trigger retrieval of accounts and tokens on the network. + + It will also populate the aux property of the instance with the values from the static aux parameter template. + """ oracles = { 'account': None, 'token': None, } + """Data oracles to be used for traffic item generation""" default_aux = { } + """Aux parameter template to be passed to the traffic generator module""" def __init__(self): - self.tokens = self.oracles['token'].get_tokens() self.accounts = self.oracles['account'].get_accounts() self.aux = copy.copy(self.default_aux) self.__balances = {} + for a in self.accounts: + self.__balances[a] = {} - def load_balances(self): - pass - - + # Caches a single address' balance of a single token def __cache_balance(self, holder_address, token, value): if self.__balances.get(holder_address) == None: self.__balances[holder_address] = {} @@ -124,13 +181,33 @@ class TrafficProvisioner: def add_aux(self, k, v): + """Add a key-value pair to the aux parameter list. + + Does not protect existing entries from being overwritten. + + :param k: Key + :type k: str + :param v: Value + :type v: any + """ logg.debug('added {} = {} to traffictasker'.format(k, v)) self.aux[k] = v - def balances(self, accounts=None, refresh=False): - if refresh: - if accounts == None: + # TODO: Balance list type should perhaps be a class (provided by cic-eth package) due to its complexity. + def balances(self, refresh_accounts=None): + """Retrieves all token balances for the given account list. + + If refresh_accounts is not None, the balance values for the given accounts will be retrieved from upstream. If the argument is an empty list, the balances will be updated for all tokens of all ccounts. If there are many accounts and/or tokens, this may be a VERY EXPENSIVE OPERATION. The "balance" method can be used instead to update individual account/token pair balances. + + :param accounts: List of accounts to refresh balances for. + :type accounts: list of str, 0x-hex + :returns: Dict of dict of dicts; v[accounts][token] = {balance_types} + :rtype: dict + """ + if refresh_accounts != None: + accounts = refresh_accounts + if len(accounts) == 0: accounts = self.accounts for account in accounts: for token in self.tokens: @@ -139,11 +216,23 @@ class TrafficProvisioner: logg.debug('balance sender {} token {} = {}'.format(account, token, value)) else: logg.debug('returning cached balances') + return self.__balances + # TODO: use proper redis callback def balance(self, account, token): - # TODO: use proper redis callback + """Update balance for a single token of a single account from upstream. + + The balance will be the spendable balance at the time of the call. This value may be less than the balance reported by the consensus network, if a previous outgoing transaction is still pending in the network or the custodial system queue. + + :param account: Account to update + :type account: str, 0x-hex + :param token: Token to update balance for + :type token: cic_registry.token.Token + :returns: Updated balance + :rtype: complex balance dict + """ api = Api( str(self.aux['chain_spec']), queue=self.aux['api_queue'], @@ -156,15 +245,38 @@ class TrafficProvisioner: for c in t.collect(): r = c[1] assert t.successful() + #return r[0]['balance_network'] - r[0]['balance_outgoing'] return r[0] def update_balance(self, account, token, value): + """Manually set a token balance for an account. + + :param account: Account to update + :type account: str, 0x-hex + :param token: Token to update balance for + :type token: cic_registry.token.Token + :param value: Balance value to set + :type value: number + :returns: Balance value (unchanged) + :rtype: complex balance dict + """ self.__cache_balance(account, token.symbol(), value) + return value +# TODO: Abstract redis with a generic pubsub adapter class TrafficSyncHandler: + """Encapsulates callback methods required by the chain syncer. + + This implementation uses a redis subscription as backend to retrieve results from asynchronously executed tasks. + :param config: Configuration of current top-level execution + :type config: object with dict get interface + :param traffic_router: Traffic router instance to use for the syncer session. + :type traffic_router: TrafficRouter + :raises Exception: Any Exception redis may raise on connection attempt. + """ def __init__(self, config, traffic_router): self.traffic_router = traffic_router self.redis_channel = str(uuid.uuid4()) @@ -174,6 +286,7 @@ class TrafficSyncHandler: self.init = False + # connects to redis def __connect_redis(self, redis_channel, config): r = redis.Redis(config.get('REDIS_HOST'), config.get('REDIS_PORT'), config.get('REDIS_DB')) redis_pubsub = r.pubsub() @@ -182,20 +295,33 @@ class TrafficSyncHandler: return redis_pubsub + # TODO: This method is too long, split up + # TODO: This method will not yet cache balances for newly created accounts def refresh(self, block_number, tx_index): + """Traffic method and item execution driver to be called on every loop execution of the chain syncer. + Implements the signature required by callbacks called from chainsyncer.driver.loop. + + :param block_number: Syncer block height at time of call. + :type block_number: number + :param tx_index: Syncer block transaction index at time of call. + :type tx_index: number + """ traffic_provisioner = TrafficProvisioner() traffic_provisioner.add_aux('redis_channel', self.redis_channel) - refresh_balance = not self.init - balances = traffic_provisioner.balances(refresh=refresh_balance) + refresh_accounts = None + # Note! This call may be very expensive if there are a lot of accounts and/or tokens on the network + if not self.init: + refresh_accounts = traffic_provisioner.accounts + balances = traffic_provisioner.balances(refresh_accounts=refresh_accounts) self.init = True if len(traffic_provisioner.tokens) == 0: logg.error('patiently waiting for at least one registered token...') return - logg.debug('executing handler refresh with accouts {}'.format(traffic_provisioner.accounts)) + logg.debug('executing handler refresh with accounts {}'.format(traffic_provisioner.accounts)) logg.debug('executing handler refresh with tokens {}'.format(traffic_provisioner.tokens)) sender_indices = [*range(0, len(traffic_provisioner.accounts))] @@ -208,10 +334,10 @@ class TrafficSyncHandler: break # TODO: temporary selection - token_pair = [ + token_pair = ( traffic_provisioner.tokens[0], traffic_provisioner.tokens[0], - ] + ) sender_index_index = random.randint(0, len(sender_indices)-1) sender_index = sender_indices[sender_index_index] sender = traffic_provisioner.accounts[sender_index] @@ -221,16 +347,16 @@ class TrafficSyncHandler: sender_indices = sender_indices[:len(sender_indices)-1] balance_full = traffic_provisioner.balance(sender, token_pair[0]) - balance = balance_full['balance_network'] - balance_full['balance_outgoing'] recipient_index = random.randint(0, len(traffic_provisioner.accounts)-1) recipient = traffic_provisioner.accounts[recipient_index] - + + logg.debug('trigger item {} tokens {} sender {} recipient {} balance {}') (e, t, balance_result,) = traffic_item.method( token_pair, sender, recipient, - balance, + balance_full, traffic_provisioner.aux, block_number, tx_index, @@ -260,19 +386,32 @@ class TrafficSyncHandler: uu = message_data['root_id'] match_item = self.traffic_items[uu] self.traffic_router.release(match_item) - if message_data['status'] == 0: + logg.debug('>>>>>>>>>>>>>>>>>>> match item {} {} {}'.format(match_item, match_item.result, dir(match_item))) + if message_data['status'] != 0: logg.error('task item {} failed with error code {}'.format(match_item, message_data['status'])) else: - match_item['result'] = message_data['result'] + match_item.result = message_data['result'] logg.debug('got callback result: {}'.format(match_item)) def name(self): + """Returns the common name for the syncer callback implementation. Required by the chain syncer. + """ return 'traffic_item_handler' - def filter(self, conn, block, tx, session): + def filter(self, conn, block, tx, db_session): + """Callback for every transaction found in a block. Required by the chain syncer. + + Currently performs no operation. + + :param conn: A HTTPConnection object to the chain rpc provider. + :type conn: chainlib.eth.rpc.HTTPConnection + :param block: The block object of current transaction + :type block: chainlib.eth.block.Block + :param tx: The block transaction object + :type tx: chainlib.eth.tx.Tx + :param db_session: Syncer backend database session + :type db_session: SQLAlchemy.Session + """ logg.debug('handler get {}'.format(tx)) - - - diff --git a/apps/contract-migration/scripts/config/traffic.ini b/apps/contract-migration/scripts/config/traffic.ini index df89d1aa..99b572c8 100644 --- a/apps/contract-migration/scripts/config/traffic.ini +++ b/apps/contract-migration/scripts/config/traffic.ini @@ -1,4 +1,4 @@ [traffic] #local.noop_traffic = 2 -#local.account = 2 +local.account = 2 local.transfer = 2 diff --git a/apps/contract-migration/scripts/local/account.py b/apps/contract-migration/scripts/local/account.py index 41ff5187..35666a0a 100644 --- a/apps/contract-migration/scripts/local/account.py +++ b/apps/contract-migration/scripts/local/account.py @@ -12,7 +12,17 @@ name = 'account' def do(token_pair, sender, recipient, sender_balance, aux, block_number, tx_index): + """Triggers creation and registration of new account through the custodial cic-eth component. + It expects the following aux parameters to exist: + - redis_host_callback: Redis host name exposed to cic-eth, for callback + - redis_port_callback: Redis port exposed to cic-eth, for callback + - redis_db: Redis db, for callback + - redis_channel: Redis channel, for callback + - chain_spec: Chain specification for the chain to execute the transfer on + + See local.noop.do for details on parameters and return values. + """ logg.debug('running {} {} {}'.format(__name__, token_pair, sender, recipient)) api = Api( str(aux['chain_spec']), @@ -24,4 +34,4 @@ def do(token_pair, sender, recipient, sender_balance, aux, block_number, tx_inde t = api.create_account(register=True) - return (None, t, sender_balance, sender_balance,) + return (None, t, sender_balance, ) diff --git a/apps/contract-migration/scripts/local/noop_traffic.py b/apps/contract-migration/scripts/local/noop_traffic.py index 961f807b..85f7cf23 100644 --- a/apps/contract-migration/scripts/local/noop_traffic.py +++ b/apps/contract-migration/scripts/local/noop_traffic.py @@ -5,7 +5,33 @@ logging.basicConfig(level=logging.WARNING) logg = logging.getLogger() -def do(tokens, accounts, aux, block_number, tx_index): - logg.debug('running {} {} {}'.format(__name__, tokens, accounts)) +def do(token_pair, sender, recipient, sender_balance, aux, block_number, tx_index): + """Defines the function signature for a traffic generator. The method itself only logs the input parameters. - return (None, None,) + If the error position in the return tuple is not None, the calling code should consider the generation as failed, and not count it towards the limit of simultaneous traffic items that can be simultaneously in flight. + + If the task_id position in the return tuple is None, the calling code should interpret the traffic item to have been synchronously completed, and not count it towards the limit of simultaneous traffic items that can be simultaneously in flight. + + The balance element of the result is the balance dict passed as argument, with fields updated according to the expected delta as a result of the operation. However, in the event that the generator module dispatches an asynchronous event then there is no guarantee that this balance will actually be the correct result. The caller should take care to periodically re-sync balance from the upstream. + + :param token_pair: Source and destination tokens for the traffic item. + :type token_pair: 2-element tuple with cic_registry.token.Token + :param sender: Sender address + :type sender: str, 0x-hex + :param recipient: Recipient address + :type recipient: str, 0x-hex + :param sender_balance: Sender balance in full decimal resolution + :type sender_balance: complex balance dict + :param aux: Custom parameters defined by traffic generation client code + :type aux: dict + :param block_number: Syncer block number position at time of method call + :type block_number: number + :param tx_index: Syncer block transaction index position at time of method call + :type tx_index: number + :raises KeyError: Missing required aux element + :returns: Exception|None, task_id|None and adjusted_sender_balance respectively + :rtype: tuple + """ + logg.debug('running {} {} {} {} {} {} {} {}'.format(__name__, token_pair, sender, recipient, sender_balance, aux, block_number, tx_index)) + + return (None, None, sender_balance, ) diff --git a/apps/contract-migration/scripts/local/transfer.py b/apps/contract-migration/scripts/local/transfer.py index 9cf5d250..e6a6e74d 100644 --- a/apps/contract-migration/scripts/local/transfer.py +++ b/apps/contract-migration/scripts/local/transfer.py @@ -13,13 +13,26 @@ name = 'erc20_transfer' def do(token_pair, sender, recipient, sender_balance, aux, block_number, tx_index): + """Triggers an ERC20 token transfer through the custodial cic-eth component, with a randomly chosen amount in integer resolution. + It expects the following aux parameters to exist: + - redis_host_callback: Redis host name exposed to cic-eth, for callback + - redis_port_callback: Redis port exposed to cic-eth, for callback + - redis_db: Redis db, for callback + - redis_channel: Redis channel, for callback + - chain_spec: Chain specification for the chain to execute the transfer on + + See local.noop.do for details on parameters and return values. + """ logg.debug('running {} {} {} {}'.format(__name__, token_pair, sender, recipient)) decimals = token_pair[0].decimals() - balance_units = int(sender_balance / decimals) - if balance_units == 0: + sender_balance_value = sender_balance['balance_network'] - sender_balance['balance_outgoing'] + + balance_units = int(sender_balance_value / decimals) + + if balance_units <= 0: return (AttributeError('sender {} has zero balance'), None, 0,) spend_units = random.randint(1, balance_units) @@ -34,6 +47,5 @@ def do(token_pair, sender, recipient, sender_balance, aux, block_number, tx_inde ) t = api.transfer(sender, recipient, spend_value, token_pair[0].symbol()) - changed_sender_balance = sender_balance - spend_value - - return (None, t, changed_sender_balance,) + sender_balance['balance_outgoing'] += spend_value + return (None, t, sender_balance,)