Add canonical account create callback

This commit is contained in:
nolash 2021-02-20 10:47:01 +01:00
parent 7119d2e7ec
commit e7958aaf9e
Signed by untrusted user who does not match committer: lash
GPG Key ID: 21D2E7BB88C2A746
5 changed files with 120 additions and 56 deletions

View File

@ -1,2 +1,3 @@
[traffic] [traffic]
local.noop_traffic = 2 #local.noop_traffic = 2
local.account = 2

View File

@ -0,0 +1,25 @@
# standard imports
import logging
# external imports
from cic_eth.api.api_task import Api
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
queue = 'cic-eth'
name = 'account'
def do(tokens, accounts, aux, block_number, tx_index):
logg.debug('running {} {} {}'.format(__name__, tokens, accounts))
api = Api(
str(aux['chain_spec']),
queue=queue,
callback_param='{}:{}:{}:{}'.format(aux['redis_host_callback'], aux['redis_port_callback'], aux['redis_db'], aux['redis_channel']),
callback_task='cic_eth.callbacks.redis.redis',
callback_queue=queue,
)
t = api.create_account(register=True)
return (None, t,)

View File

@ -5,5 +5,7 @@ logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger() logg = logging.getLogger()
def do(config, tokens, accounts, block_number, tx_index): def do(tokens, accounts, aux, block_number, tx_index):
logg.debug('running {} {} {}'.format(__name__, tokens, accounts)) logg.debug('running {} {} {}'.format(__name__, tokens, accounts))
return (None, None,)

View File

