Compare commits

..

11 Commits

Author SHA1 Message Date
Spencer Ofwiti
1c846234a3 Remove leading '0x' from saved address. 2021-04-22 14:58:26 +03:00
Spencer Ofwiti
d0e39a6fb2 Refactor import meta phone to set account address. 2021-04-21 15:04:04 +03:00
Spencer Ofwiti
6ac04cbdc6 Merge branch 'master' into spencer/refactor-meta-library 2021-04-21 09:38:22 +03:00
Spencer Ofwiti
df8a031b5a Merge branch 'master' into spencer/refactor-meta-library
# Conflicts:
#	apps/contract-migration/scripts/package-lock.json
#	apps/contract-migration/scripts/package.json
2021-04-20 18:53:33 +03:00
Spencer Ofwiti
266edd20c8 Merge branch 'master' into spencer/refactor-meta-library 2021-04-20 08:06:30 +03:00
Spencer Ofwiti
9310f7cce1 Merge branch 'master' into spencer/refactor-meta-library 2021-04-19 14:55:26 +03:00
Spencer Ofwiti
395080340b Fix depenndency import. 2021-04-15 11:12:20 +03:00
Spencer Ofwiti
2f8531c195 Refactor scripts to use imports from crdt-meta. 2021-04-14 16:42:41 +03:00
Spencer Ofwiti
426818baa1 Bump dependency versions. 2021-04-14 16:37:55 +03:00
Spencer Ofwiti
5853aab840 Merge branch 'master' into spencer/refactor-meta-library 2021-04-14 13:59:04 +03:00
Spencer Ofwiti
edbd7ffdf5 Remove library files into crdt-meta. 2021-04-14 09:48:29 +03:00
675 changed files with 11485 additions and 27724 deletions

11
.gitignore vendored
View File

@@ -4,14 +4,3 @@ service-configs/*
__pycache__
*.pyc
*.o
gmon.out
*.egg-info
dist/
build/
**/*sqlite
**/.nyc_output
**/coverage
**/.venv
.idea
**/.vim
**/*secret.yaml

View File

@@ -1,43 +1,13 @@
include:
#- local: 'ci_templates/.cic-template.yml' #kaniko build templates
# these includes are app specific unit tests
- local: 'ci_templates/.cic-template.yml'
- local: 'apps/contract-migration/.gitlab-ci.yml'
- local: 'apps/cic-eth/.gitlab-ci.yml'
- local: 'apps/cic-ussd/.gitlab-ci.yml'
- local: 'apps/cic-notify/.gitlab-ci.yml'
- local: 'apps/cic-meta/.gitlab-ci.yml'
- local: 'apps/cic-cache/.gitlab-ci.yml'
#- local: 'apps/contract-migration/.gitlab-ci.yml'
#- local: 'apps/data-seeding/.gitlab-ci.yml'
stages:
- build
- test
- deploy
image: registry.gitlab.com/grassrootseconomics/cic-internal-integration/docker-with-compose:latest
variables:
DOCKER_BUILDKIT: "1"
COMPOSE_DOCKER_CLI_BUILD: "1"
CI_DEBUG_TRACE: "true"
before_script:
- docker login -u gitlab-ci-token -p $CI_JOB_TOKEN $CI_REGISTRY
# runs on protected branches and pushes to repo
build-push:
stage: build
tags:
- integration
#script:
# - TAG=$CI_COMMIT_REF_SLUG-$CI_COMMIT_SHORT_SHA sh ./scripts/build-push.sh
script:
- TAG=latest sh ./scripts/build-push.sh
rules:
- if: $CI_COMMIT_REF_PROTECTED == "true"
when: always
deploy-dev:
stage: deploy
trigger: grassrootseconomics/devops
when: manual
- release

View File

@@ -2,21 +2,25 @@
## Getting started
This repo uses docker-compose and docker buildkit. Set the following environment variables to get started:
## Make some keys
```
export COMPOSE_DOCKER_CLI_BUILD=1
export DOCKER_BUILDKIT=1
docker build -t bloxie . && docker run -v "$(pwd)/keys:/root/keys" --rm -it -t bloxie account new --chain /root/bloxberg.json --keys-path /root/keys
```
start services, database, redis and local ethereum node
```
docker-compose up -d
```
Run app/contract-migration to deploy contracts
### Prepare the repo
This is stuff we need to put in makefile but for now...
File mounts and permisssions need to be set
```
RUN_MASK=3 docker-compose up contract-migration
chmod -R 755 scripts/initdb apps/cic-meta/scripts/initdb
````
start cluster
```
docker-compose up
```
stop cluster
@@ -24,9 +28,9 @@ stop cluster
docker-compose down
```
stop cluster and delete data
delete data
```
docker-compose down -v --remove-orphans
docker-compose down -v
```
rebuild an images
@@ -34,7 +38,5 @@ rebuild an images
docker-compose up --build <service_name>
```
to delete the buildkit cache
```
docker builder prune --filter type=exec.cachemount
```
Deployment variables are writtend to service-configs/.env after everthing is up.

View File

@@ -0,0 +1,34 @@
# The solc image messes up the alpine environment, so we have to go all over again
FROM python:3.8.6-slim-buster
LABEL authors="Louis Holbrook <dev@holbrook.no> 0826EDA1702D1E87C6E2875121D2E7BB88C2A746"
LABEL spdx-license-identifier="GPL-3.0-or-later"
LABEL description="Base layer for buiding development images for the cic component suite"
RUN apt-get update && \
apt-get install -y git gcc g++ libpq-dev && \
apt-get install -y vim gawk jq telnet openssl iputils-ping curl wget gnupg socat bash procps make python2 postgresql-client
RUN echo installing nodejs tooling
COPY ./dev/nvm.sh /root/
# Install nvm with node and npm
# https://stackoverflow.com/questions/25899912/how-to-install-nvm-in-docker
ENV NVM_DIR /root/.nvm
ENV NODE_VERSION 15.3.0
ENV BANCOR_NODE_VERSION 10.16.0
RUN wget -qO- https://raw.githubusercontent.com/nvm-sh/nvm/v0.37.2/install.sh | bash \
&& . $NVM_DIR/nvm.sh \
&& nvm install $NODE_VERSION \
&& nvm alias default $NODE_VERSION \
&& nvm use $NODE_VERSION \
# So many ridiculously stupid issues with node in docker that take oceans of absolutely wasted time to resolve
# owner of these files is "1001" by default - wtf
&& chown -R root:root "$NVM_DIR/versions/node/v$NODE_VERSION"
ENV NODE_PATH $NVM_DIR/versions/node//v$NODE_VERSION/lib/node_modules
ENV PATH $NVM_DIR/versions/node//v$NODE_VERSION/bin:$PATH

View File

@@ -0,0 +1 @@
## this is an example base image if we wanted one for all the other apps. Its just OS level things

View File

@@ -6,4 +6,3 @@ HOST=localhost
PORT=5432
ENGINE=postgresql
DRIVER=psycopg2
DEBUG=

View File

@@ -6,4 +6,3 @@ HOST=localhost
PORT=5432
ENGINE=sqlite
DRIVER=pysqlite
DEBUG=

View File

@@ -2,6 +2,4 @@
omit =
.venv/*
scripts/*
cic_cache/db/migrations/*
cic_cache/version.py
cic_cache/cli
cic_cache/db/postgres/*

View File

@@ -1,4 +0,0 @@
.git
.cache
.dot
**/doc

View File

