WIP add erc20 transfer traffic item

This commit is contained in:
nolash 2021-02-20 19:17:08 +01:00
parent f2a0ef99ec
commit 8dd8db497c
Signed by untrusted user who does not match committer: lash
GPG Key ID: 21D2E7BB88C2A746
4 changed files with 84 additions and 14 deletions

View File

@ -32,6 +32,7 @@ from cic_eth.admin import ctrl
from cic_eth.eth.rpc import RpcClient from cic_eth.eth.rpc import RpcClient
from cic_eth.eth.rpc import GasOracle from cic_eth.eth.rpc import GasOracle
from cic_eth.queue import tx from cic_eth.queue import tx
from cic_eth.queue import balance
from cic_eth.callbacks import Callback from cic_eth.callbacks import Callback
from cic_eth.callbacks import http from cic_eth.callbacks import http
from cic_eth.callbacks import tcp from cic_eth.callbacks import tcp

View File

@ -1,3 +1,4 @@
[traffic] [traffic]
#local.noop_traffic = 2 #local.noop_traffic = 2
local.account = 2 #local.account = 2
local.transfer = 2

View File

@ -11,9 +11,9 @@ queue = 'cic-eth'
name = 'account' name = 'account'
def do(tokens, accounts, aux, block_number, tx_index): def do(token_pair, sender, recipient, sender_balance, aux, block_number, tx_index):
logg.debug('running {} {} {}'.format(__name__, tokens, accounts)) logg.debug('running {} {} {}'.format(__name__, token_pair, sender, recipient))
api = Api( api = Api(
str(aux['chain_spec']), str(aux['chain_spec']),
queue=queue, queue=queue,
@ -21,5 +21,7 @@ def do(tokens, accounts, aux, block_number, tx_index):
callback_task='cic_eth.callbacks.redis.redis', callback_task='cic_eth.callbacks.redis.redis',
callback_queue=queue, callback_queue=queue,
) )
t = api.create_account(register=True) t = api.create_account(register=True)
return (None, t,)
return (None, t, sender_balance, sender_balance,)

View File