@ -13,6 +13,7 @@ import random
import redis import redis
import confini import confini
import web3 import web3
import celery
from cic_registry import CICRegistry from cic_registry import CICRegistry
from cic_registry.chain import ChainRegistry from cic_registry.chain import ChainRegistry
from chainlib.chain import ChainSpec from chainlib.chain import ChainSpec
@ -42,15 +43,17 @@ default_data_dir = '/usr/local/share/cic/solidity/abi'
argparser = argparse.ArgumentParser() argparser = argparse.ArgumentParser()
argparser.add_argument('-p', type=str, help='Ethereum provider url') argparser.add_argument('-p', type=str, help='Ethereum provider url')
argparser.add_argument('-r', type=str, help='cic-registry address')
argparser.add_argument('-y', '--key-file', dest='y', type=str, help='Ethereum keystore file to use for signing') argparser.add_argument('-y', '--key-file', dest='y', type=str, help='Ethereum keystore file to use for signing')
argparser.add_argument('-c', type=str, default='./config', help='config file') argparser.add_argument('-c', type=str, default='./config', help='config file')
argparser.add_argument('-q', type=str, default='cic-eth', help='celery queue to submit to')
argparser.add_argument('-i', '--chain-spec', dest='i', type=str, help='chain spec') argparser.add_argument('-i', '--chain-spec', dest='i', type=str, help='chain spec')
argparser.add_argument('-v', action='store_true', help='be verbose') argparser.add_argument('-v', action='store_true', help='be verbose')
argparser.add_argument('-vv', action='store_true', help='be more verbose') argparser.add_argument('-vv', action='store_true', help='be more verbose')
argparser.add_argument('--abi-dir', dest='abi_dir', type=str, help='Directory containing bytecode and abi') argparser.add_argument('--abi-dir', dest='abi_dir', type=str, help='Directory containing bytecode and abi')
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration') argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
argparser.add_argument('--redis-host-callback', dest='redis_host_callback', default='localhost', type=str, help='redis host to use for callback')
argparser.add_argument('-r', type=str, help='cic-registry address') argparser.add_argument('--redis-port-callback', dest='redis_port_callback', default=6379, type=int, help='redis port to use for callback')
argparser.add_argument('--batch-size', dest='batch_size', default=10, type=int, help='number of events to process simultaneously') argparser.add_argument('--batch-size', dest='batch_size', default=10, type=int, help='number of events to process simultaneously')
args = argparser.parse_args() args = argparser.parse_args()
@ -80,6 +83,10 @@ if batchsize < 1:
logg.info('batch size {}'.format(batchsize)) logg.info('batch size {}'.format(batchsize))
config.add(batchsize, '_BATCH_SIZE', True) config.add(batchsize, '_BATCH_SIZE', True)
# redis task result callback
config.add(args.redis_host_callback, '_REDIS_HOST_CALLBACK', True)
config.add(args.redis_port_callback, '_REDIS_PORT_CALLBACK', True)
# signer # signer
keystore = DictKeystore() keystore = DictKeystore()
if args.y == None: if args.y == None:
@ -106,12 +113,15 @@ elif re.match(re_http, config.get('ETH_PROVIDER')):
blockchain_provider = web3.Web3.HTTPProvider(config.get('ETH_PROVIDER')) blockchain_provider = web3.Web3.HTTPProvider(config.get('ETH_PROVIDER'))
w3 = web3.Web3(blockchain_provider) w3 = web3.Web3(blockchain_provider)
# connect celery
celery_app = celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL'))
class TrafficItem: class TrafficItem:
def __init__(self, item): def __init__(self, item):
self.method = item.do self.method = item.do
self.uuid = uuid.uuid4() self.uuid = uuid.uuid4()
self.ext = None
self.complete = False self.complete = False
@ -151,8 +161,8 @@ class TrafficRouter:
return ti return ti
def release(self, k): def release(self, traffic_item):
del self.reserved[k] del self.reserved[traffic_item.uuid]
# parse traffic items # parse traffic items
@ -173,6 +183,9 @@ class TrafficTasker:
'account': None, 'account': None,
'token': None, 'token': None,
} }
default_aux = {
}
def __init__(self): def __init__(self):
@ -180,7 +193,7 @@ class TrafficTasker:
self.accounts = self.oracles['account'].get_accounts() self.accounts = self.oracles['account'].get_accounts()
self.balances = {} self.balances = {}
self.aux = {} self.aux = copy.copy(self.default_aux)
def load_balances(self): def load_balances(self):
@ -200,6 +213,69 @@ class TrafficTasker:
class Handler:
def __init__(self, config, traffic_router):
self.traffic_router = traffic_router
self.redis_channel = str(uuid.uuid4())
self.pubsub = self.__connect_redis(self.redis_channel, config)
self.traffic_items = {}
self.config = config
def __connect_redis(self, redis_channel, config):
r = redis.Redis(config.get('REDIS_HOST'), config.get('REDIS_PORT'), config.get('REDIS_DB'))
redis_pubsub = r.pubsub()
redis_pubsub.subscribe(redis_channel)
logg.debug('redis connected on channel {}'.format(redis_channel))
return redis_pubsub
def refresh(self, block_number, tx_index):
traffic_tasker = TrafficTasker()
traffic_tasker.add_aux('redis_channel', self.redis_channel)
if len(traffic_tasker.tokens) == 0:
logg.error('patiently waiting for at least one registered token...')
return
logg.debug('executing handler refresh with accouts {}'.format(traffic_tasker.accounts))
logg.debug('executing handler refresh with tokens {}'.format(traffic_tasker.tokens))
while True:
traffic_item = traffic_router.reserve()
if traffic_item == None:
logg.debug('no traffic_items left to reserve {}'.format(traffic_item))
break
(e, t,) = traffic_item.method(traffic_tasker.tokens, traffic_tasker.accounts, traffic_tasker.aux, block_number, tx_index)
if e != None:
logg.info('traffic method {} failed: {}'.format(traffic_item.name(), e))
continue
if t == None:
logg.info('traffic method {} completed immediately')
self.traffic_router.release(traffic_item)
traffic_item.ext = t
self.traffic_items[traffic_item.uuid] = traffic_item
# TODO: add drain
while True:
m = self.pubsub.get_message(timeout=0.01)
if m == None:
break
logg.debug('redis message {}'.format(m))
def name(self):
return 'traffic_item_handler'
def filter(self, conn, block, tx, session):
logg.debug('handler get {}'.format(tx))
class TokenOracle: class TokenOracle:
def __init__(self, chain_spec, registry): def __init__(self, chain_spec, registry):
@ -251,52 +327,6 @@ class AccountsOracle:
return copy.copy(self.accounts) return copy.copy(self.accounts)
class Handler:
def __init__(self, config, chain_spec, registry, traffic_router):
self.traffic_router = traffic_router
self.redis_channel = str(uuid.uuid4())
self.pubsub = self.__connect_redis(self.redis_channel, config)
def __connect_redis(self, redis_channel, config):
r = redis.Redis(config.get('REDIS_HOST'), config.get('REDIS_PORT'), config.get('REDIS_DB'))
redis_pubsub = r.pubsub()
redis_pubsub.subscribe(redis_channel)
logg.debug('redis connected on channel {}'.format(redis_channel))
return redis_pubsub
def refresh(self, block_number, tx_index):
token_tasker = TrafficTasker()
if len(token_tasker.tokens) == 0:
logg.error('no tokens yet')
return
while True:
item = traffic_router.reserve()
if item == None:
logg.debug('no items left to reserve {}'.format(item))
break
item.method(config, token_tasker.tokens, token_tasker.accounts, block_number, tx_index)
# TODO: add drain
while True:
m = self.pubsub.get_message(timeout=0.01)
if m == None:
break
def name(self):
return 'traffic_item_handler'
def filter(self, conn, block, tx, session):
logg.debug('handler get {}'.format(tx))
def main(local_config=None): def main(local_config=None):
if local_config != None: if local_config != None:
@ -327,7 +357,7 @@ def main(local_config=None):
nonce_oracle = DefaultNonceOracle(config.get('_SIGNER_ADDRESS'), conn) nonce_oracle = DefaultNonceOracle(config.get('_SIGNER_ADDRESS'), conn)
# Set up magic traffic handler # Set up magic traffic handler
handler = Handler(config, chain_spec, CICRegistry, traffic_router) handler = Handler(config, traffic_router)
# Set up syncer # Set up syncer
@ -337,9 +367,15 @@ def main(local_config=None):
block_offset = int(strip_0x(r), 16) + 1 block_offset = int(strip_0x(r), 16) + 1
syncer_backend.set(block_offset, 0) syncer_backend.set(block_offset, 0)
TrafficTasker.oracles['token']= TokenOracle(chain_spec, CICRegistry) TrafficTasker.oracles['token']= TokenOracle(chain_spec, CICRegistry)
TrafficTasker.oracles['account'] = AccountsOracle(chain_spec, CICRegistry) TrafficTasker.oracles['account'] = AccountsOracle(chain_spec, CICRegistry)
TrafficTasker.default_aux = {
'chain_spec': chain_spec,
'registry': CICRegistry,
'redis_host_callback': config.get('_REDIS_HOST_CALLBACK'),
'redis_port_callback': config.get('_REDIS_PORT_CALLBACK'),
'redis_db': config.get('REDIS_DB'),
}
syncer = HeadSyncer(syncer_backend, loop_callback=handler.refresh) syncer = HeadSyncer(syncer_backend, loop_callback=handler.refresh)
syncer.add_filter(handler) syncer.add_filter(handler)

View File

@ -237,7 +237,7 @@ services:
- -c - -c
- | - |
if [[ -f /tmp/cic/config/.env ]]; then source /tmp/cic/config/.env; fi if [[ -f /tmp/cic/config/.env ]]; then source /tmp/cic/config/.env; fi
./start_tasker.sh -q cic-eth -v ./start_tasker.sh -q cic-eth -vv
# command: [/bin/sh, "./start_tasker.sh", -q, cic-eth, -vv ] # command: [/bin/sh, "./start_tasker.sh", -q, cic-eth, -vv ]
cic-eth-manager-head: cic-eth-manager-head: