From 26616b72458e1849b85dca0396c4ba6ddf7467b9 Mon Sep 17 00:00:00 2001 From: William Luke Date: Wed, 5 Jan 2022 12:16:48 +0300 Subject: [PATCH] refactor --- .../cic_eth/runnable/daemons/server.py | 160 +++--------------- apps/cic-eth/cic_eth/server/app.py | 138 +++++++++++++++ apps/cic-eth/cic_eth/server/cache.py | 10 +- apps/cic-eth/cic_eth/server/celery.py | 102 ++++++----- apps/cic-eth/cic_eth/server/config.py | 24 ++- apps/cic-eth/tests/test_server.py | 36 +++- 6 files changed, 258 insertions(+), 212 deletions(-) create mode 100644 apps/cic-eth/cic_eth/server/app.py diff --git a/apps/cic-eth/cic_eth/runnable/daemons/server.py b/apps/cic-eth/cic_eth/runnable/daemons/server.py index 3aeabf36..c0eb29fc 100644 --- a/apps/cic-eth/cic_eth/runnable/daemons/server.py +++ b/apps/cic-eth/cic_eth/runnable/daemons/server.py @@ -1,152 +1,30 @@ -import logging -import sys -from typing import List, Optional, Union - -import redis -# standard imports -from cic_eth.server import cache, celery, converters -from cic_eth.server.config import args, config -from cic_eth.server.models import (DefaultToken, Token, TokenBalance, - Transaction) -# standard imports -from fastapi import FastAPI, Query - -# define log levels -if args.vv: - logging.getLogger().setLevel(logging.DEBUG) -elif args.v: - logging.getLogger().setLevel(logging.INFO) - -log = logging.getLogger(__name__) - -# define universal redis cache access -cache.Cache.store = redis.StrictRedis(host=config.get('REDIS_HOST'), port=config.get( - 'REDIS_PORT'), db=config.get('REDIS_DB'), decode_responses=True) +import cic_eth.cli +from cic_eth.server.app import create_app +from cic_eth.server.config import get_config -app = FastAPI(debug=True, - title="Grassroots Economics", - description="CIC ETH API", - version="0.0.1", - terms_of_service="https://www.grassrootseconomics.org/pages/terms-and-conditions.html", - contact={ - "name": "Grassroots Economics", - "url": "https://www.grassrootseconomics.org", - "email": "will@grassecon.org" - }, - license_info={ - "name": "GPLv3", - }) +config = get_config() -@app.get("/transactions", response_model=List[Transaction]) -def transactions(address: str, limit: Optional[str] = 10): - return celery.call('list', address, limit=limit) +# Setup Celery App +celery_app = cic_eth.cli.CeleryApp.from_config(config) +celery_app.set_default() -@app.get("/balance", response_model=List[TokenBalance]) -def balance(token_symbol: str, address: str = Query(..., title="Address", min_length=40, max_length=42), include_pending: bool = True): - log.info(f"address: {address}") - log.info(f"token_symbol: {token_symbol}") - data = celery.call('balance', address, token_symbol, - include_pending=include_pending) - for b in data: - token = get_token(token_symbol) - b['balance_network'] = converters.from_wei( - token.decimals, int(b['balance_network'])) - b['balance_incoming'] = converters.from_wei( - token.decimals, int(b['balance_incoming'])) - b['balance_outgoing'] = converters.from_wei( - token.decimals, int(b['balance_outgoing'])) - - b.update({ - "balance_available": int(b['balance_network']) + int(b['balance_incoming']) - int(b['balance_outgoing']) - }) - return data +chain_spec = config.get('CHAIN_SPEC') +celery_queue = config.get('CELERY_QUEUE') +redis_host = config.get('REDIS_HOST') +redis_port = config.get('REDIS_PORT') +redis_db = config.get('REDIS_DB') +redis_timeout = config.get('REDIS_TIMEOUT') -@app.post("/create_account") -def create_account(password: Optional[str] = None, register: bool = True): - data = celery.call('create_account', password=password, register=register) - return data - - -# def refill_gas(start_response, query: dict): -# address = query.pop('address') -# data = celery.call('refill_gas', address) -# return data - - -# def ping(start_response, query: dict): -# data = celery.call('ping', **query) -# return data - -@app.post("/transfer") -def transfer(from_address: str, to_address: str, value: int, token_symbol: str): - token = get_token( - token_symbol) - wei_value = converters.to_wei(token.decimals, int(value)) - data = celery.call('transfer', from_address, - to_address, wei_value, token_symbol) - return data - - -@app.post("/transfer_from") -def transfer_from(from_address: str, to_address: str, value: int, token_symbol: str, spender_address: str): - token = get_token( - token_symbol) - wei_value = converters.to_wei(token.decimals, int(value)) - data = celery.call('transfer_from', from_address, to_address, - wei_value, token_symbol, spender_address) - return data - - -@app.get("/token", response_model=Token) -def token(token_symbol: str, proof: Optional[str] = None): - token = get_token(token_symbol) - if token == None: - sys.stderr.write(f"Cached Token {token_symbol} not found") - data = celery.call('token', token_symbol, proof=proof) - cache.set_token_data(token_symbol, data) - token = Token.new(data) - sys.stderr.write(f"Token {token}") - - return token - - -@app.get("/tokens", response_model=List[Token]) -def tokens(token_symbols: Optional[List[str]] = Query(None), proof: Optional[Union[str, List[str], List[List[str]]]] = None): - data = celery.call('tokens', token_symbols, - catch=len(token_symbols), proof=proof) - if data: - tokens = [] - for token in data: - print(f"Token: {token}") - tokens.append(Token.new(token)) - return tokens - return None - - -@app.get("/default_token", response_model=DefaultToken) -def default_token(): - data = cache.get_default_token() - if data is None: - data = celery.call('default_token') - if data is not None: - cache.set_default_token(data) - return data - - -def get_token(token_symbol: str): - data = cache.get_token_data(token_symbol) - log.debug(f"cached token data: {data}") - if data == None: - data = celery.call('token', token_symbol) - log.debug( - f"No token data setting token data for: {token_symbol} to {data}") - cache.set_token_data(token_symbol, data) - return Token.new(data) - +app = create_app(chain_spec, + celery_queue, + redis_host, + redis_port, + redis_db, + redis_timeout) if __name__ == "__main__": import uvicorn diff --git a/apps/cic-eth/cic_eth/server/app.py b/apps/cic-eth/cic_eth/server/app.py new file mode 100644 index 00000000..53321561 --- /dev/null +++ b/apps/cic-eth/cic_eth/server/app.py @@ -0,0 +1,138 @@ +import logging +import sys +from typing import List, Optional, Union + +from cic_eth.server import cache, converters +from cic_eth.server.cache import setup_cache +from cic_eth.server.celery import create_celery_wrapper +from cic_eth.server.models import (DefaultToken, Token, TokenBalance, + Transaction) +from fastapi import FastAPI, Query + +log = logging.getLogger(__name__) + + +def create_app(chain_spec, + celery_queue, + redis_host, + redis_port, + redis_db, + redis_timeout): + setup_cache(redis_db=redis_db, redis_host=redis_host, + redis_port=redis_port) + celery_wrapper = create_celery_wrapper(celery_queue=celery_queue, chain_spec=chain_spec, + redis_db=redis_db, redis_host=redis_host, redis_port=redis_port, redis_timeout=redis_timeout) + app = FastAPI(debug=True, + title="Grassroots Economics", + description="CIC ETH API", + version="0.0.1", + terms_of_service="https://www.grassrootseconomics.org/pages/terms-and-conditions.html", + contact={ + "name": "Grassroots Economics", + "url": "https://www.grassrootseconomics.org", + "email": "will@grassecon.org" + }, + license_info={ + "name": "GPLv3", + }) + + @app.get("/transactions", response_model=List[Transaction]) + def transactions(address: str, limit: Optional[str] = 10): + return celery_wrapper('list', address, limit=limit) + + @app.get("/balance", response_model=List[TokenBalance]) + def balance(token_symbol: str, address: str = Query(..., title="Address", min_length=40, max_length=42), include_pending: bool = True): + log.info(f"address: {address}") + log.info(f"token_symbol: {token_symbol}") + data = celery_wrapper('balance', address, token_symbol, + include_pending=include_pending) + for b in data: + token = get_token(token_symbol) + b['balance_network'] = converters.from_wei( + token.decimals, int(b['balance_network'])) + b['balance_incoming'] = converters.from_wei( + token.decimals, int(b['balance_incoming'])) + b['balance_outgoing'] = converters.from_wei( + token.decimals, int(b['balance_outgoing'])) + + b.update({ + "balance_available": int(b['balance_network']) + int(b['balance_incoming']) - int(b['balance_outgoing']) + }) + return data + + @app.post("/create_account") + def create_account(password: Optional[str] = None, register: bool = True): + data = celery_wrapper( + 'create_account', password=password, register=register) + return data + + # def refill_gas(start_response, query: dict): + # address = query.pop('address') + # data = celery_wrapper('refill_gas', address) + # return data + + # def ping(start_response, query: dict): + # data = celery_wrapper('ping', **query) + # return data + + @app.post("/transfer") + def transfer(from_address: str, to_address: str, value: int, token_symbol: str): + token = get_token( + token_symbol) + wei_value = converters.to_wei(token.decimals, int(value)) + data = celery_wrapper('transfer', from_address, + to_address, wei_value, token_symbol) + return data + + @app.post("/transfer_from") + def transfer_from(from_address: str, to_address: str, value: int, token_symbol: str, spender_address: str): + token = get_token( + token_symbol) + wei_value = converters.to_wei(token.decimals, int(value)) + data = celery_wrapper('transfer_from', from_address, to_address, + wei_value, token_symbol, spender_address) + return data + + @app.get("/token", response_model=Token) + def token(token_symbol: str, proof: Optional[str] = None): + token = get_token(token_symbol) + if token == None: + sys.stderr.write(f"Cached Token {token_symbol} not found") + data = celery_wrapper('token', token_symbol, proof=proof) + cache.set_token_data(token_symbol, data) + token = Token.new(data) + sys.stderr.write(f"Token {token}") + + return token + + @app.get("/tokens", response_model=List[Token]) + def tokens(token_symbols: Optional[List[str]] = Query(None), proof: Optional[Union[str, List[str], List[List[str]]]] = None): + data = celery_wrapper('tokens', token_symbols, + catch=len(token_symbols), proof=proof) + if data: + tokens = [] + for token in data: + print(f"Token: {token}") + tokens.append(Token.new(token)) + return tokens + return None + + @app.get("/default_token", response_model=DefaultToken) + def default_token(): + data = cache.get_default_token() + if data is None: + data = celery_wrapper('default_token') + if data is not None: + cache.set_default_token(data) + return data + + def get_token(token_symbol: str): + data = cache.get_token_data(token_symbol) + log.debug(f"cached token data: {data}") + if data == None: + data = celery_wrapper('token', token_symbol) + log.debug( + f"No token data setting token data for: {token_symbol} to {data}") + cache.set_token_data(token_symbol, data) + return Token.new(data) + return app diff --git a/apps/cic-eth/cic_eth/server/cache.py b/apps/cic-eth/cic_eth/server/cache.py index c801f2bf..2901f3f8 100644 --- a/apps/cic-eth/cic_eth/server/cache.py +++ b/apps/cic-eth/cic_eth/server/cache.py @@ -3,10 +3,10 @@ import hashlib import json import logging from typing import Optional, Union + from cic_eth.server.models import Token -# external imports from cic_types.condiments import MetadataPointer -from redis import Redis +from redis import Redis, StrictRedis logg = logging.getLogger(__file__) @@ -15,6 +15,12 @@ class Cache: store: Redis = None +def setup_cache(redis_host, redis_port, redis_db): + # Define universal redis cache access + Cache.store = StrictRedis( + host=redis_host, port=redis_port, db=redis_db, decode_responses=True) + + def get_token_data(token_symbol: str): """ :param token_symbol: diff --git a/apps/cic-eth/cic_eth/server/celery.py b/apps/cic-eth/cic_eth/server/celery.py index 74404c03..8271a372 100644 --- a/apps/cic-eth/cic_eth/server/celery.py +++ b/apps/cic-eth/cic_eth/server/celery.py @@ -3,67 +3,61 @@ import logging import sys import uuid -import cic_eth.cli import redis from cic_eth.api.api_task import Api -from cic_eth.server.config import config log = logging.getLogger(__name__) -celery_app = cic_eth.cli.CeleryApp.from_config(config) -celery_app.set_default() +def create_celery_wrapper(chain_spec, + celery_queue, + redis_host, + redis_port, + redis_db, + redis_timeout): + def call(method, *args, catch=1, **kwargs): + """ Creates a redis channel and calls `cic_eth.api` with the provided `method` and `*args`. Returns the result of the api call. Catch allows you to specify how many messages to catch before returning. + """ + log.debug(f"Using chainspec: {chain_spec}") + redis_channel = str(uuid.uuid4()) + r = redis.Redis(redis_host, redis_port, redis_db) + ps = r.pubsub() + ps.subscribe(redis_channel) + api = Api( + chain_spec, + queue=celery_queue, + callback_param='{}:{}:{}:{}'.format( + redis_host, redis_port, redis_db, redis_channel), + callback_task='cic_eth.callbacks.redis.redis', + callback_queue=celery_queue, + ) + getattr(api, method)(*args, **kwargs) + ps.get_message() + try: + data = [] + if catch == 1: + message = ps.get_message(timeout=redis_timeout) + data = json.loads(message['data'])["result"] + else: + for _i in range(catch): + message = ps.get_message( + timeout=redis_timeout) + result = json.loads(message['data'])["result"] + data.append(result) -chain_spec = config.get('CHAIN_SPEC') -celery_queue = config.get('CELERY_QUEUE') + except TimeoutError as e: + sys.stderr.write( + f"cic_eth.api.{method}({args}, {kwargs}) timed out:\n {e}") + raise e + except Exception as e: + sys.stderr.write( + f'Unable to parse Data:\n{data}\n Error:\n{e}') + raise e -redis_host = config.get('REDIS_HOST') -redis_port = config.get('REDIS_PORT') -redis_db = config.get('REDIS_DB') + log.debug( + f"cic_eth.api.{method}(args={args}, kwargs={kwargs})\n {data}") - -def call(method, *args, catch=1, **kwargs): - """ Creates a redis channel and calls `cic_eth.api` with the provided `method` and `*args`. Returns the result of the api call. Catch allows you to specify how many messages to catch before returning. - """ - log.debug(f"Using chainspec: {chain_spec}") - redis_channel = str(uuid.uuid4()) - r = redis.Redis(redis_host, redis_port, redis_db) - ps = r.pubsub() - ps.subscribe(redis_channel) - api = Api( - chain_spec, - queue=celery_queue, - callback_param='{}:{}:{}:{}'.format( - redis_host, redis_port, redis_db, redis_channel), - callback_task='cic_eth.callbacks.redis.redis', - callback_queue=celery_queue, - ) - getattr(api, method)(*args, **kwargs) - - ps.get_message() - try: - data = [] - if catch == 1: - message = ps.get_message(timeout=config.get('REDIS_TIMEOUT')) - data = json.loads(message['data'])["result"] - else: - for _i in range(catch): - message = ps.get_message(timeout=config.get('REDIS_TIMEOUT')) - result = json.loads(message['data'])["result"] - data.append(result) - - except TimeoutError as e: - sys.stderr.write( - f"cic_eth.api.{method}({args}, {kwargs}) timed out:\n {e}") - raise e - except Exception as e: - sys.stderr.write( - f'Unable to parse Data:\n{data}\n Error:\n{e}') - return None - - log.debug( - f"cic_eth.api.{method}(args={args}, kwargs={kwargs})\n {data}") - - ps.unsubscribe() - return data + ps.unsubscribe() + return data + return call diff --git a/apps/cic-eth/cic_eth/server/config.py b/apps/cic-eth/cic_eth/server/config.py index 35e60169..f10ef0a4 100644 --- a/apps/cic-eth/cic_eth/server/config.py +++ b/apps/cic-eth/cic_eth/server/config.py @@ -1,8 +1,20 @@ +import logging + import cic_eth.cli -arg_flags = cic_eth.cli.argflag_std_base -local_arg_flags = cic_eth.cli.argflag_local_taskcallback -argparser = cic_eth.cli.ArgumentParser(arg_flags) -argparser.process_local_flags(local_arg_flags) -args = argparser.parse_args([]) -config = cic_eth.cli.Config.from_args(args, arg_flags, local_arg_flags) + +def get_config(): + # Parse Args + arg_flags = cic_eth.cli.argflag_std_base + local_arg_flags = cic_eth.cli.argflag_local_taskcallback + argparser = cic_eth.cli.ArgumentParser(arg_flags) + argparser.process_local_flags(local_arg_flags) + args = argparser.parse_args([]) + config = cic_eth.cli.Config.from_args(args, arg_flags, local_arg_flags) + # Define log levels + if args.vv: + logging.getLogger().setLevel(logging.DEBUG) + elif args.v: + logging.getLogger().setLevel(logging.INFO) + + return config diff --git a/apps/cic-eth/tests/test_server.py b/apps/cic-eth/tests/test_server.py index f45bf71d..0969b380 100644 --- a/apps/cic-eth/tests/test_server.py +++ b/apps/cic-eth/tests/test_server.py @@ -5,30 +5,47 @@ import logging import time import hexathon -from cic_eth.runnable.daemons.server import app +from cic_eth.server.app import create_app +from cic_eth_registry.pytest.fixtures_tokens import * from fastapi.testclient import TestClient log = logging.getLogger(__name__) -client = TestClient(app) + +@pytest.fixture(scope='session') +def client(config, default_chain_spec, celery_session_worker): + config.add(str(default_chain_spec), 'CHAIN_SPEC', exists_ok=True) + config.add('cic_eth_test', 'CELERY_QUEUE', exists_ok=True) + chain_spec = config.get('CHAIN_SPEC') + celery_queue = config.get('CELERY_QUEUE') + redis_host = config.get('REDIS_REDIS_HOST') + redis_port = config.get('REDIS_REDIS_PORT') + redis_db = config.get('REDIS_DB') + redis_timeout = 20 + + app = create_app(str(chain_spec), celery_queue, redis_host, + redis_port, redis_db, redis_timeout) + return TestClient(app) -def test_default_token(): +def test_default_token(client, foo_token, default_chain_spec): + print(foo_token) + print(f"default_chain_spec: {default_chain_spec.asdict()}") # Default Token response = client.get('/default_token') log.debug(f"balance response {response}") default_token = response.json() assert default_token == { 'address': '3FF776B6f888980DEf9d4220858803f9dC5e341e', - 'decimals': 6, + 'decimals': 7, 'name': 'Giftable Token', 'symbol': 'GFT', } -def test_token(): +def test_token(client): # Default Token - response = client.get('/token?token_symbol=GFT') + response = client.get('/token?token_symbol=FOO') log.debug(f"token response {response}") token = response.json() assert token == { @@ -43,9 +60,10 @@ def test_token(): } -def test_tokens(): +def test_tokens(client): # Default Token - response = client.get('/tokens', params={'token_symbols': ['GFT', 'COFE']}) + response = client.get( + '/tokens', params={'token_symbols': ['GFT', 'COFE', 'FOO']}) log.debug(f"tokens response {response}") tokens = response.json() @@ -70,7 +88,7 @@ def test_tokens(): ] -def test_account(): +def test_account(client): # Default Token response = client.get('/default_token') log.debug(f"balance response {response}")