From 47b107c776bce0052f3d9097ff3e368850bec8fc Mon Sep 17 00:00:00 2001 From: nolash Date: Fri, 19 Feb 2021 23:11:25 +0100 Subject: [PATCH] Add traffic router, redis subscription, dynamic traffic item module loading --- .../scripts/config/traffic.ini | 2 + .../scripts/import_users.py | 3 +- .../scripts/local/noop_traffic.py | 9 ++ .../scripts/requirements.txt | 12 +- apps/contract-migration/scripts/traffic.py | 122 ++++++++++++++++-- 5 files changed, 133 insertions(+), 15 deletions(-) create mode 100644 apps/contract-migration/scripts/config/traffic.ini create mode 100644 apps/contract-migration/scripts/local/noop_traffic.py diff --git a/apps/contract-migration/scripts/config/traffic.ini b/apps/contract-migration/scripts/config/traffic.ini new file mode 100644 index 00000000..a95bf36d --- /dev/null +++ b/apps/contract-migration/scripts/config/traffic.ini @@ -0,0 +1,2 @@ +[traffic] +local.noop_traffic = 2 diff --git a/apps/contract-migration/scripts/import_users.py b/apps/contract-migration/scripts/import_users.py index e1bfb695..675d36c7 100644 --- a/apps/contract-migration/scripts/import_users.py +++ b/apps/contract-migration/scripts/import_users.py @@ -101,7 +101,8 @@ def register_eth(i, u): ps.get_message() m = ps.get_message(timeout=args.timeout) try: - address = json.loads(m['data']) + r = json.loads(m['data']) + address = r['result'] except TypeError as e: if m == None: logg.critical('empty response from redis callback (did the service crash?)') diff --git a/apps/contract-migration/scripts/local/noop_traffic.py b/apps/contract-migration/scripts/local/noop_traffic.py new file mode 100644 index 00000000..7a7db240 --- /dev/null +++ b/apps/contract-migration/scripts/local/noop_traffic.py @@ -0,0 +1,9 @@ +# standard imports +import logging + +logging.basicConfig(level=logging.WARNING) +logg = logging.getLogger() + + +def do(config, tokens, accounts, block_number, tx_index): + logg.debug('running {} {} {}'.format(__name__, tokens, accounts)) diff --git a/apps/contract-migration/scripts/requirements.txt b/apps/contract-migration/scripts/requirements.txt index d3215c14..c0ad7587 100644 --- a/apps/contract-migration/scripts/requirements.txt +++ b/apps/contract-migration/scripts/requirements.txt @@ -1,12 +1,12 @@ psycopg2==2.8.6 chainlib~=0.0.1a15 -chainsyncer==0.0.1a9 -cic-eth==0.10.0a28 -cic-registry==0.5.3a19 -confini==0.3.6rc1 +chainsyncer~=0.0.1a9 +cic-eth~=0.10.0a28 +cic-registry~=0.5.3a19 +confini~=0.3.6rc3 celery==4.4.7 redis==3.5.3 -hexathon==0.0.1a3 +hexathon~=0.0.1a3 faker==4.17.1 cic-types==0.1.0a7+build.1c254367 -eth-accounts-index==0.0.10a10 +eth-accounts-index~=0.0.10a10 diff --git a/apps/contract-migration/scripts/traffic.py b/apps/contract-migration/scripts/traffic.py index f28df63d..2a0709ae 100644 --- a/apps/contract-migration/scripts/traffic.py +++ b/apps/contract-migration/scripts/traffic.py @@ -4,8 +4,13 @@ import logging import argparse import re import sys +import uuid +import importlib +import copy +import random # external imports +import redis import confini import web3 from cic_registry import CICRegistry @@ -26,6 +31,11 @@ from hexathon import strip_0x logging.basicConfig(level=logging.WARNING) logg = logging.getLogger() +logging.getLogger('urllib3').setLevel(logging.CRITICAL) +logging.getLogger('websockets.protocol').setLevel(logging.CRITICAL) +logging.getLogger('web3.RequestManager').setLevel(logging.CRITICAL) +logging.getLogger('web3.providers.WebsocketProvider').setLevel(logging.CRITICAL) +logging.getLogger('web3.providers.HTTPProvider').setLevel(logging.CRITICAL) script_dir = os.path.realpath(os.path.dirname(__file__)) default_data_dir = '/usr/local/share/cic/solidity/abi' @@ -82,6 +92,8 @@ config.add(__signer_address, '_SIGNER_ADDRESS') logg.debug('now have key for signer address {}'.format(config.get('_SIGNER_ADDRESS'))) signer = EIP155Signer(keystore) +logg.debug('config:\n{}'.format(config)) + # web3 input # TODO: Replace with chainlib @@ -95,7 +107,64 @@ elif re.match(re_http, config.get('ETH_PROVIDER')): w3 = web3.Web3(blockchain_provider) -logg.debug('config:\n{}'.format(config)) +class TrafficItem: + + def __init__(self, item): + self.method = item.do + self.uuid = uuid.uuid4() + self.complete = False + + +class TrafficRouter: + + def __init__(self, batch_size=1): + self.items = [] + self.weights = [] + self.total_weights = 0 + self.batch_size = batch_size + self.reserved = {} + self.reserved_count = 0 + self.traffic = {} + + + def add(self, item, weight): + self.weights.append(self.total_weights) + self.total_weights += weight + m = importlib.import_module(item) + self.items.append(m) + logg.debug('found traffic item {} weight {}'.format(k, v)) + + + def reserve(self): + if len(self.reserved) == self.batch_size: + return None + + n = random.randint(0, self.total_weights) + item = self.items[0] + for i in range(len(self.weights)): + if n <= self.weights[i]: + item = self.items[i] + break + + ti = TrafficItem(item) + self.reserved[ti.uuid] = ti + return ti + + + def release(self, k): + del self.reserved[k] + + +# parse traffic items +traffic_router = TrafficRouter() +for k in config.all(): + if len(k) > 8 and k[:8] == 'TRAFFIC_': + v = int(config.get(k)) + try: + traffic_router.add(k[8:].lower(), v) + except ModuleNotFoundError as e: + logg.critical('requested traffic item module not found: {}'.format(e)) + sys.exit(1) class TokenOracle: @@ -122,7 +191,7 @@ class TokenOracle: logg.debug('adding token idx {} symbol {} address {}'.format(i, token_symbol, token_address)) - return self.tokens + return copy.copy(self.tokens) class AccountsOracle: @@ -146,16 +215,28 @@ class AccountsOracle: self.accounts.append(account) logg.debug('adding account {}'.format(account)) - return self.accounts + return copy.copy(self.accounts) -class Refresher: +class Handler: - def __init__(self, chain_spec, registry): + def __init__(self, config, chain_spec, registry, traffic_router): self.chain_spec = chain_spec self.registry = registry self.token_oracle = TokenOracle(self.chain_spec, self.registry) self.accounts_oracle = AccountsOracle(self.chain_spec, self.registry) + self.traffic_router = traffic_router + self.redis_channel = str(uuid.uuid4()) + self.pubsub = self.__connect_redis(self.redis_channel, config) + + + def __connect_redis(self, redis_channel, config): + r = redis.Redis(config.get('REDIS_HOST'), config.get('REDIS_PORT'), config.get('REDIS_DB')) + redis_pubsub = r.pubsub() + redis_pubsub.subscribe(redis_channel) + logg.debug('redis connected on channel {}'.format(redis_channel)) + return redis_pubsub + def refresh(self, block_number, tx_index): tokens = self.token_oracle.get_tokens() @@ -163,11 +244,30 @@ class Refresher: if len(accounts) == 0: logg.error('no accounts yet') + #return elif len(tokens) == 0: logg.error('no tokens yet') + #return + + item = traffic_router.reserve() + if item != None: + item.method(config, tokens, accounts, block_number, tx_index) + + # TODO: add drain + m = self.pubsub.get_message(timeout=0.01) + if m != None + pass + + + def name(self): + return 'traffic_item_handler' + + + def filter(self, conn, block, tx, session): + logg.debug('handler get {}'.format(tx)) + - def main(local_config=None): if local_config != None: @@ -197,14 +297,20 @@ def main(local_config=None): gas_oracle = DefaultGasOracle(conn) nonce_oracle = DefaultNonceOracle(config.get('_SIGNER_ADDRESS'), conn) + # Set up magic traffic handler + handler = Handler(config, chain_spec, CICRegistry, traffic_router) + + # Set up syncer syncer_backend = MemBackend(str(chain_spec), 0) o = block_latest() r = conn.do(o) block_offset = int(strip_0x(r), 16) + 1 syncer_backend.set(block_offset, 0) - refresher_callback = Refresher(chain_spec, CICRegistry) - syncer = HeadSyncer(syncer_backend, loop_callback=refresher_callback.refresh) + + + syncer = HeadSyncer(syncer_backend, loop_callback=handler.refresh) + syncer.add_filter(handler) syncer.loop(1, conn) if __name__ == '__main__':