Add traffic router, redis subscription, dynamic traffic item module loading

This commit is contained in:
nolash 2021-02-19 23:11:25 +01:00
parent 2a9c74080f
commit 47b107c776
Signed by untrusted user who does not match committer: lash
GPG Key ID: 21D2E7BB88C2A746
5 changed files with 133 additions and 15 deletions

View File

@ -0,0 +1,2 @@
[traffic]
local.noop_traffic = 2

View File

@ -101,7 +101,8 @@ def register_eth(i, u):
ps.get_message() ps.get_message()
m = ps.get_message(timeout=args.timeout) m = ps.get_message(timeout=args.timeout)
try: try:
address = json.loads(m['data']) r = json.loads(m['data'])
address = r['result']
except TypeError as e: except TypeError as e:
if m == None: if m == None:
logg.critical('empty response from redis callback (did the service crash?)') logg.critical('empty response from redis callback (did the service crash?)')

View File

@ -0,0 +1,9 @@
# standard imports
import logging
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
def do(config, tokens, accounts, block_number, tx_index):
logg.debug('running {} {} {}'.format(__name__, tokens, accounts))

View File

@ -1,12 +1,12 @@
psycopg2==2.8.6 psycopg2==2.8.6
chainlib~=0.0.1a15 chainlib~=0.0.1a15
chainsyncer==0.0.1a9 chainsyncer~=0.0.1a9
cic-eth==0.10.0a28 cic-eth~=0.10.0a28
cic-registry==0.5.3a19 cic-registry~=0.5.3a19
confini==0.3.6rc1 confini~=0.3.6rc3
celery==4.4.7 celery==4.4.7
redis==3.5.3 redis==3.5.3
hexathon==0.0.1a3 hexathon~=0.0.1a3
faker==4.17.1 faker==4.17.1
cic-types==0.1.0a7+build.1c254367 cic-types==0.1.0a7+build.1c254367
eth-accounts-index==0.0.10a10 eth-accounts-index~=0.0.10a10

View File

