This commit is contained in:
William Luke 2022-01-05 12:16:48 +03:00
parent ea1198a898
commit 26616b7245
6 changed files with 258 additions and 212 deletions

View File

@ -1,152 +1,30 @@
import logging
import sys
from typing import List, Optional, Union
import redis
# standard imports
from cic_eth.server import cache, celery, converters
from cic_eth.server.config import args, config
from cic_eth.server.models import (DefaultToken, Token, TokenBalance,
Transaction)
# standard imports
from fastapi import FastAPI, Query
# define log levels
if args.vv:
logging.getLogger().setLevel(logging.DEBUG)
elif args.v:
logging.getLogger().setLevel(logging.INFO)
log = logging.getLogger(__name__)
# define universal redis cache access
cache.Cache.store = redis.StrictRedis(host=config.get('REDIS_HOST'), port=config.get(
'REDIS_PORT'), db=config.get('REDIS_DB'), decode_responses=True)
import cic_eth.cli
from cic_eth.server.app import create_app
from cic_eth.server.config import get_config
app = FastAPI(debug=True,
title="Grassroots Economics",
description="CIC ETH API",
version="0.0.1",
terms_of_service="https://www.grassrootseconomics.org/pages/terms-and-conditions.html",
contact={
"name": "Grassroots Economics",
"url": "https://www.grassrootseconomics.org",
"email": "will@grassecon.org"
},
license_info={
"name": "GPLv3",
})
config = get_config()
@app.get("/transactions", response_model=List[Transaction])
def transactions(address: str, limit: Optional[str] = 10):
return celery.call('list', address, limit=limit)
# Setup Celery App
celery_app = cic_eth.cli.CeleryApp.from_config(config)
celery_app.set_default()
@app.get("/balance", response_model=List[TokenBalance])
def balance(token_symbol: str, address: str = Query(..., title="Address", min_length=40, max_length=42), include_pending: bool = True):
log.info(f"address: {address}")
log.info(f"token_symbol: {token_symbol}")
data = celery.call('balance', address, token_symbol,
include_pending=include_pending)
for b in data:
token = get_token(token_symbol)
b['balance_network'] = converters.from_wei(
token.decimals, int(b['balance_network']))
b['balance_incoming'] = converters.from_wei(
token.decimals, int(b['balance_incoming']))
b['balance_outgoing'] = converters.from_wei(
token.decimals, int(b['balance_outgoing']))
b.update({
"balance_available": int(b['balance_network']) + int(b['balance_incoming']) - int(b['balance_outgoing'])
})
return data
chain_spec = config.get('CHAIN_SPEC')
celery_queue = config.get('CELERY_QUEUE')
redis_host = config.get('REDIS_HOST')
redis_port = config.get('REDIS_PORT')
redis_db = config.get('REDIS_DB')
redis_timeout = config.get('REDIS_TIMEOUT')
@app.post("/create_account")
def create_account(password: Optional[str] = None, register: bool = True):
data = celery.call('create_account', password=password, register=register)
return data
# def refill_gas(start_response, query: dict):
# address = query.pop('address')
# data = celery.call('refill_gas', address)
# return data
# def ping(start_response, query: dict):
# data = celery.call('ping', **query)
# return data
@app.post("/transfer")
def transfer(from_address: str, to_address: str, value: int, token_symbol: str):
token = get_token(
token_symbol)
wei_value = converters.to_wei(token.decimals, int(value))
data = celery.call('transfer', from_address,
to_address, wei_value, token_symbol)
return data
@app.post("/transfer_from")
def transfer_from(from_address: str, to_address: str, value: int, token_symbol: str, spender_address: str):
token = get_token(
token_symbol)
wei_value = converters.to_wei(token.decimals, int(value))
data = celery.call('transfer_from', from_address, to_address,
wei_value, token_symbol, spender_address)
return data
@app.get("/token", response_model=Token)
def token(token_symbol: str, proof: Optional[str] = None):
token = get_token(token_symbol)
if token == None:
sys.stderr.write(f"Cached Token {token_symbol} not found")
data = celery.call('token', token_symbol, proof=proof)
cache.set_token_data(token_symbol, data)
token = Token.new(data)
sys.stderr.write(f"Token {token}")
return token
@app.get("/tokens", response_model=List[Token])
def tokens(token_symbols: Optional[List[str]] = Query(None), proof: Optional[Union[str, List[str], List[List[str]]]] = None):
data = celery.call('tokens', token_symbols,
catch=len(token_symbols), proof=proof)
if data:
tokens = []
for token in data:
print(f"Token: {token}")
tokens.append(Token.new(token))
return tokens
return None
@app.get("/default_token", response_model=DefaultToken)
def default_token():
data = cache.get_default_token()
if data is None:
data = celery.call('default_token')
if data is not None:
cache.set_default_token(data)
return data
def get_token(token_symbol: str):
data = cache.get_token_data(token_symbol)
log.debug(f"cached token data: {data}")
if data == None:
data = celery.call('token', token_symbol)
log.debug(
f"No token data setting token data for: {token_symbol} to {data}")
cache.set_token_data(token_symbol, data)
return Token.new(data)
app = create_app(chain_spec,
celery_queue,
redis_host,
redis_port,
redis_db,
redis_timeout)
if __name__ == "__main__":
import uvicorn