@@ -1,17 +1,22 @@
build-test-cic-cache:
stage: test
tags:
- integration
variables:
APP_NAME: cic-cache
MR_IMAGE_TAG: mr-$APP_NAME-$CI_COMMIT_REF_SLUG-$CI_COMMIT_SHORT_SHA
script:
- cd apps/cic-cache
- docker build -t $MR_IMAGE_TAG -f docker/Dockerfile .
- docker run $MR_IMAGE_TAG sh docker/run_tests.sh
allow_failure: true
rules:
- if: $CI_PIPELINE_SOURCE == "merge_request_event"
changes:
- apps/$APP_NAME/**/*
when: always
.cic_cache_variables:
variables:
APP_NAME: cic-cache
DOCKERFILE_PATH: $APP_NAME/docker/Dockerfile
.cic_cache_changes_target:
rules:
- changes:
- $CONTEXT/$APP_NAME/*
build-mr-cic-cache:
extends:
- .cic_cache_changes_target
- .py_build_merge_request
- .cic_cache_variables
build-push-cic-cache:
extends:
- .py_build_push
- .cic_cache_variables

View File

@@ -1 +0,0 @@
include *requirements.txt cic_cache/data/config/*

View File

@@ -1 +0,0 @@
# CIC-CACHE

View File

@@ -55,37 +55,15 @@ class Api:
queue=callback_queue,
)
def list(self, offset=0, limit=100, address=None, oldest=False):
def list(self, offset, limit, address=None):
s = celery.signature(
'cic_cache.tasks.tx.tx_filter',
[
offset,
limit,
0,
100,
address,
oldest,
],
queue=self.queue,
)
if self.callback_param != None:
s.link(self.callback_success).on_error(self.callback_error)
t = s.apply_async()
return t
def list_content(self, offset=0, limit=100, address=None, block_offset=None, block_limit=None, oldest=False):
s = celery.signature(
'cic_cache.tasks.tx.tx_filter_content',
[
offset,
limit,
address,
block_offset,
block_limit,
oldest,
],
queue=self.queue,
queue=None
)
if self.callback_param != None:
s.link(self.callback_success).on_error(self.callback_error)

View File

@@ -1,42 +1,30 @@
# standard imports
import logging
import datetime
# external imports
# third-party imports
import moolb
# local imports
from cic_cache.db.list import (
list_transactions_mined,
list_transactions_account_mined,
list_transactions_mined_with_data,
list_transactions_mined_with_data_index,
list_transactions_account_mined_with_data_index,
list_transactions_account_mined_with_data,
)
from cic_cache.db import list_transactions_mined
from cic_cache.db import list_transactions_account_mined
logg = logging.getLogger()
DEFAULT_FILTER_SIZE = 8192 * 8
DEFAULT_LIMIT = 100
class Cache:
class BloomCache:
def __init__(self, session):
self.session = session
class BloomCache(Cache):
@staticmethod
def __get_filter_size(n):
n = DEFAULT_FILTER_SIZE
n = 8192 * 8
logg.warning('filter size hardcoded to {}'.format(n))
return n
def load_transactions(self, offset, limit, block_offset=None, block_limit=None, oldest=False):
def load_transactions(self, offset, limit):
"""Retrieves a list of transactions from cache and creates a bloom filter pointing to blocks and transactions.
Block and transaction numbers are serialized as 32-bit big-endian numbers. The input to the second bloom filter is the concatenation of the serialized block number and transaction index.
@@ -53,7 +41,7 @@ class BloomCache(Cache):
:return: Lowest block, bloom filter for blocks, bloom filter for blocks|tx
:rtype: tuple
"""
rows = list_transactions_mined(self.session, offset, limit, block_offset=block_offset, block_limit=block_limit, oldest=oldest)
rows = list_transactions_mined(self.session, offset, limit)
f_block = moolb.Bloom(BloomCache.__get_filter_size(limit), 3)
f_blocktx = moolb.Bloom(BloomCache.__get_filter_size(limit), 3)
@@ -62,12 +50,7 @@ class BloomCache(Cache):
for r in rows:
if highest_block == -1:
highest_block = r[0]
lowest_block = r[0]
else:
if oldest:
highest_block = r[0]
else:
lowest_block = r[0]
lowest_block = r[0]
block = r[0].to_bytes(4, byteorder='big')
tx = r[1].to_bytes(4, byteorder='big')
f_block.add(block)
@@ -76,7 +59,7 @@ class BloomCache(Cache):
return (lowest_block, highest_block, f_block.to_bytes(), f_blocktx.to_bytes(),)
def load_transactions_account(self, address, offset, limit, block_offset=None, block_limit=None, oldest=False):
def load_transactions_account(self, address, offset, limit):
"""Same as load_transactions(...), but only retrieves transactions where the specified account address is sender or recipient.
:param address: Address to retrieve transactions for.
@@ -88,7 +71,7 @@ class BloomCache(Cache):
:return: Lowest block, bloom filter for blocks, bloom filter for blocks|tx
:rtype: tuple
"""
rows = list_transactions_account_mined(self.session, address, offset, limit, block_offset=block_offset, block_limit=block_limit, oldest=oldest)
rows = list_transactions_account_mined(self.session, address, offset, limit)
f_block = moolb.Bloom(BloomCache.__get_filter_size(limit), 3)
f_blocktx = moolb.Bloom(BloomCache.__get_filter_size(limit), 3)
@@ -97,74 +80,10 @@ class BloomCache(Cache):
for r in rows:
if highest_block == -1:
highest_block = r[0]
lowest_block = r[0]
else:
if oldest:
highest_block = r[0]
else:
lowest_block = r[0]
lowest_block = r[0]
block = r[0].to_bytes(4, byteorder='big')
tx = r[1].to_bytes(4, byteorder='big')
f_block.add(block)
f_blocktx.add(block + tx)
logg.debug('added block {} tx {} lo {} hi {}'.format(r[0], r[1], lowest_block, highest_block))
return (lowest_block, highest_block, f_block.to_bytes(), f_blocktx.to_bytes(),)
class DataCache(Cache):
def load_transactions_with_data(self, offset, limit, block_offset=None, block_limit=None, oldest=False):
if limit == 0:
limit = DEFAULT_LIMIT
rows = list_transactions_mined_with_data(self.session, offset, limit, block_offset, block_limit, oldest=oldest)
return self.__process_rows(rows, oldest)
def load_transactions_account_with_data(self, address, offset, limit, block_offset=None, block_limit=None, oldest=False):
if limit == 0:
limit = DEFAULT_LIMIT
rows = list_transactions_account_mined_with_data(self.session, address, offset, limit, block_offset, block_limit, oldest=oldest)
return self.__process_rows(rows, oldest)
def __process_rows(self, rows, oldest):
tx_cache = []
highest_block = -1;
lowest_block = -1;
date_is_str = None # stick this in startup
for r in rows:
if highest_block == -1:
highest_block = r['block_number']
lowest_block = r['block_number']
else:
if oldest:
highest_block = r['block_number']
else:
lowest_block = r['block_number']
tx_type = 'unknown'
if r['value'] != None:
tx_type = '{}.{}'.format(r['domain'], r['value'])
if date_is_str == None:
date_is_str = type(r['date_block']).__name__ == 'str'
o = {
'block_number': r['block_number'],
'tx_hash': r['tx_hash'],
'date_block': r['date_block'],
'sender': r['sender'],
'recipient': r['recipient'],
'from_value': int(r['from_value']),
'to_value': int(r['to_value']),
'source_token': r['source_token'],
'destination_token': r['destination_token'],
'success': r['success'],
'tx_type': tx_type,
}
if date_is_str:
o['date_block'] = datetime.datetime.fromisoformat(r['date_block'])
tx_cache.append(o)
return (lowest_block, highest_block, tx_cache)

View File

@@ -1,15 +0,0 @@
# local imports
from .base import *
from .chain import (
EthChainInterface,
chain_interface,
)
from .rpc import RPC
from .arg import ArgumentParser
from .config import Config
from .celery import CeleryApp
from .registry import (
connect_registry,
connect_token_registry,
connect_declarator,
)

View File

@@ -1,20 +0,0 @@
# external imports
from chainlib.eth.cli import ArgumentParser as BaseArgumentParser
# local imports
from .base import (
CICFlag,
Flag,
)
class ArgumentParser(BaseArgumentParser):
def process_local_flags(self, local_arg_flags):
if local_arg_flags & CICFlag.CELERY:
self.add_argument('-q', '--celery-queue', dest='celery_queue', type=str, default='cic-cache', help='Task queue')
if local_arg_flags & CICFlag.SYNCER:
self.add_argument('--offset', type=int, default=0, help='Start block height for initial history sync')
self.add_argument('--no-history', action='store_true', dest='no_history', help='Skip initial history sync')
if local_arg_flags & CICFlag.CHAIN:
self.add_argument('-r', '--registry-address', type=str, dest='registry_address', help='CIC registry contract address')

View File

@@ -1,31 +0,0 @@
# standard imports
import enum
# external imports
from chainlib.eth.cli import (
argflag_std_read,
argflag_std_write,
argflag_std_base,
Flag,
)
class CICFlag(enum.IntEnum):
# celery - nibble 1
CELERY = 1
# redis - nibble 2
# REDIS = 16
# REDIS_CALLBACK = 32
# chain - nibble 3
CHAIN = 256
# sync - nibble 4
SYNCER = 4096
argflag_local_task = CICFlag.CELERY
#argflag_local_taskcallback = argflag_local_task | CICFlag.REDIS | CICFlag.REDIS_CALLBACK
argflag_local_chain = CICFlag.CHAIN
argflag_local_sync = CICFlag.SYNCER | CICFlag.CHAIN

View File

@@ -1,24 +0,0 @@
# standard imports
import logging
# external imports
import celery
logg = logging.getLogger(__name__)
class CeleryApp:
@classmethod
def from_config(cls, config):
backend_url = config.get('CELERY_RESULT_URL')
broker_url = config.get('CELERY_BROKER_URL')
celery_app = None
if backend_url != None:
celery_app = celery.Celery(broker=broker_url, backend=backend_url)
logg.info('creating celery app on {} with backend on {}'.format(broker_url, backend_url))
else:
celery_app = celery.Celery(broker=broker_url)
logg.info('creating celery app without results backend on {}'.format(broker_url))
return celery_app

View File

@@ -1,21 +0,0 @@
# external imports
from chainlib.eth.block import (
block_by_number,
Block,
)
from chainlib.eth.tx import (
receipt,
Tx,
)
from chainlib.interface import ChainInterface
class EthChainInterface(ChainInterface):
def __init__(self):
self._tx_receipt = receipt
self._block_by_number = block_by_number
self._block_from_src = Block.from_src
self._src_normalize = Tx.src_normalize
chain_interface = EthChainInterface()

View File

@@ -1,63 +0,0 @@
# standard imports
import os
import logging
# external imports
from chainlib.eth.cli import (
Config as BaseConfig,
Flag,
)
# local imports
from .base import CICFlag
script_dir = os.path.dirname(os.path.realpath(__file__))
logg = logging.getLogger(__name__)
class Config(BaseConfig):
local_base_config_dir = os.path.join(script_dir, '..', 'data', 'config')
@classmethod
def from_args(cls, args, arg_flags, local_arg_flags, extra_args={}, default_config_dir=None, base_config_dir=None, default_fee_limit=None):
expanded_base_config_dir = [cls.local_base_config_dir]
if base_config_dir != None:
if isinstance(base_config_dir, str):
base_config_dir = [base_config_dir]
for d in base_config_dir:
expanded_base_config_dir.append(d)
config = BaseConfig.from_args(args, arg_flags, extra_args=extra_args, default_config_dir=default_config_dir, base_config_dir=expanded_base_config_dir, load_callback=None)
local_args_override = {}
# if local_arg_flags & CICFlag.REDIS:
# local_args_override['REDIS_HOST'] = getattr(args, 'redis_host')
# local_args_override['REDIS_PORT'] = getattr(args, 'redis_port')
# local_args_override['REDIS_DB'] = getattr(args, 'redis_db')
# local_args_override['REDIS_TIMEOUT'] = getattr(args, 'redis_timeout')
if local_arg_flags & CICFlag.CHAIN:
local_args_override['CIC_REGISTRY_ADDRESS'] = getattr(args, 'registry_address')
if local_arg_flags & CICFlag.CELERY:
local_args_override['CELERY_QUEUE'] = getattr(args, 'celery_queue')
if local_arg_flags & CICFlag.SYNCER:
local_args_override['SYNCER_OFFSET'] = getattr(args, 'offset')
local_args_override['SYNCER_NO_HISTORY'] = getattr(args, 'no_history')
config.dict_override(local_args_override, 'local cli args')
# if local_arg_flags & CICFlag.REDIS_CALLBACK:
# config.add(getattr(args, 'redis_host_callback'), '_REDIS_HOST_CALLBACK')
# config.add(getattr(args, 'redis_port_callback'), '_REDIS_PORT_CALLBACK')
if local_arg_flags & CICFlag.CELERY:
config.add(config.true('CELERY_DEBUG'), 'CELERY_DEBUG', exists_ok=True)
logg.debug('config loaded:\n{}'.format(config))
return config

View File

@@ -1,33 +0,0 @@
# standard imports
import logging
# external imports
from cic_eth_registry import CICRegistry
from cic_eth_registry.lookup.declarator import AddressDeclaratorLookup
from cic_eth_registry.lookup.tokenindex import TokenIndexLookup
from chainlib.eth.constant import ZERO_ADDRESS
logg = logging.getLogger()
def connect_token_registry(self, conn, chain_spec, sender_address=ZERO_ADDRESS):
registry = CICRegistry(chain_spec, conn)
token_registry_address = registry.by_name('TokenRegistry', sender_address=sender_address)
logg.debug('using token registry address {}'.format(token_registry_address))
lookup = TokenIndexLookup(chain_spec, token_registry_address)
CICRegistry.add_lookup(lookup)
def connect_declarator(self, conn, chain_spec, trusted_addresses, sender_address=ZERO_ADDRESS):
registry = CICRegistry(chain_spec, conn)
declarator_address = registry.by_name('AddressDeclarator', sender_address=sender_address)
logg.debug('using declarator address {}'.format(declarator_address))
lookup = AddressDeclaratorLookup(chain_spec, declarator_address, trusted_addresses)
CICRegistry.add_lookup(lookup)
def connect_registry(conn, chain_spec, registry_address, sender_address=ZERO_ADDRESS):
CICRegistry.address = registry_address
registry = CICRegistry(chain_spec, conn)
registry_address = registry.by_name('ContractRegistry', sender_address=sender_address)
return registry

View File

@@ -1,43 +0,0 @@
# standard imports
import logging
# external imports
from chainlib.connection import (
RPCConnection,
ConnType,
)
from chainlib.eth.connection import EthUnixSignerConnection
from chainlib.chain import ChainSpec
logg = logging.getLogger(__name__)
class RPC:
def __init__(self, chain_spec, rpc_provider, signer_provider=None):
self.chain_spec = chain_spec
self.rpc_provider = rpc_provider
self.signer_provider = signer_provider
def get_default(self):
return RPCConnection.connect(self.chain_spec, 'default')
@staticmethod
def from_config(config):
chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC'))
RPCConnection.register_location(config.get('RPC_PROVIDER'), chain_spec, 'default')
if config.get('SIGNER_PROVIDER'):
RPCConnection.register_constructor(ConnType.UNIX, EthUnixSignerConnection, tag='signer')
RPCConnection.register_location(config.get('SIGNER_PROVIDER'), chain_spec, 'signer')
rpc = RPC(chain_spec, config.get('RPC_PROVIDER'), signer_provider=config.get('SIGNER_PROVIDER'))
logg.info('set up rpc: {}'.format(rpc))
return rpc
def __str__(self):
return 'RPC factory, chain {}, rpc {}, signer {}'.format(self.chain_spec, self.rpc_provider, self.signer_provider)

View File

@@ -1,5 +0,0 @@
[celery]
broker_url = redis://localhost:6379
result_url =
queue = cic-cache
debug = 0

View File

@@ -1,4 +0,0 @@
[cic]
registry_address =
trust_address =
health_modules = cic_eth.check.db,cic_eth.check.redis,cic_eth.check.signer,cic_eth.check.gas

View File

@@ -1,10 +0,0 @@
[database]
engine =
driver =
host =
port =
name = cic-cache
user =
password =
debug = 0
pool_size = 0

View File

@@ -1,2 +0,0 @@
[signer]
provider =

View File

@@ -1,4 +0,0 @@
[syncer]
loop_interval = 1
offset = 0
no_history = 0

View File

@@ -2,14 +2,9 @@
import logging
# local imports
from .list import (
list_transactions_mined,
list_transactions_account_mined,
add_transaction,
tag_transaction,
add_tag,
)
from .list import list_transactions_mined
from .list import list_transactions_account_mined
from .list import add_transaction
logg = logging.getLogger()

View File

@@ -2,9 +2,8 @@
import logging
import datetime
# external imports
# third-party imports
from cic_cache.db.models.base import SessionBase
from sqlalchemy import text
logg = logging.getLogger()
@@ -13,9 +12,6 @@ def list_transactions_mined(
session,
offset,
limit,
block_offset,
block_limit,
oldest=False,
):
"""Executes db query to return all confirmed transactions according to the specified offset and limit.
@@ -26,154 +22,7 @@ def list_transactions_mined(
:result: Result set
:rtype: SQLAlchemy.ResultProxy
"""
order_by = 'DESC'
if oldest:
order_by = 'ASC'
if block_offset:
if block_limit:
s = "SELECT block_number, tx_index FROM tx WHERE block_number >= {} and block_number <= {} ORDER BY block_number {}, tx_index {} LIMIT {} OFFSET {}".format(block_offset, block_limit, order_by, order_by, limit, offset)
else:
s = "SELECT block_number, tx_index FROM tx WHERE block_number >= {} ORDER BY block_number {}, tx_index {} LIMIT {} OFFSET {}".format(block_offset, order_by, order_by, limit, offset)
else:
s = "SELECT block_number, tx_index FROM tx ORDER BY block_number {}, tx_index {} LIMIT {} OFFSET {}".format(order_by, order_by, limit, offset)
r = session.execute(s)
return r
def list_transactions_mined_with_data(
session,
offset,
limit,
block_offset,
block_limit,
oldest=False,
):
"""Executes db query to return all confirmed transactions according to the specified offset and limit.
:param block_offset: First block to include in search
:type block_offset: int
:param block_limit: Last block to include in search
:type block_limit: int
:result: Result set
:rtype: SQLAlchemy.ResultProxy
"""
order_by = 'DESC'
if oldest:
order_by = 'ASC'
if block_offset:
if block_limit:
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} AND block_number <= {} ORDER BY block_number {}, tx_index {} LIMIT {} OFFSET {}".format(block_offset, block_limit, order_by, order_by, limit, offset)
else:
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} ORDER BY block_number {}, tx_index {} LIMIT {} OFFSET {}".format(block_offset, order_by, order_by, limit, offset)
else:
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id ORDER BY block_number {}, tx_index {} LIMIT {} OFFSET {}".format(order_by, order_by, limit, offset)
r = session.execute(s)
return r
def list_transactions_mined_with_data_index(
session,
offset,
end,
block_offset,
block_limit,
oldest=False,
):
"""Executes db query to return all confirmed transactions according to the specified offset and limit.
:param offset: Offset in data set to return transactions from
:type offset: int
:param limit: Max number of transactions to retrieve
:type limit: int
:result: Result set
:rtype: SQLAlchemy.ResultProxy
"""
order_by = 'DESC'
if oldest:
order_by = 'ASC'
if block_offset:
if block_limit:
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} and block_number <= {} ORDER BY block_number {}, tx_index {} LIMIT {} OFFSET {}".format(block_offset, block_limit, order_by, order_by, offset, end)
else:
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} ORDER BY block_number {}, tx_index {} LIMIT {} OFFSET {}".format(block_offset, order_by, order_by, offset, end)
else:
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id ORDER BY block_number {}, tx_index {} LIMIT {} OFFSET {}".format(order_by, order_by, offset, end)
r = session.execute(s)
return r
def list_transactions_account_mined_with_data_index(
session,
address,
offset,
limit,
block_offset,
block_limit,
oldest=False,
):
"""Executes db query to return all confirmed transactions according to the specified offset and limit, filtered by address
:param offset: Offset in data set to return transactions from
:type offset: int
:param limit: Max number of transactions to retrieve
:type limit: int
:result: Result set
:rtype: SQLAlchemy.ResultProxy
"""
order_by = 'DESC'
if oldest:
order_by = 'ASC'
if block_offset:
if block_limit:
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} AND block_number <= {} AND (sender = '{}' OR recipient = '{}') ORDER BY block_number {}, tx_index {} LIMIT {} OFFSET {}".format(block_offset, block_limit, address, address, order_by, order_by, limit, offset)
else:
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} AND (sender = '{}' OR recipient = '{}') ORDER BY block_number {}, tx_index {} LIMIT {} OFFSET {}".format(block_offset, address, address, order_by, order_by, limit, offset)
else:
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE sender = '{}' OR recipient = '{}' ORDER BY block_number {}, tx_index {} LIMIT {} OFFSET {}".format(address, address, order_by, order_by, limit, offset)
r = session.execute(s)
return r
def list_transactions_account_mined_with_data(
session,
address,
offset,
limit,
block_offset,
block_limit,
oldest=False,
):
"""Executes db query to return all confirmed transactions according to the specified offset and limit.
:param block_offset: First block to include in search
:type block_offset: int
:param block_limit: Last block to include in search
:type block_limit: int
:result: Result set
:rtype: SQLAlchemy.ResultProxy
"""
order_by = 'DESC'
if oldest:
order_by = 'ASC'
if block_offset:
if block_limit:
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} AND block_number <= {} AND (sender = '{}' OR recipient = '{}') ORDER BY block_number {}, tx_index {} LIMIT {} OFFSET {}".format(block_offset, block_limit, address, address, order_by, order_by, limit, offset)
else:
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} AND (sender = '{}' OR recipient = '{}') ORDER BY block_number {}, tx_index {} LIMIT {} OFFSET {}".format(block_offset, address, address, order_by, order_by, limit, offset)
else:
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE sender = '{}' OR recipient = '{}' ORDER BY block_number {}, tx_index {} LIMIT {} OFFSET {}".format(address, address, order_by, order_by, limit, offset)
s = "SELECT block_number, tx_index FROM tx ORDER BY block_number DESC, tx_index DESC LIMIT {} OFFSET {}".format(limit, offset)
r = session.execute(s)
return r
@@ -183,9 +32,6 @@ def list_transactions_account_mined(
address,
offset,
limit,
block_offset,
block_limit,
oldest=False,
):
"""Same as list_transactions_mined(...), but only retrieves transaction where the specified account address is sender or recipient.
@@ -198,27 +44,13 @@ def list_transactions_account_mined(
:result: Result set
:rtype: SQLAlchemy.ResultProxy
"""
order_by = 'DESC'
if oldest:
order_by = 'ASC'
if block_offset:
if block_limit:
s = "SELECT block_number, tx_index FROM tx WHERE block_number >= {} AND block_number <= {} AND (sender = '{}' OR recipient = '{}') ORDER BY block_number {}, tx_index {} LIMIT {} OFFSET {}".format(block_offset, block_limit, address, address, order_by, order_by, limit, offset)
else:
s = "SELECT block_number, tx_index FROM tx WHERE block_number >= {} AND (sender = '{}' OR recipient = '{}') ORDER BY block_number {}, tx_index {} LIMIT {} OFFSET {}".format(block_offset, address, address, order_by, order_by, limit, offset)
else:
s = "SELECT block_number, tx_index FROM tx WHERE sender = '{}' OR recipient = '{}' ORDER BY block_number {}, tx_index {} LIMIT {} OFFSET {}".format(address, address, order_by, order_by, limit, offset)
s = "SELECT block_number, tx_index FROM tx WHERE sender = '{}' OR recipient = '{}' ORDER BY block_number DESC, tx_index DESC LIMIT {} OFFSET {}".format(address, address, limit, offset)
r = session.execute(s)
return r
def add_transaction(
session,
tx_hash,
session, tx_hash,
block_number,
tx_index,
sender,
@@ -230,33 +62,6 @@ def add_transaction(
success,
timestamp,
):
"""Adds a single transaction to the cache persistent storage. Sensible interpretation of all fields is the responsibility of the caller.
:param session: Persistent storage session object
:type session: SQLAlchemy session
:param tx_hash: Transaction hash
:type tx_hash: str, 0x-hex
:param block_number: Block number
:type block_number: int
:param tx_index: Transaction index in block
:type tx_index: int
:param sender: Ethereum address of effective sender
:type sender: str, 0x-hex
:param receiver: Ethereum address of effective recipient
:type receiver: str, 0x-hex
:param source_token: Ethereum address of token used by sender
:type source_token: str, 0x-hex
:param destination_token: Ethereum address of token received by recipient
:type destination_token: str, 0x-hex
:param from_value: Source token value spent in transaction
:type from_value: int
:param to_value: Destination token value received in transaction
:type to_value: int
:param success: True if code execution on network was successful
:type success: bool
:param date_block: Block timestamp
:type date_block: datetime
"""
date_block = datetime.datetime.fromtimestamp(timestamp)
s = "INSERT INTO tx (tx_hash, block_number, tx_index, sender, recipient, source_token, destination_token, from_value, to_value, success, date_block) VALUES ('{}', {}, {}, '{}', '{}', '{}', '{}', {}, {}, {}, '{}')".format(
tx_hash,
@@ -272,74 +77,3 @@ def add_transaction(
date_block,
)
session.execute(s)
def tag_transaction(
session,
tx_hash,
name,
domain=None,
):
"""Tag a single transaction with a single tag.
Tag must already exist in storage.
:param session: Persistent storage session object
:type session: SQLAlchemy session
:param tx_hash: Transaction hash
:type tx_hash: str, 0x-hex
:param name: Tag value
:type name: str
:param domain: Tag domain
:type domain: str
:raises ValueError: Unknown tag or transaction hash
"""
s = text("SELECT id from tx where tx_hash = :a")
r = session.execute(s, {'a': tx_hash}).fetchall()
tx_id = r[0].values()[0]
if tx_id == None:
raise ValueError('unknown tx hash {}'.format(tx_hash))
#s = text("SELECT id from tag where value = :a and domain = :b")
if domain == None:
s = text("SELECT id from tag where value = :a")
else:
s = text("SELECT id from tag where value = :a and domain = :b")
r = session.execute(s, {'a': name, 'b': domain}).fetchall()
tag_id = r[0].values()[0]
logg.debug('type {} {}'.format(type(tag_id), type(tx_id)))
if tag_id == None:
raise ValueError('unknown tag name {} domain {}'.format(name, domain))
s = text("INSERT INTO tag_tx_link (tag_id, tx_id) VALUES (:a, :b)")
r = session.execute(s, {'a': int(tag_id), 'b': int(tx_id)})
def add_tag(
session,
name,
domain=None,
):
"""Add a single tag to storage.
:param session: Persistent storage session object
:type session: SQLAlchemy session
:param name: Tag value
:type name: str
:param domain: Tag domain
:type domain: str
:raises sqlalchemy.exc.IntegrityError: Tag already exists
"""
s = None
if domain == None:
s = text("INSERT INTO tag (value) VALUES (:b)")
else:
s = text("INSERT INTO tag (domain, value) VALUES (:a, :b)")
session.execute(s, {'a': domain, 'b': name})

View File

@@ -7,7 +7,7 @@ Create Date: 2021-04-01 08:10:29.156243
"""
from alembic import op
import sqlalchemy as sa
from chainsyncer.db.migrations.default.export import (
from chainsyncer.db.migrations.sqlalchemy import (
chainsyncer_upgrade,
chainsyncer_downgrade,
)

View File

@@ -1,38 +0,0 @@
"""Transaction tags
Revision ID: aaf2bdce7d6e
Revises: 6604de4203e2
Create Date: 2021-05-01 09:20:20.775082
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'aaf2bdce7d6e'
down_revision = '6604de4203e2'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'tag',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('domain', sa.String(), nullable=True),
sa.Column('value', sa.String(), nullable=False),
)
op.create_index('idx_tag_domain_value', 'tag', ['domain', 'value'], unique=True)
op.create_table(
'tag_tx_link',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('tag_id', sa.Integer, sa.ForeignKey('tag.id'), nullable=False),
sa.Column('tx_id', sa.Integer, sa.ForeignKey('tx.id'), nullable=False),
)
def downgrade():
op.drop_table('tag_tx_link')
op.drop_index('idx_tag_domain_value')
op.drop_table('tag')

View File

@@ -100,4 +100,3 @@ class SessionBase(Model):
logg.debug('destroying session {}'.format(session_key))
session.commit()
session.close()
del SessionBase.localsessions[session_key]

View File

@@ -1,2 +1 @@
from .erc20 import *
from .faucet import *

View File

@@ -1,27 +1,2 @@
class TagSyncFilter:
"""Holds tag name and domain for an implementing filter.
:param name: Tag value
:type name: str
:param domain: Tag domain
:type domain: str
"""
def __init__(self, name, domain=None):
self.tag_name = name
self.tag_domain = domain
def tag(self):
"""Return tag value/domain.
:rtype: Tuple
:returns: tag value/domain.
"""
return (self.tag_name, self.tag_domain)
def __str__(self):
if self.tag_domain == None:
return self.tag_name
return '{}.{}'.format(self.tag_domain, self.tag_name)
class SyncFilter:
pass

View File

@@ -2,6 +2,7 @@
import logging
# external imports
from chainlib.eth.erc20 import ERC20
from chainlib.eth.address import (
to_checksum_address,
)
@@ -12,19 +13,17 @@ from cic_eth_registry.error import (
NotAContractError,
ContractMismatchError,
)
from eth_erc20 import ERC20
# local imports
from .base import TagSyncFilter
from .base import SyncFilter
from cic_cache import db as cic_cache_db
logg = logging.getLogger().getChild(__name__)
class ERC20TransferFilter(TagSyncFilter):
class ERC20TransferFilter(SyncFilter):
def __init__(self, chain_spec):
super(ERC20TransferFilter, self).__init__('transfer', domain='erc20')
self.chain_spec = chain_spec
@@ -47,9 +46,6 @@ class ERC20TransferFilter(TagSyncFilter):
except RequestMismatchException:
logg.debug('erc20 match but not a transfer, skipping')
return False
except ValueError:
logg.debug('erc20 match but bogus data, skipping')
return False
token_sender = tx.outputs[0]
token_recipient = transfer_data[0]
@@ -71,13 +67,7 @@ class ERC20TransferFilter(TagSyncFilter):
tx.status == Status.SUCCESS,
block.timestamp,
)
db_session.flush()
cic_cache_db.tag_transaction(
db_session,
tx.hash,
self.tag_name,
domain=self.tag_domain,
)
#db_session.flush()
db_session.commit()
return True

View File

@@ -1,73 +0,0 @@
# standard imports
import logging
# external imports
from erc20_faucet import Faucet
from chainlib.eth.address import to_checksum_address
from chainlib.eth.constant import ZERO_ADDRESS
from chainlib.status import Status
from hexathon import strip_0x
# local imports
import cic_cache.db as cic_cache_db
from .base import TagSyncFilter
#logg = logging.getLogger().getChild(__name__)
logg = logging.getLogger()
class FaucetFilter(TagSyncFilter):
def __init__(self, chain_spec, sender_address=ZERO_ADDRESS):
super(FaucetFilter, self).__init__('give_to', domain='faucet')
self.chain_spec = chain_spec
self.sender_address = sender_address
def filter(self, conn, block, tx, db_session=None):
try:
data = strip_0x(tx.payload)
except ValueError:
return False
logg.debug('data {}'.format(data))
if Faucet.method_for(data[:8]) == None:
return False
token_sender = tx.inputs[0]
token_recipient = data[64+8-40:]
logg.debug('token recipient {}'.format(token_recipient))
f = Faucet(self.chain_spec)
o = f.token(token_sender, sender_address=self.sender_address)
r = conn.do(o)
token = f.parse_token(r)
f = Faucet(self.chain_spec)
o = f.token_amount(token_sender, sender_address=self.sender_address)
r = conn.do(o)
token_value = f.parse_token_amount(r)
cic_cache_db.add_transaction(
db_session,
tx.hash,
block.number,
tx.index,
to_checksum_address(token_sender),
to_checksum_address(token_recipient),
token,
token,
token_value,
token_value,
tx.status == Status.SUCCESS,
block.timestamp,
)
db_session.flush()
cic_cache_db.tag_transaction(
db_session,
tx.hash,
self.tag_name,
domain=self.tag_domain,
)
db_session.commit()
return True

View File

@@ -1,115 +0,0 @@
# standard imports
import logging
import json
import re
import base64
# external imports
from hexathon import add_0x
# local imports
from cic_cache.cache import (
BloomCache,
DataCache,
)
logg = logging.getLogger(__name__)
#logg = logging.getLogger()
re_transactions_all_bloom = r'/tx/(\d+)?/?(\d+)/?'
re_transactions_account_bloom = r'/tx/user/((0x)?[a-fA-F0-9]+)(/(\d+)(/(\d+))?)?/?'
re_transactions_all_data = r'/txa/(\d+)?/?(\d+)/?'
DEFAULT_LIMIT = 100
def process_transactions_account_bloom(session, env):
r = re.match(re_transactions_account_bloom, env.get('PATH_INFO'))
if not r:
return None
address = r[1]
if r[2] == None:
address = add_0x(address)
offset = 0
if r.lastindex > 2:
offset = r[4]
limit = DEFAULT_LIMIT
if r.lastindex > 4:
limit = r[6]
c = BloomCache(session)
(lowest_block, highest_block, bloom_filter_block, bloom_filter_tx) = c.load_transactions_account(address, offset, limit)
o = {
'alg': 'sha256',
'low': lowest_block,
'high': highest_block,
'block_filter': base64.b64encode(bloom_filter_block).decode('utf-8'),
'blocktx_filter': base64.b64encode(bloom_filter_tx).decode('utf-8'),
'filter_rounds': 3,
}
j = json.dumps(o)
return ('application/json', j.encode('utf-8'),)
def process_transactions_all_bloom(session, env):
r = re.match(re_transactions_all_bloom, env.get('PATH_INFO'))
if not r:
return None
offset = DEFAULT_LIMIT
if r.lastindex > 0:
offset = r[1]
limit = 0
if r.lastindex > 1:
limit = r[2]
c = BloomCache(session)
(lowest_block, highest_block, bloom_filter_block, bloom_filter_tx) = c.load_transactions(offset, limit)
o = {
'alg': 'sha256',
'low': lowest_block,
'high': highest_block,
'block_filter': base64.b64encode(bloom_filter_block).decode('utf-8'),
'blocktx_filter': base64.b64encode(bloom_filter_tx).decode('utf-8'),
'filter_rounds': 3,
}
j = json.dumps(o)
return ('application/json', j.encode('utf-8'),)
def process_transactions_all_data(session, env):
r = re.match(re_transactions_all_data, env.get('PATH_INFO'))
if not r:
return None
if env.get('HTTP_X_CIC_CACHE_MODE') != 'all':
return None
logg.debug('got data request {}'.format(env))
block_offset = r[1]
block_end = r[2]
if int(r[2]) < int(r[1]):
raise ValueError('cart before the horse, dude')
c = DataCache(session)
(lowest_block, highest_block, tx_cache) = c.load_transactions_with_data(0, 0, block_offset, block_end, oldest=True) # oldest needs to be settable
for r in tx_cache:
r['date_block'] = r['date_block'].timestamp()
o = {
'low': lowest_block,
'high': highest_block,
'data': tx_cache,
}
j = json.dumps(o)
return ('application/json', j.encode('utf-8'),)

View File

@@ -1,20 +1,18 @@
# standard imports
import os
import re
import logging
import argparse
import json
import base64
# external imports
# third-party imports
import confini
# local imports
from cic_cache import BloomCache
from cic_cache.db import dsn_from_config
from cic_cache.db.models.base import SessionBase
from cic_cache.runnable.daemons.query import (
process_transactions_account_bloom,
process_transactions_all_bloom,
process_transactions_all_data,
)
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
@@ -46,6 +44,72 @@ logg.debug('config:\n{}'.format(config))
dsn = dsn_from_config(config)
SessionBase.connect(dsn, config.true('DATABASE_DEBUG'))
re_transactions_all_bloom = r'/tx/(\d+)?/?(\d+)/?'
re_transactions_account_bloom = r'/tx/user/((0x)?[a-fA-F0-9]+)/?(\d+)?/?(\d+)/?'
DEFAULT_LIMIT = 100
def process_transactions_account_bloom(session, env):
r = re.match(re_transactions_account_bloom, env.get('PATH_INFO'))
if not r:
return None
address = r[1]
if r[2] == None:
address = '0x' + address
offset = DEFAULT_LIMIT
if r.lastindex > 2:
offset = r[3]
limit = 0
if r.lastindex > 3:
limit = r[4]
c = BloomCache(session)
(lowest_block, highest_block, bloom_filter_block, bloom_filter_tx) = c.load_transactions_account(address, offset, limit)
o = {
'alg': 'sha256',
'low': lowest_block,
'high': highest_block,
'block_filter': base64.b64encode(bloom_filter_block).decode('utf-8'),
'blocktx_filter': base64.b64encode(bloom_filter_tx).decode('utf-8'),
'filter_rounds': 3,
}
j = json.dumps(o)
return ('application/json', j.encode('utf-8'),)
def process_transactions_all_bloom(session, env):
r = re.match(re_transactions_all_bloom, env.get('PATH_INFO'))
if not r:
return None
offset = DEFAULT_LIMIT
if r.lastindex > 0:
offset = r[1]
limit = 0
if r.lastindex > 1:
limit = r[2]
c = BloomCache(session)
(lowest_block, highest_block, bloom_filter_block, bloom_filter_tx) = c.load_transactions(offset, limit)
o = {
'alg': 'sha256',
'low': lowest_block,
'high': highest_block,
'block_filter': base64.b64encode(bloom_filter_block).decode('utf-8'),
'blocktx_filter': base64.b64encode(bloom_filter_tx).decode('utf-8'),
'filter_rounds': 3,
}
j = json.dumps(o)
return ('application/json', j.encode('utf-8'),)
# uwsgi application
def application(env, start_response):
@@ -55,16 +119,10 @@ def application(env, start_response):
session = SessionBase.create_session()
for handler in [
process_transactions_all_data,
process_transactions_all_bloom,
process_transactions_account_bloom,
]:
r = None
try:
r = handler(session, env)
except ValueError as e:
start_response('400 {}'.format(str(e)))
return []
r = handler(session, env)
if r != None:
(mime_type, content) = r
break

View File

@@ -7,8 +7,14 @@ import argparse
import sys
import re
# external imports
import sqlalchemy
# third-party imports
import confini
import celery
import rlp
import cic_base.config
import cic_base.log
import cic_base.argparse
import cic_base.rpc
from cic_eth_registry import CICRegistry
from cic_eth_registry.error import UnknownContractError
from chainlib.chain import ChainSpec
@@ -21,63 +27,36 @@ from hexathon import (
strip_0x,
)
from chainsyncer.backend.sql import SQLBackend
from chainsyncer.driver.head import HeadSyncer
from chainsyncer.driver.history import HistorySyncer
from chainsyncer.driver import (
HeadSyncer,
HistorySyncer,
)
from chainsyncer.db.models.base import SessionBase
# local imports
import cic_cache.cli
from cic_cache.db import (
dsn_from_config,
add_tag,
)
from cic_cache.db import dsn_from_config
from cic_cache.runnable.daemons.filters import (
ERC20TransferFilter,
FaucetFilter,
)
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
script_dir = os.path.realpath(os.path.dirname(__file__))
# process args
arg_flags = cic_cache.cli.argflag_std_read
local_arg_flags = cic_cache.cli.argflag_local_sync
argparser = cic_cache.cli.ArgumentParser(arg_flags)
argparser.process_local_flags(local_arg_flags)
args = argparser.parse_args()
logg = cic_base.log.create()
argparser = cic_base.argparse.create(script_dir, cic_base.argparse.full_template)
#argparser = cic_base.argparse.add(argparser, add_traffic_args, 'traffic')
args = cic_base.argparse.parse(argparser, logg)
config = cic_base.config.create(args.c, args, args.env_prefix)
# process config
config = cic_cache.cli.Config.from_args(args, arg_flags, local_arg_flags)
cic_base.config.log(config)
# connect to database
dsn = dsn_from_config(config)
SessionBase.connect(dsn, debug=config.true('DATABASE_DEBUG'))
# set up rpc
rpc = cic_cache.cli.RPC.from_config(config)
conn = rpc.get_default()
chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
# set up chain provisions
chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC'))
registry = None
try:
registry = cic_cache.cli.connect_registry(conn, chain_spec, config.get('CIC_REGISTRY_ADDRESS'))
except UnknownContractError as e:
logg.exception('Registry contract connection failed for {}: {}'.format(config.get('CIC_REGISTRY_ADDRESS'), e))
sys.exit(1)
logg.info('connected contract registry {}'.format(config.get('CIC_REGISTRY_ADDRESS')))
def register_filter_tags(filters, session):
for f in filters:
tag = f.tag()
try:
add_tag(session, tag[0], domain=tag[1])
session.commit()
logg.info('added tag name "{}" domain "{}"'.format(tag[0], tag[1]))
except sqlalchemy.exc.IntegrityError:
session.rollback()
logg.debug('already have tag name "{}" domain "{}"'.format(tag[0], tag[1]))
#RPCConnection.register_location(config.get('ETH_PROVIDER'), chain_spec, 'default')
cic_base.rpc.setup(chain_spec, config.get('ETH_PROVIDER'))
def main():
@@ -88,29 +67,26 @@ def main():
r = rpc.do(o)
block_offset = int(strip_0x(r), 16) + 1
logg.debug('current block height {}'.format(block_offset))
logg.debug('starting at block {}'.format(block_offset))
syncers = []
#if SQLBackend.first(chain_spec):
# backend = SQLBackend.initial(chain_spec, block_offset)
syncer_backends = SQLBackend.resume(chain_spec, block_offset)
if len(syncer_backends) == 0:
initial_block_start = config.get('SYNCER_OFFSET')
initial_block_offset = block_offset
if config.get('SYNCER_NO_HISTORY'):
initial_block_start = block_offset
initial_block_offset += 1
syncer_backends.append(SQLBackend.initial(chain_spec, initial_block_offset, start_block_height=initial_block_start))
logg.info('found no backends to resume, adding initial sync from history start {} end {}'.format(initial_block_start, initial_block_offset))
logg.info('found no backends to resume')
syncer_backends.append(SQLBackend.initial(chain_spec, block_offset))
else:
for syncer_backend in syncer_backends:
logg.info('resuming sync session {}'.format(syncer_backend))
for syncer_backend in syncer_backends:
syncers.append(HistorySyncer(syncer_backend, cic_cache.cli.chain_interface))
syncers.append(HistorySyncer(syncer_backend))
syncer_backend = SQLBackend.live(chain_spec, block_offset+1)
syncers.append(HeadSyncer(syncer_backend, cic_cache.cli.chain_interface))
syncers.append(HeadSyncer(syncer_backend))
trusted_addresses_src = config.get('CIC_TRUST_ADDRESS')
if trusted_addresses_src == None:
@@ -121,22 +97,11 @@ def main():
logg.info('using trusted address {}'.format(address))
erc20_transfer_filter = ERC20TransferFilter(chain_spec)
faucet_filter = FaucetFilter(chain_spec)
filters = [
erc20_transfer_filter,
faucet_filter,
]
session = SessionBase.create_session()
register_filter_tags(filters, session)
session.close()
i = 0
for syncer in syncers:
logg.debug('running syncer index {}'.format(i))
for f in filters:
syncer.add_filter(f)
syncer.add_filter(erc20_transfer_filter)
r = syncer.loop(int(config.get('SYNCER_LOOP_INTERVAL')), rpc)
sys.stderr.write("sync {} done at block {}\n".format(syncer, r))

View File

@@ -2,17 +2,14 @@
import celery
# local imports
from cic_cache.cache import (
BloomCache,
DataCache,
)
from cic_cache.cache import BloomCache
from cic_cache.db.models.base import SessionBase
celery_app = celery.current_app
@celery_app.task(bind=True)
def tx_filter(self, offset, limit, address=None, oldest=False, encoding='hex'):
def tx_filter(self, offset, limit, address=None, encoding='hex'):
queue = self.request.delivery_info.get('routing_key')
session = SessionBase.create_session()
@@ -20,9 +17,9 @@ def tx_filter(self, offset, limit, address=None, oldest=False, encoding='hex'):
c = BloomCache(session)
b = None
if address == None:
(lowest_block, highest_block, bloom_filter_block, bloom_filter_tx) = c.load_transactions(offset, limit, oldest=oldest)
(lowest_block, highest_block, bloom_filter_block, bloom_filter_tx) = c.load_transactions(offset, limit)
else:
(lowest_block, highest_block, bloom_filter_block, bloom_filter_tx) = c.load_transactions_account(address, offset, limit, oldest=oldest)
(lowest_block, highest_block, bloom_filter_block, bloom_filter_tx) = c.load_transactions_account(address, offset, limit)
session.close()
@@ -38,17 +35,4 @@ def tx_filter(self, offset, limit, address=None, oldest=False, encoding='hex'):
return o
@celery_app.task(bind=True)
def tx_filter_content(self, offset, limit, address=None, block_offset=None, block_limit=None, oldest=False, encoding='hex'):
session = SessionBase.create_session()
c = DataCache(session)
b = None
if address == None:
(lowest_block, highest_block, tx_cache) = c.load_transactions_with_data(offset, limit, block_offset=block_offset, block_limit=block_limit, oldest=oldest)
else:
(lowest_block, highest_block, tx_cache) = c.load_transactions_account_with_data_index(address, offset, limit, block_offset=block_offset, block_limit=block_limit)
session.close()
return (lowest_block, highest_block, tx_cache,)

View File

@@ -4,7 +4,7 @@ import semver
version = (
0,
2,
1,
0,
'alpha.2',
)

View File

@@ -0,0 +1,2 @@
[bancor]
dir =

View File

@@ -1,3 +1,4 @@
[cic]
registry_address =
chain_spec =
trust_address =

View File

@@ -6,4 +6,4 @@ HOST=localhost
PORT=5432
ENGINE=postgresql
DRIVER=psycopg2
DEBUG=0
DEBUG=

View File

@@ -0,0 +1,3 @@
[bancor]
registry_address =
dir = /usr/local/share/bancor

View File

@@ -1,3 +1,4 @@
[cic]
chain_spec =
registry_address =
trust_address = 0xEb3907eCad74a0013c259D5874AE7f22DcBcC95C

View File

@@ -0,0 +1,2 @@
[eth]
provider = ws://localhost:63546

View File

@@ -1,4 +1,2 @@
[syncer]
loop_interval = 1
offset = 0
no_history = 0

View File

@@ -0,0 +1,2 @@
[eth]
provider = ws://localhost:8545

View File

@@ -0,0 +1,2 @@
[syncer]
loop_interval = 5

View File

@@ -1,4 +1,2 @@
[cic]
registry_address =
chain_spec =
trust_address =

View File

@@ -6,4 +6,4 @@ HOST=localhost
PORT=5432
ENGINE=sqlite
DRIVER=pysqlite
DEBUG=1
DEBUG=

View File

@@ -1,39 +1,56 @@
# syntax = docker/dockerfile:1.2
FROM registry.gitlab.com/grassrootseconomics/cic-base-images:python-3.8.6-dev-55da5f4e as dev
# RUN pip install $pip_extra_index_url_flag cic-base[full_graph]==0.1.2b9
FROM python:3.8.6-slim-buster
COPY requirements.txt .
#RUN pip install $pip_extra_index_url_flag -r test_requirements.txt
#RUN pip install $pip_extra_index_url_flag .
#RUN pip install .[server]
#COPY --from=0 /usr/local/share/cic/solidity/ /usr/local/share/cic/solidity/
ARG EXTRA_INDEX_URL="https://pip.grassrootseconomics.net:8433"
ARG GITLAB_PYTHON_REGISTRY="https://gitlab.com/api/v4/projects/27624814/packages/pypi/simple"
ARG EXTRA_PIP_ARGS=""
RUN --mount=type=cache,mode=0755,target=/root/.cache/pip \
pip install --index-url https://pypi.org/simple \
--extra-index-url $GITLAB_PYTHON_REGISTRY --extra-index-url $EXTRA_INDEX_URL $EXTRA_PIP_ARGS \
-r requirements.txt
WORKDIR /usr/src/cic-cache
COPY . .
ARG pip_extra_index_url_flag='--index https://pypi.org/simple --extra-index-url https://pip.grassrootseconomics.net:8433'
ARG root_requirement_file='requirements.txt'
RUN python setup.py install
#RUN apk update && \
# apk add gcc musl-dev gnupg libpq
#RUN apk add postgresql-dev
#RUN apk add linux-headers
#RUN apk add libffi-dev
RUN apt-get update && \
apt install -y gcc gnupg libpq-dev wget make g++ gnupg bash procps git
# Copy shared requirements from top of mono-repo
RUN echo "copying root req file ${root_requirement_file}"
RUN pip install $pip_extra_index_url_flag cic-base[full_graph]==0.1.2a76
COPY cic-cache/requirements.txt ./
COPY cic-cache/setup.cfg \
cic-cache/setup.py \
./
COPY cic-cache/cic_cache/ ./cic_cache/
COPY cic-cache/scripts/ ./scripts/
COPY cic-cache/test_requirements.txt ./
RUN pip install $pip_extra_index_url_flag -r test_requirements.txt
RUN pip install $pip_extra_index_url_flag .
RUN pip install .[server]
COPY cic-cache/tests/ ./tests/
#COPY db/ cic-cache/db
#RUN apk add postgresql-client
# ini files in config directory defines the configurable parameters for the application
# they can all be overridden by environment variables
# to generate a list of environment variables from configuration, use: confini-dump -z <dir> (executable provided by confini package)
COPY config/ /usr/local/etc/cic-cache/
COPY cic-cache/config/ /usr/local/etc/cic-cache/
# for db migrations
RUN git clone https://github.com/vishnubob/wait-for-it.git /usr/local/bin/wait-for-it/
COPY cic_cache/db/migrations/ /usr/local/share/cic-cache/alembic/
COPY cic-cache/cic_cache/db/migrations/ /usr/local/share/cic-cache/alembic/
COPY /docker/start_tracker.sh ./start_tracker.sh
COPY /docker/db.sh ./db.sh
RUN git clone https://gitlab.com/grassrootseconomics/cic-contracts.git && \
mkdir -p /usr/local/share/cic/solidity && \
cp -R cic-contracts/abis /usr/local/share/cic/solidity/abi
COPY cic-cache/docker/start_tracker.sh ./start_tracker.sh
COPY cic-cache/docker/db.sh ./db.sh
RUN chmod 755 ./*.sh
# Tracker
# ENTRYPOINT ["/usr/local/bin/cic-cache-tracker", "-vv"]
# Server
# ENTRYPOINT [ "/usr/local/bin/uwsgi", "--wsgi-file", "/usr/local/lib/python3.8/site-packages/cic_cache/runnable/server.py", "--http", ":80", "--pyargv", "-vv" ]
ENTRYPOINT []

View File

@@ -1,10 +0,0 @@
#! /bin/bash
set -e
pip install --extra-index-url https://pip.grassrootseconomics.net:8433 \
--extra-index-url https://gitlab.com/api/v4/projects/27624814/packages/pypi/simple \
-r test_requirements.txt
export PYTHONPATH=. && pytest -x --cov=cic_cache --cov-fail-under=90 --cov-report term-missing tests

View File

@@ -2,9 +2,4 @@
. ./db.sh
if [ $? -ne "0" ]; then
>&2 echo db migrate fail
exit 1
fi
/usr/local/bin/cic-cache-trackerd $@

View File

@@ -1,14 +1,12 @@
cic-base~=0.1.2a77
alembic==1.4.2
confini>=0.3.6rc4,<0.5.0
confini~=0.3.6rc3
uwsgi==2.0.19.1
moolb~=0.1.1b2
cic-eth-registry~=0.6.1a1
moolb~=0.1.0
cic-eth-registry~=0.5.4a16
SQLAlchemy==1.3.20
semver==2.13.0
psycopg2==2.8.6
celery==4.4.7
redis==3.5.3
chainsyncer[sql]>=0.0.6a3,<0.1.0
erc20-faucet>=0.3.2a2, <0.4.0
chainlib-eth>=0.0.9a14,<0.1.0
eth-address-index>=0.2.3a4,<0.3.0
chainsyncer[sql]~=0.0.2a2

View File

@@ -2,7 +2,6 @@
import os
import argparse
import logging
import re
import alembic
from alembic.config import Config as AlembicConfig
@@ -24,8 +23,6 @@ argparser = argparse.ArgumentParser()
argparser.add_argument('-c', type=str, default=config_dir, help='config file')
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
argparser.add_argument('--migrations-dir', dest='migrations_dir', default=migrationsdir, type=str, help='path to alembic migrations directory')
argparser.add_argument('--reset', action='store_true', help='downgrade before upgrading')
argparser.add_argument('-f', action='store_true', help='force action')
argparser.add_argument('-v', action='store_true', help='be verbose')
argparser.add_argument('-vv', action='store_true', help='be more verbose')
args = argparser.parse_args()
@@ -56,10 +53,4 @@ ac = AlembicConfig(os.path.join(migrations_dir, 'alembic.ini'))
ac.set_main_option('sqlalchemy.url', dsn)
ac.set_main_option('script_location', migrations_dir)
if args.reset:
if not args.f:
if not re.match(r'[yY][eE]?[sS]?', input('EEK! this will DELETE the existing db. are you sure??')):
logg.error('user chickened out on requested reset, bailing')
sys.exit(1)
alembic.command.downgrade(ac, 'base')
alembic.command.upgrade(ac, 'head')

View File

@@ -23,13 +23,11 @@ licence_files =
[options]
python_requires = >= 3.6
include_package_data = True
packages =
cic_cache
cic_cache.tasks
cic_cache.db
cic_cache.db.models
cic_cache.cli
cic_cache.runnable
cic_cache.runnable.daemons
cic_cache.runnable.daemons.filters
@@ -41,4 +39,3 @@ console_scripts =
cic-cache-trackerd = cic_cache.runnable.daemons.tracker:main
cic-cache-serverd = cic_cache.runnable.daemons.server:main
cic-cache-taskerd = cic_cache.runnable.daemons.tasker:main
cic-cache-list = cic_cache.runable.list:main

View File

@@ -4,7 +4,3 @@ pytest-mock==3.3.1
pysqlite3==0.4.3
sqlparse==0.4.1
pytest-celery==0.0.0a1
eth_tester==0.5.0b3
py-evm==0.3.0a20
sarafu-faucet~=0.0.7a1
erc20-transfer-authorization>=0.3.5a1,<0.4.0

View File

@@ -1,40 +0,0 @@
# standard imports
import os
# external imports
import chainlib.cli
# local imports
import cic_cache.cli
script_dir = os.path.dirname(os.path.realpath(__file__))
config_dir = os.path.join(script_dir, '..', 'testdata', 'config')
def test_argumentparserto_config():
argparser = cic_cache.cli.ArgumentParser()
local_flags = 0xffff
argparser.process_local_flags(local_flags)
argparser.add_argument('--foo', type=str)
args = argparser.parse_args([
'-q', 'baz',
'--offset', '13',
'--no-history',
'-r','0xdeadbeef',
'-vv',
'--foo', 'bar',
])
extra_args = {
'foo': '_BARBARBAR',
}
config = cic_cache.cli.Config.from_args(args, chainlib.cli.argflag_std_base, local_flags, extra_args=extra_args, base_config_dir=config_dir)
assert config.get('_BARBARBAR') == 'bar'
assert config.get('CELERY_QUEUE') == 'baz'
assert config.get('SYNCER_NO_HISTORY') == True
assert config.get('SYNCER_OFFSET') == 13
assert config.get('CIC_REGISTRY_ADDRESS') == '0xdeadbeef'

View File

@@ -1,17 +0,0 @@
# standard imports
import tempfile
# local imports
import cic_cache.cli
def test_cli_celery():
cf = tempfile.mkdtemp()
config = {
'CELERY_RESULT_URL': 'filesystem://' + cf,
}
cic_cache.cli.CeleryApp.from_config(config)
config['CELERY_BROKER_URL'] = 'filesystem://' + cf
cic_cache.cli.CeleryApp.from_config(config)

View File

@@ -1,68 +0,0 @@
# external imports
import pytest
from chainlib.eth.gas import (
Gas,
RPCGasOracle,
)
from chainlib.eth.nonce import RPCNonceOracle
from chainlib.eth.block import (
block_latest,
Block,
)
from chainlib.eth.pytest.fixtures_chain import default_chain_spec
from chainlib.eth.pytest.fixtures_ethtester import *
from cic_eth_registry.pytest.fixtures_contracts import *
from hexathon import add_0x
# local imports
import cic_cache.cli
@pytest.mark.xfail()
def test_cli_rpc(
eth_rpc,
eth_signer,
default_chain_spec,
):
config = {
'CHAIN_SPEC': str(default_chain_spec),
'RPC_HTTP_PROVIDER': 'http://localhost:8545',
}
rpc = cic_cache.cli.RPC.from_config(config, default_label='foo')
conn = rpc.get_by_label('foo')
#o = block_latest()
#conn.do(o)
def test_cli_chain(
default_chain_spec,
eth_rpc,
eth_signer,
contract_roles,
):
ifc = cic_cache.cli.EthChainInterface()
nonce_oracle = RPCNonceOracle(contract_roles['CONTRACT_DEPLOYER'], conn=eth_rpc)
gas_oracle = RPCGasOracle(conn=eth_rpc)
c = Gas(default_chain_spec, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, signer=eth_signer)
recipient = add_0x(os.urandom(20).hex())
(tx_hash, o) = c.create(contract_roles['CONTRACT_DEPLOYER'], recipient, 1024)
r = eth_rpc.do(o)
o = ifc.tx_receipt(r)
r = eth_rpc.do(o)
assert r['status'] == 1
o = ifc.block_by_number(1)
block_src = eth_rpc.do(o)
block = ifc.block_from_src(block_src)
assert block.number == 1
with pytest.raises(KeyError):
assert block_src['gasUsed'] == 21000
assert block_src['gas_used'] == 21000
block_src = ifc.src_normalize(block_src)
assert block_src['gasUsed'] == 21000
assert block_src['gas_used'] == 21000

View File

@@ -3,14 +3,11 @@ import os
import sys
import datetime
# external imports
# third-party imports
import pytest
import moolb
# local imports
from cic_cache import db
from cic_cache import BloomCache
from cic_cache.cache import DEFAULT_FILTER_SIZE
script_dir = os.path.dirname(os.path.realpath(__file__))
root_dir = os.path.dirname(script_dir)
@@ -64,6 +61,7 @@ def txs(
dt.timestamp(),
)
tx_number = 42
tx_hash_second = '0x' + os.urandom(32).hex()
tx_signed_second = '0x' + os.urandom(128).hex()
@@ -86,62 +84,3 @@ def txs(
session.commit()
return [
tx_hash_first,
tx_hash_second,
]
@pytest.fixture(scope='function')
def more_txs(
init_database,
list_defaults,
list_actors,
list_tokens,
txs,
):
session = init_database
tx_number = 666
tx_hash = '0x' + os.urandom(32).hex()
tx_signed = '0x' + os.urandom(128).hex()
nonce = 3
dt = datetime.datetime.utcnow()
dt += datetime.timedelta(hours=1)
db.add_transaction(
session,
tx_hash,
list_defaults['block']+2,
tx_number,
list_actors['alice'],
list_actors['diane'],
list_tokens['bar'],
list_tokens['bar'],
2048,
4096,
False,
dt.timestamp(),
)
session.commit()
return [tx_hash] + txs
@pytest.fixture(scope='function')
def tag_txs(
init_database,
txs,
):
db.add_tag(init_database, 'taag', domain='test')
init_database.commit()
db.tag_transaction(init_database, txs[1], 'taag', domain='test')
@pytest.fixture(scope='session')
def zero_filter():
return moolb.Bloom(DEFAULT_FILTER_SIZE, 3)

View File

@@ -1,3 +0,0 @@
from chainlib.eth.pytest import *
from cic_eth_registry.pytest.fixtures_tokens import *

View File

@@ -1,173 +0,0 @@
# standard imports
import os
import datetime
import logging
import json
# external imports
import pytest
from sqlalchemy import text
from chainlib.eth.tx import Tx
from chainlib.eth.block import Block
from chainlib.chain import ChainSpec
from chainlib.eth.error import RequestMismatchException
from hexathon import (
strip_0x,
add_0x,
)
# local imports
from cic_cache.db import add_tag
from cic_cache.runnable.daemons.filters.erc20 import ERC20TransferFilter
from cic_cache.runnable.daemons.filters.base import TagSyncFilter
logg = logging.getLogger()
def test_base_filter_str(
init_database,
):
f = TagSyncFilter('foo')
assert 'foo' == str(f)
f = TagSyncFilter('foo', domain='bar')
assert 'bar.foo' == str(f)
def test_erc20_filter(
eth_rpc,
foo_token,
init_database,
list_defaults,
list_actors,
tags,
):
chain_spec = ChainSpec('foo', 'bar', 42, 'baz')
fltr = ERC20TransferFilter(chain_spec)
add_tag(init_database, fltr.tag_name, domain=fltr.tag_domain)
data = 'a9059cbb'
data += strip_0x(list_actors['alice'])
data += '1000'.ljust(64, '0')
block = Block({
'hash': os.urandom(32).hex(),
'number': 42,
'timestamp': datetime.datetime.utcnow().timestamp(),
'transactions': [],
})
tx = Tx({
'to': foo_token,
'from': list_actors['bob'],
'data': data,
'value': 0,
'hash': os.urandom(32).hex(),
'nonce': 13,
'gasPrice': 10000000,
'gas': 123456,
})
block.txs.append(tx)
tx.block = block
r = fltr.filter(eth_rpc, block, tx, db_session=init_database)
assert r
s = text("SELECT x.tx_hash FROM tag a INNER JOIN tag_tx_link l ON l.tag_id = a.id INNER JOIN tx x ON x.id = l.tx_id WHERE a.domain = :a AND a.value = :b")
r = init_database.execute(s, {'a': fltr.tag_domain, 'b': fltr.tag_name}).fetchone()
assert r[0] == tx.hash
def test_erc20_filter_nocontract(
eth_rpc,
foo_token,
init_database,
list_defaults,
list_actors,
tags,
):
chain_spec = ChainSpec('foo', 'bar', 42, 'baz')
fltr = ERC20TransferFilter(chain_spec)
add_tag(init_database, fltr.tag_name, domain=fltr.tag_domain)
# incomplete args
data = 'a9059cbb'
data += strip_0x(list_actors['alice'])
data += '1000'.ljust(64, '0')
block = Block({
'hash': os.urandom(32).hex(),
'number': 42,
'timestamp': datetime.datetime.utcnow().timestamp(),
'transactions': [],
})
tx = Tx({
'to': os.urandom(20).hex(),
'from': list_actors['bob'],
'data': data,
'value': 0,
'hash': os.urandom(32).hex(),
'nonce': 13,
'gasPrice': 10000000,
'gas': 123456,
})
block.txs.append(tx)
tx.block = block
assert not fltr.filter(eth_rpc, block, tx, db_session=init_database)
@pytest.mark.parametrize(
'contract_method,contract_input,expected_exception',
[
('a9059cbb', os.urandom(32).hex(), ValueError), # not enough args
('a9059cbb', os.urandom(31).hex(), ValueError), # wrong arg boundary
('a9059cbc', os.urandom(64).hex(), RequestMismatchException), # wrong method
],
)
def test_erc20_filter_bogus(
eth_rpc,
foo_token,
init_database,
list_defaults,
list_actors,
tags,
contract_method,
contract_input,
expected_exception,
):
chain_spec = ChainSpec('foo', 'bar', 42, 'baz')
fltr = ERC20TransferFilter(chain_spec)
add_tag(init_database, fltr.tag_name, domain=fltr.tag_domain)
# incomplete args
data = contract_method
data += contract_input
block = Block({
'hash': os.urandom(32).hex(),
'number': 42,
'timestamp': datetime.datetime.utcnow().timestamp(),
'transactions': [],
})
tx = Tx({
'to': foo_token,
'from': list_actors['bob'],
'data': data,
'value': 0,
'hash': os.urandom(32).hex(),
'nonce': 13,
'gasPrice': 10000000,
'gas': 123456,
})
block.txs.append(tx)
tx.block = block
assert not fltr.filter(eth_rpc, block, tx, db_session=init_database)

View File

@@ -1,71 +0,0 @@
# standard imports
import logging
# external imports
from chainlib.chain import ChainSpec
from chainlib.eth.nonce import RPCNonceOracle
from chainlib.eth.block import (
block_by_hash,
Block,
)
from chainlib.eth.tx import (
receipt,
unpack,
transaction,
Tx,
)
from hexathon import strip_0x
from erc20_faucet.faucet import SingleShotFaucet
from sqlalchemy import text
# local imports
from cic_cache.db import add_tag
from cic_cache.runnable.daemons.filters.faucet import FaucetFilter
logg = logging.getLogger()
def test_filter_faucet(
eth_rpc,
eth_signer,
foo_token,
faucet_noregistry,
init_database,
list_defaults,
contract_roles,
agent_roles,
tags,
):
chain_spec = ChainSpec('foo', 'bar', 42, 'baz')
fltr = FaucetFilter(chain_spec, contract_roles['CONTRACT_DEPLOYER'])
add_tag(init_database, fltr.tag_name, domain=fltr.tag_domain)
nonce_oracle = RPCNonceOracle(agent_roles['ALICE'], eth_rpc)
c = SingleShotFaucet(chain_spec, signer=eth_signer, nonce_oracle=nonce_oracle)
(tx_hash_hex, o) = c.give_to(faucet_noregistry, agent_roles['ALICE'], agent_roles['ALICE'])
r = eth_rpc.do(o)
tx_src = unpack(bytes.fromhex(strip_0x(o['params'][0])), chain_spec)
o = receipt(r)
r = eth_rpc.do(o)
rcpt = Tx.src_normalize(r)
assert r['status'] == 1
o = block_by_hash(r['block_hash'])
r = eth_rpc.do(o)
block_object = Block(r)
tx = Tx(tx_src, block_object)
tx.apply_receipt(rcpt)
r = fltr.filter(eth_rpc, block_object, tx, init_database)
assert r
s = text("SELECT x.tx_hash FROM tag a INNER JOIN tag_tx_link l ON l.tag_id = a.id INNER JOIN tx x ON x.id = l.tx_id WHERE a.domain = :a AND a.value = :b")
r = init_database.execute(s, {'a': fltr.tag_domain, 'b': fltr.tag_name}).fetchone()
assert r[0] == tx.hash

View File

@@ -2,7 +2,7 @@
import os
import logging
# external imports
# third-party imports
import pytest
import confini
@@ -13,7 +13,7 @@ logg = logging.getLogger(__file__)
@pytest.fixture(scope='session')
def load_config():
config_dir = os.path.join(root_dir, 'config/test')
config_dir = os.path.join(root_dir, '.config/test')
conf = confini.Config(config_dir, 'CICTEST')
conf.process()
logg.debug('config {}'.format(conf))

View File

@@ -3,16 +3,13 @@ import os
import logging
import re
# external imports
# third-party imports
import pytest
import sqlparse
import alembic
from alembic.config import Config as AlembicConfig
# local imports
from cic_cache.db.models.base import SessionBase
from cic_cache.db import dsn_from_config
from cic_cache.db import add_tag
logg = logging.getLogger(__file__)
@@ -29,10 +26,11 @@ def database_engine(
except FileNotFoundError:
pass
dsn = dsn_from_config(load_config)
SessionBase.connect(dsn, debug=load_config.true('DATABASE_DEBUG'))
SessionBase.connect(dsn)
return dsn
# TODO: use alembic instead to migrate db, here we have to keep separate schema than migration script in script/migrate.py
@pytest.fixture(scope='function')
def init_database(
load_config,
@@ -40,23 +38,52 @@ def init_database(
):
rootdir = os.path.dirname(os.path.dirname(__file__))
dbdir = os.path.join(rootdir, 'cic_cache', 'db')
migrationsdir = os.path.join(dbdir, 'migrations', load_config.get('DATABASE_ENGINE'))
if not os.path.isdir(migrationsdir):
migrationsdir = os.path.join(dbdir, 'migrations', 'default')
logg.info('using migrations directory {}'.format(migrationsdir))
schemadir = os.path.join(rootdir, 'db', load_config.get('DATABASE_DRIVER'))
if load_config.get('DATABASE_ENGINE') == 'sqlite':
rconn = SessionBase.engine.raw_connection()
f = open(os.path.join(schemadir, 'db.sql'))
s = f.read()
f.close()
rconn.executescript(s)
else:
rconn = SessionBase.engine.raw_connection()
rcursor = rconn.cursor()
#rcursor.execute('DROP FUNCTION IF EXISTS public.transaction_list')
#rcursor.execute('DROP FUNCTION IF EXISTS public.balances')
f = open(os.path.join(schemadir, 'db.sql'))
s = f.read()
f.close()
r = re.compile(r'^[A-Z]', re.MULTILINE)
for l in sqlparse.parse(s):
strl = str(l)
# we need to check for empty query lines, as sqlparse doesn't do that on its own (and psycopg complains when it gets them)
if not re.search(r, strl):
logg.warning('skipping parsed query line {}'.format(strl))
continue
rcursor.execute(strl)
rconn.commit()
rcursor.execute('SET search_path TO public')
# this doesn't work when run separately, no idea why
# functions have been manually added to original schema from cic-eth
# f = open(os.path.join(schemadir, 'proc_transaction_list.sql'))
# s = f.read()
# f.close()
# rcursor.execute(s)
#
# f = open(os.path.join(schemadir, 'proc_balances.sql'))
# s = f.read()
# f.close()
# rcursor.execute(s)
rcursor.close()
session = SessionBase.create_session()
ac = AlembicConfig(os.path.join(migrationsdir, 'alembic.ini'))
ac.set_main_option('sqlalchemy.url', database_engine)
ac.set_main_option('script_location', migrationsdir)
alembic.command.downgrade(ac, 'base')
alembic.command.upgrade(ac, 'head')
session.commit()
yield session
session.commit()
session.close()
@@ -89,14 +116,3 @@ def list_defaults(
return {
'block': 420000,
}
@pytest.fixture(scope='function')
def tags(
init_database,
):
add_tag(init_database, 'foo')
add_tag(init_database, 'baz', domain='bar')
add_tag(init_database, 'xyzzy', domain='bar')
init_database.commit()

View File

@@ -1,31 +0,0 @@
# standard imports
import json
# external imports
import pytest
# local imports
from cic_cache.runnable.daemons.query import process_transactions_all_data
def test_api_all_data(
init_database,
txs,
):
env = {
'PATH_INFO': '/txa/410000/420000',
'HTTP_X_CIC_CACHE_MODE': 'all',
}
j = process_transactions_all_data(init_database, env)
o = json.loads(j[1])
assert len(o['data']) == 2
env = {
'PATH_INFO': '/txa/420000/410000',
'HTTP_X_CIC_CACHE_MODE': 'all',
}
with pytest.raises(ValueError):
j = process_transactions_all_data(init_database, env)

View File

@@ -4,13 +4,11 @@ import datetime
import logging
import json
# external imports
# third-party imports
import pytest
# local imports
from cic_cache import db
from cic_cache import BloomCache
from cic_cache.cache import DataCache
logg = logging.getLogger()
@@ -19,6 +17,7 @@ def test_cache(
init_database,
list_defaults,
list_actors,
list_tokens,
txs,
):
@@ -34,219 +33,3 @@ def test_cache(
assert b[0] == list_defaults['block'] - 1
def test_cache_data(
init_database,
txs,
tag_txs,
):
session = init_database
c = DataCache(session)
b = c.load_transactions_with_data(0, 3) #410000, 420000) #, 100, block_offset=410000, block_limit=420000, oldest=True)
assert len(b[2]) == 2
assert b[2][0]['tx_hash'] == txs[0]
assert b[2][0]['tx_type'] == 'unknown'
assert b[2][1]['tx_type'] == 'test.taag'
def test_cache_ranges(
init_database,
list_defaults,
list_actors,
list_tokens,
more_txs,
):
session = init_database
oldest = list_defaults['block'] - 1
mid = list_defaults['block']
newest = list_defaults['block'] + 2
c = BloomCache(session)
b = c.load_transactions(0, 100)
assert b[0] == oldest
assert b[1] == newest
b = c.load_transactions(1, 2)
assert b[0] == oldest
assert b[1] == mid
b = c.load_transactions(0, 2)
assert b[0] == mid
assert b[1] == newest
b = c.load_transactions(0, 1)
assert b[0] == newest
assert b[1] == newest
b = c.load_transactions(0, 100, oldest=True)
assert b[0] == oldest
assert b[1] == newest
b = c.load_transactions(0, 100, block_offset=list_defaults['block'])
assert b[0] == mid
assert b[1] == newest
b = c.load_transactions(0, 100, block_offset=list_defaults['block'] - 1, block_limit=list_defaults['block'])
assert b[0] == oldest
assert b[1] == mid
b = c.load_transactions(0, 100, block_offset=list_defaults['block'] - 1, block_limit=list_defaults['block'], oldest=True)
assert b[0] == oldest
assert b[1] == mid
# now check when supplying account
b = c.load_transactions_account(list_actors['alice'], 0, 100)
assert b[0] == oldest
assert b[1] == newest
b = c.load_transactions_account(list_actors['bob'], 0, 100)
assert b[0] == mid
assert b[1] == mid
b = c.load_transactions_account(list_actors['diane'], 0, 100)
assert b[0] == oldest
assert b[1] == newest
# add block filter to the mix
b = c.load_transactions_account(list_actors['alice'], 0, 100, block_offset=list_defaults['block'])
assert b[0] == mid
assert b[1] == newest
b = c.load_transactions_account(list_actors['alice'], 0, 100, block_offset=list_defaults['block'])
assert b[0] == mid
assert b[1] == newest
b = c.load_transactions_account(list_actors['bob'], 0, 100, block_offset=list_defaults['block'] - 1, block_limit=list_defaults['block'])
assert b[0] == mid
assert b[1] == mid
b = c.load_transactions_account(list_actors['diane'], 0, 100, block_offset=list_defaults['block'] - 1, block_limit=list_defaults['block'])
assert b[0] == oldest
assert b[1] == oldest
def test_cache_ranges_data(
init_database,
list_defaults,
list_actors,
list_tokens,
more_txs,
):
session = init_database
oldest = list_defaults['block'] - 1
mid = list_defaults['block']
newest = list_defaults['block'] + 2
c = DataCache(session)
b = c.load_transactions_with_data(0, 100)
assert b[0] == oldest
assert b[1] == newest
assert len(b[2]) == 3
assert b[2][0]['tx_hash'] == more_txs[0]
assert b[2][2]['tx_hash'] == more_txs[2]
b = c.load_transactions_with_data(1, 2)
assert b[0] == oldest
assert b[1] == mid
assert len(b[2]) == 2
assert b[2][0]['tx_hash'] == more_txs[1]
assert b[2][1]['tx_hash'] == more_txs[2]
b = c.load_transactions_with_data(0, 2)
assert b[0] == mid
assert b[1] == newest
assert len(b[2]) == 2
assert b[2][0]['tx_hash'] == more_txs[0]
assert b[2][1]['tx_hash'] == more_txs[1]
b = c.load_transactions_with_data(0, 1)
assert b[0] == newest
assert b[1] == newest
assert len(b[2]) == 1
assert b[2][0]['tx_hash'] == more_txs[0]
b = c.load_transactions_with_data(0, 100, oldest=True)
assert b[0] == oldest
assert b[1] == newest
assert len(b[2]) == 3
assert b[2][0]['tx_hash'] == more_txs[2]
assert b[2][1]['tx_hash'] == more_txs[1]
assert b[2][2]['tx_hash'] == more_txs[0]
b = c.load_transactions_with_data(0, 100, block_offset=list_defaults['block'])
assert b[0] == mid
assert b[1] == newest
assert len(b[2]) == 2
assert b[2][0]['tx_hash'] == more_txs[0]
assert b[2][1]['tx_hash'] == more_txs[1]
b = c.load_transactions_with_data(0, 100, block_offset=list_defaults['block'] - 1, block_limit=list_defaults['block'])
assert b[0] == oldest
assert b[1] == mid
assert len(b[2]) == 2
assert b[2][0]['tx_hash'] == more_txs[1]
assert b[2][1]['tx_hash'] == more_txs[2]
b = c.load_transactions_with_data(0, 100, block_offset=list_defaults['block'] - 1, block_limit=list_defaults['block'], oldest=True)
assert b[0] == oldest
assert b[1] == mid
assert len(b[2]) == 2
assert b[2][0]['tx_hash'] == more_txs[2]
assert b[2][1]['tx_hash'] == more_txs[1]
# now check when supplying account
b = c.load_transactions_account_with_data(list_actors['alice'], 0, 100)
assert b[0] == oldest
assert b[1] == newest
assert len(b[2]) == 3
assert b[2][0]['tx_hash'] == more_txs[0]
assert b[2][1]['tx_hash'] == more_txs[1]
assert b[2][2]['tx_hash'] == more_txs[2]
b = c.load_transactions_account_with_data(list_actors['bob'], 0, 100)
assert b[0] == mid
assert b[1] == mid
assert len(b[2]) == 1
assert b[2][0]['tx_hash'] == more_txs[1]
b = c.load_transactions_account_with_data(list_actors['diane'], 0, 100)
assert b[0] == oldest
assert b[1] == newest
assert len(b[2]) == 2
assert b[2][0]['tx_hash'] == more_txs[0]
assert b[2][1]['tx_hash'] == more_txs[2]
# add block filter to the mix
b = c.load_transactions_account_with_data(list_actors['alice'], 0, 100, block_offset=list_defaults['block'])
assert b[0] == mid
assert b[1] == newest
assert len(b[2]) == 2
assert b[2][0]['tx_hash'] == more_txs[0]
assert b[2][1]['tx_hash'] == more_txs[1]
b = c.load_transactions_account_with_data(list_actors['alice'], 0, 100, block_offset=list_defaults['block'])
assert b[0] == mid
assert b[1] == newest
assert len(b[2]) == 2
assert b[2][0]['tx_hash'] == more_txs[0]
assert b[2][1]['tx_hash'] == more_txs[1]
b = c.load_transactions_account_with_data(list_actors['bob'], 0, 100, block_offset=list_defaults['block'] - 1, block_limit=list_defaults['block'])
assert b[0] == mid
assert b[1] == mid
assert len(b[2]) == 1
assert b[2][0]['tx_hash'] == more_txs[1]
b = c.load_transactions_account_with_data(list_actors['diane'], 0, 100, block_offset=list_defaults['block'] - 1, block_limit=list_defaults['block'])
assert b[0] == oldest
assert b[1] == oldest
assert len(b[2]) == 1
assert b[2][0]['tx_hash'] == more_txs[2]

View File

@@ -1,230 +0,0 @@
# standard imports
import logging
import json
import base64
import copy
import re
# external imports
import pytest
from hexathon import strip_0x
# local imports
from cic_cache.runnable.daemons.query import *
logg = logging.getLogger()
@pytest.mark.parametrize(
'query_path_prefix, query_role, query_address_index, query_offset, query_offset_index, query_limit, query_limit_index, match_re',
[
('/tx/user/', 'alice', 0, None, 3, None, 5, re_transactions_account_bloom),
('/tx/user/', 'alice', 0, 42, 3, None, 5, re_transactions_account_bloom),
('/tx/user/', 'alice', 0, 42, 3, 13, 5, re_transactions_account_bloom),
('/tx/', None, 0, None, 3, None, 5, re_transactions_all_bloom),
('/tx/', None, 0, 42, 3, None, 5, re_transactions_all_bloom),
('/tx/', None, 0, 42, 3, 13, 5, re_transactions_all_bloom),
('/txa/', None, 0, None, 3, None, 5, re_transactions_all_data),
('/txa/', None, 0, 42, 3, None, 5, re_transactions_all_data),
('/txa/', None, 0, 42, 3, 13, 5, re_transactions_all_data),
],
)
def test_query_regex(
list_actors,
query_path_prefix,
query_role,
query_address_index,
query_offset,
query_offset_index,
query_limit,
query_limit_index,
match_re,
):
paths = []
path = query_path_prefix
query_address = None
if query_role != None:
query_address = strip_0x(list_actors[query_role])
paths.append(path + '0x' + query_address)
paths.append(path + query_address)
if query_offset != None:
if query_limit != None:
for i in range(len(paths)-1):
paths[i] += '/{}/{}'.format(query_offset, query_limit)
else:
for i in range(len(paths)-1):
paths[i] += '/' + str(query_offset)
for i in range(len(paths)):
paths.append(paths[i] + '/')
for p in paths:
logg.debug('testing path {} against {}'.format(p, match_re))
m = re.match(match_re, p)
l = len(m.groups())
logg.debug('laast index match {} groups {}'.format(m.lastindex, l))
for i in range(l+1):
logg.debug('group {} {}'.format(i, m[i]))
if m.lastindex >= query_offset_index:
assert query_offset == int(m[query_offset_index + 1])
if m.lastindex >= query_limit_index:
assert query_limit == int(m[query_limit_index + 1])
if query_address_index != None:
match_address = strip_0x(m[query_address_index + 1])
assert query_address == match_address
@pytest.mark.parametrize(
'role_name, query_offset, query_limit, query_match',
[
('alice', None, None, [(420000, 13), (419999, 42)]),
('alice', None, 1, [(420000, 13)]),
('alice', 1, None, [(419999, 42)]), # 420000 == list_defaults['block']
('alice', 2, None, []), # 420000 == list_defaults['block']
],
)
def test_query_process_txs_account(
init_database,
list_defaults,
list_actors,
list_tokens,
txs,
zero_filter,
role_name,
query_offset,
query_limit,
query_match,
):
actor = None
try:
actor = list_actors[role_name]
except KeyError:
actor = os.urandom(20).hex()
path_info = '/tx/user/0x' + strip_0x(actor)
if query_offset != None:
path_info += '/' + str(query_offset)
if query_limit != None:
if query_offset == None:
path_info += '/0'
path_info += '/' + str(query_limit)
env = {
'PATH_INFO': path_info,
}
logg.debug('using path {}'.format(path_info))
r = process_transactions_account_bloom(init_database, env)
assert r != None
o = json.loads(r[1])
block_filter_data = base64.b64decode(o['block_filter'].encode('utf-8'))
zero_filter_data = zero_filter.to_bytes()
if len(query_match) == 0:
assert block_filter_data == zero_filter_data
return
assert block_filter_data != zero_filter_data
block_filter = copy.copy(zero_filter)
block_filter.merge(block_filter_data)
block_filter_data = block_filter.to_bytes()
assert block_filter_data != zero_filter_data
for (block, tx) in query_match:
block = block.to_bytes(4, byteorder='big')
assert block_filter.check(block)
@pytest.mark.parametrize(
'query_offset, query_limit, query_match',
[
(None, 2, [(420000, 13), (419999, 42)]),
(0, 1, [(420000, 13)]),
(1, 1, [(419999, 42)]),
(2, 0, []),
],
)
def test_query_process_txs_bloom(
init_database,
list_defaults,
list_actors,
list_tokens,
txs,
zero_filter,
query_offset,
query_limit,
query_match,
):
path_info = '/tx'
if query_offset != None:
path_info += '/' + str(query_offset)
if query_limit != None:
if query_offset == None:
path_info += '/0'
path_info += '/' + str(query_limit)
env = {
'PATH_INFO': path_info,
}
logg.debug('using path {}'.format(path_info))
r = process_transactions_all_bloom(init_database, env)
assert r != None
o = json.loads(r[1])
block_filter_data = base64.b64decode(o['block_filter'].encode('utf-8'))
zero_filter_data = zero_filter.to_bytes()
if len(query_match) == 0:
assert block_filter_data == zero_filter_data
return
assert block_filter_data != zero_filter_data
block_filter = copy.copy(zero_filter)
block_filter.merge(block_filter_data)
block_filter_data = block_filter.to_bytes()
assert block_filter_data != zero_filter_data
for (block, tx) in query_match:
block = block.to_bytes(4, byteorder='big')
assert block_filter.check(block)
@pytest.mark.parametrize(
'query_block_start, query_block_end, query_match_count',
[
(None, 42, 0),
(420000, 420001, 1),
(419999, 419999, 1), # matches are inclusive
(419999, 420000, 2),
(419999, 420001, 2),
],
)
def test_query_process_txs_data(
init_database,
list_defaults,
list_actors,
list_tokens,
txs,
zero_filter,
query_block_start,
query_block_end,
query_match_count,
):
path_info = '/txa'
if query_block_start != None:
path_info += '/' + str(query_block_start)
if query_block_end != None:
if query_block_start == None:
path_info += '/0'
path_info += '/' + str(query_block_end)
env = {
'PATH_INFO': path_info,
'HTTP_X_CIC_CACHE_MODE': 'all',
}
logg.debug('using path {}'.format(path_info))
r = process_transactions_all_data(init_database, env)
assert r != None
o = json.loads(r[1])
assert len(o['data']) == query_match_count

View File

@@ -1,37 +0,0 @@
import os
import datetime
import logging
import json
# external imports
import pytest
# local imports
from cic_cache.db import tag_transaction
logg = logging.getLogger()
def test_cache(
init_database,
list_defaults,
list_actors,
list_tokens,
txs,
tags,
):
tag_transaction(init_database, txs[0], 'foo')
tag_transaction(init_database, txs[0], 'baz', domain='bar')
tag_transaction(init_database, txs[1], 'xyzzy', domain='bar')
r = init_database.execute("SELECT x.tx_hash FROM tag a INNER JOIN tag_tx_link l ON l.tag_id = a.id INNER JOIN tx x ON x.id = l.tx_id WHERE a.value = 'foo'").fetchall()
assert r[0][0] == txs[0]
r = init_database.execute("SELECT x.tx_hash FROM tag a INNER JOIN tag_tx_link l ON l.tag_id = a.id INNER JOIN tx x ON x.id = l.tx_id WHERE a.domain = 'bar' AND a.value = 'baz'").fetchall()
assert r[0][0] == txs[0]
r = init_database.execute("SELECT x.tx_hash FROM tag a INNER JOIN tag_tx_link l ON l.tag_id = a.id INNER JOIN tx x ON x.id = l.tx_id WHERE a.domain = 'bar' AND a.value = 'xyzzy'").fetchall()
assert r[0][0] == txs[1]

View File

@@ -1,2 +0,0 @@
[foo]
bar_baz = xyzzy

View File

@@ -1 +0,0 @@
include *requirements.txt

View File

@@ -1,53 +0,0 @@
# standard imports
import logging
# external imports
import celery
from erc20_demurrage_token.demurrage import DemurrageCalculator
from chainlib.connection import RPCConnection
from chainlib.chain import ChainSpec
from chainlib.eth.constant import ZERO_ADDRESS
from cic_eth_registry import CICRegistry
logg = logging.getLogger(__name__)
celery_app = celery.current_app
class NoopCalculator:
def amount_since(self, amount, timestamp):
logg.debug('noopcalculator amount {} timestamp {}'.format(amount, timestamp))
return amount
class DemurrageCalculationTask(celery.Task):
demurrage_token_calcs = {}
@classmethod
def register_token(cls, rpc, chain_spec, token_symbol, sender_address=ZERO_ADDRESS):
registry = CICRegistry(chain_spec, rpc)
token_address = registry.by_name(token_symbol, sender_address=sender_address)
try:
c = DemurrageCalculator.from_contract(rpc, chain_spec, token_address, sender_address=sender_address)
logg.info('found demurrage calculator for ERC20 {} @ {}'.format(token_symbol, token_address))
except:
logg.warning('Token {} at address {} does not appear to be a demurrage contract. Calls to balance adjust for this token will always return the same amount'.format(token_symbol, token_address))
c = NoopCalculator()
cls.demurrage_token_calcs[token_symbol] = c
@celery_app.task(bind=True, base=DemurrageCalculationTask)
def get_adjusted_balance(self, token_symbol, amount, timestamp):
c = self.demurrage_token_calcs[token_symbol]
return c.amount_since(amount, timestamp)
def aux_setup(rpc, config, sender_address=ZERO_ADDRESS):
chain_spec_str = config.get('CHAIN_SPEC')
chain_spec = ChainSpec.from_chain_str(chain_spec_str)
token_symbol = config.get('CIC_DEFAULT_TOKEN_SYMBOL')
DemurrageCalculationTask.register_token(rpc, chain_spec, token_symbol, sender_address=sender_address)

View File

@@ -1,30 +0,0 @@
# standard imports
import logging
# external imports
import celery
from cic_eth.api.base import ApiBase
app = celery.current_app
logg = logging.getLogger(__name__)
class Api(ApiBase):
def get_adjusted_balance(self, token_symbol, balance, timestamp):
s = celery.signature(
'cic_eth_aux.erc20_demurrage_token.get_adjusted_balance',
[
token_symbol,
balance,
timestamp,
],
queue=None,
)
if self.callback_param != None:
s.link(self.callback_success)
s.link.on_error(self.callback_error)
t = s.apply_async(queue=self.queue)
return t

View File

@@ -1,5 +0,0 @@
celery==4.4.7
erc20-demurrage-token~=0.0.5a3
cic-eth-registry~=0.6.1a5
chainlib~=0.0.9rc3
cic_eth~=0.12.4a9

View File

@@ -1,30 +0,0 @@
[metadata]
name = cic-eth-aux-erc20-demurrage-token
version = 0.0.2a6
description = cic-eth tasks supporting erc20 demurrage token
author = Louis Holbrook
author_email = dev@holbrook.no
url = https://gitlab.com/ccicnet/erc20-demurrage-token
keywords =
ethereum
blockchain
cryptocurrency
erc20
classifiers =
Programming Language :: Python :: 3
Operating System :: OS Independent
Development Status :: 3 - Alpha
Environment :: No Input/Output (Daemon)
Intended Audience :: Developers
License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)
Topic :: Internet
#Topic :: Blockchain :: EVM
license = GPL3
licence_files =
LICENSE
[options]
include_package_data = True
python_requires = >= 3.6
packages =
cic_eth_aux.erc20_demurrage_token

View File

@@ -1,25 +0,0 @@
from setuptools import setup
requirements = []
f = open('requirements.txt', 'r')
while True:
l = f.readline()
if l == '':
break
requirements.append(l.rstrip())
f.close()
test_requirements = []
f = open('test_requirements.txt', 'r')
while True:
l = f.readline()
if l == '':
break
test_requirements.append(l.rstrip())
f.close()
setup(
install_requires=requirements,
tests_require=test_requirements,
)

View File

@@ -1,11 +0,0 @@
pytest==6.0.1
pytest-celery==0.0.0a1
pytest-mock==3.3.1
pytest-cov==2.10.1
eth-tester==0.5.0b3
py-evm==0.3.0a20
SQLAlchemy==1.3.20
liveness~=0.0.1a7
eth-accounts-index==0.1.1a1
eth-contract-registry==0.5.8a1
eth-address-index==0.2.1a1

View File

@@ -1,88 +0,0 @@
# external imports
import celery
from chainlib.eth.pytest.fixtures_chain import *
from chainlib.eth.pytest.fixtures_ethtester import *
from cic_eth_registry.pytest.fixtures_contracts import *
from cic_eth_registry.pytest.fixtures_tokens import *
from erc20_demurrage_token.unittest.base import TestTokenDeploy
from erc20_demurrage_token.token import DemurrageToken
from eth_token_index.index import TokenUniqueSymbolIndex
from eth_address_declarator.declarator import AddressDeclarator
# cic-eth imports
from cic_eth.pytest.fixtures_celery import *
from cic_eth.pytest.fixtures_token import *
from cic_eth.pytest.fixtures_config import *
@pytest.fixture(scope='function')
def demurrage_token(
default_chain_spec,
eth_rpc,
token_registry,
contract_roles,
eth_signer,
):
d = TestTokenDeploy(eth_rpc, token_symbol='BAR', token_name='Bar Token')
nonce_oracle = RPCNonceOracle(contract_roles['CONTRACT_DEPLOYER'], conn=eth_rpc)
c = DemurrageToken(default_chain_spec, signer=eth_signer, nonce_oracle=nonce_oracle)
token_address = d.deploy(eth_rpc, contract_roles['CONTRACT_DEPLOYER'], c, 'SingleNocap')
logg.debug('demurrage token contract "BAR" deployed to {}'.format(token_address))
return token_address
@pytest.fixture(scope='function')
def demurrage_token_symbol(
default_chain_spec,
eth_rpc,
demurrage_token,
contract_roles,
):
c = DemurrageToken(default_chain_spec)
o = c.symbol(demurrage_token, sender_address=contract_roles['CONTRACT_DEPLOYER'])
r = eth_rpc.do(o)
return c.parse_symbol(r)
@pytest.fixture(scope='function')
def demurrage_token_declaration(
foo_token_declaration,
):
return foo_token_declaration
@pytest.fixture(scope='function')
def register_demurrage_token(
default_chain_spec,
token_registry,
eth_rpc,
eth_signer,
register_lookups,
contract_roles,
demurrage_token_declaration,
demurrage_token,
address_declarator,
):
nonce_oracle = RPCNonceOracle(contract_roles['CONTRACT_DEPLOYER'], eth_rpc)
c = TokenUniqueSymbolIndex(default_chain_spec, signer=eth_signer, nonce_oracle=nonce_oracle)
(tx_hash_hex, o) = c.register(token_registry, contract_roles['CONTRACT_DEPLOYER'], demurrage_token)
eth_rpc.do(o)
o = receipt(tx_hash_hex)
r = eth_rpc.do(o)
assert r['status'] == 1
nonce_oracle = RPCNonceOracle(contract_roles['TRUSTED_DECLARATOR'], eth_rpc)
c = AddressDeclarator(default_chain_spec, signer=eth_signer, nonce_oracle=nonce_oracle)
(tx_hash_hex, o) = c.add_declaration(address_declarator, contract_roles['TRUSTED_DECLARATOR'], demurrage_token, demurrage_token_declaration)
eth_rpc.do(o)
o = receipt(tx_hash_hex)
r = eth_rpc.do(o)
assert r['status'] == 1
return token_registry

View File

@@ -1,69 +0,0 @@
# standard imports
import logging
import copy
import datetime
# external imports
import celery
# cic-eth imports
from cic_eth_aux.erc20_demurrage_token import (
DemurrageCalculationTask,
aux_setup,
)
from cic_eth_aux.erc20_demurrage_token.api import Api as AuxApi
logg = logging.getLogger()
def test_demurrage_calulate_task(
default_chain_spec,
eth_rpc,
cic_registry,
celery_session_worker,
register_demurrage_token,
demurrage_token_symbol,
contract_roles,
load_config,
):
config = copy.copy(load_config)
config.add(str(default_chain_spec), 'CIC_CHAIN_SPEC', exists_ok=True)
config.add(demurrage_token_symbol, 'CIC_DEFAULT_TOKEN_SYMBOL', exists_ok=True)
aux_setup(eth_rpc, load_config, sender_address=contract_roles['CONTRACT_DEPLOYER'])
since = datetime.datetime.utcnow() - datetime.timedelta(minutes=1)
s = celery.signature(
'cic_eth_aux.erc20_demurrage_token.get_adjusted_balance',
[
demurrage_token_symbol,
1000,
since.timestamp(),
],
queue=None,
)
t = s.apply_async()
r = t.get_leaf()
assert t.successful()
assert r == 980
def test_demurrage_calculate_api(
default_chain_spec,
eth_rpc,
cic_registry,
celery_session_worker,
register_demurrage_token,
demurrage_token_symbol,
contract_roles,
load_config,
):
api = AuxApi(str(default_chain_spec), queue=None)
since = datetime.datetime.utcnow() - datetime.timedelta(minutes=1)
t = api.get_adjusted_balance(demurrage_token_symbol, 1000, since.timestamp())
r = t.get_leaf()
assert t.successful()
assert r == 980

View File

@@ -5,6 +5,3 @@ omit =
cic_eth/db/migrations/*
cic_eth/sync/head.py
cic_eth/sync/mempool.py
cic_eth/queue/state.py
cic_eth/cli
*redis*.py

View File

@@ -1,6 +0,0 @@
.git
.cache
.dot
**/doc
**/.venv
**/venv