@ -4,8 +4,13 @@ import logging
import argparse import argparse
import re import re
import sys import sys
import uuid
import importlib
import copy
import random
# external imports # external imports
import redis
import confini import confini
import web3 import web3
from cic_registry import CICRegistry from cic_registry import CICRegistry
@ -26,6 +31,11 @@ from hexathon import strip_0x
logging.basicConfig(level=logging.WARNING) logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger() logg = logging.getLogger()
logging.getLogger('urllib3').setLevel(logging.CRITICAL)
logging.getLogger('websockets.protocol').setLevel(logging.CRITICAL)
logging.getLogger('web3.RequestManager').setLevel(logging.CRITICAL)
logging.getLogger('web3.providers.WebsocketProvider').setLevel(logging.CRITICAL)
logging.getLogger('web3.providers.HTTPProvider').setLevel(logging.CRITICAL)
script_dir = os.path.realpath(os.path.dirname(__file__)) script_dir = os.path.realpath(os.path.dirname(__file__))
default_data_dir = '/usr/local/share/cic/solidity/abi' default_data_dir = '/usr/local/share/cic/solidity/abi'
@ -82,6 +92,8 @@ config.add(__signer_address, '_SIGNER_ADDRESS')
logg.debug('now have key for signer address {}'.format(config.get('_SIGNER_ADDRESS'))) logg.debug('now have key for signer address {}'.format(config.get('_SIGNER_ADDRESS')))
signer = EIP155Signer(keystore) signer = EIP155Signer(keystore)
logg.debug('config:\n{}'.format(config))
# web3 input # web3 input
# TODO: Replace with chainlib # TODO: Replace with chainlib
@ -95,7 +107,64 @@ elif re.match(re_http, config.get('ETH_PROVIDER')):
w3 = web3.Web3(blockchain_provider) w3 = web3.Web3(blockchain_provider)
logg.debug('config:\n{}'.format(config)) class TrafficItem:
def __init__(self, item):
self.method = item.do
self.uuid = uuid.uuid4()
self.complete = False
class TrafficRouter:
def __init__(self, batch_size=1):
self.items = []
self.weights = []
self.total_weights = 0
self.batch_size = batch_size
self.reserved = {}
self.reserved_count = 0
self.traffic = {}
def add(self, item, weight):
self.weights.append(self.total_weights)
self.total_weights += weight
m = importlib.import_module(item)
self.items.append(m)
logg.debug('found traffic item {} weight {}'.format(k, v))
def reserve(self):
if len(self.reserved) == self.batch_size:
return None
n = random.randint(0, self.total_weights)
item = self.items[0]
for i in range(len(self.weights)):
if n <= self.weights[i]:
item = self.items[i]
break
ti = TrafficItem(item)
self.reserved[ti.uuid] = ti
return ti
def release(self, k):
del self.reserved[k]
# parse traffic items
traffic_router = TrafficRouter()
for k in config.all():
if len(k) > 8 and k[:8] == 'TRAFFIC_':
v = int(config.get(k))
try:
traffic_router.add(k[8:].lower(), v)
except ModuleNotFoundError as e:
logg.critical('requested traffic item module not found: {}'.format(e))
sys.exit(1)
class TokenOracle: class TokenOracle:
@ -122,7 +191,7 @@ class TokenOracle:
logg.debug('adding token idx {} symbol {} address {}'.format(i, token_symbol, token_address)) logg.debug('adding token idx {} symbol {} address {}'.format(i, token_symbol, token_address))
return self.tokens return copy.copy(self.tokens)
class AccountsOracle: class AccountsOracle:
@ -146,16 +215,28 @@ class AccountsOracle:
self.accounts.append(account) self.accounts.append(account)
logg.debug('adding account {}'.format(account)) logg.debug('adding account {}'.format(account))
return self.accounts return copy.copy(self.accounts)
class Refresher: class Handler:
def __init__(self, chain_spec, registry): def __init__(self, config, chain_spec, registry, traffic_router):
self.chain_spec = chain_spec self.chain_spec = chain_spec
self.registry = registry self.registry = registry
self.token_oracle = TokenOracle(self.chain_spec, self.registry) self.token_oracle = TokenOracle(self.chain_spec, self.registry)
self.accounts_oracle = AccountsOracle(self.chain_spec, self.registry) self.accounts_oracle = AccountsOracle(self.chain_spec, self.registry)
self.traffic_router = traffic_router
self.redis_channel = str(uuid.uuid4())
self.pubsub = self.__connect_redis(self.redis_channel, config)
def __connect_redis(self, redis_channel, config):
r = redis.Redis(config.get('REDIS_HOST'), config.get('REDIS_PORT'), config.get('REDIS_DB'))
redis_pubsub = r.pubsub()
redis_pubsub.subscribe(redis_channel)
logg.debug('redis connected on channel {}'.format(redis_channel))
return redis_pubsub
def refresh(self, block_number, tx_index): def refresh(self, block_number, tx_index):
tokens = self.token_oracle.get_tokens() tokens = self.token_oracle.get_tokens()
@ -163,9 +244,28 @@ class Refresher:
if len(accounts) == 0: if len(accounts) == 0:
logg.error('no accounts yet') logg.error('no accounts yet')
#return
elif len(tokens) == 0: elif len(tokens) == 0:
logg.error('no tokens yet') logg.error('no tokens yet')
#return
item = traffic_router.reserve()
if item != None:
item.method(config, tokens, accounts, block_number, tx_index)
# TODO: add drain
m = self.pubsub.get_message(timeout=0.01)
if m != None
pass
def name(self):
return 'traffic_item_handler'
def filter(self, conn, block, tx, session):
logg.debug('handler get {}'.format(tx))
def main(local_config=None): def main(local_config=None):
@ -197,14 +297,20 @@ def main(local_config=None):
gas_oracle = DefaultGasOracle(conn) gas_oracle = DefaultGasOracle(conn)
nonce_oracle = DefaultNonceOracle(config.get('_SIGNER_ADDRESS'), conn) nonce_oracle = DefaultNonceOracle(config.get('_SIGNER_ADDRESS'), conn)
# Set up magic traffic handler
handler = Handler(config, chain_spec, CICRegistry, traffic_router)
# Set up syncer # Set up syncer
syncer_backend = MemBackend(str(chain_spec), 0) syncer_backend = MemBackend(str(chain_spec), 0)
o = block_latest() o = block_latest()
r = conn.do(o) r = conn.do(o)
block_offset = int(strip_0x(r), 16) + 1 block_offset = int(strip_0x(r), 16) + 1
syncer_backend.set(block_offset, 0) syncer_backend.set(block_offset, 0)
refresher_callback = Refresher(chain_spec, CICRegistry)
syncer = HeadSyncer(syncer_backend, loop_callback=refresher_callback.refresh)
syncer = HeadSyncer(syncer_backend, loop_callback=handler.refresh)
syncer.add_filter(handler)
syncer.loop(1, conn) syncer.loop(1, conn)
if __name__ == '__main__': if __name__ == '__main__':