View File

@ -0,0 +1,138 @@
import logging
import sys
from typing import List, Optional, Union
from cic_eth.server import cache, converters
from cic_eth.server.cache import setup_cache
from cic_eth.server.celery import create_celery_wrapper
from cic_eth.server.models import (DefaultToken, Token, TokenBalance,
Transaction)
from fastapi import FastAPI, Query
log = logging.getLogger(__name__)
def create_app(chain_spec,
celery_queue,
redis_host,
redis_port,
redis_db,
redis_timeout):
setup_cache(redis_db=redis_db, redis_host=redis_host,
redis_port=redis_port)
celery_wrapper = create_celery_wrapper(celery_queue=celery_queue, chain_spec=chain_spec,
redis_db=redis_db, redis_host=redis_host, redis_port=redis_port, redis_timeout=redis_timeout)
app = FastAPI(debug=True,
title="Grassroots Economics",
description="CIC ETH API",
version="0.0.1",
terms_of_service="https://www.grassrootseconomics.org/pages/terms-and-conditions.html",
contact={
"name": "Grassroots Economics",
"url": "https://www.grassrootseconomics.org",
"email": "will@grassecon.org"
},
license_info={
"name": "GPLv3",
})
@app.get("/transactions", response_model=List[Transaction])
def transactions(address: str, limit: Optional[str] = 10):
return celery_wrapper('list', address, limit=limit)
@app.get("/balance", response_model=List[TokenBalance])
def balance(token_symbol: str, address: str = Query(..., title="Address", min_length=40, max_length=42), include_pending: bool = True):
log.info(f"address: {address}")
log.info(f"token_symbol: {token_symbol}")
data = celery_wrapper('balance', address, token_symbol,
include_pending=include_pending)
for b in data:
token = get_token(token_symbol)
b['balance_network'] = converters.from_wei(
token.decimals, int(b['balance_network']))
b['balance_incoming'] = converters.from_wei(
token.decimals, int(b['balance_incoming']))
b['balance_outgoing'] = converters.from_wei(
token.decimals, int(b['balance_outgoing']))
b.update({
"balance_available": int(b['balance_network']) + int(b['balance_incoming']) - int(b['balance_outgoing'])
})
return data
@app.post("/create_account")
def create_account(password: Optional[str] = None, register: bool = True):
data = celery_wrapper(
'create_account', password=password, register=register)
return data
# def refill_gas(start_response, query: dict):
# address = query.pop('address')
# data = celery_wrapper('refill_gas', address)
# return data
# def ping(start_response, query: dict):
# data = celery_wrapper('ping', **query)
# return data
@app.post("/transfer")
def transfer(from_address: str, to_address: str, value: int, token_symbol: str):
token = get_token(
token_symbol)
wei_value = converters.to_wei(token.decimals, int(value))
data = celery_wrapper('transfer', from_address,
to_address, wei_value, token_symbol)
return data
@app.post("/transfer_from")
def transfer_from(from_address: str, to_address: str, value: int, token_symbol: str, spender_address: str):
token = get_token(
token_symbol)
wei_value = converters.to_wei(token.decimals, int(value))
data = celery_wrapper('transfer_from', from_address, to_address,
wei_value, token_symbol, spender_address)
return data
@app.get("/token", response_model=Token)
def token(token_symbol: str, proof: Optional[str] = None):
token = get_token(token_symbol)
if token == None:
sys.stderr.write(f"Cached Token {token_symbol} not found")
data = celery_wrapper('token', token_symbol, proof=proof)
cache.set_token_data(token_symbol, data)
token = Token.new(data)
sys.stderr.write(f"Token {token}")
return token
@app.get("/tokens", response_model=List[Token])
def tokens(token_symbols: Optional[List[str]] = Query(None), proof: Optional[Union[str, List[str], List[List[str]]]] = None):
data = celery_wrapper('tokens', token_symbols,
catch=len(token_symbols), proof=proof)
if data:
tokens = []
for token in data:
print(f"Token: {token}")
tokens.append(Token.new(token))
return tokens
return None
@app.get("/default_token", response_model=DefaultToken)
def default_token():
data = cache.get_default_token()
if data is None:
data = celery_wrapper('default_token')
if data is not None:
cache.set_default_token(data)
return data
def get_token(token_symbol: str):
data = cache.get_token_data(token_symbol)
log.debug(f"cached token data: {data}")
if data == None:
data = celery_wrapper('token', token_symbol)
log.debug(
f"No token data setting token data for: {token_symbol} to {data}")
cache.set_token_data(token_symbol, data)
return Token.new(data)
return app

