Replace old registry helpers with local cache adapters, traffic scripts
This commit is contained in:
parent
7ac6c9a1e8
commit
c68de45bcb
2
apps/cic-cache/config/test/syncer.ini
Normal file
2
apps/cic-cache/config/test/syncer.ini
Normal file
@ -0,0 +1,2 @@
|
|||||||
|
[syncer]
|
||||||
|
loop_interval = 1
|
76
apps/contract-migration/scripts/cic_eth/traffic/cmd/cache.py
Normal file
76
apps/contract-migration/scripts/cic_eth/traffic/cmd/cache.py
Normal file
@ -0,0 +1,76 @@
|
|||||||
|
# external imports
|
||||||
|
from chainlib.jsonrpc import JSONRPCException
|
||||||
|
from eth_erc20 import ERC20
|
||||||
|
from eth_accounts_index import AccountsIndex
|
||||||
|
from eth_token_index import TokenUniqueSymbolIndex
|
||||||
|
|
||||||
|
class ERC20Token:
|
||||||
|
|
||||||
|
def __init__(self, chain_spec, address, conn):
|
||||||
|
self.__address = address
|
||||||
|
|
||||||
|
c = ERC20(chain_spec)
|
||||||
|
o = c.symbol(address)
|
||||||
|
r = conn.do(o)
|
||||||
|
self.__symbol = c.parse_symbol(r)
|
||||||
|
|
||||||
|
o = c.decimals(address)
|
||||||
|
r = conn.do(o)
|
||||||
|
self.__decimals = c.parse_decimals(r)
|
||||||
|
|
||||||
|
|
||||||
|
def symbol(self):
|
||||||
|
return self.__symbol
|
||||||
|
|
||||||
|
|
||||||
|
def decimals(self):
|
||||||
|
return self.__decimals
|
||||||
|
|
||||||
|
|
||||||
|
class IndexCache:
|
||||||
|
|
||||||
|
def __init__(self, chain_spec, address):
|
||||||
|
self.address = address
|
||||||
|
self.chain_spec = chain_spec
|
||||||
|
|
||||||
|
|
||||||
|
def parse(self, r):
|
||||||
|
return r
|
||||||
|
|
||||||
|
|
||||||
|
def get(self, conn):
|
||||||
|
entries = []
|
||||||
|
i = 0
|
||||||
|
while True:
|
||||||
|
o = self.o.entry(self.address, i)
|
||||||
|
try:
|
||||||
|
r = conn.do(o)
|
||||||
|
entries.append(self.parse(r, conn))
|
||||||
|
except JSONRPCException:
|
||||||
|
return entries
|
||||||
|
i += 1
|
||||||
|
|
||||||
|
|
||||||
|
class AccountRegistryCache(IndexCache):
|
||||||
|
|
||||||
|
def __init__(self, chain_spec, address):
|
||||||
|
super(AccountRegistryCache, self).__init__(chain_spec, address)
|
||||||
|
self.o = AccountsIndex(chain_spec)
|
||||||
|
self.get_accounts = self.get
|
||||||
|
|
||||||
|
|
||||||
|
def parse(self, r, conn):
|
||||||
|
return self.o.parse_account(r)
|
||||||
|
|
||||||
|
|
||||||
|
class TokenRegistryCache(IndexCache):
|
||||||
|
|
||||||
|
def __init__(self, chain_spec, address):
|
||||||
|
super(TokenRegistryCache, self).__init__(chain_spec, address)
|
||||||
|
self.o = TokenUniqueSymbolIndex(chain_spec)
|
||||||
|
self.get_tokens = self.get
|
||||||
|
|
||||||
|
|
||||||
|
def parse(self, r, conn):
|
||||||
|
token_address = self.o.parse_entry(r)
|
||||||
|
return ERC20Token(self.chain_spec, token_address, conn)
|
@ -163,9 +163,9 @@ class TrafficProvisioner:
|
|||||||
"""Aux parameter template to be passed to the traffic generator module"""
|
"""Aux parameter template to be passed to the traffic generator module"""
|
||||||
|
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self, conn):
|
||||||
self.tokens = self.oracles['token'].get_tokens()
|
self.tokens = self.oracles['token'].get_tokens(conn)
|
||||||
self.accounts = self.oracles['account'].get_accounts()
|
self.accounts = self.oracles['account'].get_accounts(conn)
|
||||||
self.aux = copy.copy(self.default_aux)
|
self.aux = copy.copy(self.default_aux)
|
||||||
self.__balances = {}
|
self.__balances = {}
|
||||||
for a in self.accounts:
|
for a in self.accounts:
|
||||||
@ -277,13 +277,14 @@ class TrafficSyncHandler:
|
|||||||
:type traffic_router: TrafficRouter
|
:type traffic_router: TrafficRouter
|
||||||
:raises Exception: Any Exception redis may raise on connection attempt.
|
:raises Exception: Any Exception redis may raise on connection attempt.
|
||||||
"""
|
"""
|
||||||
def __init__(self, config, traffic_router):
|
def __init__(self, config, traffic_router, conn):
|
||||||
self.traffic_router = traffic_router
|
self.traffic_router = traffic_router
|
||||||
self.redis_channel = str(uuid.uuid4())
|
self.redis_channel = str(uuid.uuid4())
|
||||||
self.pubsub = self.__connect_redis(self.redis_channel, config)
|
self.pubsub = self.__connect_redis(self.redis_channel, config)
|
||||||
self.traffic_items = {}
|
self.traffic_items = {}
|
||||||
self.config = config
|
self.config = config
|
||||||
self.init = False
|
self.init = False
|
||||||
|
self.conn = conn
|
||||||
|
|
||||||
|
|
||||||
# connects to redis
|
# connects to redis
|
||||||
@ -307,7 +308,7 @@ class TrafficSyncHandler:
|
|||||||
:param tx_index: Syncer block transaction index at time of call.
|
:param tx_index: Syncer block transaction index at time of call.
|
||||||
:type tx_index: number
|
:type tx_index: number
|
||||||
"""
|
"""
|
||||||
traffic_provisioner = TrafficProvisioner()
|
traffic_provisioner = TrafficProvisioner(self.conn)
|
||||||
traffic_provisioner.add_aux('redis_channel', self.redis_channel)
|
traffic_provisioner.add_aux('redis_channel', self.redis_channel)
|
||||||
|
|
||||||
refresh_accounts = None
|
refresh_accounts = None
|
||||||
@ -343,7 +344,7 @@ class TrafficSyncHandler:
|
|||||||
sender = traffic_provisioner.accounts[sender_index]
|
sender = traffic_provisioner.accounts[sender_index]
|
||||||
#balance_full = balances[sender][token_pair[0].symbol()]
|
#balance_full = balances[sender][token_pair[0].symbol()]
|
||||||
if len(sender_indices) == 1:
|
if len(sender_indices) == 1:
|
||||||
sender_indices[m] = sender_sender_indices[len(senders)-1]
|
sender_indices[sender_index] = sender_indices[len(sender_indices)-1]
|
||||||
sender_indices = sender_indices[:len(sender_indices)-1]
|
sender_indices = sender_indices[:len(sender_indices)-1]
|
||||||
|
|
||||||
balance_full = traffic_provisioner.balance(sender, token_pair[0])
|
balance_full = traffic_provisioner.balance(sender, token_pair[0])
|
||||||
@ -359,7 +360,6 @@ class TrafficSyncHandler:
|
|||||||
balance_full,
|
balance_full,
|
||||||
traffic_provisioner.aux,
|
traffic_provisioner.aux,
|
||||||
block_number,
|
block_number,
|
||||||
tx_index,
|
|
||||||
)
|
)
|
||||||
traffic_provisioner.update_balance(sender, token_pair[0], balance_result)
|
traffic_provisioner.update_balance(sender, token_pair[0], balance_result)
|
||||||
sender_indices.append(recipient_index)
|
sender_indices.append(recipient_index)
|
||||||
|
@ -11,7 +11,7 @@ queue = 'cic-eth'
|
|||||||
name = 'account'
|
name = 'account'
|
||||||
|
|
||||||
|
|
||||||
def do(token_pair, sender, recipient, sender_balance, aux, block_number, tx_index):
|
def do(token_pair, sender, recipient, sender_balance, aux, block_number):
|
||||||
"""Triggers creation and registration of new account through the custodial cic-eth component.
|
"""Triggers creation and registration of new account through the custodial cic-eth component.
|
||||||
|
|
||||||
It expects the following aux parameters to exist:
|
It expects the following aux parameters to exist:
|
||||||
|
@ -5,7 +5,7 @@ logging.basicConfig(level=logging.WARNING)
|
|||||||
logg = logging.getLogger()
|
logg = logging.getLogger()
|
||||||
|
|
||||||
|
|
||||||
def do(token_pair, sender, recipient, sender_balance, aux, block_number, tx_index):
|
def do(token_pair, sender, recipient, sender_balance, aux, block_number):
|
||||||
"""Defines the function signature for a traffic generator. The method itself only logs the input parameters.
|
"""Defines the function signature for a traffic generator. The method itself only logs the input parameters.
|
||||||
|
|
||||||
If the error position in the return tuple is not None, the calling code should consider the generation as failed, and not count it towards the limit of simultaneous traffic items that can be simultaneously in flight.
|
If the error position in the return tuple is not None, the calling code should consider the generation as failed, and not count it towards the limit of simultaneous traffic items that can be simultaneously in flight.
|
||||||
@ -26,12 +26,10 @@ def do(token_pair, sender, recipient, sender_balance, aux, block_number, tx_inde
|
|||||||
:type aux: dict
|
:type aux: dict
|
||||||
:param block_number: Syncer block number position at time of method call
|
:param block_number: Syncer block number position at time of method call
|
||||||
:type block_number: number
|
:type block_number: number
|
||||||
:param tx_index: Syncer block transaction index position at time of method call
|
|
||||||
:type tx_index: number
|
|
||||||
:raises KeyError: Missing required aux element
|
:raises KeyError: Missing required aux element
|
||||||
:returns: Exception|None, task_id|None and adjusted_sender_balance respectively
|
:returns: Exception|None, task_id|None and adjusted_sender_balance respectively
|
||||||
:rtype: tuple
|
:rtype: tuple
|
||||||
"""
|
"""
|
||||||
logg.debug('running {} {} {} {} {} {} {} {}'.format(__name__, token_pair, sender, recipient, sender_balance, aux, block_number, tx_index))
|
logg.debug('running {} {} {} {} {} {} {}'.format(__name__, token_pair, sender, recipient, sender_balance, aux, block_number))
|
||||||
|
|
||||||
return (None, None, sender_balance, )
|
return (None, None, sender_balance, )
|
||||||
|
@ -12,7 +12,7 @@ queue = 'cic-eth'
|
|||||||
name = 'erc20_transfer'
|
name = 'erc20_transfer'
|
||||||
|
|
||||||
|
|
||||||
def do(token_pair, sender, recipient, sender_balance, aux, block_number, tx_index):
|
def do(token_pair, sender, recipient, sender_balance, aux, block_number):
|
||||||
"""Triggers an ERC20 token transfer through the custodial cic-eth component, with a randomly chosen amount in integer resolution.
|
"""Triggers an ERC20 token transfer through the custodial cic-eth component, with a randomly chosen amount in integer resolution.
|
||||||
|
|
||||||
It expects the following aux parameters to exist:
|
It expects the following aux parameters to exist:
|
||||||
|
@ -12,6 +12,7 @@ from cic_eth_registry.registry import CICRegistry
|
|||||||
from chainsyncer.backend.memory import MemBackend
|
from chainsyncer.backend.memory import MemBackend
|
||||||
from chainsyncer.driver import HeadSyncer
|
from chainsyncer.driver import HeadSyncer
|
||||||
from chainlib.eth.connection import EthHTTPConnection
|
from chainlib.eth.connection import EthHTTPConnection
|
||||||
|
from chainlib.chain import ChainSpec
|
||||||
from chainlib.eth.gas import RPCGasOracle
|
from chainlib.eth.gas import RPCGasOracle
|
||||||
from chainlib.eth.nonce import RPCNonceOracle
|
from chainlib.eth.nonce import RPCNonceOracle
|
||||||
from chainlib.eth.block import block_latest
|
from chainlib.eth.block import block_latest
|
||||||
@ -33,6 +34,10 @@ from cmd.traffic import (
|
|||||||
TrafficSyncHandler,
|
TrafficSyncHandler,
|
||||||
)
|
)
|
||||||
from cmd.traffic import add_args as add_traffic_args
|
from cmd.traffic import add_args as add_traffic_args
|
||||||
|
from cmd.cache import (
|
||||||
|
AccountRegistryCache,
|
||||||
|
TokenRegistryCache,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
# common basics
|
# common basics
|
||||||
@ -59,6 +64,7 @@ config.add(args.q, '_CELERY_QUEUE', True)
|
|||||||
|
|
||||||
logg.debug(config)
|
logg.debug(config)
|
||||||
|
|
||||||
|
chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
# create signer (not currently in use, but needs to be accessible for custom traffic item generators)
|
# create signer (not currently in use, but needs to be accessible for custom traffic item generators)
|
||||||
@ -71,7 +77,8 @@ def main():
|
|||||||
rpc.setup(config.get('CIC_CHAIN_SPEC'), config.get('ETH_PROVIDER')) # replace with HTTPConnection when registry has been so refactored
|
rpc.setup(config.get('CIC_CHAIN_SPEC'), config.get('ETH_PROVIDER')) # replace with HTTPConnection when registry has been so refactored
|
||||||
conn = EthHTTPConnection(config.get('ETH_PROVIDER'))
|
conn = EthHTTPConnection(config.get('ETH_PROVIDER'))
|
||||||
#registry = registry.init_legacy(config, w3)
|
#registry = registry.init_legacy(config, w3)
|
||||||
registry = CICRegistry(config.get('CIC_CHAIN_SPEC'), conn)
|
CICRegistry.address = config.get('CIC_REGISTRY_ADDRESS')
|
||||||
|
registry = CICRegistry(chain_spec, conn)
|
||||||
|
|
||||||
# Connect to blockchain with chainlib
|
# Connect to blockchain with chainlib
|
||||||
gas_oracle = RPCGasOracle(conn)
|
gas_oracle = RPCGasOracle(conn)
|
||||||
@ -80,9 +87,8 @@ def main():
|
|||||||
# Set up magic traffic handler
|
# Set up magic traffic handler
|
||||||
traffic_router = TrafficRouter()
|
traffic_router = TrafficRouter()
|
||||||
traffic_router.apply_import_dict(config.all(), config)
|
traffic_router.apply_import_dict(config.all(), config)
|
||||||
handler = TrafficSyncHandler(config, traffic_router)
|
handler = TrafficSyncHandler(config, traffic_router, conn)
|
||||||
|
|
||||||
return
|
|
||||||
# Set up syncer
|
# Set up syncer
|
||||||
syncer_backend = MemBackend(config.get('CIC_CHAIN_SPEC'), 0)
|
syncer_backend = MemBackend(config.get('CIC_CHAIN_SPEC'), 0)
|
||||||
o = block_latest()
|
o = block_latest()
|
||||||
@ -90,9 +96,21 @@ def main():
|
|||||||
block_offset = int(strip_0x(r), 16) + 1
|
block_offset = int(strip_0x(r), 16) + 1
|
||||||
syncer_backend.set(block_offset, 0)
|
syncer_backend.set(block_offset, 0)
|
||||||
|
|
||||||
|
# get relevant registry entries
|
||||||
|
token_registry = registry.lookup('TokenRegistry')
|
||||||
|
logg.info('using token registry {}'.format(token_registry))
|
||||||
|
token_cache = TokenRegistryCache(chain_spec, token_registry)
|
||||||
|
|
||||||
|
account_registry = registry.lookup('TokenRegistry')
|
||||||
|
logg.info('using account registry {}'.format(account_registry))
|
||||||
|
account_cache = AccountRegistryCache(chain_spec, account_registry)
|
||||||
|
|
||||||
# Set up provisioner for common task input data
|
# Set up provisioner for common task input data
|
||||||
TrafficProvisioner.oracles['token']= common.registry.TokenOracle(w3, config.get('CIC_CHAIN_SPEC'), registry)
|
#TrafficProvisioner.oracles['token']= common.registry.TokenOracle(w3, config.get('CIC_CHAIN_SPEC'), registry)
|
||||||
TrafficProvisioner.oracles['account'] = common.registry.AccountsOracle(w3, config.get('CIC_CHAIN_SPEC'), registry)
|
#TrafficProvisioner.oracles['account'] = common.registry.AccountsOracle(w3, config.get('CIC_CHAIN_SPEC'), registry)
|
||||||
|
TrafficProvisioner.oracles['token']= token_cache
|
||||||
|
TrafficProvisioner.oracles['account'] = account_cache
|
||||||
|
|
||||||
TrafficProvisioner.default_aux = {
|
TrafficProvisioner.default_aux = {
|
||||||
'chain_spec': config.get('CIC_CHAIN_SPEC'),
|
'chain_spec': config.get('CIC_CHAIN_SPEC'),
|
||||||
'registry': registry,
|
'registry': registry,
|
||||||
@ -102,7 +120,7 @@ def main():
|
|||||||
'api_queue': config.get('_CELERY_QUEUE'),
|
'api_queue': config.get('_CELERY_QUEUE'),
|
||||||
}
|
}
|
||||||
|
|
||||||
syncer = HeadSyncer(syncer_backend, loop_callback=handler.refresh)
|
syncer = HeadSyncer(syncer_backend, block_callback=handler.refresh)
|
||||||
syncer.add_filter(handler)
|
syncer.add_filter(handler)
|
||||||
syncer.loop(1, conn)
|
syncer.loop(1, conn)
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user