@ -30,6 +30,7 @@ from chainlib.eth.block import block_latest
from crypto_dev_signer.eth.signer import ReferenceSigner as EIP155Signer from crypto_dev_signer.eth.signer import ReferenceSigner as EIP155Signer
from crypto_dev_signer.keystore import DictKeystore from crypto_dev_signer.keystore import DictKeystore
from hexathon import strip_0x from hexathon import strip_0x
from cic_eth.api.api_task import Api
logging.basicConfig(level=logging.WARNING) logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger() logg = logging.getLogger()
@ -88,6 +89,9 @@ config.add(batchsize, '_BATCH_SIZE', True)
config.add(args.redis_host_callback, '_REDIS_HOST_CALLBACK', True) config.add(args.redis_host_callback, '_REDIS_HOST_CALLBACK', True)
config.add(args.redis_port_callback, '_REDIS_PORT_CALLBACK', True) config.add(args.redis_port_callback, '_REDIS_PORT_CALLBACK', True)
# queue
config.add(args.q, '_CELERY_QUEUE', True)
# signer # signer
keystore = DictKeystore() keystore = DictKeystore()
if args.y == None: if args.y == None:
@ -124,6 +128,11 @@ class TrafficItem:
self.uuid = uuid.uuid4() self.uuid = uuid.uuid4()
self.ext = None self.ext = None
self.result = None self.result = None
self.sender = None
self.recipient = None
self.source_token = None
self.destination_token = None
self.source_value = 0
def __str__(self): def __str__(self):
@ -196,19 +205,18 @@ class TrafficTasker:
self.tokens = self.oracles['token'].get_tokens() self.tokens = self.oracles['token'].get_tokens()
self.accounts = self.oracles['account'].get_accounts() self.accounts = self.oracles['account'].get_accounts()
self.balances = {}
self.aux = copy.copy(self.default_aux) self.aux = copy.copy(self.default_aux)
self.__balances = {}
def load_balances(self): def load_balances(self):
pass pass
def cache_balance(self, holder_address, token, value): def __cache_balance(self, holder_address, token, value):
if self.balances.get(holder_address) == None: if self.__balances.get(holder_address) == None:
self.balances[holder_address] = {} self.__balances[holder_address] = {}
self.balances[holder_address][token] = value self.__balances[holder_address][token] = value
logg.debug('setting cached balance of {} token {} to {}'.format(holder_address, token, value)) logg.debug('setting cached balance of {} token {} to {}'.format(holder_address, token, value))
@ -217,6 +225,28 @@ class TrafficTasker:
self.aux[k] = v self.aux[k] = v
def balances(self, accounts=None):
if accounts == None:
accounts = self.accounts
for account in accounts:
for token in self.tokens:
# TODO: use proper redis callback
api = Api(
str(self.aux['chain_spec']),
queue=self.aux['api_queue'],
#callback_param='{}:{}:{}:{}'.format(aux['redis_host_callback'], aux['redis_port_callback'], aux['redis_db'], aux['redis_channel']),
#callback_task='cic_eth.callbacks.redis.redis',
#callback_queue=queue,
)
t = api.balance(account, token.symbol())
r = t.get()
for c in t.collect():
r = c[1]
assert t.successful()
self.__cache_balance(account, token.symbol(), r[0])
logg.debug('balance sender {} token {} = {}'.format(account, token, r))
return self.__balances
class Handler: class Handler:
@ -248,15 +278,51 @@ class Handler:
logg.debug('executing handler refresh with accouts {}'.format(traffic_tasker.accounts)) logg.debug('executing handler refresh with accouts {}'.format(traffic_tasker.accounts))
logg.debug('executing handler refresh with tokens {}'.format(traffic_tasker.tokens)) logg.debug('executing handler refresh with tokens {}'.format(traffic_tasker.tokens))
#senders = copy.copy(traffic_tasker.accounts)
sender_indices = [*range(0, len(traffic_tasker.accounts))]
# TODO: only get balances for the selection that we will be generating for
balances = traffic_tasker.balances()
while True: while True:
traffic_item = traffic_router.reserve() traffic_item = traffic_router.reserve()
if traffic_item == None: if traffic_item == None:
logg.debug('no traffic_items left to reserve {}'.format(traffic_item)) logg.debug('no traffic_items left to reserve {}'.format(traffic_item))
break break
(e, t,) = traffic_item.method(traffic_tasker.tokens, traffic_tasker.accounts, traffic_tasker.aux, block_number, tx_index)
# TODO: temporary selection
token_pair = [
traffic_tasker.tokens[0],
traffic_tasker.tokens[0],
]
sender_index_index = random.randint(0, len(sender_indices)-1)
sender_index = sender_indices[sender_index_index]
sender = traffic_tasker.accounts[sender_index]
logg.debug('balances {}'.format(balances))
balance_unit = balances[sender][token_pair[0].symbol()]
balance = balance_unit['balance_network'] - balance_unit['balance_outgoing']
if len(sender_indices) == 1:
sender_indices[m] = sender_sender_indices[len(senders)-1]
sender_indices = sender_indices[:len(sender_indices)-1]
recipient_index = random.randint(0, len(traffic_tasker.accounts))
recipient = traffic_tasker.accounts[recipient_index]
(e, t, balance,) = traffic_item.method(
token_pair,
sender,
recipient,
balance,
traffic_tasker.aux,
block_number,
tx_index,
)
balances[sender][token_pair[0].symbol()] = balance_unit
sender_indices.append(recipient_index)
if e != None: if e != None:
logg.info('traffic method {} failed: {}'.format(traffic_item.name(), e)) logg.info('failed {}: {}'.format(str(traffic_item), e))
continue continue
if t == None: if t == None:
logg.info('traffic method {} completed immediately') logg.info('traffic method {} completed immediately')
self.traffic_router.release(traffic_item) self.traffic_router.release(traffic_item)
@ -311,7 +377,7 @@ class TokenOracle:
token_address = self.getter.get_index(i) token_address = self.getter.get_index(i)
t = self.registry.get_address(self.chain_spec, token_address) t = self.registry.get_address(self.chain_spec, token_address)
token_symbol = t.symbol() token_symbol = t.symbol()
self.tokens.append(token_address) self.tokens.append(t)
logg.debug('adding token idx {} symbol {} address {}'.format(i, token_symbol, token_address)) logg.debug('adding token idx {} symbol {} address {}'.format(i, token_symbol, token_address))
@ -374,7 +440,6 @@ def main(local_config=None):
# Set up magic traffic handler # Set up magic traffic handler
handler = Handler(config, traffic_router) handler = Handler(config, traffic_router)
# Set up syncer # Set up syncer
syncer_backend = MemBackend(str(chain_spec), 0) syncer_backend = MemBackend(str(chain_spec), 0)
o = block_latest() o = block_latest()
@ -390,6 +455,7 @@ def main(local_config=None):
'redis_host_callback': config.get('_REDIS_HOST_CALLBACK'), 'redis_host_callback': config.get('_REDIS_HOST_CALLBACK'),
'redis_port_callback': config.get('_REDIS_PORT_CALLBACK'), 'redis_port_callback': config.get('_REDIS_PORT_CALLBACK'),
'redis_db': config.get('REDIS_DB'), 'redis_db': config.get('REDIS_DB'),
'api_queue': config.get('_CELERY_QUEUE'),
} }
syncer = HeadSyncer(syncer_backend, loop_callback=handler.refresh) syncer = HeadSyncer(syncer_backend, loop_callback=handler.refresh)