View File

@ -3,10 +3,10 @@ import hashlib
import json
import logging
from typing import Optional, Union
from cic_eth.server.models import Token
# external imports
from cic_types.condiments import MetadataPointer
from redis import Redis
from redis import Redis, StrictRedis
logg = logging.getLogger(__file__)
@ -15,6 +15,12 @@ class Cache:
store: Redis = None
def setup_cache(redis_host, redis_port, redis_db):
# Define universal redis cache access
Cache.store = StrictRedis(
host=redis_host, port=redis_port, db=redis_db, decode_responses=True)
def get_token_data(token_symbol: str):
"""
:param token_symbol:

View File

@ -3,67 +3,61 @@ import logging
import sys
import uuid
import cic_eth.cli
import redis
from cic_eth.api.api_task import Api
from cic_eth.server.config import config
log = logging.getLogger(__name__)
celery_app = cic_eth.cli.CeleryApp.from_config(config)
celery_app.set_default()
def create_celery_wrapper(chain_spec,
celery_queue,
redis_host,
redis_port,
redis_db,
redis_timeout):
def call(method, *args, catch=1, **kwargs):
""" Creates a redis channel and calls `cic_eth.api` with the provided `method` and `*args`. Returns the result of the api call. Catch allows you to specify how many messages to catch before returning.
"""
log.debug(f"Using chainspec: {chain_spec}")
redis_channel = str(uuid.uuid4())
r = redis.Redis(redis_host, redis_port, redis_db)
ps = r.pubsub()
ps.subscribe(redis_channel)
api = Api(
chain_spec,
queue=celery_queue,
callback_param='{}:{}:{}:{}'.format(
redis_host, redis_port, redis_db, redis_channel),
callback_task='cic_eth.callbacks.redis.redis',
callback_queue=celery_queue,
)
getattr(api, method)(*args, **kwargs)
ps.get_message()
try:
data = []
if catch == 1:
message = ps.get_message(timeout=redis_timeout)
data = json.loads(message['data'])["result"]
else:
for _i in range(catch):
message = ps.get_message(
timeout=redis_timeout)
result = json.loads(message['data'])["result"]
data.append(result)
chain_spec = config.get('CHAIN_SPEC')
celery_queue = config.get('CELERY_QUEUE')
except TimeoutError as e:
sys.stderr.write(
f"cic_eth.api.{method}({args}, {kwargs}) timed out:\n {e}")
raise e
except Exception as e:
sys.stderr.write(
f'Unable to parse Data:\n{data}\n Error:\n{e}')
raise e
redis_host = config.get('REDIS_HOST')
redis_port = config.get('REDIS_PORT')
redis_db = config.get('REDIS_DB')
log.debug(
f"cic_eth.api.{method}(args={args}, kwargs={kwargs})\n {data}")
def call(method, *args, catch=1, **kwargs):
""" Creates a redis channel and calls `cic_eth.api` with the provided `method` and `*args`. Returns the result of the api call. Catch allows you to specify how many messages to catch before returning.
"""
log.debug(f"Using chainspec: {chain_spec}")
redis_channel = str(uuid.uuid4())
r = redis.Redis(redis_host, redis_port, redis_db)
ps = r.pubsub()
ps.subscribe(redis_channel)
api = Api(
chain_spec,
queue=celery_queue,
callback_param='{}:{}:{}:{}'.format(
redis_host, redis_port, redis_db, redis_channel),
callback_task='cic_eth.callbacks.redis.redis',
callback_queue=celery_queue,
)
getattr(api, method)(*args, **kwargs)
ps.get_message()
try:
data = []
if catch == 1:
message = ps.get_message(timeout=config.get('REDIS_TIMEOUT'))
data = json.loads(message['data'])["result"]
else:
for _i in range(catch):
message = ps.get_message(timeout=config.get('REDIS_TIMEOUT'))
result = json.loads(message['data'])["result"]
data.append(result)
except TimeoutError as e:
sys.stderr.write(
f"cic_eth.api.{method}({args}, {kwargs}) timed out:\n {e}")
raise e
except Exception as e:
sys.stderr.write(
f'Unable to parse Data:\n{data}\n Error:\n{e}')
return None
log.debug(
f"cic_eth.api.{method}(args={args}, kwargs={kwargs})\n {data}")
ps.unsubscribe()
return data
ps.unsubscribe()
return data
return call

View File

@ -1,8 +1,20 @@
import logging
import cic_eth.cli
arg_flags = cic_eth.cli.argflag_std_base
local_arg_flags = cic_eth.cli.argflag_local_taskcallback
argparser = cic_eth.cli.ArgumentParser(arg_flags)
argparser.process_local_flags(local_arg_flags)
args = argparser.parse_args([])
config = cic_eth.cli.Config.from_args(args, arg_flags, local_arg_flags)
def get_config():
# Parse Args
arg_flags = cic_eth.cli.argflag_std_base
local_arg_flags = cic_eth.cli.argflag_local_taskcallback
argparser = cic_eth.cli.ArgumentParser(arg_flags)
argparser.process_local_flags(local_arg_flags)
args = argparser.parse_args([])
config = cic_eth.cli.Config.from_args(args, arg_flags, local_arg_flags)
# Define log levels
if args.vv:
logging.getLogger().setLevel(logging.DEBUG)
elif args.v:
logging.getLogger().setLevel(logging.INFO)
return config

View File

@ -5,30 +5,47 @@ import logging
import time
import hexathon
from cic_eth.runnable.daemons.server import app
from cic_eth.server.app import create_app
from cic_eth_registry.pytest.fixtures_tokens import *
from fastapi.testclient import TestClient
log = logging.getLogger(__name__)
client = TestClient(app)
@pytest.fixture(scope='session')
def client(config, default_chain_spec, celery_session_worker):
config.add(str(default_chain_spec), 'CHAIN_SPEC', exists_ok=True)
config.add('cic_eth_test', 'CELERY_QUEUE', exists_ok=True)
chain_spec = config.get('CHAIN_SPEC')
celery_queue = config.get('CELERY_QUEUE')
redis_host = config.get('REDIS_REDIS_HOST')
redis_port = config.get('REDIS_REDIS_PORT')
redis_db = config.get('REDIS_DB')
redis_timeout = 20
app = create_app(str(chain_spec), celery_queue, redis_host,
redis_port, redis_db, redis_timeout)
return TestClient(app)
def test_default_token():
def test_default_token(client, foo_token, default_chain_spec):
print(foo_token)
print(f"default_chain_spec: {default_chain_spec.asdict()}")
# Default Token
response = client.get('/default_token')
log.debug(f"balance response {response}")
default_token = response.json()
assert default_token == {
'address': '3FF776B6f888980DEf9d4220858803f9dC5e341e',
'decimals': 6,
'decimals': 7,
'name': 'Giftable Token',
'symbol': 'GFT',
}
def test_token():
def test_token(client):
# Default Token
response = client.get('/token?token_symbol=GFT')
response = client.get('/token?token_symbol=FOO')
log.debug(f"token response {response}")
token = response.json()
assert token == {
@ -43,9 +60,10 @@ def test_token():
}
def test_tokens():
def test_tokens(client):
# Default Token
response = client.get('/tokens', params={'token_symbols': ['GFT', 'COFE']})
response = client.get(
'/tokens', params={'token_symbols': ['GFT', 'COFE', 'FOO']})
log.debug(f"tokens response {response}")
tokens = response.json()
@ -70,7 +88,7 @@ def test_tokens():
]
def test_account():
def test_account(client):
# Default Token
response = client.get('/default_token')
log.debug(f"balance response {response}")