View File

@@ -1,16 +1,22 @@
build-test-cic-eth:
stage: test
tags:
- integration
variables:
APP_NAME: cic-eth
MR_IMAGE_TAG: mr-$APP_NAME-$CI_COMMIT_REF_SLUG-$CI_COMMIT_SHORT_SHA
script:
- cd apps/cic-eth
- docker build -t $MR_IMAGE_TAG -f docker/Dockerfile .
- docker run $MR_IMAGE_TAG sh docker/run_tests.sh
rules:
- if: $CI_PIPELINE_SOURCE == "merge_request_event"
changes:
- apps/$APP_NAME/**/*
when: always
.cic_eth_variables:
variables:
APP_NAME: cic-eth
DOCKERFILE_PATH: $APP_NAME/docker/Dockerfile
.cic_eth_changes_target:
rules:
- changes:
- $CONTEXT/$APP_NAME/*
build-mr-cic-eth:
extends:
- .cic_eth_changes_target
- .py_build_merge_request
- .cic_eth_variables
build-push-cic-eth:
extends:
- .py_build_push
- .cic_eth_variables

View File

@@ -1,2 +0,0 @@
include *requirements.txt config/test/* cic_eth/data/config/*

View File

@@ -1,6 +0,0 @@
SQLAlchemy==1.3.20
cic-eth-registry>=0.6.1a5,<0.7.0
hexathon~=0.0.1a8
chainqueue>=0.0.4a6,<0.1.0
eth-erc20>=0.1.2a2,<0.2.0
chainlib-eth>=0.0.10a2,<0.1.0

View File

@@ -2,14 +2,10 @@
import datetime
import logging
# external imports
# third-party imports
import celery
from chainlib.eth.constant import ZERO_ADDRESS
from chainlib.chain import ChainSpec
from hexathon import (
add_0x,
strip_0x,
uniform as hex_uniform,
)
# local imports
from cic_eth.db.enum import LockEnum
@@ -19,17 +15,12 @@ from cic_eth.task import (
CriticalSQLAlchemyTask,
)
from cic_eth.error import LockedError
from cic_eth.encode import (
tx_normalize,
ZERO_ADDRESS_NORMAL,
)
celery_app = celery.current_app
logg = logging.getLogger()
@celery_app.task(base=CriticalSQLAlchemyTask)
def lock(chained_input, chain_spec_dict, address=ZERO_ADDRESS_NORMAL, flags=LockEnum.ALL, tx_hash=None):
def lock(chained_input, chain_spec_dict, address=ZERO_ADDRESS, flags=LockEnum.ALL, tx_hash=None):
"""Task wrapper to set arbitrary locks
:param chain_str: Chain spec string representation
@@ -41,17 +32,14 @@ def lock(chained_input, chain_spec_dict, address=ZERO_ADDRESS_NORMAL, flags=Lock
:returns: New lock state for address
:rtype: number
"""
address = tx_normalize.wallet_address(address)
chain_str = '::'
if chain_spec_dict != None:
chain_str = str(ChainSpec.from_dict(chain_spec_dict))
chain_str = str(ChainSpec.from_dict(chain_spec_dict))
r = Lock.set(chain_str, flags, address=address, tx_hash=tx_hash)
logg.debug('Locked {} for {}, flag now {}'.format(flags, address, r))
return chained_input
@celery_app.task(base=CriticalSQLAlchemyTask)
def unlock(chained_input, chain_spec_dict, address=ZERO_ADDRESS_NORMAL, flags=LockEnum.ALL):
def unlock(chained_input, chain_spec_dict, address=ZERO_ADDRESS, flags=LockEnum.ALL):
"""Task wrapper to reset arbitrary locks
:param chain_str: Chain spec string representation
@@ -63,17 +51,14 @@ def unlock(chained_input, chain_spec_dict, address=ZERO_ADDRESS_NORMAL, flags=Lo
:returns: New lock state for address
:rtype: number
"""
address = tx_normalize.wallet_address(address)
chain_str = '::'
if chain_spec_dict != None:
chain_str = str(ChainSpec.from_dict(chain_spec_dict))
chain_str = str(ChainSpec.from_dict(chain_spec_dict))
r = Lock.reset(chain_str, flags, address=address)
logg.debug('Unlocked {} for {}, flag now {}'.format(flags, address, r))
return chained_input
@celery_app.task(base=CriticalSQLAlchemyTask)
def lock_send(chained_input, chain_spec_dict, address=ZERO_ADDRESS_NORMAL, tx_hash=None):
def lock_send(chained_input, chain_spec_dict, address=ZERO_ADDRESS, tx_hash=None):
"""Task wrapper to set send lock
:param chain_str: Chain spec string representation
@@ -83,7 +68,6 @@ def lock_send(chained_input, chain_spec_dict, address=ZERO_ADDRESS_NORMAL, tx_ha
:returns: New lock state for address
:rtype: number
"""
address = tx_normalize.wallet_address(address)
chain_str = str(ChainSpec.from_dict(chain_spec_dict))
r = Lock.set(chain_str, LockEnum.SEND, address=address, tx_hash=tx_hash)
logg.debug('Send locked for {}, flag now {}'.format(address, r))
@@ -91,7 +75,7 @@ def lock_send(chained_input, chain_spec_dict, address=ZERO_ADDRESS_NORMAL, tx_ha
@celery_app.task(base=CriticalSQLAlchemyTask)
def unlock_send(chained_input, chain_spec_dict, address=ZERO_ADDRESS_NORMAL):
def unlock_send(chained_input, chain_spec_dict, address=ZERO_ADDRESS):
"""Task wrapper to reset send lock
:param chain_str: Chain spec string representation
@@ -101,7 +85,6 @@ def unlock_send(chained_input, chain_spec_dict, address=ZERO_ADDRESS_NORMAL):
:returns: New lock state for address
:rtype: number
"""
address = tx_normalize.wallet_address(address)
chain_str = str(ChainSpec.from_dict(chain_spec_dict))
r = Lock.reset(chain_str, LockEnum.SEND, address=address)
logg.debug('Send unlocked for {}, flag now {}'.format(address, r))
@@ -109,7 +92,7 @@ def unlock_send(chained_input, chain_spec_dict, address=ZERO_ADDRESS_NORMAL):
@celery_app.task(base=CriticalSQLAlchemyTask)
def lock_queue(chained_input, chain_spec_dict, address=ZERO_ADDRESS_NORMAL, tx_hash=None):
def lock_queue(chained_input, chain_spec_dict, address=ZERO_ADDRESS, tx_hash=None):
"""Task wrapper to set queue direct lock
:param chain_str: Chain spec string representation
@@ -119,7 +102,6 @@ def lock_queue(chained_input, chain_spec_dict, address=ZERO_ADDRESS_NORMAL, tx_h
:returns: New lock state for address
:rtype: number
"""
address = tx_normalize.wallet_address(address)
chain_str = str(ChainSpec.from_dict(chain_spec_dict))
r = Lock.set(chain_str, LockEnum.QUEUE, address=address, tx_hash=tx_hash)
logg.debug('Queue direct locked for {}, flag now {}'.format(address, r))
@@ -127,7 +109,7 @@ def lock_queue(chained_input, chain_spec_dict, address=ZERO_ADDRESS_NORMAL, tx_h
@celery_app.task(base=CriticalSQLAlchemyTask)
def unlock_queue(chained_input, chain_spec_dict, address=ZERO_ADDRESS_NORMAL):
def unlock_queue(chained_input, chain_spec_dict, address=ZERO_ADDRESS):
"""Task wrapper to reset queue direct lock
:param chain_str: Chain spec string representation
@@ -137,7 +119,6 @@ def unlock_queue(chained_input, chain_spec_dict, address=ZERO_ADDRESS_NORMAL):
:returns: New lock state for address
:rtype: number
"""
address = tx_normalize.wallet_address(address)
chain_str = str(ChainSpec.from_dict(chain_spec_dict))
r = Lock.reset(chain_str, LockEnum.QUEUE, address=address)
logg.debug('Queue direct unlocked for {}, flag now {}'.format(address, r))
@@ -146,13 +127,9 @@ def unlock_queue(chained_input, chain_spec_dict, address=ZERO_ADDRESS_NORMAL):
@celery_app.task(base=CriticalSQLAlchemyTask)
def check_lock(chained_input, chain_spec_dict, lock_flags, address=None):
if address != None:
address = tx_normalize.wallet_address(address)
chain_str = '::'
if chain_spec_dict != None:
chain_str = str(ChainSpec.from_dict(chain_spec_dict))
chain_str = str(ChainSpec.from_dict(chain_spec_dict))
session = SessionBase.create_session()
r = Lock.check(chain_str, lock_flags, address=ZERO_ADDRESS_NORMAL, session=session)
r = Lock.check(chain_str, lock_flags, address=ZERO_ADDRESS, session=session)
if address != None:
r |= Lock.check(chain_str, lock_flags, address=address, session=session)
if r > 0:
@@ -162,9 +139,3 @@ def check_lock(chained_input, chain_spec_dict, lock_flags, address=None):
session.flush()
session.close()
return chained_input
@celery_app.task()
def shutdown(message):
logg.critical('shutdown called: {}'.format(message))
celery_app.control.shutdown() #broadcast('shutdown')

View File

@@ -4,22 +4,11 @@ import logging
# external imports
import celery
from chainlib.chain import ChainSpec
from chainlib.connection import RPCConnection
from chainlib.eth.tx import (
unpack,
TxFactory,
)
from chainlib.eth.gas import OverrideGasOracle
from chainqueue.sql.query import get_tx
from chainqueue.sql.state import set_cancel
from chainlib.eth.tx import unpack
from chainqueue.query import get_tx
from chainqueue.state import set_cancel
from chainqueue.db.models.otx import Otx
from chainqueue.db.models.tx import TxCache
from hexathon import (
strip_0x,
add_0x,
uniform as hex_uniform,
)
from potaahto.symbols import snake_and_camel
# local imports
from cic_eth.db.models.base import SessionBase
@@ -32,15 +21,13 @@ from cic_eth.admin.ctrl import (
)
from cic_eth.queue.tx import queue_create
from cic_eth.eth.gas import create_check_gas_task
from cic_eth.task import BaseTask
from cic_eth.encode import tx_normalize
celery_app = celery.current_app
logg = logging.getLogger()
@celery_app.task(bind=True, base=BaseTask)
def shift_nonce(self, chainspec_dict, tx_hash_orig_hex, delta=1):
@celery_app.task(bind=True)
def shift_nonce(self, chain_str, tx_hash_orig_hex, delta=1):
"""Shift all transactions with nonces higher than the offset by the provided position delta.
Transactions who are replaced by transactions that move nonces will be marked as OVERRIDDEN.
@@ -51,109 +38,89 @@ def shift_nonce(self, chainspec_dict, tx_hash_orig_hex, delta=1):
:type tx_hash_orig_hex: str, 0x-hex
:param delta: Amount
"""
chain_spec = ChainSpec.from_dict(chainspec_dict)
rpc = RPCConnection.connect(chain_spec, 'default')
rpc_signer = RPCConnection.connect(chain_spec, 'signer')
queue = None
try:
queue = self.request.delivery_info.get('routing_key')
except AttributeError:
pass
session = BaseTask.session_func()
tx_brief = get_tx(chain_spec, tx_hash_orig_hex, session=session)
tx_raw = bytes.fromhex(strip_0x(tx_brief['signed_tx']))
chain_spec = ChainSpec.from_chain_str(chain_str)
tx_brief = get_tx(tx_hash_orig_hex)
tx_raw = bytes.fromhex(strip_0x(tx_brief['signed_tx'][2:]))
tx = unpack(tx_raw, chain_spec)
nonce = tx_brief['nonce']
address = tx['from']
logg.debug('shifting nonce {} position(s) for address {}, offset {}, hash {}'.format(delta, address, nonce, tx['hash']))
logg.debug('shifting nonce {} position(s) for address {}, offset {}'.format(delta, address, nonce))
lock_queue(None, chain_spec.asdict(), address=address)
lock_send(None, chain_spec.asdict(), address=address)
lock_queue(None, chain_str, address)
lock_send(None, chain_str, address)
set_cancel(chain_spec, strip_0x(tx['hash']), manual=True, session=session)
query_address = tx_normalize.wallet_address(address)
session = SessionBase.create_session()
q = session.query(Otx)
q = q.join(TxCache)
q = q.filter(TxCache.sender==query_address)
q = q.filter(TxCache.sender==address)
q = q.filter(Otx.nonce>=nonce+delta)
q = q.order_by(Otx.nonce.asc())
otxs = q.all()
tx_hashes = []
txs = []
gas_total = 0
for otx in otxs:
tx_raw = bytes.fromhex(strip_0x(otx.signed_tx))
tx_new = unpack(tx_raw, chain_spec)
tx_new = snake_and_camel(tx_new)
tx_previous_hash_hex = tx_new['hash']
tx_previous_nonce = tx_new['nonce']
tx_new['gas_price'] += 1
tx_new['gasPrice'] = tx_new['gas_price']
tx_new['nonce'] -= delta
gas_total += tx_new['gas_price'] * tx_new['gas']
logg.debug('tx_new {}'.format(tx_new))
logg.debug('gas running total {}'.format(gas_total))
del(tx_new['hash'])
del(tx_new['hash_unsigned'])
del(tx_new['hashUnsigned'])
tx_new['nonce'] -= delta
gas_oracle = OverrideGasOracle(limit=tx_new['gas'], price=tx_new['gas_price'] + 1) # TODO: it should be possible to merely set this price here and if missing in the existing struct then fill it in (chainlib.eth.tx)
c = TxFactory(chain_spec, signer=rpc_signer, gas_oracle=gas_oracle)
(tx_hash_hex, tx_signed_raw_hex) = c.build_raw(tx_new)
(tx_hash_hex, tx_signed_raw_hex) = sign_tx(tx_new, chain_str)
logg.debug('tx {} -> {} nonce {} -> {}'.format(tx_previous_hash_hex, tx_hash_hex, tx_previous_nonce, tx_new['nonce']))
otx = Otx(
tx_new['nonce'],
tx_hash_hex,
tx_signed_raw_hex,
)
nonce=tx_new['nonce'],
address=tx_new['from'],
tx_hash=tx_hash_hex,
signed_tx=tx_signed_raw_hex,
)
session.add(otx)
session.commit()
# TODO: cancel all first, then replace. Otherwise we risk two non-locked states for two different nonces.
set_cancel(chain_spec, strip_0x(tx_previous_hash_hex), manual=True, session=session)
set_cancel(tx_previous_hash_hex, True)
TxCache.clone(tx_previous_hash_hex, tx_hash_hex, session=session)
TxCache.clone(tx_previous_hash_hex, tx_hash_hex)
tx_hashes.append(tx_hash_hex)
txs.append(tx_signed_raw_hex)
session.commit()
session.close()
s = create_check_gas_task(
s = create_check_gas_and_send_task(
txs,
chain_spec,
#tx_new['from'],
address,
#gas=tx_new['gas'],
gas=gas_total,
tx_hashes_hex=tx_hashes,
queue=queue,
chain_str,
tx_new['from'],
tx_new['gas'],
tx_hashes,
queue,
)
s_unlock_send = celery.signature(
'cic_eth.admin.ctrl.unlock_send',
[
chain_spec.asdict(),
address,
#tx_new['from'],
chain_str,
tx_new['from'],
],
queue=queue,
)
s_unlock_direct = celery.signature(
'cic_eth.admin.ctrl.unlock_queue',
[
chain_spec.asdict(),
address,
#tx_new['from'],
chain_str,
tx_new['from'],
],
queue=queue,
)

View File

@@ -1,2 +0,0 @@
# local imports
from cic_eth.eth.erc20 import default_token

View File

@@ -5,3 +5,4 @@
"""
from .api_task import Api
from .api_admin import AdminApi

View File

@@ -8,7 +8,6 @@ from chainlib.eth.constant import (
ZERO_ADDRESS,
)
from cic_eth_registry import CICRegistry
from cic_eth_registry.erc20 import ERC20Token
from cic_eth_registry.error import UnknownContractError
from chainlib.eth.address import to_checksum_address
from chainlib.eth.contract import code
@@ -21,7 +20,6 @@ from chainlib.hash import keccak256_hex_to_hex
from hexathon import (
strip_0x,
add_0x,
uniform as hex_uniform,
)
from chainlib.eth.gas import balance
from chainqueue.db.enum import (
@@ -32,14 +30,13 @@ from chainqueue.db.enum import (
status_str,
)
from chainqueue.error import TxStateChangeError
from eth_erc20 import ERC20
# local imports
from cic_eth.db.models.base import SessionBase
from cic_eth.db.models.role import AccountRole
from cic_eth.db.models.nonce import Nonce
from cic_eth.error import InitializationError
from cic_eth.queue.query import get_tx_local
from cic_eth.queue.query import get_tx
app = celery.current_app
@@ -63,29 +60,6 @@ class AdminApi:
self.call_address = call_address
def proxy_do(self, chain_spec, o):
s_proxy = celery.signature(
'cic_eth.task.rpc_proxy',
[
chain_spec.asdict(),
o,
'default',
],
queue=self.queue
)
return s_proxy.apply_async()
def registry(self):
s_registry = celery.signature(
'cic_eth.task.registry',
[],
queue=self.queue
)
return s_registry.apply_async()
def unlock(self, chain_spec, address, flags=None):
s_unlock = celery.signature(
'cic_eth.admin.ctrl.unlock',
@@ -172,6 +146,7 @@ class AdminApi:
# TODO: This check should most likely be in resend task itself
tx_dict = s_get_tx_cache.apply_async().get()
#if tx_dict['status'] in [StatusEnum.REVERTED, StatusEnum.SUCCESS, StatusEnum.CANCELLED, StatusEnum.OBSOLETED]:
if not is_alive(getattr(StatusEnum, tx_dict['status']).value):
raise TxStateChangeError('Cannot resend mined or obsoleted transaction'.format(txold_hash_hex))
@@ -191,7 +166,6 @@ class AdminApi:
s_manual = celery.signature(
'cic_eth.queue.state.set_manual',
[
chain_spec.asdict(),
tx_hash_hex,
],
queue=self.queue,
@@ -210,9 +184,8 @@ class AdminApi:
s.link(s_gas)
return s_manual.apply_async()
def check_nonce(self, chain_spec, address):
def check_nonce(self, address):
s = celery.signature(
'cic_eth.queue.query.get_account_tx',
[
@@ -233,12 +206,13 @@ class AdminApi:
s_get_tx = celery.signature(
'cic_eth.queue.query.get_tx',
[
chain_spec.asdict(),
chain_spec.asdict(),
k,
],
queue=self.queue,
)
tx = s_get_tx.apply_async().get()
#tx = get_tx(k)
logg.debug('checking nonce {} (previous {})'.format(tx['nonce'], last_nonce))
nonce_otx = tx['nonce']
if not is_alive(tx['status']) and tx['status'] & local_fail > 0:
@@ -246,14 +220,15 @@ class AdminApi:
blocking_tx = k
blocking_nonce = nonce_otx
elif nonce_otx - last_nonce > 1:
logg.debug('tx {}'.format(tx))
tx_obj = unpack(bytes.fromhex(strip_0x(tx['signed_tx'])), chain_spec)
logg.error('nonce gap; {} followed {} for account {}'.format(nonce_otx, last_nonce, tx_obj['from']))
logg.error('nonce gap; {} followed {} for account {}'.format(nonce_otx, last_nonce, tx['from']))
blocking_tx = k
blocking_nonce = nonce_otx
break
last_nonce = nonce_otx
#nonce_cache = Nonce.get(address)
#nonce_w3 = self.w3.eth.getTransactionCount(address, 'pending')
return {
'nonce': {
#'network': nonce_cache,
@@ -262,13 +237,12 @@ class AdminApi:
'blocking': blocking_nonce,
},
'tx': {
'blocking': add_0x(blocking_tx),
'blocking': blocking_tx,
}
}
}
# TODO: is risky since it does not validate that there is actually a nonce problem?
def fix_nonce(self, chain_spec, address, nonce):
def fix_nonce(self, address, nonce, chain_spec):
s = celery.signature(
'cic_eth.queue.query.get_account_tx',
[
@@ -282,17 +256,15 @@ class AdminApi:
txs = s.apply_async().get()
tx_hash_hex = None
session = SessionBase.create_session()
for k in txs.keys():
tx_dict = get_tx_local(chain_spec, k, session=session)
tx_dict = get_tx(k)
if tx_dict['nonce'] == nonce:
tx_hash_hex = k
session.close()
s_nonce = celery.signature(
'cic_eth.admin.nonce.shift_nonce',
[
chain_spec.asdict(),
self.rpc.chain_spec.asdict(),
tx_hash_hex,
],
queue=self.queue
@@ -300,6 +272,20 @@ class AdminApi:
return s_nonce.apply_async()
# # TODO: this is a stub, complete all checks
# def ready(self):
# """Checks whether all required initializations have been performed.
#
# :raises cic_eth.error.InitializationError: At least one setting pre-requisite has not been met.
# :raises KeyError: An address provided for initialization is not known by the keystore.
# """
# addr = AccountRole.get_address('ETH_GAS_PROVIDER_ADDRESS')
# if addr == ZERO_ADDRESS:
# raise InitializationError('missing account ETH_GAS_PROVIDER_ADDRESS')
#
# self.w3.eth.sign(addr, text='666f6f')
def account(self, chain_spec, address, include_sender=True, include_recipient=True, renderer=None, w=sys.stdout):
"""Lists locally originated transactions for the given Ethereum address.
@@ -308,8 +294,6 @@ class AdminApi:
:param address: Ethereum address to return transactions for
:type address: str, 0x-hex
"""
address = add_0x(hex_uniform(strip_0x(address)))
last_nonce = -1
s = celery.signature(
'cic_eth.queue.query.get_account_tx',
@@ -364,7 +348,6 @@ class AdminApi:
# TODO: Add exception upon non-existent tx aswell as invalid tx data to docstring
# TODO: This method is WAY too long
def tx(self, chain_spec, tx_hash=None, tx_raw=None, registry=None, renderer=None, w=sys.stdout):
"""Output local and network details about a given transaction with local origin.
@@ -387,6 +370,7 @@ class AdminApi:
if tx_raw != None:
tx_hash = add_0x(keccak256_hex_to_hex(tx_raw))
#tx_hash = self.w3.keccak(hexstr=tx_raw).hex()
s = celery.signature(
'cic_eth.queue.query.get_tx_cache',
@@ -399,87 +383,41 @@ class AdminApi:
t = s.apply_async()
tx = t.get()
source_token = None
if tx['source_token'] != ZERO_ADDRESS:
source_token_declaration = None
if registry != None:
try:
source_token_declaration = registry.by_address(tx['source_token'], sender_address=self.call_address)
except UnknownContractError:
logg.warning('unknown source token contract {} (direct)'.format(tx['source_token']))
else:
s = celery.signature(
'cic_eth.task.registry_address_lookup',
[
chain_spec.asdict(),
tx['source_token'],
],
queue=self.queue
)
t = s.apply_async()
source_token_declaration = t.get()
if source_token_declaration != None:
logg.warning('found declarator record for source token {} but not checking validity'.format(tx['source_token']))
source_token = ERC20Token(chain_spec, self.rpc, tx['source_token'])
logg.debug('source token set tup {}'.format(source_token))
try:
source_token = registry.by_address(tx['source_token'])
#source_token = CICRegistry.get_address(chain_spec, tx['source_token']).contract
except UnknownContractError:
#source_token_contract = self.w3.eth.contract(abi=CICRegistry.abi('ERC20'), address=tx['source_token'])
#source_token = CICRegistry.add_token(chain_spec, source_token_contract)
logg.warning('unknown source token contract {}'.format(tx['source_token']))
destination_token = None
if tx['destination_token'] != ZERO_ADDRESS:
destination_token_declaration = None
if registry != None:
try:
destination_token_declaration = registry.by_address(tx['destination_token'], sender_address=self.call_address)
except UnknownContractError:
logg.warning('unknown destination token contract {}'.format(tx['destination_token']))
else:
s = celery.signature(
'cic_eth.task.registry_address_lookup',
[
chain_spec.asdict(),
tx['destination_token'],
],
queue=self.queue
)
t = s.apply_async()
destination_token_declaration = t.get()
if destination_token_declaration != None:
logg.warning('found declarator record for destination token {} but not checking validity'.format(tx['destination_token']))
destination_token = ERC20Token(chain_spec, self.rpc, tx['destination_token'])
if tx['source_token'] != ZERO_ADDRESS:
try:
#destination_token = CICRegistry.get_address(chain_spec, tx['destination_token'])
destination_token = registry.by_address(tx['destination_token'])
except UnknownContractError:
#destination_token_contract = self.w3.eth.contract(abi=CICRegistry.abi('ERC20'), address=tx['source_token'])
#destination_token = CICRegistry.add_token(chain_spec, destination_token_contract)
logg.warning('unknown destination token contract {}'.format(tx['destination_token']))
tx['sender_description'] = 'Custodial account'
tx['recipient_description'] = 'Custodial account'
o = code(tx['sender'])
t = self.proxy_do(chain_spec, o)
r = t.get()
r = self.rpc.do(o)
if len(strip_0x(r, allow_empty=True)) > 0:
if registry != None:
try:
sender_contract = registry.by_address(tx['sender'], sender_address=self.call_address)
tx['sender_description'] = 'Contract at {}'.format(tx['sender'])
except UnknownContractError:
tx['sender_description'] = 'Unknown contract'
except KeyError as e:
tx['sender_description'] = 'Unknown contract'
else:
s = celery.signature(
'cic_eth.task.registry_address_lookup',
[
chain_spec.asdict(),
tx['sender'],
],
queue=self.queue
)
t = s.apply_async()
tx['sender_description'] = t.get()
if tx['sender_description'] == None:
tx['sender_description'] = 'Unknown contract'
try:
#sender_contract = CICRegistry.get_address(chain_spec, tx['sender'])
sender_contract = registry.by_address(tx['sender'], sender_address=self.call_address)
tx['sender_description'] = 'Contract at {}'.format(tx['sender']) #sender_contract)
except UnknownContractError:
tx['sender_description'] = 'Unknown contract'
except KeyError as e:
tx['sender_description'] = 'Unknown contract'
else:
s = celery.signature(
'cic_eth.eth.account.have',
@@ -508,31 +446,16 @@ class AdminApi:
tx['sender_description'] = role
o = code(tx['recipient'])
t = self.proxy_do(chain_spec, o)
r = t.get()
r = self.rpc.do(o)
if len(strip_0x(r, allow_empty=True)) > 0:
if registry != None:
try:
recipient_contract = registry.by_address(tx['recipient'])
tx['recipient_description'] = 'Contract at {}'.format(tx['recipient'])
except UnknownContractError as e:
tx['recipient_description'] = 'Unknown contract'
except KeyError as e:
tx['recipient_description'] = 'Unknown contract'
else:
s = celery.signature(
'cic_eth.task.registry_address_lookup',
[
chain_spec.asdict(),
tx['recipient'],
],
queue=self.queue
)
t = s.apply_async()
tx['recipient_description'] = t.get()
if tx['recipient_description'] == None:
tx['recipient_description'] = 'Unknown contract'
try:
#recipient_contract = CICRegistry.by_address(tx['recipient'])
recipient_contract = registry.by_address(tx['recipient'])
tx['recipient_description'] = 'Contract at {}'.format(tx['recipient']) #recipient_contract)
except UnknownContractError as e:
tx['recipient_description'] = 'Unknown contract'
except KeyError as e:
tx['recipient_description'] = 'Unknown contract'
else:
s = celery.signature(
'cic_eth.eth.account.have',
@@ -560,19 +483,13 @@ class AdminApi:
if role != None:
tx['recipient_description'] = role
erc20_c = ERC20(chain_spec)
if source_token != None:
tx['source_token_symbol'] = source_token.symbol
o = erc20_c.balance_of(tx['source_token'], tx['sender'], sender_address=self.call_address)
r = self.rpc.do(o)
tx['sender_token_balance'] = erc20_c.parse_balance(r)
tx['source_token_symbol'] = source_token.symbol()
tx['sender_token_balance'] = source_token.function('balanceOf')(tx['sender']).call()
if destination_token != None:
tx['destination_token_symbol'] = destination_token.symbol
o = erc20_c.balance_of(tx['destination_token'], tx['recipient'], sender_address=self.call_address)
r = self.rpc.do(o)
tx['recipient_token_balance'] = erc20_c.parse_balance(r)
#tx['recipient_token_balance'] = destination_token.function('balanceOf')(tx['recipient']).call()
tx['destination_token_symbol'] = destination_token.symbol()
tx['recipient_token_balance'] = source_token.function('balanceOf')(tx['recipient']).call()
# TODO: this can mean either not subitted or culled, need to check other txs with same nonce to determine which
tx['network_status'] = 'Not in node'
@@ -580,8 +497,7 @@ class AdminApi:
r = None
try:
o = transaction(tx_hash)
t = self.proxy_do(chain_spec, o)
r = t.get()
r = self.rpc.do(o)
if r != None:
tx['network_status'] = 'Mempool'
except Exception as e:
@@ -590,8 +506,7 @@ class AdminApi:
if r != None:
try:
o = receipt(tx_hash)
t = self.proxy_do(chain_spec, o)
r = t.get()
r = self.rpc.do(o)
logg.debug('h {} o {}'.format(tx_hash, o))
if int(strip_0x(r['status'])) == 1:
tx['network_status'] = 'Confirmed'
@@ -606,13 +521,11 @@ class AdminApi:
pass
o = balance(tx['sender'])
t = self.proxy_do(chain_spec, o)
r = t.get()
r = self.rpc.do(o)
tx['sender_gas_balance'] = r
o = balance(tx['recipient'])
t = self.proxy_do(chain_spec, o)
r = t.get()
r = self.rpc.do(o)
tx['recipient_gas_balance'] = r
tx_unpacked = unpack(bytes.fromhex(strip_0x(tx['signed_tx'])), chain_spec)

View File

@@ -8,309 +8,83 @@ import logging
# external imports
import celery
from cic_eth_registry import CICRegistry
from chainlib.chain import ChainSpec
from hexathon import strip_0x
# local imports
from cic_eth.api.base import ApiBase
from cic_eth.enum import LockEnum
from cic_eth.db.enum import LockEnum
app = celery.current_app
#logg = logging.getLogger(__name__)
logg = logging.getLogger()
logg = logging.getLogger(__name__)
class Api(ApiBase):
class Api:
"""Creates task chains to perform well-known CIC operations.
@staticmethod
def to_v_list(v, n):
"""Translate an arbitrary number of string and/or list arguments to a list of list of string arguments
Each method that sends tasks returns details about the root task. The root task uuid can be provided in the callback, to enable to caller to correlate the result with individual calls. It can also be used to independently poll the completion of a task chain.
:param v: Arguments
:type v: str or list
:param n: Number of elements to generate arguments for
:type n: int
:rtype: list
:returns: list of assembled arguments
"""
if isinstance(v, str):
vv = v
v = []
for i in range(n):
v.append([vv])
elif not isinstance(v, list):
raise ValueError('argument must be single string, or list or strings or lists')
else:
if len(v) != n:
raise ValueError('v argument count must match integer n')
for i in range(n):
if isinstance(v[i], str):
v[i] = [v[i]]
elif not isinstance(v, list):
raise ValueError('proof argument must be single string, or list or strings or lists')
:param callback_param: Static value to pass to callback
:type callback_param: str
:param callback_task: Callback task that executes callback_param call. (Must be included by the celery worker)
:type callback_task: string
:param queue: Name of worker queue to submit tasks to
:type queue: str
"""
def __init__(self, chain_str, queue='cic-eth', callback_param=None, callback_task='cic_eth.callbacks.noop.noop', callback_queue=None):
self.chain_str = chain_str
self.chain_spec = ChainSpec.from_chain_str(chain_str)
self.callback_param = callback_param
self.callback_task = callback_task
self.queue = queue
logg.debug('api using queue {}'.format(self.queue))
self.callback_success = None
self.callback_error = None
if callback_queue == None:
callback_queue=self.queue
return v
def default_token(self):
"""Retrieves the default fallback token of the custodial network.
:returns: uuid of root task
:rtype: celery.Task
"""
s_token = celery.signature(
'cic_eth.eth.erc20.default_token',
[],
queue=self.queue,
)
if self.callback_param != None:
s_token.link(self.callback_success)
return s_token.apply_async()
def token(self, token_symbol, proof=None):
"""Single-token alias for tokens method.
See tokens method for details.
:param token_symbol: Token symbol to look up
:type token_symbol: str
:param proof: Proofs to add to signature verification for the token
:type proof: str or list
:returns: uuid of root task
:rtype: celery.Task
"""
if not isinstance(token_symbol, str):
raise ValueError('token symbol must be string')
return self.tokens([token_symbol], proof=proof)
def tokens(self, token_symbols, proof=None):
"""Perform a token data lookup from the token index. The token index will enforce unique associations between token symbol and contract address.
Token symbols are always strings, and should be specified using uppercase letters.
If the proof argument is included, the network will be queried for trusted signatures on the given proof(s). There must exist at least one trusted signature for every given proof for every token. Trusted signatures for the custodial system are provided at service startup.
The proof argument may be specified in a number of ways:
- as None, in which case proof checks are skipped (although there may still be builtin proof checks being performed)
- as a single string, where the same proof is used for each token lookup
- as an array of strings, where the respective proof is used for the respective token. number of proofs must match the number of tokens.
- as an array of lists, where the respective proofs in each list is used for the respective token. number of lists of proofs must match the number of tokens.
The success callback provided at the Api object instantiation will receive individual calls for each token that passes the proof checks. Each token that does not pass is passed to the Api error callback.
This method is not intended to be used synchronously. Do so at your peril.
:param token_symbols: Token symbol strings to look up
:type token_symbol: list
:param proof: Proof(s) to verify tokens against
:type proof: None, str or list
:returns: uuid of root task
:rtype: celery.Task
"""
if not isinstance(token_symbols, list):
raise ValueError('token symbols argument must be list')
if proof == None:
logg.debug('looking up tokens without external proof check: {}'.format(','.join(token_symbols)))
proof = ''
logg.debug('proof is {}'.format(proof))
l = len(token_symbols)
if len(proof) == 0:
l = 0
proof = Api.to_v_list(proof, l)
chain_spec_dict = self.chain_spec.asdict()
s_token_resolve = celery.signature(
'cic_eth.eth.erc20.resolve_tokens_by_symbol',
[
token_symbols,
chain_spec_dict,
],
queue=self.queue,
)
s_token_info = celery.signature(
'cic_eth.eth.erc20.token_info',
[
chain_spec_dict,
proof,
],
queue=self.queue,
)
s_token_verify = celery.signature(
'cic_eth.eth.erc20.verify_token_info',
if callback_param != None:
self.callback_success = celery.signature(
callback_task,
[
chain_spec_dict,
self.callback_success,
self.callback_error,
callback_param,
0,
],
queue=self.queue,
queue=callback_queue,
)
s_token_info.link(s_token_verify)
s_token_resolve.link(s_token_info)
return s_token_resolve.apply_async()
self.callback_error = celery.signature(
callback_task,
[
callback_param,
1,
],
queue=callback_queue,
)
# def convert_transfer(self, from_address, to_address, target_return, minimum_return, from_token_symbol, to_token_symbol):
# """Executes a chain of celery tasks that performs conversion between two ERC20 tokens, and transfers to a specified receipient after convert has completed.
#
# :param from_address: Ethereum address of sender
# :type from_address: str, 0x-hex
# :param to_address: Ethereum address of receipient
# :type to_address: str, 0x-hex
# :param target_return: Estimated return from conversion
# :type target_return: int
# :param minimum_return: The least value of destination token return to allow
# :type minimum_return: int
# :param from_token_symbol: ERC20 token symbol of token being converted
# :type from_token_symbol: str
# :param to_token_symbol: ERC20 token symbol of token to receive
# :type to_token_symbol: str
# :returns: uuid of root task
# :rtype: celery.Task
# """
# raise NotImplementedError('out of service until new DEX migration is done')
# s_check = celery.signature(
# 'cic_eth.admin.ctrl.check_lock',
# [
# [from_token_symbol, to_token_symbol],
# self.chain_spec.asdict(),
# LockEnum.QUEUE,
# from_address,
# ],
# queue=self.queue,
# )
# s_nonce = celery.signature(
# 'cic_eth.eth.nonce.reserve_nonce',
# [
# self.chain_spec.asdict(),
# ],
# queue=self.queue,
# )
# s_tokens = celery.signature(
# 'cic_eth.eth.erc20.resolve_tokens_by_symbol',
# [
# self.chain_str,
# ],
# queue=self.queue,
# )
# s_convert = celery.signature(
# 'cic_eth.eth.bancor.convert_with_default_reserve',
# [
# from_address,
# target_return,
# minimum_return,
# to_address,
# self.chain_spec.asdict(),
# ],
# queue=self.queue,
# )
# s_nonce.link(s_tokens)
# s_check.link(s_nonce)
# if self.callback_param != None:
# s_convert.link(self.callback_success)
# s_tokens.link(s_convert).on_error(self.callback_error)
# else:
# s_tokens.link(s_convert)
#
# t = s_check.apply_async(queue=self.queue)
# return t
#
#
# def convert(self, from_address, target_return, minimum_return, from_token_symbol, to_token_symbol):
# """Executes a chain of celery tasks that performs conversion between two ERC20 tokens.
#
# :param from_address: Ethereum address of sender
# :type from_address: str, 0x-hex
# :param target_return: Estimated return from conversion
# :type target_return: int
# :param minimum_return: The least value of destination token return to allow
# :type minimum_return: int
# :param from_token_symbol: ERC20 token symbol of token being converted
# :type from_token_symbol: str
# :param to_token_symbol: ERC20 token symbol of token to receive
# :type to_token_symbol: str
# :returns: uuid of root task
# :rtype: celery.Task
# """
# raise NotImplementedError('out of service until new DEX migration is done')
# s_check = celery.signature(
# 'cic_eth.admin.ctrl.check_lock',
# [
# [from_token_symbol, to_token_symbol],
# self.chain_spec.asdict(),
# LockEnum.QUEUE,
# from_address,
# ],
# queue=self.queue,
# )
# s_nonce = celery.signature(
# 'cic_eth.eth.nonce.reserve_nonce',
# [
# self.chain_spec.asdict(),
# ],
# queue=self.queue,
# )
# s_tokens = celery.signature(
# 'cic_eth.eth.erc20.resolve_tokens_by_symbol',
# [
# self.chain_spec.asdict(),
# ],
# queue=self.queue,
# )
# s_convert = celery.signature(
# 'cic_eth.eth.bancor.convert_with_default_reserve',
# [
# from_address,
# target_return,
# minimum_return,
# from_address,
# self.chain_spec.asdict(),
# ],
# queue=self.queue,
# )
# s_nonce.link(s_tokens)
# s_check.link(s_nonce)
# if self.callback_param != None:
# s_convert.link(self.callback_success)
# s_tokens.link(s_convert).on_error(self.callback_error)
# else:
# s_tokens.link(s_convert)
#
# t = s_check.apply_async(queue=self.queue)
# return t
def transfer_from(self, from_address, to_address, value, token_symbol, spender_address):
"""Executes a chain of celery tasks that performs a transfer of ERC20 tokens by one address on behalf of another address to a third party.
def convert_transfer(self, from_address, to_address, target_return, minimum_return, from_token_symbol, to_token_symbol):
"""Executes a chain of celery tasks that performs conversion between two ERC20 tokens, and transfers to a specified receipient after convert has completed.
:param from_address: Ethereum address of sender
:type from_address: str, 0x-hex
:param to_address: Ethereum address of recipient
:param to_address: Ethereum address of receipient
:type to_address: str, 0x-hex
:param value: Estimated return from conversion
:type value: int
:param token_symbol: ERC20 token symbol of token to send
:type token_symbol: str
:param spender_address: Ethereum address of recipient
:type spender_address: str, 0x-hex
:param target_return: Estimated return from conversion
:type target_return: int
:param minimum_return: The least value of destination token return to allow
:type minimum_return: int
:param from_token_symbol: ERC20 token symbol of token being converted
:type from_token_symbol: str
:param to_token_symbol: ERC20 token symbol of token to receive
:type to_token_symbol: str
:returns: uuid of root task
:rtype: celery.Task
"""
raise NotImplementedError('out of service until new DEX migration is done')
s_check = celery.signature(
'cic_eth.admin.ctrl.check_lock',
[
[token_symbol],
[from_token_symbol, to_token_symbol],
self.chain_spec.asdict(),
LockEnum.QUEUE,
from_address,
@@ -321,7 +95,70 @@ class Api(ApiBase):
'cic_eth.eth.nonce.reserve_nonce',
[
self.chain_spec.asdict(),
],
queue=self.queue,
)
s_tokens = celery.signature(
'cic_eth.eth.erc20.resolve_tokens_by_symbol',
[
self.chain_str,
],
queue=self.queue,
)
s_convert = celery.signature(
'cic_eth.eth.bancor.convert_with_default_reserve',
[
from_address,
target_return,
minimum_return,
to_address,
self.chain_spec.asdict(),
],
queue=self.queue,
)
s_nonce.link(s_tokens)
s_check.link(s_nonce)
if self.callback_param != None:
s_convert.link(self.callback_success)
s_tokens.link(s_convert).on_error(self.callback_error)
else:
s_tokens.link(s_convert)
t = s_check.apply_async(queue=self.queue)
return t
def convert(self, from_address, target_return, minimum_return, from_token_symbol, to_token_symbol):
"""Executes a chain of celery tasks that performs conversion between two ERC20 tokens.
:param from_address: Ethereum address of sender
:type from_address: str, 0x-hex
:param target_return: Estimated return from conversion
:type target_return: int
:param minimum_return: The least value of destination token return to allow
:type minimum_return: int
:param from_token_symbol: ERC20 token symbol of token being converted
:type from_token_symbol: str
:param to_token_symbol: ERC20 token symbol of token to receive
:type to_token_symbol: str
:returns: uuid of root task
:rtype: celery.Task
"""
raise NotImplementedError('out of service until new DEX migration is done')
s_check = celery.signature(
'cic_eth.admin.ctrl.check_lock',
[
[from_token_symbol, to_token_symbol],
self.chain_spec.asdict(),
LockEnum.QUEUE,
from_address,
],
queue=self.queue,
)
s_nonce = celery.signature(
'cic_eth.eth.nonce.reserve_nonce',
[
self.chain_spec.asdict(),
],
queue=self.queue,
)
@@ -332,41 +169,29 @@ class Api(ApiBase):
],
queue=self.queue,
)
s_allow = celery.signature(
'cic_eth.eth.erc20.check_allowance',
s_convert = celery.signature(
'cic_eth.eth.bancor.convert_with_default_reserve',
[
from_address,
value,
target_return,
minimum_return,
from_address,
self.chain_spec.asdict(),
spender_address,
],
queue=self.queue,
)
s_transfer = celery.signature(
'cic_eth.eth.erc20.transfer_from',
[
from_address,
to_address,
value,
self.chain_spec.asdict(),
spender_address,
],
queue=self.queue,
)
s_tokens.link(s_allow)
s_nonce.link(s_tokens)
s_check.link(s_nonce)
if self.callback_param != None:
s_transfer.link(self.callback_success)
s_allow.link(s_transfer).on_error(self.callback_error)
s_convert.link(self.callback_success)
s_tokens.link(s_convert).on_error(self.callback_error)
else:
s_allow.link(s_transfer)
s_tokens.link(s_convert)
t = s_check.apply_async(queue=self.queue)
return t
def transfer(self, from_address, to_address, value, token_symbol):
"""Executes a chain of celery tasks that performs a transfer of ERC20 tokens from one address to another.
@@ -381,8 +206,6 @@ class Api(ApiBase):
:returns: uuid of root task
:rtype: celery.Task
"""
#from_address = strip_0x(from_address)
#to_address = strip_0x(to_address)
s_check = celery.signature(
'cic_eth.admin.ctrl.check_lock',
[
@@ -649,9 +472,9 @@ class Api(ApiBase):
s_external_get = celery.signature(
external_task,
[
address,
offset,
limit,
address,
],
queue=external_queue,
)

View File

@@ -1,52 +0,0 @@
# standard imports
import logging
# external imports
import celery
from chainlib.chain import ChainSpec
logg = logging.getLogger(__name__)
class ApiBase:
"""Creates task chains to perform well-known CIC operations.
Each method that sends tasks returns details about the root task. The root task uuid can be provided in the callback, to enable to caller to correlate the result with individual calls. It can also be used to independently poll the completion of a task chain.
:param callback_param: Static value to pass to callback
:type callback_param: str
:param callback_task: Callback task that executes callback_param call. (Must be included by the celery worker)
:type callback_task: string
:param queue: Name of worker queue to submit tasks to
:type queue: str
"""
def __init__(self, chain_str, queue='cic-eth', callback_param=None, callback_task='cic_eth.callbacks.noop.noop', callback_queue=None):
self.chain_str = chain_str
self.chain_spec = ChainSpec.from_chain_str(chain_str)
self.callback_param = callback_param
self.callback_task = callback_task
self.queue = queue
logg.debug('api using queue {}'.format(self.queue))
self.callback_success = None
self.callback_error = None
if callback_queue == None:
callback_queue=self.queue
if callback_param != None:
self.callback_success = celery.signature(
callback_task,
[
callback_param,
0,
],
queue=callback_queue,
)
self.callback_error = celery.signature(
callback_task,
[
callback_param,
1,
],
queue=callback_queue,
)

View File

@@ -1,10 +1,7 @@
import logging
import celery
celery_app = celery.current_app
#logg = celery_app.log.get_default_logger()
logg = logging.getLogger()
logg = celery_app.log.get_default_logger()
@celery_app.task(bind=True)

View File

@@ -1,8 +0,0 @@
from cic_eth.db.models.base import SessionBase
def health(*args, **kwargs):
session = SessionBase.create_session()
session.execute('SELECT count(*) from alembic_version')
session.close()
return True

View File

@@ -1,48 +0,0 @@
# standard imports
import logging
# external imports
from chainlib.connection import RPCConnection
from chainlib.chain import ChainSpec
from chainlib.eth.gas import balance
# local imports
from cic_eth.db.models.role import AccountRole
from cic_eth.db.models.base import SessionBase
from cic_eth.db.enum import LockEnum
from cic_eth.error import LockedError
from cic_eth.admin.ctrl import check_lock
logg = logging.getLogger().getChild(__name__)
def health(*args, **kwargs):
session = SessionBase.create_session()
config = kwargs['config']
chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC'))
logg.debug('check gas balance of gas gifter for chain {}'.format(chain_spec))
try:
check_lock(None, None, LockEnum.INIT)
except LockedError:
logg.warning('INIT lock is set, skipping GAS GIFTER balance check.')
return True
gas_provider = AccountRole.get_address('GAS_GIFTER', session=session)
session.close()
rpc = RPCConnection.connect(chain_spec, 'default')
o = balance(gas_provider)
r = rpc.do(o)
try:
r = int(r, 16)
except TypeError:
r = int(r)
gas_min = int(config.get('ETH_GAS_GIFTER_MINIMUM_BALANCE'))
if r < gas_min:
logg.error('EEK! gas gifter has balance {}, below minimum {}'.format(r, gas_min))
return False
return True

View File

@@ -1,18 +0,0 @@
# external imports
import redis
import os
def health(*args, **kwargs):
r = redis.Redis(
host=kwargs['config'].get('REDIS_HOST'),
port=kwargs['config'].get('REDIS_PORT'),
db=kwargs['config'].get('REDIS_DB'),
)
try:
r.set(kwargs['unit'], os.getpid())
except redis.connection.ConnectionError:
return False
except redis.connection.ResponseError:
return False
return True

Some files were not shown because too many files have changed in this diff Show More