Add magic arg parsing for token lookup api

This commit is contained in:
nolash 2021-10-09 16:21:03 +02:00
parent 66f94ae694
commit 9637cb61f8
Signed by untrusted user who does not match committer: lash
GPG Key ID: 21D2E7BB88C2A746
2 changed files with 88 additions and 3 deletions

View File

@ -23,6 +23,26 @@ logg = logging.getLogger()
class Api(ApiBase): class Api(ApiBase):
@staticmethod
def to_v_list(v, n):
if isinstance(v, str):
vv = v
v = []
for i in range(n):
v.append([vv])
elif not isinstance(v, list):
raise ValueError('argument must be single string, or list or strings or lists')
else:
if len(v) != n:
raise ValueError('v argument count must match integer n')
for i in range(n):
if isinstance(v[i], str):
v[i] = [v[i]]
elif not isinstance(v, list):
raise ValueError('proof argument must be single string, or list or strings or lists')
return v
def default_token(self): def default_token(self):
s_token = celery.signature( s_token = celery.signature(
@ -37,12 +57,21 @@ class Api(ApiBase):
def token(self, token_symbol, proof=None): def token(self, token_symbol, proof=None):
if not isinstance(token_symbol, str):
raise ValueError('token symbol must be string')
return self.tokens([token_symbol], proof=proof) return self.tokens([token_symbol], proof=proof)
def tokens(self, token_symbols, proof=None): def tokens(self, token_symbols, proof=None):
if isinstance(proof, str): if not isinstance(token_symbols, list):
proof = [proof] raise ValueError('token symbols argument must be list')
if proof == None:
logg.debug('looking up tokens without external proof check: {}'.format(','.join(token_symbols)))
proof = ''
proof = Api.to_v_list(proof, len(token_symbols))
chain_spec_dict = self.chain_spec.asdict() chain_spec_dict = self.chain_spec.asdict()
s_token_resolve = celery.signature( s_token_resolve = celery.signature(

View File

@ -40,6 +40,61 @@ def test_default_token(
assert r['address'] == foo_token assert r['address'] == foo_token
def test_to_v_list():
assert Api.to_v_list('foo', 1) == [['foo']]
assert Api.to_v_list(['foo'], 1) == [['foo']]
assert Api.to_v_list(['foo', 'bar'], 2) == [['foo'], ['bar']]
assert Api.to_v_list('foo', 3) == [['foo'], ['foo'], ['foo']]
assert Api.to_v_list([['foo'], ['bar']], 2) == [['foo'], ['bar']]
with pytest.raises(ValueError):
Api.to_v_list([['foo'], ['bar']], 3)
with pytest.raises(ValueError):
Api.to_v_list(['foo', 'bar'], 3)
with pytest.raises(ValueError):
Api.to_v_list([['foo'], ['bar'], ['baz']], 2)
assert Api.to_v_list([
['foo'],
'bar',
['inky', 'pinky', 'blinky', 'clyde'],
], 3) == [
['foo'],
['bar'],
['inky', 'pinky', 'blinky', 'clyde'],
]
def test_token_single(
default_chain_spec,
foo_token,
bar_token,
token_registry,
register_tokens,
register_lookups,
cic_registry,
init_database,
init_celery_tasks,
custodial_roles,
foo_token_declaration,
bar_token_declaration,
celery_session_worker,
):
api = Api(str(default_chain_spec), queue=None, callback_param='foo')
t = api.token('FOO', proof=None)
r = t.get()
logg.debug('rr {}'.format(r))
assert len(r) == 1
assert r[0]['address'] == strip_0x(foo_token)
t = api.token('FOO', proof=foo_token_declaration)
r = t.get()
assert len(r) == 1
assert r[0]['address'] == strip_0x(foo_token)
def test_tokens( def test_tokens(
default_chain_spec, default_chain_spec,
foo_token, foo_token,
@ -118,3 +173,4 @@ def test_tokens(
f.close() f.close()
assert v == b'\x00' assert v == b'\x00'
break break