Introduce traffic tasker, handling chain state retrieval

This commit is contained in:
nolash 2021-02-20 09:39:28 +01:00
parent 47b107c776
commit 7119d2e7ec
Signed by untrusted user who does not match committer: lash
GPG Key ID: 21D2E7BB88C2A746

View File

@ -167,6 +167,39 @@ for k in config.all():
sys.exit(1) sys.exit(1)
class TrafficTasker:
oracles = {
'account': None,
'token': None,
}
def __init__(self):
self.tokens = self.oracles['token'].get_tokens()
self.accounts = self.oracles['account'].get_accounts()
self.balances = {}
self.aux = {}
def load_balances(self):
pass
def cache_balance(self, holder_address, token, value):
if self.balances.get(holder_address) == None:
self.balances[holder_address] = {}
self.balances[holder_address][token] = value
logg.debug('setting cached balance of {} token {} to {}'.format(holder_address, token, value))
def add_aux(self, k, v):
logg.debug('added {} = {} to traffictasker'.format(k, v))
self.aux[k] = v
class TokenOracle: class TokenOracle:
def __init__(self, chain_spec, registry): def __init__(self, chain_spec, registry):
@ -221,10 +254,6 @@ class AccountsOracle:
class Handler: class Handler:
def __init__(self, config, chain_spec, registry, traffic_router): def __init__(self, config, chain_spec, registry, traffic_router):
self.chain_spec = chain_spec
self.registry = registry
self.token_oracle = TokenOracle(self.chain_spec, self.registry)
self.accounts_oracle = AccountsOracle(self.chain_spec, self.registry)
self.traffic_router = traffic_router self.traffic_router = traffic_router
self.redis_channel = str(uuid.uuid4()) self.redis_channel = str(uuid.uuid4())
self.pubsub = self.__connect_redis(self.redis_channel, config) self.pubsub = self.__connect_redis(self.redis_channel, config)
@ -239,25 +268,25 @@ class Handler:
def refresh(self, block_number, tx_index): def refresh(self, block_number, tx_index):
tokens = self.token_oracle.get_tokens()
accounts = self.accounts_oracle.get_accounts()
if len(accounts) == 0: token_tasker = TrafficTasker()
logg.error('no accounts yet')
#return
elif len(tokens) == 0: if len(token_tasker.tokens) == 0:
logg.error('no tokens yet') logg.error('no tokens yet')
#return return
item = traffic_router.reserve() while True:
if item != None: item = traffic_router.reserve()
item.method(config, tokens, accounts, block_number, tx_index) if item == None:
logg.debug('no items left to reserve {}'.format(item))
break
item.method(config, token_tasker.tokens, token_tasker.accounts, block_number, tx_index)
# TODO: add drain # TODO: add drain
m = self.pubsub.get_message(timeout=0.01) while True:
if m != None m = self.pubsub.get_message(timeout=0.01)
pass if m == None:
break
def name(self): def name(self):
@ -309,6 +338,9 @@ def main(local_config=None):
syncer_backend.set(block_offset, 0) syncer_backend.set(block_offset, 0)
TrafficTasker.oracles['token']= TokenOracle(chain_spec, CICRegistry)
TrafficTasker.oracles['account'] = AccountsOracle(chain_spec, CICRegistry)
syncer = HeadSyncer(syncer_backend, loop_callback=handler.refresh) syncer = HeadSyncer(syncer_backend, loop_callback=handler.refresh)
syncer.add_filter(handler) syncer.add_filter(handler)
syncer.loop(1, conn) syncer.loop(1, conn)