Compare commits

..

24 Commits

Author SHA1 Message Date
nolash
195fd6ea08 Remove commented code 2021-07-07 09:30:14 +02:00
a123122ed9 Update apps/cic-eth/.gitlab-ci.yml 2021-07-05 21:20:15 +00:00
4c420bdcfd Update ci_templates/.cic-template.yml, apps/cic-eth/.gitlab-ci.yml files 2021-07-05 21:00:41 +00:00
6601a3cdb6 Update apps/cic-eth/.gitlab-ci.yml 2021-07-05 20:49:09 +00:00
1b948776ba Update ci_templates/.cic-template.yml 2021-07-05 20:35:06 +00:00
b67d1ac26a Update ci_templates/.cic-template.yml 2021-07-05 20:31:29 +00:00
e68f10fac5 Update apps/cic-eth/docker/Dockerfile 2021-07-05 20:22:39 +00:00
6a86238cb6 Update apps/cic-eth/docker/Dockerfile 2021-07-05 20:19:12 +00:00
5fcef25bdd Update apps/cic-eth/docker/Dockerfile 2021-07-05 20:17:56 +00:00
3e526ba1ac Update ci_templates/.cic-template.yml, apps/cic-eth/.gitlab-ci.yml files 2021-07-05 20:15:07 +00:00
52239e2e82 Update ci_templates/.cic-template.yml, apps/cic-eth/.gitlab-ci.yml files 2021-07-05 20:12:04 +00:00
a96508dac4 Update ci_templates/.cic-template.yml 2021-07-05 20:08:08 +00:00
868a0e836c Update apps/cic-eth/.gitlab-ci.yml, apps/cic-eth/docker/Dockerfile, ci_templates/.cic-template.yml files 2021-07-05 19:59:27 +00:00
dadab0a20b Update apps/cic-eth/docker/Dockerfile 2021-07-05 19:26:23 +00:00
nolash
a4b00545d3 Merge remote-tracking branch 'origin/master' into lash/free-api 2021-07-05 19:28:40 +02:00
nolash
08aef8804b Merge remote-tracking branch 'origin/master' into lash/free-api 2021-07-05 13:03:54 +02:00
nolash
8fef8c98db Correct the admin import path correction 2021-07-05 12:27:52 +02:00
nolash
54b857d286 Merge remote-tracking branch 'origin/master' into lash/free-api 2021-07-05 12:25:05 +02:00
nolash
b917070155 Correct admin api import path in tests 2021-07-05 12:22:53 +02:00
nolash
ae3a8de2d4 Add redis to test reqs 2021-07-05 12:02:37 +02:00
nolash
ceaafeb513 Add eth-erc20 dep to tests reqs 2021-07-05 11:32:28 +02:00
nolash
7cd2fc4622 Add eth-erc20 dep 2021-07-05 11:32:00 +02:00
nolash
2144bec1ef Bump cic-eth version in data seeding, ussd 2021-07-01 21:30:08 +02:00
nolash
4897007bf9 Split out tools, admin and services setup 2021-07-01 21:16:37 +02:00
1137 changed files with 13855 additions and 197682 deletions

3
.gitignore vendored
View File

@@ -13,6 +13,3 @@ build/
**/coverage
**/.venv
.idea
**/.vim
**/*secret.yaml
.env

View File

@@ -1,43 +1,14 @@
include:
#- local: 'ci_templates/.cic-template.yml' #kaniko build templates
# these includes are app specific unit tests
- local: 'ci_templates/.cic-template.yml'
- local: 'apps/contract-migration/.gitlab-ci.yml'
- local: 'apps/cic-eth/.gitlab-ci.yml'
- local: 'apps/cic-ussd/.gitlab-ci.yml'
- local: 'apps/cic-notify/.gitlab-ci.yml'
- local: 'apps/cic-meta/.gitlab-ci.yml'
- local: 'apps/cic-cache/.gitlab-ci.yml'
#- local: 'apps/contract-migration/.gitlab-ci.yml'
#- local: 'apps/data-seeding/.gitlab-ci.yml'
- local: 'apps/data-seeding/.gitlab-ci.yml'
stages:
- build
- test
- deploy
image: registry.gitlab.com/grassrootseconomics/cic-internal-integration/docker-with-compose:latest
variables:
DOCKER_BUILDKIT: "1"
COMPOSE_DOCKER_CLI_BUILD: "1"
CI_DEBUG_TRACE: "true"
before_script:
- docker login -u gitlab-ci-token -p $CI_JOB_TOKEN $CI_REGISTRY
# runs on protected branches and pushes to repo
build-push:
stage: build
tags:
- integration
#script:
# - TAG=$CI_COMMIT_REF_SLUG-$CI_COMMIT_SHORT_SHA sh ./scripts/build-push.sh
script:
- TAG=latest sh ./scripts/build-push.sh
rules:
- if: $CI_COMMIT_REF_PROTECTED == "true"
when: always
deploy-dev:
stage: deploy
trigger: grassrootseconomics/devops
when: manual
- release

View File

@@ -2,25 +2,25 @@
## Getting started
### Preperation
Copy the .env_sample file to .env and populate with appropriate env vars
This repo uses docker-compose and docker buildkit. Set the following environment variables to get started:
## Make some keys
```
export COMPOSE_DOCKER_CLI_BUILD=1
export DOCKER_BUILDKIT=1
docker build -t bloxie . && docker run -v "$(pwd)/keys:/root/keys" --rm -it -t bloxie account new --chain /root/bloxberg.json --keys-path /root/keys
```
start services, database, redis and local ethereum node
```
docker-compose up -d
```
Run app/contract-migration to deploy contracts
### Prepare the repo
This is stuff we need to put in makefile but for now...
File mounts and permisssions need to be set
```
RUN_MASK=3 docker-compose up contract-migration
chmod -R 755 scripts/initdb apps/cic-meta/scripts/initdb
````
start cluster
```
docker-compose up
```
stop cluster
@@ -28,9 +28,9 @@ stop cluster
docker-compose down
```
stop cluster and delete data
delete data
```
docker-compose down -v --remove-orphans
docker-compose down -v
```
rebuild an images
@@ -38,7 +38,5 @@ rebuild an images
docker-compose up --build <service_name>
```
to delete the buildkit cache
```
docker builder prune --filter type=exec.cachemount
```
Deployment variables are writtend to service-configs/.env after everthing is up.

View File

@@ -0,0 +1,34 @@
# The solc image messes up the alpine environment, so we have to go all over again
FROM python:3.8.6-slim-buster
LABEL authors="Louis Holbrook <dev@holbrook.no> 0826EDA1702D1E87C6E2875121D2E7BB88C2A746"
LABEL spdx-license-identifier="GPL-3.0-or-later"
LABEL description="Base layer for buiding development images for the cic component suite"
RUN apt-get update && \
apt-get install -y git gcc g++ libpq-dev && \
apt-get install -y vim gawk jq telnet openssl iputils-ping curl wget gnupg socat bash procps make python2 postgresql-client
RUN echo installing nodejs tooling
COPY ./dev/nvm.sh /root/
# Install nvm with node and npm
# https://stackoverflow.com/questions/25899912/how-to-install-nvm-in-docker
ENV NVM_DIR /root/.nvm
ENV NODE_VERSION 15.3.0
ENV BANCOR_NODE_VERSION 10.16.0
RUN wget -qO- https://raw.githubusercontent.com/nvm-sh/nvm/v0.37.2/install.sh | bash \
&& . $NVM_DIR/nvm.sh \
&& nvm install $NODE_VERSION \
&& nvm alias default $NODE_VERSION \
&& nvm use $NODE_VERSION \
# So many ridiculously stupid issues with node in docker that take oceans of absolutely wasted time to resolve
# owner of these files is "1001" by default - wtf
&& chown -R root:root "$NVM_DIR/versions/node/v$NODE_VERSION"
ENV NODE_PATH $NVM_DIR/versions/node//v$NODE_VERSION/lib/node_modules
ENV PATH $NVM_DIR/versions/node//v$NODE_VERSION/bin:$PATH

View File

@@ -0,0 +1 @@
## this is an example base image if we wanted one for all the other apps. Its just OS level things

View File

@@ -2,6 +2,4 @@
omit =
.venv/*
scripts/*
cic_cache/db/migrations/*
cic_cache/version.py
cic_cache/cli
cic_cache/db/postgres/*

View File

@@ -1,4 +0,0 @@
.git
.cache
.dot
**/doc

View File

@@ -1,17 +1,22 @@
build-test-cic-cache:
stage: test
tags:
- integration
variables:
APP_NAME: cic-cache
MR_IMAGE_TAG: mr-$APP_NAME-$CI_COMMIT_REF_SLUG-$CI_COMMIT_SHORT_SHA
script:
- cd apps/cic-cache
- docker build -t $MR_IMAGE_TAG -f docker/Dockerfile .
- docker run $MR_IMAGE_TAG sh docker/run_tests.sh
allow_failure: true
rules:
- if: $CI_PIPELINE_SOURCE == "merge_request_event"
changes:
- apps/$APP_NAME/**/*
when: always
.cic_cache_variables:
variables:
APP_NAME: cic-cache
DOCKERFILE_PATH: $APP_NAME/docker/Dockerfile
.cic_cache_changes_target:
rules:
- changes:
- $CONTEXT/$APP_NAME/*
build-mr-cic-cache:
extends:
- .cic_cache_changes_target
- .py_build_merge_request
- .cic_cache_variables
build-push-cic-cache:
extends:
- .py_build_push
- .cic_cache_variables

View File

@@ -1 +0,0 @@
include *requirements.txt cic_cache/data/config/*

View File

@@ -1 +0,0 @@
# CIC-CACHE

View File

@@ -55,37 +55,15 @@ class Api:
queue=callback_queue,
)
def list(self, offset=0, limit=100, address=None, oldest=False):
def list(self, offset, limit, address=None):
s = celery.signature(
'cic_cache.tasks.tx.tx_filter',
[
offset,
limit,
0,
100,
address,
oldest,
],
queue=self.queue,
)
if self.callback_param != None:
s.link(self.callback_success).on_error(self.callback_error)
t = s.apply_async()
return t
def list_content(self, offset=0, limit=100, address=None, block_offset=None, block_limit=None, oldest=False):
s = celery.signature(
'cic_cache.tasks.tx.tx_filter_content',
[
offset,
limit,
address,
block_offset,
block_limit,
oldest,
],
queue=self.queue,
queue=None
)
if self.callback_param != None:
s.link(self.callback_success).on_error(self.callback_error)

View File

@@ -10,17 +10,11 @@ from cic_cache.db.list import (
list_transactions_mined,
list_transactions_account_mined,
list_transactions_mined_with_data,
list_transactions_mined_with_data_index,
list_transactions_account_mined_with_data_index,
list_transactions_account_mined_with_data,
)
logg = logging.getLogger()
DEFAULT_FILTER_SIZE = 8192 * 8
DEFAULT_LIMIT = 100
class Cache:
def __init__(self, session):
@@ -31,12 +25,12 @@ class BloomCache(Cache):
@staticmethod
def __get_filter_size(n):
n = DEFAULT_FILTER_SIZE
n = 8192 * 8
logg.warning('filter size hardcoded to {}'.format(n))
return n
def load_transactions(self, offset, limit, block_offset=None, block_limit=None, oldest=False):
def load_transactions(self, offset, limit):
"""Retrieves a list of transactions from cache and creates a bloom filter pointing to blocks and transactions.
Block and transaction numbers are serialized as 32-bit big-endian numbers. The input to the second bloom filter is the concatenation of the serialized block number and transaction index.
@@ -53,7 +47,7 @@ class BloomCache(Cache):
:return: Lowest block, bloom filter for blocks, bloom filter for blocks|tx
:rtype: tuple
"""
rows = list_transactions_mined(self.session, offset, limit, block_offset=block_offset, block_limit=block_limit, oldest=oldest)
rows = list_transactions_mined(self.session, offset, limit)
f_block = moolb.Bloom(BloomCache.__get_filter_size(limit), 3)
f_blocktx = moolb.Bloom(BloomCache.__get_filter_size(limit), 3)
@@ -62,12 +56,7 @@ class BloomCache(Cache):
for r in rows:
if highest_block == -1:
highest_block = r[0]
lowest_block = r[0]
else:
if oldest:
highest_block = r[0]
else:
lowest_block = r[0]
lowest_block = r[0]
block = r[0].to_bytes(4, byteorder='big')
tx = r[1].to_bytes(4, byteorder='big')
f_block.add(block)
@@ -76,7 +65,7 @@ class BloomCache(Cache):
return (lowest_block, highest_block, f_block.to_bytes(), f_blocktx.to_bytes(),)
def load_transactions_account(self, address, offset, limit, block_offset=None, block_limit=None, oldest=False):
def load_transactions_account(self, address, offset, limit):
"""Same as load_transactions(...), but only retrieves transactions where the specified account address is sender or recipient.
:param address: Address to retrieve transactions for.
@@ -88,7 +77,7 @@ class BloomCache(Cache):
:return: Lowest block, bloom filter for blocks, bloom filter for blocks|tx
:rtype: tuple
"""
rows = list_transactions_account_mined(self.session, address, offset, limit, block_offset=block_offset, block_limit=block_limit, oldest=oldest)
rows = list_transactions_account_mined(self.session, address, offset, limit)
f_block = moolb.Bloom(BloomCache.__get_filter_size(limit), 3)
f_blocktx = moolb.Bloom(BloomCache.__get_filter_size(limit), 3)
@@ -97,12 +86,7 @@ class BloomCache(Cache):
for r in rows:
if highest_block == -1:
highest_block = r[0]
lowest_block = r[0]
else:
if oldest:
highest_block = r[0]
else:
lowest_block = r[0]
lowest_block = r[0]
block = r[0].to_bytes(4, byteorder='big')
tx = r[1].to_bytes(4, byteorder='big')
f_block.add(block)
@@ -113,21 +97,8 @@ class BloomCache(Cache):
class DataCache(Cache):
def load_transactions_with_data(self, offset, limit, block_offset=None, block_limit=None, oldest=False):
if limit == 0:
limit = DEFAULT_LIMIT
rows = list_transactions_mined_with_data(self.session, offset, limit, block_offset, block_limit, oldest=oldest)
return self.__process_rows(rows, oldest)
def load_transactions_account_with_data(self, address, offset, limit, block_offset=None, block_limit=None, oldest=False):
if limit == 0:
limit = DEFAULT_LIMIT
rows = list_transactions_account_mined_with_data(self.session, address, offset, limit, block_offset, block_limit, oldest=oldest)
return self.__process_rows(rows, oldest)
def __process_rows(self, rows, oldest):
def load_transactions_with_data(self, offset, end):
rows = list_transactions_mined_with_data(self.session, offset, end)
tx_cache = []
highest_block = -1;
lowest_block = -1;
@@ -135,12 +106,7 @@ class DataCache(Cache):
for r in rows:
if highest_block == -1:
highest_block = r['block_number']
lowest_block = r['block_number']
else:
if oldest:
highest_block = r['block_number']
else:
lowest_block = r['block_number']
lowest_block = r['block_number']
tx_type = 'unknown'
if r['value'] != None:

View File

@@ -1,15 +0,0 @@
# local imports
from .base import *
from .chain import (
EthChainInterface,
chain_interface,
)
from .rpc import RPC
from .arg import ArgumentParser
from .config import Config
from .celery import CeleryApp
from .registry import (
connect_registry,
connect_token_registry,
connect_declarator,
)

View File

@@ -1,20 +0,0 @@
# external imports
from chainlib.eth.cli import ArgumentParser as BaseArgumentParser
# local imports
from .base import (
CICFlag,
Flag,
)
class ArgumentParser(BaseArgumentParser):
def process_local_flags(self, local_arg_flags):
if local_arg_flags & CICFlag.CELERY:
self.add_argument('-q', '--celery-queue', dest='celery_queue', type=str, default='cic-cache', help='Task queue')
if local_arg_flags & CICFlag.SYNCER:
self.add_argument('--offset', type=int, default=0, help='Start block height for initial history sync')
self.add_argument('--no-history', action='store_true', dest='no_history', help='Skip initial history sync')
if local_arg_flags & CICFlag.CHAIN:
self.add_argument('-r', '--registry-address', type=str, dest='registry_address', help='CIC registry contract address')

View File

@@ -1,31 +0,0 @@
# standard imports
import enum
# external imports
from chainlib.eth.cli import (
argflag_std_read,
argflag_std_write,
argflag_std_base,
Flag,
)
class CICFlag(enum.IntEnum):
# celery - nibble 1
CELERY = 1
# redis - nibble 2
# REDIS = 16
# REDIS_CALLBACK = 32
# chain - nibble 3
CHAIN = 256
# sync - nibble 4
SYNCER = 4096
argflag_local_task = CICFlag.CELERY
#argflag_local_taskcallback = argflag_local_task | CICFlag.REDIS | CICFlag.REDIS_CALLBACK
argflag_local_chain = CICFlag.CHAIN
argflag_local_sync = CICFlag.SYNCER | CICFlag.CHAIN

View File

@@ -1,24 +0,0 @@
# standard imports
import logging
# external imports
import celery
logg = logging.getLogger(__name__)
class CeleryApp:
@classmethod
def from_config(cls, config):
backend_url = config.get('CELERY_RESULT_URL')
broker_url = config.get('CELERY_BROKER_URL')
celery_app = None
if backend_url != None:
celery_app = celery.Celery(broker=broker_url, backend=backend_url)
logg.info('creating celery app on {} with backend on {}'.format(broker_url, backend_url))
else:
celery_app = celery.Celery(broker=broker_url)
logg.info('creating celery app without results backend on {}'.format(broker_url))
return celery_app

View File

@@ -1,21 +0,0 @@
# external imports
from chainlib.eth.block import (
block_by_number,
Block,
)
from chainlib.eth.tx import (
receipt,
Tx,
)
from chainlib.interface import ChainInterface
class EthChainInterface(ChainInterface):
def __init__(self):
self._tx_receipt = receipt
self._block_by_number = block_by_number
self._block_from_src = Block.from_src
self._src_normalize = Tx.src_normalize
chain_interface = EthChainInterface()

View File

@@ -1,63 +0,0 @@
# standard imports
import os
import logging
# external imports
from chainlib.eth.cli import (
Config as BaseConfig,
Flag,
)
# local imports
from .base import CICFlag
script_dir = os.path.dirname(os.path.realpath(__file__))
logg = logging.getLogger(__name__)
class Config(BaseConfig):
local_base_config_dir = os.path.join(script_dir, '..', 'data', 'config')
@classmethod
def from_args(cls, args, arg_flags, local_arg_flags, extra_args={}, default_config_dir=None, base_config_dir=None, default_fee_limit=None):
expanded_base_config_dir = [cls.local_base_config_dir]
if base_config_dir != None:
if isinstance(base_config_dir, str):
base_config_dir = [base_config_dir]
for d in base_config_dir:
expanded_base_config_dir.append(d)
config = BaseConfig.from_args(args, arg_flags, extra_args=extra_args, default_config_dir=default_config_dir, base_config_dir=expanded_base_config_dir, load_callback=None)
local_args_override = {}
# if local_arg_flags & CICFlag.REDIS:
# local_args_override['REDIS_HOST'] = getattr(args, 'redis_host')
# local_args_override['REDIS_PORT'] = getattr(args, 'redis_port')
# local_args_override['REDIS_DB'] = getattr(args, 'redis_db')
# local_args_override['REDIS_TIMEOUT'] = getattr(args, 'redis_timeout')
if local_arg_flags & CICFlag.CHAIN:
local_args_override['CIC_REGISTRY_ADDRESS'] = getattr(args, 'registry_address')
if local_arg_flags & CICFlag.CELERY:
local_args_override['CELERY_QUEUE'] = getattr(args, 'celery_queue')
if local_arg_flags & CICFlag.SYNCER:
local_args_override['SYNCER_OFFSET'] = getattr(args, 'offset')
local_args_override['SYNCER_NO_HISTORY'] = getattr(args, 'no_history')
config.dict_override(local_args_override, 'local cli args')
# if local_arg_flags & CICFlag.REDIS_CALLBACK:
# config.add(getattr(args, 'redis_host_callback'), '_REDIS_HOST_CALLBACK')
# config.add(getattr(args, 'redis_port_callback'), '_REDIS_PORT_CALLBACK')
if local_arg_flags & CICFlag.CELERY:
config.add(config.true('CELERY_DEBUG'), 'CELERY_DEBUG', exists_ok=True)
logg.debug('config loaded:\n{}'.format(config))
return config

View File

@@ -1,33 +0,0 @@
# standard imports
import logging
# external imports
from cic_eth_registry import CICRegistry
from cic_eth_registry.lookup.declarator import AddressDeclaratorLookup
from cic_eth_registry.lookup.tokenindex import TokenIndexLookup
from chainlib.eth.constant import ZERO_ADDRESS
logg = logging.getLogger()
def connect_token_registry(self, conn, chain_spec, sender_address=ZERO_ADDRESS):
registry = CICRegistry(chain_spec, conn)
token_registry_address = registry.by_name('TokenRegistry', sender_address=sender_address)
logg.debug('using token registry address {}'.format(token_registry_address))
lookup = TokenIndexLookup(chain_spec, token_registry_address)
CICRegistry.add_lookup(lookup)
def connect_declarator(self, conn, chain_spec, trusted_addresses, sender_address=ZERO_ADDRESS):
registry = CICRegistry(chain_spec, conn)
declarator_address = registry.by_name('AddressDeclarator', sender_address=sender_address)
logg.debug('using declarator address {}'.format(declarator_address))
lookup = AddressDeclaratorLookup(chain_spec, declarator_address, trusted_addresses)
CICRegistry.add_lookup(lookup)
def connect_registry(conn, chain_spec, registry_address, sender_address=ZERO_ADDRESS):
CICRegistry.address = registry_address
registry = CICRegistry(chain_spec, conn)
registry_address = registry.by_name('ContractRegistry', sender_address=sender_address)
return registry

View File

@@ -1,43 +0,0 @@
# standard imports
import logging
# external imports
from chainlib.connection import (
RPCConnection,
ConnType,
)
from chainlib.eth.connection import EthUnixSignerConnection
from chainlib.chain import ChainSpec
logg = logging.getLogger(__name__)
class RPC:
def __init__(self, chain_spec, rpc_provider, signer_provider=None):
self.chain_spec = chain_spec
self.rpc_provider = rpc_provider
self.signer_provider = signer_provider
def get_default(self):
return RPCConnection.connect(self.chain_spec, 'default')
@staticmethod
def from_config(config):
chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC'))
RPCConnection.register_location(config.get('RPC_PROVIDER'), chain_spec, 'default')
if config.get('SIGNER_PROVIDER'):
RPCConnection.register_constructor(ConnType.UNIX, EthUnixSignerConnection, tag='signer')
RPCConnection.register_location(config.get('SIGNER_PROVIDER'), chain_spec, 'signer')
rpc = RPC(chain_spec, config.get('RPC_PROVIDER'), signer_provider=config.get('SIGNER_PROVIDER'))
logg.info('set up rpc: {}'.format(rpc))
return rpc
def __str__(self):
return 'RPC factory, chain {}, rpc {}, signer {}'.format(self.chain_spec, self.rpc_provider, self.signer_provider)

View File

@@ -1,5 +0,0 @@
[celery]
broker_url = redis://localhost:6379
result_url =
queue = cic-cache
debug = 0

View File

@@ -1,4 +0,0 @@
[cic]
registry_address =
trust_address =
health_modules = cic_eth.check.db,cic_eth.check.redis,cic_eth.check.signer,cic_eth.check.gas

View File

@@ -1,10 +0,0 @@
[database]
engine =
driver =
host =
port =
name = cic-cache
user =
password =
debug = 0
pool_size = 0

View File

@@ -1,2 +0,0 @@
[signer]
provider =

View File

@@ -1,4 +0,0 @@
[syncer]
loop_interval = 1
offset = 0
no_history = 0

View File

@@ -13,9 +13,6 @@ def list_transactions_mined(
session,
offset,
limit,
block_offset,
block_limit,
oldest=False,
):
"""Executes db query to return all confirmed transactions according to the specified offset and limit.
@@ -26,62 +23,15 @@ def list_transactions_mined(
:result: Result set
:rtype: SQLAlchemy.ResultProxy
"""
order_by = 'DESC'
if oldest:
order_by = 'ASC'
if block_offset:
if block_limit:
s = "SELECT block_number, tx_index FROM tx WHERE block_number >= {} and block_number <= {} ORDER BY block_number {}, tx_index {} LIMIT {} OFFSET {}".format(block_offset, block_limit, order_by, order_by, limit, offset)
else:
s = "SELECT block_number, tx_index FROM tx WHERE block_number >= {} ORDER BY block_number {}, tx_index {} LIMIT {} OFFSET {}".format(block_offset, order_by, order_by, limit, offset)
else:
s = "SELECT block_number, tx_index FROM tx ORDER BY block_number {}, tx_index {} LIMIT {} OFFSET {}".format(order_by, order_by, limit, offset)
s = "SELECT block_number, tx_index FROM tx ORDER BY block_number DESC, tx_index DESC LIMIT {} OFFSET {}".format(limit, offset)
r = session.execute(s)
return r
def list_transactions_mined_with_data(
session,
offset,
limit,
block_offset,
block_limit,
oldest=False,
):
"""Executes db query to return all confirmed transactions according to the specified offset and limit.
:param block_offset: First block to include in search
:type block_offset: int
:param block_limit: Last block to include in search
:type block_limit: int
:result: Result set
:rtype: SQLAlchemy.ResultProxy
"""
order_by = 'DESC'
if oldest:
order_by = 'ASC'
if block_offset:
if block_limit:
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} AND block_number <= {} ORDER BY block_number {}, tx_index {} LIMIT {} OFFSET {}".format(block_offset, block_limit, order_by, order_by, limit, offset)
else:
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} ORDER BY block_number {}, tx_index {} LIMIT {} OFFSET {}".format(block_offset, order_by, order_by, limit, offset)
else:
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id ORDER BY block_number {}, tx_index {} LIMIT {} OFFSET {}".format(order_by, order_by, limit, offset)
r = session.execute(s)
return r
def list_transactions_mined_with_data_index(
session,
offset,
end,
block_offset,
block_limit,
oldest=False,
):
"""Executes db query to return all confirmed transactions according to the specified offset and limit.
@@ -92,87 +42,7 @@ def list_transactions_mined_with_data_index(
:result: Result set
:rtype: SQLAlchemy.ResultProxy
"""
order_by = 'DESC'
if oldest:
order_by = 'ASC'
if block_offset:
if block_limit:
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} and block_number <= {} ORDER BY block_number {}, tx_index {} LIMIT {} OFFSET {}".format(block_offset, block_limit, order_by, order_by, offset, end)
else:
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} ORDER BY block_number {}, tx_index {} LIMIT {} OFFSET {}".format(block_offset, order_by, order_by, offset, end)
else:
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id ORDER BY block_number {}, tx_index {} LIMIT {} OFFSET {}".format(order_by, order_by, offset, end)
r = session.execute(s)
return r
def list_transactions_account_mined_with_data_index(
session,
address,
offset,
limit,
block_offset,
block_limit,
oldest=False,
):
"""Executes db query to return all confirmed transactions according to the specified offset and limit, filtered by address
:param offset: Offset in data set to return transactions from
:type offset: int
:param limit: Max number of transactions to retrieve
:type limit: int
:result: Result set
:rtype: SQLAlchemy.ResultProxy
"""
order_by = 'DESC'
if oldest:
order_by = 'ASC'
if block_offset:
if block_limit:
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} AND block_number <= {} AND (sender = '{}' OR recipient = '{}') ORDER BY block_number {}, tx_index {} LIMIT {} OFFSET {}".format(block_offset, block_limit, address, address, order_by, order_by, limit, offset)
else:
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} AND (sender = '{}' OR recipient = '{}') ORDER BY block_number {}, tx_index {} LIMIT {} OFFSET {}".format(block_offset, address, address, order_by, order_by, limit, offset)
else:
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE sender = '{}' OR recipient = '{}' ORDER BY block_number {}, tx_index {} LIMIT {} OFFSET {}".format(address, address, order_by, order_by, limit, offset)
r = session.execute(s)
return r
def list_transactions_account_mined_with_data(
session,
address,
offset,
limit,
block_offset,
block_limit,
oldest=False,
):
"""Executes db query to return all confirmed transactions according to the specified offset and limit.
:param block_offset: First block to include in search
:type block_offset: int
:param block_limit: Last block to include in search
:type block_limit: int
:result: Result set
:rtype: SQLAlchemy.ResultProxy
"""
order_by = 'DESC'
if oldest:
order_by = 'ASC'
if block_offset:
if block_limit:
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} AND block_number <= {} AND (sender = '{}' OR recipient = '{}') ORDER BY block_number {}, tx_index {} LIMIT {} OFFSET {}".format(block_offset, block_limit, address, address, order_by, order_by, limit, offset)
else:
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} AND (sender = '{}' OR recipient = '{}') ORDER BY block_number {}, tx_index {} LIMIT {} OFFSET {}".format(block_offset, address, address, order_by, order_by, limit, offset)
else:
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE sender = '{}' OR recipient = '{}' ORDER BY block_number {}, tx_index {} LIMIT {} OFFSET {}".format(address, address, order_by, order_by, limit, offset)
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} AND block_number <= {} ORDER BY block_number ASC, tx_index ASC".format(offset, end)
r = session.execute(s)
return r
@@ -183,9 +53,6 @@ def list_transactions_account_mined(
address,
offset,
limit,
block_offset,
block_limit,
oldest=False,
):
"""Same as list_transactions_mined(...), but only retrieves transaction where the specified account address is sender or recipient.
@@ -198,20 +65,7 @@ def list_transactions_account_mined(
:result: Result set
:rtype: SQLAlchemy.ResultProxy
"""
order_by = 'DESC'
if oldest:
order_by = 'ASC'
if block_offset:
if block_limit:
s = "SELECT block_number, tx_index FROM tx WHERE block_number >= {} AND block_number <= {} AND (sender = '{}' OR recipient = '{}') ORDER BY block_number {}, tx_index {} LIMIT {} OFFSET {}".format(block_offset, block_limit, address, address, order_by, order_by, limit, offset)
else:
s = "SELECT block_number, tx_index FROM tx WHERE block_number >= {} AND (sender = '{}' OR recipient = '{}') ORDER BY block_number {}, tx_index {} LIMIT {} OFFSET {}".format(block_offset, address, address, order_by, order_by, limit, offset)
else:
s = "SELECT block_number, tx_index FROM tx WHERE sender = '{}' OR recipient = '{}' ORDER BY block_number {}, tx_index {} LIMIT {} OFFSET {}".format(address, address, order_by, order_by, limit, offset)
s = "SELECT block_number, tx_index FROM tx WHERE sender = '{}' OR recipient = '{}' ORDER BY block_number DESC, tx_index DESC LIMIT {} OFFSET {}".format(address, address, limit, offset)
r = session.execute(s)
return r

View File

@@ -7,7 +7,7 @@ Create Date: 2021-04-01 08:10:29.156243
"""
from alembic import op
import sqlalchemy as sa
from chainsyncer.db.migrations.default.export import (
from chainsyncer.db.migrations.sqlalchemy import (
chainsyncer_upgrade,
chainsyncer_downgrade,
)

View File

@@ -100,4 +100,3 @@ class SessionBase(Model):
logg.debug('destroying session {}'.format(session_key))
session.commit()
session.close()
del SessionBase.localsessions[session_key]

View File

@@ -4,9 +4,6 @@ import json
import re
import base64
# external imports
from hexathon import add_0x
# local imports
from cic_cache.cache import (
BloomCache,
@@ -14,11 +11,10 @@ from cic_cache.cache import (
)
logg = logging.getLogger(__name__)
#logg = logging.getLogger()
re_transactions_all_bloom = r'/tx/(\d+)?/?(\d+)/?'
re_transactions_account_bloom = r'/tx/user/((0x)?[a-fA-F0-9]+)(/(\d+)(/(\d+))?)?/?'
re_transactions_all_data = r'/txa/(\d+)?/?(\d+)/?'
re_transactions_account_bloom = r'/tx/user/((0x)?[a-fA-F0-9]+)/?(\d+)?/?(\d+)/?'
re_transactions_all_data = r'/txa/(\d+)/(\d+)/?'
DEFAULT_LIMIT = 100
@@ -30,13 +26,13 @@ def process_transactions_account_bloom(session, env):
address = r[1]
if r[2] == None:
address = add_0x(address)
offset = 0
address = '0x' + address
offset = DEFAULT_LIMIT
if r.lastindex > 2:
offset = r[4]
limit = DEFAULT_LIMIT
if r.lastindex > 4:
limit = r[6]
offset = r[3]
limit = 0
if r.lastindex > 3:
limit = r[4]
c = BloomCache(session)
(lowest_block, highest_block, bloom_filter_block, bloom_filter_tx) = c.load_transactions_account(address, offset, limit)
@@ -91,14 +87,13 @@ def process_transactions_all_data(session, env):
if env.get('HTTP_X_CIC_CACHE_MODE') != 'all':
return None
logg.debug('got data request {}'.format(env))
block_offset = r[1]
block_end = r[2]
offset = r[1]
end = r[2]
if int(r[2]) < int(r[1]):
raise ValueError('cart before the horse, dude')
c = DataCache(session)
(lowest_block, highest_block, tx_cache) = c.load_transactions_with_data(0, 0, block_offset, block_end, oldest=True) # oldest needs to be settable
(lowest_block, highest_block, tx_cache) = c.load_transactions_with_data(offset, end)
for r in tx_cache:
r['date_block'] = r['date_block'].timestamp()

View File

@@ -8,7 +8,15 @@ import sys
import re
# external imports
import confini
import celery
import sqlalchemy
import rlp
import cic_base.config
import cic_base.log
import cic_base.argparse
import cic_base.rpc
from cic_base.eth.syncer import chain_interface
from cic_eth_registry import CICRegistry
from cic_eth_registry.error import UnknownContractError
from chainlib.chain import ChainSpec
@@ -26,7 +34,6 @@ from chainsyncer.driver.history import HistorySyncer
from chainsyncer.db.models.base import SessionBase
# local imports
import cic_cache.cli
from cic_cache.db import (
dsn_from_config,
add_tag,
@@ -36,36 +43,32 @@ from cic_cache.runnable.daemons.filters import (
FaucetFilter,
)
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
script_dir = os.path.realpath(os.path.dirname(__file__))
# process args
arg_flags = cic_cache.cli.argflag_std_read
local_arg_flags = cic_cache.cli.argflag_local_sync
argparser = cic_cache.cli.ArgumentParser(arg_flags)
argparser.process_local_flags(local_arg_flags)
args = argparser.parse_args()
def add_block_args(argparser):
argparser.add_argument('--history-start', type=int, default=0, dest='history_start', help='Start block height for initial history sync')
argparser.add_argument('--no-history', action='store_true', dest='no_history', help='Skip initial history sync')
return argparser
# process config
config = cic_cache.cli.Config.from_args(args, arg_flags, local_arg_flags)
# connect to database
logg = cic_base.log.create()
argparser = cic_base.argparse.create(script_dir, cic_base.argparse.full_template)
argparser = cic_base.argparse.add(argparser, add_block_args, 'block')
args = cic_base.argparse.parse(argparser, logg)
config = cic_base.config.create(args.c, args, args.env_prefix)
config.add(args.history_start, 'SYNCER_HISTORY_START', True)
config.add(args.no_history, '_NO_HISTORY', True)
cic_base.config.log(config)
dsn = dsn_from_config(config)
SessionBase.connect(dsn, debug=config.true('DATABASE_DEBUG'))
# set up rpc
rpc = cic_cache.cli.RPC.from_config(config)
conn = rpc.get_default()
chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
# set up chain provisions
chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC'))
registry = None
try:
registry = cic_cache.cli.connect_registry(conn, chain_spec, config.get('CIC_REGISTRY_ADDRESS'))
except UnknownContractError as e:
logg.exception('Registry contract connection failed for {}: {}'.format(config.get('CIC_REGISTRY_ADDRESS'), e))
sys.exit(1)
logg.info('connected contract registry {}'.format(config.get('CIC_REGISTRY_ADDRESS')))
cic_base.rpc.setup(chain_spec, config.get('ETH_PROVIDER'))
def register_filter_tags(filters, session):
@@ -92,12 +95,14 @@ def main():
syncers = []
#if SQLBackend.first(chain_spec):
# backend = SQLBackend.initial(chain_spec, block_offset)
syncer_backends = SQLBackend.resume(chain_spec, block_offset)
if len(syncer_backends) == 0:
initial_block_start = config.get('SYNCER_OFFSET')
initial_block_start = config.get('SYNCER_HISTORY_START')
initial_block_offset = block_offset
if config.get('SYNCER_NO_HISTORY'):
if config.get('_NO_HISTORY'):
initial_block_start = block_offset
initial_block_offset += 1
syncer_backends.append(SQLBackend.initial(chain_spec, initial_block_offset, start_block_height=initial_block_start))
@@ -107,10 +112,10 @@ def main():
logg.info('resuming sync session {}'.format(syncer_backend))
for syncer_backend in syncer_backends:
syncers.append(HistorySyncer(syncer_backend, cic_cache.cli.chain_interface))
syncers.append(HistorySyncer(syncer_backend, chain_interface))
syncer_backend = SQLBackend.live(chain_spec, block_offset+1)
syncers.append(HeadSyncer(syncer_backend, cic_cache.cli.chain_interface))
syncers.append(HeadSyncer(syncer_backend, chain_interface))
trusted_addresses_src = config.get('CIC_TRUST_ADDRESS')
if trusted_addresses_src == None:

View File

@@ -2,17 +2,14 @@
import celery
# local imports
from cic_cache.cache import (
BloomCache,
DataCache,
)
from cic_cache.cache import BloomCache
from cic_cache.db.models.base import SessionBase
celery_app = celery.current_app
@celery_app.task(bind=True)
def tx_filter(self, offset, limit, address=None, oldest=False, encoding='hex'):
def tx_filter(self, offset, limit, address=None, encoding='hex'):
queue = self.request.delivery_info.get('routing_key')
session = SessionBase.create_session()
@@ -20,9 +17,9 @@ def tx_filter(self, offset, limit, address=None, oldest=False, encoding='hex'):
c = BloomCache(session)
b = None
if address == None:
(lowest_block, highest_block, bloom_filter_block, bloom_filter_tx) = c.load_transactions(offset, limit, oldest=oldest)
(lowest_block, highest_block, bloom_filter_block, bloom_filter_tx) = c.load_transactions(offset, limit)
else:
(lowest_block, highest_block, bloom_filter_block, bloom_filter_tx) = c.load_transactions_account(address, offset, limit, oldest=oldest)
(lowest_block, highest_block, bloom_filter_block, bloom_filter_tx) = c.load_transactions_account(address, offset, limit)
session.close()
@@ -38,17 +35,4 @@ def tx_filter(self, offset, limit, address=None, oldest=False, encoding='hex'):
return o
@celery_app.task(bind=True)
def tx_filter_content(self, offset, limit, address=None, block_offset=None, block_limit=None, oldest=False, encoding='hex'):
session = SessionBase.create_session()
c = DataCache(session)
b = None
if address == None:
(lowest_block, highest_block, tx_cache) = c.load_transactions_with_data(offset, limit, block_offset=block_offset, block_limit=block_limit, oldest=oldest)
else:
(lowest_block, highest_block, tx_cache) = c.load_transactions_account_with_data_index(address, offset, limit, block_offset=block_offset, block_limit=block_limit)
session.close()
return (lowest_block, highest_block, tx_cache,)

View File

@@ -4,7 +4,7 @@ import semver
version = (
0,
2,
1,
0,
'alpha.2',
)

View File

@@ -0,0 +1,2 @@
[bancor]
dir =

View File

@@ -1,3 +1,4 @@
[cic]
registry_address =
chain_spec =
trust_address =

View File

@@ -0,0 +1,3 @@
[bancor]
registry_address =
dir = /usr/local/share/bancor

View File

@@ -1,3 +1,4 @@
[cic]
chain_spec =
registry_address =
trust_address = 0xEb3907eCad74a0013c259D5874AE7f22DcBcC95C

View File

@@ -1,4 +1,3 @@
[syncer]
loop_interval = 1
offset = 0
no_history = 0
history_start = 0

View File

@@ -0,0 +1,2 @@
[eth]
provider = ws://localhost:8545

View File

@@ -0,0 +1,3 @@
[syncer]
loop_interval = 5
history_start = 0

View File

@@ -1,39 +1,52 @@
# syntax = docker/dockerfile:1.2
FROM registry.gitlab.com/grassrootseconomics/cic-base-images:python-3.8.6-dev-55da5f4e as dev
# RUN pip install $pip_extra_index_url_flag cic-base[full_graph]==0.1.2b9
FROM python:3.8.6-slim-buster
COPY requirements.txt .
#RUN pip install $pip_extra_index_url_flag -r test_requirements.txt
#RUN pip install $pip_extra_index_url_flag .
#RUN pip install .[server]
#COPY --from=0 /usr/local/share/cic/solidity/ /usr/local/share/cic/solidity/
ARG EXTRA_INDEX_URL="https://pip.grassrootseconomics.net:8433"
ARG GITLAB_PYTHON_REGISTRY="https://gitlab.com/api/v4/projects/27624814/packages/pypi/simple"
ARG EXTRA_PIP_ARGS=""
RUN --mount=type=cache,mode=0755,target=/root/.cache/pip \
pip install --index-url https://pypi.org/simple \
--extra-index-url $GITLAB_PYTHON_REGISTRY --extra-index-url $EXTRA_INDEX_URL $EXTRA_PIP_ARGS \
-r requirements.txt
WORKDIR /usr/src/cic-cache
COPY . .
ARG pip_extra_index_url_flag='--index https://pypi.org/simple --extra-index-url https://pip.grassrootseconomics.net:8433'
ARG root_requirement_file='requirements.txt'
RUN python setup.py install
#RUN apk update && \
# apk add gcc musl-dev gnupg libpq
#RUN apk add postgresql-dev
#RUN apk add linux-headers
#RUN apk add libffi-dev
RUN apt-get update && \
apt install -y gcc gnupg libpq-dev wget make g++ gnupg bash procps git
# Copy shared requirements from top of mono-repo
RUN echo "copying root req file ${root_requirement_file}"
RUN pip install $pip_extra_index_url_flag cic-base[full_graph]==0.1.2b9
COPY cic-cache/requirements.txt ./
COPY cic-cache/setup.cfg \
cic-cache/setup.py \
./
COPY cic-cache/cic_cache/ ./cic_cache/
COPY cic-cache/scripts/ ./scripts/
COPY cic-cache/test_requirements.txt ./
RUN pip install $pip_extra_index_url_flag -r test_requirements.txt
RUN pip install $pip_extra_index_url_flag .
RUN pip install .[server]
COPY cic-cache/tests/ ./tests/
#COPY db/ cic-cache/db
#RUN apk add postgresql-client
# ini files in config directory defines the configurable parameters for the application
# they can all be overridden by environment variables
# to generate a list of environment variables from configuration, use: confini-dump -z <dir> (executable provided by confini package)
COPY config/ /usr/local/etc/cic-cache/
COPY cic-cache/config/ /usr/local/etc/cic-cache/
# for db migrations
RUN git clone https://github.com/vishnubob/wait-for-it.git /usr/local/bin/wait-for-it/
COPY cic_cache/db/migrations/ /usr/local/share/cic-cache/alembic/
COPY cic-cache/cic_cache/db/migrations/ /usr/local/share/cic-cache/alembic/
COPY /docker/start_tracker.sh ./start_tracker.sh
COPY /docker/db.sh ./db.sh
COPY cic-cache/docker/start_tracker.sh ./start_tracker.sh
COPY cic-cache/docker/db.sh ./db.sh
RUN chmod 755 ./*.sh
# Tracker
# ENTRYPOINT ["/usr/local/bin/cic-cache-tracker", "-vv"]
# Server
# ENTRYPOINT [ "/usr/local/bin/uwsgi", "--wsgi-file", "/usr/local/lib/python3.8/site-packages/cic_cache/runnable/server.py", "--http", ":80", "--pyargv", "-vv" ]
ENTRYPOINT []

View File

@@ -1,10 +0,0 @@
#! /bin/bash
set -e
pip install --extra-index-url https://pip.grassrootseconomics.net:8433 \
--extra-index-url https://gitlab.com/api/v4/projects/27624814/packages/pypi/simple \
-r test_requirements.txt
export PYTHONPATH=. && pytest -x --cov=cic_cache --cov-fail-under=90 --cov-report term-missing tests

View File

@@ -1,14 +1,13 @@
cic-base==0.1.3a3+build.984b5cff
alembic==1.4.2
confini>=0.3.6rc4,<0.5.0
confini~=0.3.6rc3
uwsgi==2.0.19.1
moolb~=0.1.1b2
cic-eth-registry~=0.6.1a1
moolb~=0.1.0
cic-eth-registry~=0.5.6a1
SQLAlchemy==1.3.20
semver==2.13.0
psycopg2==2.8.6
celery==4.4.7
redis==3.5.3
chainsyncer[sql]>=0.0.6a3,<0.1.0
erc20-faucet>=0.3.2a2, <0.4.0
chainlib-eth>=0.0.9a14,<0.1.0
eth-address-index>=0.2.3a4,<0.3.0
chainsyncer[sql]~=0.0.3a3
erc20-faucet~=0.2.2a1

View File

@@ -23,13 +23,11 @@ licence_files =
[options]
python_requires = >= 3.6
include_package_data = True
packages =
cic_cache
cic_cache.tasks
cic_cache.db
cic_cache.db.models
cic_cache.cli
cic_cache.runnable
cic_cache.runnable.daemons
cic_cache.runnable.daemons.filters
@@ -41,4 +39,3 @@ console_scripts =
cic-cache-trackerd = cic_cache.runnable.daemons.tracker:main
cic-cache-serverd = cic_cache.runnable.daemons.server:main
cic-cache-taskerd = cic_cache.runnable.daemons.tasker:main
cic-cache-list = cic_cache.runable.list:main

View File

@@ -6,5 +6,5 @@ sqlparse==0.4.1
pytest-celery==0.0.0a1
eth_tester==0.5.0b3
py-evm==0.3.0a20
sarafu-faucet~=0.0.7a1
erc20-transfer-authorization>=0.3.5a1,<0.4.0
cic_base[full]==0.1.3a3+build.984b5cff
sarafu-faucet~=0.0.4a1

View File

@@ -1,40 +0,0 @@
# standard imports
import os
# external imports
import chainlib.cli
# local imports
import cic_cache.cli
script_dir = os.path.dirname(os.path.realpath(__file__))
config_dir = os.path.join(script_dir, '..', 'testdata', 'config')
def test_argumentparserto_config():
argparser = cic_cache.cli.ArgumentParser()
local_flags = 0xffff
argparser.process_local_flags(local_flags)
argparser.add_argument('--foo', type=str)
args = argparser.parse_args([
'-q', 'baz',
'--offset', '13',
'--no-history',
'-r','0xdeadbeef',
'-vv',
'--foo', 'bar',
])
extra_args = {
'foo': '_BARBARBAR',
}
config = cic_cache.cli.Config.from_args(args, chainlib.cli.argflag_std_base, local_flags, extra_args=extra_args, base_config_dir=config_dir)
assert config.get('_BARBARBAR') == 'bar'
assert config.get('CELERY_QUEUE') == 'baz'
assert config.get('SYNCER_NO_HISTORY') == True
assert config.get('SYNCER_OFFSET') == 13
assert config.get('CIC_REGISTRY_ADDRESS') == '0xdeadbeef'

View File

@@ -1,17 +0,0 @@
# standard imports
import tempfile
# local imports
import cic_cache.cli
def test_cli_celery():
cf = tempfile.mkdtemp()
config = {
'CELERY_RESULT_URL': 'filesystem://' + cf,
}
cic_cache.cli.CeleryApp.from_config(config)
config['CELERY_BROKER_URL'] = 'filesystem://' + cf
cic_cache.cli.CeleryApp.from_config(config)

View File

@@ -1,68 +0,0 @@
# external imports
import pytest
from chainlib.eth.gas import (
Gas,
RPCGasOracle,
)
from chainlib.eth.nonce import RPCNonceOracle
from chainlib.eth.block import (
block_latest,
Block,
)
from chainlib.eth.pytest.fixtures_chain import default_chain_spec
from chainlib.eth.pytest.fixtures_ethtester import *
from cic_eth_registry.pytest.fixtures_contracts import *
from hexathon import add_0x
# local imports
import cic_cache.cli
@pytest.mark.xfail()
def test_cli_rpc(
eth_rpc,
eth_signer,
default_chain_spec,
):
config = {
'CHAIN_SPEC': str(default_chain_spec),
'RPC_HTTP_PROVIDER': 'http://localhost:8545',
}
rpc = cic_cache.cli.RPC.from_config(config, default_label='foo')
conn = rpc.get_by_label('foo')
#o = block_latest()
#conn.do(o)
def test_cli_chain(
default_chain_spec,
eth_rpc,
eth_signer,
contract_roles,
):
ifc = cic_cache.cli.EthChainInterface()
nonce_oracle = RPCNonceOracle(contract_roles['CONTRACT_DEPLOYER'], conn=eth_rpc)
gas_oracle = RPCGasOracle(conn=eth_rpc)
c = Gas(default_chain_spec, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, signer=eth_signer)
recipient = add_0x(os.urandom(20).hex())
(tx_hash, o) = c.create(contract_roles['CONTRACT_DEPLOYER'], recipient, 1024)
r = eth_rpc.do(o)
o = ifc.tx_receipt(r)
r = eth_rpc.do(o)
assert r['status'] == 1
o = ifc.block_by_number(1)
block_src = eth_rpc.do(o)
block = ifc.block_from_src(block_src)
assert block.number == 1
with pytest.raises(KeyError):
assert block_src['gasUsed'] == 21000
assert block_src['gas_used'] == 21000
block_src = ifc.src_normalize(block_src)
assert block_src['gasUsed'] == 21000
assert block_src['gas_used'] == 21000

View File

@@ -5,12 +5,9 @@ import datetime
# external imports
import pytest
import moolb
# local imports
from cic_cache import db
from cic_cache import BloomCache
from cic_cache.cache import DEFAULT_FILTER_SIZE
script_dir = os.path.dirname(os.path.realpath(__file__))
root_dir = os.path.dirname(script_dir)
@@ -64,6 +61,7 @@ def txs(
dt.timestamp(),
)
tx_number = 42
tx_hash_second = '0x' + os.urandom(32).hex()
tx_signed_second = '0x' + os.urandom(128).hex()
@@ -92,44 +90,6 @@ def txs(
]
@pytest.fixture(scope='function')
def more_txs(
init_database,
list_defaults,
list_actors,
list_tokens,
txs,
):
session = init_database
tx_number = 666
tx_hash = '0x' + os.urandom(32).hex()
tx_signed = '0x' + os.urandom(128).hex()
nonce = 3
dt = datetime.datetime.utcnow()
dt += datetime.timedelta(hours=1)
db.add_transaction(
session,
tx_hash,
list_defaults['block']+2,
tx_number,
list_actors['alice'],
list_actors['diane'],
list_tokens['bar'],
list_tokens['bar'],
2048,
4096,
False,
dt.timestamp(),
)
session.commit()
return [tx_hash] + txs
@pytest.fixture(scope='function')
def tag_txs(
init_database,
@@ -141,7 +101,3 @@ def tag_txs(
db.tag_transaction(init_database, txs[1], 'taag', domain='test')
@pytest.fixture(scope='session')
def zero_filter():
return moolb.Bloom(DEFAULT_FILTER_SIZE, 3)

View File

@@ -10,7 +10,6 @@ from sqlalchemy import text
from chainlib.eth.tx import Tx
from chainlib.eth.block import Block
from chainlib.chain import ChainSpec
from chainlib.eth.error import RequestMismatchException
from hexathon import (
strip_0x,
add_0x,
@@ -19,21 +18,10 @@ from hexathon import (
# local imports
from cic_cache.db import add_tag
from cic_cache.runnable.daemons.filters.erc20 import ERC20TransferFilter
from cic_cache.runnable.daemons.filters.base import TagSyncFilter
logg = logging.getLogger()
def test_base_filter_str(
init_database,
):
f = TagSyncFilter('foo')
assert 'foo' == str(f)
f = TagSyncFilter('foo', domain='bar')
assert 'bar.foo' == str(f)
def test_erc20_filter(
eth_rpc,
foo_token,
@@ -79,95 +67,3 @@ def test_erc20_filter(
s = text("SELECT x.tx_hash FROM tag a INNER JOIN tag_tx_link l ON l.tag_id = a.id INNER JOIN tx x ON x.id = l.tx_id WHERE a.domain = :a AND a.value = :b")
r = init_database.execute(s, {'a': fltr.tag_domain, 'b': fltr.tag_name}).fetchone()
assert r[0] == tx.hash
def test_erc20_filter_nocontract(
eth_rpc,
foo_token,
init_database,
list_defaults,
list_actors,
tags,
):
chain_spec = ChainSpec('foo', 'bar', 42, 'baz')
fltr = ERC20TransferFilter(chain_spec)
add_tag(init_database, fltr.tag_name, domain=fltr.tag_domain)
# incomplete args
data = 'a9059cbb'
data += strip_0x(list_actors['alice'])
data += '1000'.ljust(64, '0')
block = Block({
'hash': os.urandom(32).hex(),
'number': 42,
'timestamp': datetime.datetime.utcnow().timestamp(),
'transactions': [],
})
tx = Tx({
'to': os.urandom(20).hex(),
'from': list_actors['bob'],
'data': data,
'value': 0,
'hash': os.urandom(32).hex(),
'nonce': 13,
'gasPrice': 10000000,
'gas': 123456,
})
block.txs.append(tx)
tx.block = block
assert not fltr.filter(eth_rpc, block, tx, db_session=init_database)
@pytest.mark.parametrize(
'contract_method,contract_input,expected_exception',
[
('a9059cbb', os.urandom(32).hex(), ValueError), # not enough args
('a9059cbb', os.urandom(31).hex(), ValueError), # wrong arg boundary
('a9059cbc', os.urandom(64).hex(), RequestMismatchException), # wrong method
],
)
def test_erc20_filter_bogus(
eth_rpc,
foo_token,
init_database,
list_defaults,
list_actors,
tags,
contract_method,
contract_input,
expected_exception,
):
chain_spec = ChainSpec('foo', 'bar', 42, 'baz')
fltr = ERC20TransferFilter(chain_spec)
add_tag(init_database, fltr.tag_name, domain=fltr.tag_domain)
# incomplete args
data = contract_method
data += contract_input
block = Block({
'hash': os.urandom(32).hex(),
'number': 42,
'timestamp': datetime.datetime.utcnow().timestamp(),
'transactions': [],
})
tx = Tx({
'to': foo_token,
'from': list_actors['bob'],
'data': data,
'value': 0,
'hash': os.urandom(32).hex(),
'nonce': 13,
'gasPrice': 10000000,
'gas': 123456,
})
block.txs.append(tx)
tx.block = block
assert not fltr.filter(eth_rpc, block, tx, db_session=init_database)

View File

@@ -8,7 +8,6 @@ import json
import pytest
# local imports
from cic_cache import db
from cic_cache import BloomCache
from cic_cache.cache import DataCache
@@ -19,6 +18,7 @@ def test_cache(
init_database,
list_defaults,
list_actors,
list_tokens,
txs,
):
@@ -37,6 +37,9 @@ def test_cache(
def test_cache_data(
init_database,
list_defaults,
list_actors,
list_tokens,
txs,
tag_txs,
):
@@ -44,209 +47,10 @@ def test_cache_data(
session = init_database
c = DataCache(session)
b = c.load_transactions_with_data(0, 3) #410000, 420000) #, 100, block_offset=410000, block_limit=420000, oldest=True)
b = c.load_transactions_with_data(410000, 420000)
assert len(b[2]) == 2
assert b[2][0]['tx_hash'] == txs[0]
assert b[2][0]['tx_type'] == 'unknown'
assert b[2][1]['tx_type'] == 'test.taag'
def test_cache_ranges(
init_database,
list_defaults,
list_actors,
list_tokens,
more_txs,
):
session = init_database
oldest = list_defaults['block'] - 1
mid = list_defaults['block']
newest = list_defaults['block'] + 2
c = BloomCache(session)
b = c.load_transactions(0, 100)
assert b[0] == oldest
assert b[1] == newest
b = c.load_transactions(1, 2)
assert b[0] == oldest
assert b[1] == mid
b = c.load_transactions(0, 2)
assert b[0] == mid
assert b[1] == newest
b = c.load_transactions(0, 1)
assert b[0] == newest
assert b[1] == newest
b = c.load_transactions(0, 100, oldest=True)
assert b[0] == oldest
assert b[1] == newest
b = c.load_transactions(0, 100, block_offset=list_defaults['block'])
assert b[0] == mid
assert b[1] == newest
b = c.load_transactions(0, 100, block_offset=list_defaults['block'] - 1, block_limit=list_defaults['block'])
assert b[0] == oldest
assert b[1] == mid
b = c.load_transactions(0, 100, block_offset=list_defaults['block'] - 1, block_limit=list_defaults['block'], oldest=True)
assert b[0] == oldest
assert b[1] == mid
# now check when supplying account
b = c.load_transactions_account(list_actors['alice'], 0, 100)
assert b[0] == oldest
assert b[1] == newest
b = c.load_transactions_account(list_actors['bob'], 0, 100)
assert b[0] == mid
assert b[1] == mid
b = c.load_transactions_account(list_actors['diane'], 0, 100)
assert b[0] == oldest
assert b[1] == newest
# add block filter to the mix
b = c.load_transactions_account(list_actors['alice'], 0, 100, block_offset=list_defaults['block'])
assert b[0] == mid
assert b[1] == newest
assert b[2][0]['tx_hash'] == txs[1]
assert b[2][1]['tx_type'] == 'unknown'
assert b[2][0]['tx_type'] == 'test.taag'
b = c.load_transactions_account(list_actors['alice'], 0, 100, block_offset=list_defaults['block'])
assert b[0] == mid
assert b[1] == newest
b = c.load_transactions_account(list_actors['bob'], 0, 100, block_offset=list_defaults['block'] - 1, block_limit=list_defaults['block'])
assert b[0] == mid
assert b[1] == mid
b = c.load_transactions_account(list_actors['diane'], 0, 100, block_offset=list_defaults['block'] - 1, block_limit=list_defaults['block'])
assert b[0] == oldest
assert b[1] == oldest
def test_cache_ranges_data(
init_database,
list_defaults,
list_actors,
list_tokens,
more_txs,
):
session = init_database
oldest = list_defaults['block'] - 1
mid = list_defaults['block']
newest = list_defaults['block'] + 2
c = DataCache(session)
b = c.load_transactions_with_data(0, 100)
assert b[0] == oldest
assert b[1] == newest
assert len(b[2]) == 3
assert b[2][0]['tx_hash'] == more_txs[0]
assert b[2][2]['tx_hash'] == more_txs[2]
b = c.load_transactions_with_data(1, 2)
assert b[0] == oldest
assert b[1] == mid
assert len(b[2]) == 2
assert b[2][0]['tx_hash'] == more_txs[1]
assert b[2][1]['tx_hash'] == more_txs[2]
b = c.load_transactions_with_data(0, 2)
assert b[0] == mid
assert b[1] == newest
assert len(b[2]) == 2
assert b[2][0]['tx_hash'] == more_txs[0]
assert b[2][1]['tx_hash'] == more_txs[1]
b = c.load_transactions_with_data(0, 1)
assert b[0] == newest
assert b[1] == newest
assert len(b[2]) == 1
assert b[2][0]['tx_hash'] == more_txs[0]
b = c.load_transactions_with_data(0, 100, oldest=True)
assert b[0] == oldest
assert b[1] == newest
assert len(b[2]) == 3
assert b[2][0]['tx_hash'] == more_txs[2]
assert b[2][1]['tx_hash'] == more_txs[1]
assert b[2][2]['tx_hash'] == more_txs[0]
b = c.load_transactions_with_data(0, 100, block_offset=list_defaults['block'])
assert b[0] == mid
assert b[1] == newest
assert len(b[2]) == 2
assert b[2][0]['tx_hash'] == more_txs[0]
assert b[2][1]['tx_hash'] == more_txs[1]
b = c.load_transactions_with_data(0, 100, block_offset=list_defaults['block'] - 1, block_limit=list_defaults['block'])
assert b[0] == oldest
assert b[1] == mid
assert len(b[2]) == 2
assert b[2][0]['tx_hash'] == more_txs[1]
assert b[2][1]['tx_hash'] == more_txs[2]
b = c.load_transactions_with_data(0, 100, block_offset=list_defaults['block'] - 1, block_limit=list_defaults['block'], oldest=True)
assert b[0] == oldest
assert b[1] == mid
assert len(b[2]) == 2
assert b[2][0]['tx_hash'] == more_txs[2]
assert b[2][1]['tx_hash'] == more_txs[1]
# now check when supplying account
b = c.load_transactions_account_with_data(list_actors['alice'], 0, 100)
assert b[0] == oldest
assert b[1] == newest
assert len(b[2]) == 3
assert b[2][0]['tx_hash'] == more_txs[0]
assert b[2][1]['tx_hash'] == more_txs[1]
assert b[2][2]['tx_hash'] == more_txs[2]
b = c.load_transactions_account_with_data(list_actors['bob'], 0, 100)
assert b[0] == mid
assert b[1] == mid
assert len(b[2]) == 1
assert b[2][0]['tx_hash'] == more_txs[1]
b = c.load_transactions_account_with_data(list_actors['diane'], 0, 100)
assert b[0] == oldest
assert b[1] == newest
assert len(b[2]) == 2
assert b[2][0]['tx_hash'] == more_txs[0]
assert b[2][1]['tx_hash'] == more_txs[2]
# add block filter to the mix
b = c.load_transactions_account_with_data(list_actors['alice'], 0, 100, block_offset=list_defaults['block'])
assert b[0] == mid
assert b[1] == newest
assert len(b[2]) == 2
assert b[2][0]['tx_hash'] == more_txs[0]
assert b[2][1]['tx_hash'] == more_txs[1]
b = c.load_transactions_account_with_data(list_actors['alice'], 0, 100, block_offset=list_defaults['block'])
assert b[0] == mid
assert b[1] == newest
assert len(b[2]) == 2
assert b[2][0]['tx_hash'] == more_txs[0]
assert b[2][1]['tx_hash'] == more_txs[1]
b = c.load_transactions_account_with_data(list_actors['bob'], 0, 100, block_offset=list_defaults['block'] - 1, block_limit=list_defaults['block'])
assert b[0] == mid
assert b[1] == mid
assert len(b[2]) == 1
assert b[2][0]['tx_hash'] == more_txs[1]
b = c.load_transactions_account_with_data(list_actors['diane'], 0, 100, block_offset=list_defaults['block'] - 1, block_limit=list_defaults['block'])
assert b[0] == oldest
assert b[1] == oldest
assert len(b[2]) == 1
assert b[2][0]['tx_hash'] == more_txs[2]

View File

@@ -1,230 +0,0 @@
# standard imports
import logging
import json
import base64
import copy
import re
# external imports
import pytest
from hexathon import strip_0x
# local imports
from cic_cache.runnable.daemons.query import *
logg = logging.getLogger()
@pytest.mark.parametrize(
'query_path_prefix, query_role, query_address_index, query_offset, query_offset_index, query_limit, query_limit_index, match_re',
[
('/tx/user/', 'alice', 0, None, 3, None, 5, re_transactions_account_bloom),
('/tx/user/', 'alice', 0, 42, 3, None, 5, re_transactions_account_bloom),
('/tx/user/', 'alice', 0, 42, 3, 13, 5, re_transactions_account_bloom),
('/tx/', None, 0, None, 3, None, 5, re_transactions_all_bloom),
('/tx/', None, 0, 42, 3, None, 5, re_transactions_all_bloom),
('/tx/', None, 0, 42, 3, 13, 5, re_transactions_all_bloom),
('/txa/', None, 0, None, 3, None, 5, re_transactions_all_data),
('/txa/', None, 0, 42, 3, None, 5, re_transactions_all_data),
('/txa/', None, 0, 42, 3, 13, 5, re_transactions_all_data),
],
)
def test_query_regex(
list_actors,
query_path_prefix,
query_role,
query_address_index,
query_offset,
query_offset_index,
query_limit,
query_limit_index,
match_re,
):
paths = []
path = query_path_prefix
query_address = None
if query_role != None:
query_address = strip_0x(list_actors[query_role])
paths.append(path + '0x' + query_address)
paths.append(path + query_address)
if query_offset != None:
if query_limit != None:
for i in range(len(paths)-1):
paths[i] += '/{}/{}'.format(query_offset, query_limit)
else:
for i in range(len(paths)-1):
paths[i] += '/' + str(query_offset)
for i in range(len(paths)):
paths.append(paths[i] + '/')
for p in paths:
logg.debug('testing path {} against {}'.format(p, match_re))
m = re.match(match_re, p)
l = len(m.groups())
logg.debug('laast index match {} groups {}'.format(m.lastindex, l))
for i in range(l+1):
logg.debug('group {} {}'.format(i, m[i]))
if m.lastindex >= query_offset_index:
assert query_offset == int(m[query_offset_index + 1])
if m.lastindex >= query_limit_index:
assert query_limit == int(m[query_limit_index + 1])
if query_address_index != None:
match_address = strip_0x(m[query_address_index + 1])
assert query_address == match_address
@pytest.mark.parametrize(
'role_name, query_offset, query_limit, query_match',
[
('alice', None, None, [(420000, 13), (419999, 42)]),
('alice', None, 1, [(420000, 13)]),
('alice', 1, None, [(419999, 42)]), # 420000 == list_defaults['block']
('alice', 2, None, []), # 420000 == list_defaults['block']
],
)
def test_query_process_txs_account(
init_database,
list_defaults,
list_actors,
list_tokens,
txs,
zero_filter,
role_name,
query_offset,
query_limit,
query_match,
):
actor = None
try:
actor = list_actors[role_name]
except KeyError:
actor = os.urandom(20).hex()
path_info = '/tx/user/0x' + strip_0x(actor)
if query_offset != None:
path_info += '/' + str(query_offset)
if query_limit != None:
if query_offset == None:
path_info += '/0'
path_info += '/' + str(query_limit)
env = {
'PATH_INFO': path_info,
}
logg.debug('using path {}'.format(path_info))
r = process_transactions_account_bloom(init_database, env)
assert r != None
o = json.loads(r[1])
block_filter_data = base64.b64decode(o['block_filter'].encode('utf-8'))
zero_filter_data = zero_filter.to_bytes()
if len(query_match) == 0:
assert block_filter_data == zero_filter_data
return
assert block_filter_data != zero_filter_data
block_filter = copy.copy(zero_filter)
block_filter.merge(block_filter_data)
block_filter_data = block_filter.to_bytes()
assert block_filter_data != zero_filter_data
for (block, tx) in query_match:
block = block.to_bytes(4, byteorder='big')
assert block_filter.check(block)
@pytest.mark.parametrize(
'query_offset, query_limit, query_match',
[
(None, 2, [(420000, 13), (419999, 42)]),
(0, 1, [(420000, 13)]),
(1, 1, [(419999, 42)]),
(2, 0, []),
],
)
def test_query_process_txs_bloom(
init_database,
list_defaults,
list_actors,
list_tokens,
txs,
zero_filter,
query_offset,
query_limit,
query_match,
):
path_info = '/tx'
if query_offset != None:
path_info += '/' + str(query_offset)
if query_limit != None:
if query_offset == None:
path_info += '/0'
path_info += '/' + str(query_limit)
env = {
'PATH_INFO': path_info,
}
logg.debug('using path {}'.format(path_info))
r = process_transactions_all_bloom(init_database, env)
assert r != None
o = json.loads(r[1])
block_filter_data = base64.b64decode(o['block_filter'].encode('utf-8'))
zero_filter_data = zero_filter.to_bytes()
if len(query_match) == 0:
assert block_filter_data == zero_filter_data
return
assert block_filter_data != zero_filter_data
block_filter = copy.copy(zero_filter)
block_filter.merge(block_filter_data)
block_filter_data = block_filter.to_bytes()
assert block_filter_data != zero_filter_data
for (block, tx) in query_match:
block = block.to_bytes(4, byteorder='big')
assert block_filter.check(block)
@pytest.mark.parametrize(
'query_block_start, query_block_end, query_match_count',
[
(None, 42, 0),
(420000, 420001, 1),
(419999, 419999, 1), # matches are inclusive
(419999, 420000, 2),
(419999, 420001, 2),
],
)
def test_query_process_txs_data(
init_database,
list_defaults,
list_actors,
list_tokens,
txs,
zero_filter,
query_block_start,
query_block_end,
query_match_count,
):
path_info = '/txa'
if query_block_start != None:
path_info += '/' + str(query_block_start)
if query_block_end != None:
if query_block_start == None:
path_info += '/0'
path_info += '/' + str(query_block_end)
env = {
'PATH_INFO': path_info,
'HTTP_X_CIC_CACHE_MODE': 'all',
}
logg.debug('using path {}'.format(path_info))
r = process_transactions_all_data(init_database, env)
assert r != None
o = json.loads(r[1])
assert len(o['data']) == query_match_count

View File

@@ -1,2 +0,0 @@
[foo]
bar_baz = xyzzy

View File

@@ -1 +0,0 @@
include *requirements.txt

View File

@@ -1,53 +0,0 @@
# standard imports
import logging
# external imports
import celery
from erc20_demurrage_token.demurrage import DemurrageCalculator
from chainlib.connection import RPCConnection
from chainlib.chain import ChainSpec
from chainlib.eth.constant import ZERO_ADDRESS
from cic_eth_registry import CICRegistry
logg = logging.getLogger(__name__)
celery_app = celery.current_app
class NoopCalculator:
def amount_since(self, amount, timestamp):
logg.debug('noopcalculator amount {} timestamp {}'.format(amount, timestamp))
return amount
class DemurrageCalculationTask(celery.Task):
demurrage_token_calcs = {}
@classmethod
def register_token(cls, rpc, chain_spec, token_symbol, sender_address=ZERO_ADDRESS):
registry = CICRegistry(chain_spec, rpc)
token_address = registry.by_name(token_symbol, sender_address=sender_address)
try:
c = DemurrageCalculator.from_contract(rpc, chain_spec, token_address, sender_address=sender_address)
logg.info('found demurrage calculator for ERC20 {} @ {}'.format(token_symbol, token_address))
except:
logg.warning('Token {} at address {} does not appear to be a demurrage contract. Calls to balance adjust for this token will always return the same amount'.format(token_symbol, token_address))
c = NoopCalculator()
cls.demurrage_token_calcs[token_symbol] = c
@celery_app.task(bind=True, base=DemurrageCalculationTask)
def get_adjusted_balance(self, token_symbol, amount, timestamp):
c = self.demurrage_token_calcs[token_symbol]
return c.amount_since(amount, timestamp)
def aux_setup(rpc, config, sender_address=ZERO_ADDRESS):
chain_spec_str = config.get('CHAIN_SPEC')
chain_spec = ChainSpec.from_chain_str(chain_spec_str)
token_symbol = config.get('CIC_DEFAULT_TOKEN_SYMBOL')
DemurrageCalculationTask.register_token(rpc, chain_spec, token_symbol, sender_address=sender_address)

View File

@@ -1,30 +0,0 @@
# standard imports
import logging
# external imports
import celery
from cic_eth.api.base import ApiBase
app = celery.current_app
logg = logging.getLogger(__name__)
class Api(ApiBase):
def get_adjusted_balance(self, token_symbol, balance, timestamp):
s = celery.signature(
'cic_eth_aux.erc20_demurrage_token.get_adjusted_balance',
[
token_symbol,
balance,
timestamp,
],
queue=None,
)
if self.callback_param != None:
s.link(self.callback_success)
s.link.on_error(self.callback_error)
t = s.apply_async(queue=self.queue)
return t

View File

@@ -1,4 +0,0 @@
celery==4.4.7
erc20-demurrage-token~=0.0.3a1
cic-eth-registry>=0.6.1a2,<0.7.0
cic-eth[services]~=0.12.4a8

View File

@@ -1,30 +0,0 @@
[metadata]
name = cic-eth-aux-erc20-demurrage-token
version = 0.0.2a6
description = cic-eth tasks supporting erc20 demurrage token
author = Louis Holbrook
author_email = dev@holbrook.no
url = https://gitlab.com/ccicnet/erc20-demurrage-token
keywords =
ethereum
blockchain
cryptocurrency
erc20
classifiers =
Programming Language :: Python :: 3
Operating System :: OS Independent
Development Status :: 3 - Alpha
Environment :: No Input/Output (Daemon)
Intended Audience :: Developers
License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)
Topic :: Internet
#Topic :: Blockchain :: EVM
license = GPL3
licence_files =
LICENSE
[options]
include_package_data = True
python_requires = >= 3.6
packages =
cic_eth_aux.erc20_demurrage_token

View File

@@ -1,25 +0,0 @@
from setuptools import setup
requirements = []
f = open('requirements.txt', 'r')
while True:
l = f.readline()
if l == '':
break
requirements.append(l.rstrip())
f.close()
test_requirements = []
f = open('test_requirements.txt', 'r')
while True:
l = f.readline()
if l == '':
break
test_requirements.append(l.rstrip())
f.close()
setup(
install_requires=requirements,
tests_require=test_requirements,
)

View File

@@ -1,11 +0,0 @@
pytest==6.0.1
pytest-celery==0.0.0a1
pytest-mock==3.3.1
pytest-cov==2.10.1
eth-tester==0.5.0b3
py-evm==0.3.0a20
SQLAlchemy==1.3.20
liveness~=0.0.1a7
eth-accounts-index==0.1.1a1
eth-contract-registry==0.5.8a1
eth-address-index==0.2.1a1

View File

@@ -1,88 +0,0 @@
# external imports
import celery
from chainlib.eth.pytest.fixtures_chain import *
from chainlib.eth.pytest.fixtures_ethtester import *
from cic_eth_registry.pytest.fixtures_contracts import *
from cic_eth_registry.pytest.fixtures_tokens import *
from erc20_demurrage_token.unittest.base import TestTokenDeploy
from erc20_demurrage_token.token import DemurrageToken
from eth_token_index.index import TokenUniqueSymbolIndex
from eth_address_declarator.declarator import AddressDeclarator
# cic-eth imports
from cic_eth.pytest.fixtures_celery import *
from cic_eth.pytest.fixtures_token import *
from cic_eth.pytest.fixtures_config import *
@pytest.fixture(scope='function')
def demurrage_token(
default_chain_spec,
eth_rpc,
token_registry,
contract_roles,
eth_signer,
):
d = TestTokenDeploy(eth_rpc, token_symbol='BAR', token_name='Bar Token')
nonce_oracle = RPCNonceOracle(contract_roles['CONTRACT_DEPLOYER'], conn=eth_rpc)
c = DemurrageToken(default_chain_spec, signer=eth_signer, nonce_oracle=nonce_oracle)
token_address = d.deploy(eth_rpc, contract_roles['CONTRACT_DEPLOYER'], c, 'SingleNocap')
logg.debug('demurrage token contract "BAR" deployed to {}'.format(token_address))
return token_address
@pytest.fixture(scope='function')
def demurrage_token_symbol(
default_chain_spec,
eth_rpc,
demurrage_token,
contract_roles,
):
c = DemurrageToken(default_chain_spec)
o = c.symbol(demurrage_token, sender_address=contract_roles['CONTRACT_DEPLOYER'])
r = eth_rpc.do(o)
return c.parse_symbol(r)
@pytest.fixture(scope='function')
def demurrage_token_declaration(
foo_token_declaration,
):
return foo_token_declaration
@pytest.fixture(scope='function')
def register_demurrage_token(
default_chain_spec,
token_registry,
eth_rpc,
eth_signer,
register_lookups,
contract_roles,
demurrage_token_declaration,
demurrage_token,
address_declarator,
):
nonce_oracle = RPCNonceOracle(contract_roles['CONTRACT_DEPLOYER'], eth_rpc)
c = TokenUniqueSymbolIndex(default_chain_spec, signer=eth_signer, nonce_oracle=nonce_oracle)
(tx_hash_hex, o) = c.register(token_registry, contract_roles['CONTRACT_DEPLOYER'], demurrage_token)
eth_rpc.do(o)
o = receipt(tx_hash_hex)
r = eth_rpc.do(o)
assert r['status'] == 1
nonce_oracle = RPCNonceOracle(contract_roles['TRUSTED_DECLARATOR'], eth_rpc)
c = AddressDeclarator(default_chain_spec, signer=eth_signer, nonce_oracle=nonce_oracle)
(tx_hash_hex, o) = c.add_declaration(address_declarator, contract_roles['TRUSTED_DECLARATOR'], demurrage_token, demurrage_token_declaration)
eth_rpc.do(o)
o = receipt(tx_hash_hex)
r = eth_rpc.do(o)
assert r['status'] == 1
return token_registry

View File

@@ -1,69 +0,0 @@
# standard imports
import logging
import copy
import datetime
# external imports
import celery
# cic-eth imports
from cic_eth_aux.erc20_demurrage_token import (
DemurrageCalculationTask,
aux_setup,
)
from cic_eth_aux.erc20_demurrage_token.api import Api as AuxApi
logg = logging.getLogger()
def test_demurrage_calulate_task(
default_chain_spec,
eth_rpc,
cic_registry,
celery_session_worker,
register_demurrage_token,
demurrage_token_symbol,
contract_roles,
load_config,
):
config = copy.copy(load_config)
config.add(str(default_chain_spec), 'CIC_CHAIN_SPEC', exists_ok=True)
config.add(demurrage_token_symbol, 'CIC_DEFAULT_TOKEN_SYMBOL', exists_ok=True)
aux_setup(eth_rpc, load_config, sender_address=contract_roles['CONTRACT_DEPLOYER'])
since = datetime.datetime.utcnow() - datetime.timedelta(minutes=1)
s = celery.signature(
'cic_eth_aux.erc20_demurrage_token.get_adjusted_balance',
[
demurrage_token_symbol,
1000,
since.timestamp(),
],
queue=None,
)
t = s.apply_async()
r = t.get_leaf()
assert t.successful()
assert r == 980
def test_demurrage_calculate_api(
default_chain_spec,
eth_rpc,
cic_registry,
celery_session_worker,
register_demurrage_token,
demurrage_token_symbol,
contract_roles,
load_config,
):
api = AuxApi(str(default_chain_spec), queue=None)
since = datetime.datetime.utcnow() - datetime.timedelta(minutes=1)
t = api.get_adjusted_balance(demurrage_token_symbol, 1000, since.timestamp())
r = t.get_leaf()
assert t.successful()
assert r == 980

View File

@@ -6,5 +6,4 @@ omit =
cic_eth/sync/head.py
cic_eth/sync/mempool.py
cic_eth/queue/state.py
cic_eth/cli
*redis*.py

View File

@@ -1,6 +0,0 @@
.git
.cache
.dot
**/doc
**/.venv
**/venv

View File

@@ -1,16 +1,31 @@
build-test-cic-eth:
stage: test
tags:
- integration
variables:
APP_NAME: cic-eth
MR_IMAGE_TAG: mr-$APP_NAME-$CI_COMMIT_REF_SLUG-$CI_COMMIT_SHORT_SHA
script:
- cd apps/cic-eth
- docker build -t $MR_IMAGE_TAG -f docker/Dockerfile .
- docker run $MR_IMAGE_TAG sh docker/run_tests.sh
rules:
.cic_eth_variables:
variables:
APP_NAME: cic-eth
DOCKERFILE_PATH: $APP_NAME/docker/Dockerfile
.cic_eth_mr_changes_target:
rules:
- if: $CI_PIPELINE_SOURCE == "merge_request_event"
changes:
- apps/$APP_NAME/**/*
- $CONTEXT/$APP_NAME/**/*
when: always
build-mr-cic-eth:
extends:
- .cic_eth_variables
- .cic_eth_mr_changes_target
- .py_build_target_test
test-mr-cic-eth:
extends:
- .cic_eth_variables
- .cic_eth_mr_changes_target
stage: test
image: $IMAGE_TAG_BASE
script:
- cd apps/$APP_NAME/
- pytest -x --cov=cic_eth --cov-fail-under=90 --cov-report term-missing tests
build-push-cic-eth:
extends:
- .py_build_push

View File

@@ -1,2 +1,2 @@
include *requirements.txt config/test/* cic_eth/data/config/*
include *requirements.txt

View File

@@ -1,5 +1,5 @@
SQLAlchemy==1.3.20
cic-eth-registry>=0.6.1a3,<0.7.0
hexathon~=0.0.1a8
chainqueue>=0.0.4a6,<0.1.0
eth-erc20>=0.1.2a2,<0.2.0
cic-eth-registry~=0.5.6a1
hexathon~=0.0.1a7
chainqueue~=0.0.2b5
eth-erc20==0.0.10a2

View File

@@ -4,12 +4,8 @@ import logging
# external imports
import celery
from chainlib.eth.constant import ZERO_ADDRESS
from chainlib.chain import ChainSpec
from hexathon import (
add_0x,
strip_0x,
uniform as hex_uniform,
)
# local imports
from cic_eth.db.enum import LockEnum
@@ -19,17 +15,12 @@ from cic_eth.task import (
CriticalSQLAlchemyTask,
)
from cic_eth.error import LockedError
from cic_eth.encode import (
tx_normalize,
ZERO_ADDRESS_NORMAL,
)
celery_app = celery.current_app
logg = logging.getLogger()
@celery_app.task(base=CriticalSQLAlchemyTask)
def lock(chained_input, chain_spec_dict, address=ZERO_ADDRESS_NORMAL, flags=LockEnum.ALL, tx_hash=None):
def lock(chained_input, chain_spec_dict, address=ZERO_ADDRESS, flags=LockEnum.ALL, tx_hash=None):
"""Task wrapper to set arbitrary locks
:param chain_str: Chain spec string representation
@@ -41,7 +32,6 @@ def lock(chained_input, chain_spec_dict, address=ZERO_ADDRESS_NORMAL, flags=Lock
:returns: New lock state for address
:rtype: number
"""
address = tx_normalize.wallet_address(address)
chain_str = '::'
if chain_spec_dict != None:
chain_str = str(ChainSpec.from_dict(chain_spec_dict))
@@ -51,7 +41,7 @@ def lock(chained_input, chain_spec_dict, address=ZERO_ADDRESS_NORMAL, flags=Lock
@celery_app.task(base=CriticalSQLAlchemyTask)
def unlock(chained_input, chain_spec_dict, address=ZERO_ADDRESS_NORMAL, flags=LockEnum.ALL):
def unlock(chained_input, chain_spec_dict, address=ZERO_ADDRESS, flags=LockEnum.ALL):
"""Task wrapper to reset arbitrary locks
:param chain_str: Chain spec string representation
@@ -63,7 +53,6 @@ def unlock(chained_input, chain_spec_dict, address=ZERO_ADDRESS_NORMAL, flags=Lo
:returns: New lock state for address
:rtype: number
"""
address = tx_normalize.wallet_address(address)
chain_str = '::'
if chain_spec_dict != None:
chain_str = str(ChainSpec.from_dict(chain_spec_dict))
@@ -73,7 +62,7 @@ def unlock(chained_input, chain_spec_dict, address=ZERO_ADDRESS_NORMAL, flags=Lo
@celery_app.task(base=CriticalSQLAlchemyTask)
def lock_send(chained_input, chain_spec_dict, address=ZERO_ADDRESS_NORMAL, tx_hash=None):
def lock_send(chained_input, chain_spec_dict, address=ZERO_ADDRESS, tx_hash=None):
"""Task wrapper to set send lock
:param chain_str: Chain spec string representation
@@ -83,7 +72,6 @@ def lock_send(chained_input, chain_spec_dict, address=ZERO_ADDRESS_NORMAL, tx_ha
:returns: New lock state for address
:rtype: number
"""
address = tx_normalize.wallet_address(address)
chain_str = str(ChainSpec.from_dict(chain_spec_dict))
r = Lock.set(chain_str, LockEnum.SEND, address=address, tx_hash=tx_hash)
logg.debug('Send locked for {}, flag now {}'.format(address, r))
@@ -91,7 +79,7 @@ def lock_send(chained_input, chain_spec_dict, address=ZERO_ADDRESS_NORMAL, tx_ha
@celery_app.task(base=CriticalSQLAlchemyTask)
def unlock_send(chained_input, chain_spec_dict, address=ZERO_ADDRESS_NORMAL):
def unlock_send(chained_input, chain_spec_dict, address=ZERO_ADDRESS):
"""Task wrapper to reset send lock
:param chain_str: Chain spec string representation
@@ -101,7 +89,6 @@ def unlock_send(chained_input, chain_spec_dict, address=ZERO_ADDRESS_NORMAL):
:returns: New lock state for address
:rtype: number
"""
address = tx_normalize.wallet_address(address)
chain_str = str(ChainSpec.from_dict(chain_spec_dict))
r = Lock.reset(chain_str, LockEnum.SEND, address=address)
logg.debug('Send unlocked for {}, flag now {}'.format(address, r))
@@ -109,7 +96,7 @@ def unlock_send(chained_input, chain_spec_dict, address=ZERO_ADDRESS_NORMAL):
@celery_app.task(base=CriticalSQLAlchemyTask)
def lock_queue(chained_input, chain_spec_dict, address=ZERO_ADDRESS_NORMAL, tx_hash=None):
def lock_queue(chained_input, chain_spec_dict, address=ZERO_ADDRESS, tx_hash=None):
"""Task wrapper to set queue direct lock
:param chain_str: Chain spec string representation
@@ -119,7 +106,6 @@ def lock_queue(chained_input, chain_spec_dict, address=ZERO_ADDRESS_NORMAL, tx_h
:returns: New lock state for address
:rtype: number
"""
address = tx_normalize.wallet_address(address)
chain_str = str(ChainSpec.from_dict(chain_spec_dict))
r = Lock.set(chain_str, LockEnum.QUEUE, address=address, tx_hash=tx_hash)
logg.debug('Queue direct locked for {}, flag now {}'.format(address, r))
@@ -127,7 +113,7 @@ def lock_queue(chained_input, chain_spec_dict, address=ZERO_ADDRESS_NORMAL, tx_h
@celery_app.task(base=CriticalSQLAlchemyTask)
def unlock_queue(chained_input, chain_spec_dict, address=ZERO_ADDRESS_NORMAL):
def unlock_queue(chained_input, chain_spec_dict, address=ZERO_ADDRESS):
"""Task wrapper to reset queue direct lock
:param chain_str: Chain spec string representation
@@ -137,7 +123,6 @@ def unlock_queue(chained_input, chain_spec_dict, address=ZERO_ADDRESS_NORMAL):
:returns: New lock state for address
:rtype: number
"""
address = tx_normalize.wallet_address(address)
chain_str = str(ChainSpec.from_dict(chain_spec_dict))
r = Lock.reset(chain_str, LockEnum.QUEUE, address=address)
logg.debug('Queue direct unlocked for {}, flag now {}'.format(address, r))
@@ -146,13 +131,11 @@ def unlock_queue(chained_input, chain_spec_dict, address=ZERO_ADDRESS_NORMAL):
@celery_app.task(base=CriticalSQLAlchemyTask)
def check_lock(chained_input, chain_spec_dict, lock_flags, address=None):
if address != None:
address = tx_normalize.wallet_address(address)
chain_str = '::'
if chain_spec_dict != None:
chain_str = str(ChainSpec.from_dict(chain_spec_dict))
session = SessionBase.create_session()
r = Lock.check(chain_str, lock_flags, address=ZERO_ADDRESS_NORMAL, session=session)
r = Lock.check(chain_str, lock_flags, address=ZERO_ADDRESS, session=session)
if address != None:
r |= Lock.check(chain_str, lock_flags, address=address, session=session)
if r > 0:

View File

@@ -14,11 +14,7 @@ from chainqueue.sql.query import get_tx
from chainqueue.sql.state import set_cancel
from chainqueue.db.models.otx import Otx
from chainqueue.db.models.tx import TxCache
from hexathon import (
strip_0x,
add_0x,
uniform as hex_uniform,
)
from hexathon import strip_0x
from potaahto.symbols import snake_and_camel
# local imports
@@ -33,7 +29,6 @@ from cic_eth.admin.ctrl import (
from cic_eth.queue.tx import queue_create
from cic_eth.eth.gas import create_check_gas_task
from cic_eth.task import BaseTask
from cic_eth.encode import tx_normalize
celery_app = celery.current_app
logg = logging.getLogger()
@@ -74,17 +69,15 @@ def shift_nonce(self, chainspec_dict, tx_hash_orig_hex, delta=1):
set_cancel(chain_spec, strip_0x(tx['hash']), manual=True, session=session)
query_address = tx_normalize.wallet_address(address)
q = session.query(Otx)
q = q.join(TxCache)
q = q.filter(TxCache.sender==query_address)
q = q.filter(TxCache.sender==address)
q = q.filter(Otx.nonce>=nonce+delta)
q = q.order_by(Otx.nonce.asc())
otxs = q.all()
tx_hashes = []
txs = []
gas_total = 0
for otx in otxs:
tx_raw = bytes.fromhex(strip_0x(otx.signed_tx))
tx_new = unpack(tx_raw, chain_spec)
@@ -96,10 +89,8 @@ def shift_nonce(self, chainspec_dict, tx_hash_orig_hex, delta=1):
tx_new['gas_price'] += 1
tx_new['gasPrice'] = tx_new['gas_price']
tx_new['nonce'] -= delta
gas_total += tx_new['gas_price'] * tx_new['gas']
logg.debug('tx_new {}'.format(tx_new))
logg.debug('gas running total {}'.format(gas_total))
del(tx_new['hash'])
del(tx_new['hash_unsigned'])
@@ -131,10 +122,8 @@ def shift_nonce(self, chainspec_dict, tx_hash_orig_hex, delta=1):
s = create_check_gas_task(
txs,
chain_spec,
#tx_new['from'],
address,
#gas=tx_new['gas'],
gas=gas_total,
tx_new['from'],
gas=tx_new['gas'],
tx_hashes_hex=tx_hashes,
queue=queue,
)
@@ -143,8 +132,7 @@ def shift_nonce(self, chainspec_dict, tx_hash_orig_hex, delta=1):
'cic_eth.admin.ctrl.unlock_send',
[
chain_spec.asdict(),
address,
#tx_new['from'],
tx_new['from'],
],
queue=queue,
)
@@ -152,8 +140,7 @@ def shift_nonce(self, chainspec_dict, tx_hash_orig_hex, delta=1):
'cic_eth.admin.ctrl.unlock_queue',
[
chain_spec.asdict(),
address,
#tx_new['from'],
tx_new['from'],
],
queue=queue,
)

View File

@@ -21,7 +21,6 @@ from chainlib.hash import keccak256_hex_to_hex
from hexathon import (
strip_0x,
add_0x,
uniform as hex_uniform,
)
from chainlib.eth.gas import balance
from chainqueue.db.enum import (
@@ -32,6 +31,7 @@ from chainqueue.db.enum import (
status_str,
)
from chainqueue.error import TxStateChangeError
from chainqueue.sql.query import get_tx
from eth_erc20 import ERC20
# local imports
@@ -39,7 +39,6 @@ from cic_eth.db.models.base import SessionBase
from cic_eth.db.models.role import AccountRole
from cic_eth.db.models.nonce import Nonce
from cic_eth.error import InitializationError
from cic_eth.queue.query import get_tx_local
app = celery.current_app
@@ -284,7 +283,7 @@ class AdminApi:
tx_hash_hex = None
session = SessionBase.create_session()
for k in txs.keys():
tx_dict = get_tx_local(chain_spec, k, session=session)
tx_dict = get_tx(chain_spec, k, session=session)
if tx_dict['nonce'] == nonce:
tx_hash_hex = k
session.close()
@@ -308,8 +307,6 @@ class AdminApi:
:param address: Ethereum address to return transactions for
:type address: str, 0x-hex
"""
address = add_0x(hex_uniform(strip_0x(address)))
last_nonce = -1
s = celery.signature(
'cic_eth.queue.query.get_account_tx',

View File

@@ -9,10 +9,8 @@ import logging
# external imports
import celery
from chainlib.chain import ChainSpec
from hexathon import strip_0x
# local imports
from cic_eth.api.base import ApiBase
from cic_eth.enum import LockEnum
app = celery.current_app
@@ -20,8 +18,48 @@ app = celery.current_app
logg = logging.getLogger(__name__)
class Api(ApiBase):
class Api:
"""Creates task chains to perform well-known CIC operations.
Each method that sends tasks returns details about the root task. The root task uuid can be provided in the callback, to enable to caller to correlate the result with individual calls. It can also be used to independently poll the completion of a task chain.
:param callback_param: Static value to pass to callback
:type callback_param: str
:param callback_task: Callback task that executes callback_param call. (Must be included by the celery worker)
:type callback_task: string
:param queue: Name of worker queue to submit tasks to
:type queue: str
"""
def __init__(self, chain_str, queue='cic-eth', callback_param=None, callback_task='cic_eth.callbacks.noop.noop', callback_queue=None):
self.chain_str = chain_str
self.chain_spec = ChainSpec.from_chain_str(chain_str)
self.callback_param = callback_param
self.callback_task = callback_task
self.queue = queue
logg.debug('api using queue {}'.format(self.queue))
self.callback_success = None
self.callback_error = None
if callback_queue == None:
callback_queue=self.queue
if callback_param != None:
self.callback_success = celery.signature(
callback_task,
[
callback_param,
0,
],
queue=callback_queue,
)
self.callback_error = celery.signature(
callback_task,
[
callback_param,
1,
],
queue=callback_queue,
)
def default_token(self):
s_token = celery.signature(
@@ -255,8 +293,6 @@ class Api(ApiBase):
:returns: uuid of root task
:rtype: celery.Task
"""
#from_address = strip_0x(from_address)
#to_address = strip_0x(to_address)
s_check = celery.signature(
'cic_eth.admin.ctrl.check_lock',
[
@@ -523,9 +559,9 @@ class Api(ApiBase):
s_external_get = celery.signature(
external_task,
[
address,
offset,
limit,
address,
],
queue=external_queue,
)

View File

@@ -1,52 +0,0 @@
# standard imports
import logging
# external imports
import celery
from chainlib.chain import ChainSpec
logg = logging.getLogger(__name__)
class ApiBase:
"""Creates task chains to perform well-known CIC operations.
Each method that sends tasks returns details about the root task. The root task uuid can be provided in the callback, to enable to caller to correlate the result with individual calls. It can also be used to independently poll the completion of a task chain.
:param callback_param: Static value to pass to callback
:type callback_param: str
:param callback_task: Callback task that executes callback_param call. (Must be included by the celery worker)
:type callback_task: string
:param queue: Name of worker queue to submit tasks to
:type queue: str
"""
def __init__(self, chain_str, queue='cic-eth', callback_param=None, callback_task='cic_eth.callbacks.noop.noop', callback_queue=None):
self.chain_str = chain_str
self.chain_spec = ChainSpec.from_chain_str(chain_str)
self.callback_param = callback_param
self.callback_task = callback_task
self.queue = queue
logg.debug('api using queue {}'.format(self.queue))
self.callback_success = None
self.callback_error = None
if callback_queue == None:
callback_queue=self.queue
if callback_param != None:
self.callback_success = celery.signature(
callback_task,
[
callback_param,
0,
],
queue=callback_queue,
)
self.callback_error = celery.signature(
callback_task,
[
callback_param,
1,
],
queue=callback_queue,
)

View File

@@ -21,7 +21,7 @@ def health(*args, **kwargs):
session = SessionBase.create_session()
config = kwargs['config']
chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC'))
chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
logg.debug('check gas balance of gas gifter for chain {}'.format(chain_spec))
try:

View File

@@ -15,7 +15,7 @@ logg = logging.getLogger().getChild(__name__)
def health(*args, **kwargs):
blocked = True
max_attempts = 5
conn = RPCConnection.connect(kwargs['config'].get('CHAIN_SPEC'), tag='signer')
conn = RPCConnection.connect(kwargs['config'].get('CIC_CHAIN_SPEC'), tag='signer')
for i in range(max_attempts):
idx = i + 1
logg.debug('attempt signer connection check {}/{}'.format(idx, max_attempts))

View File

@@ -1,10 +0,0 @@
# local imports
from .base import *
from .chain import (
EthChainInterface,
chain_interface,
)
from .rpc import RPC
from .arg import ArgumentParser
from .config import Config
from .celery import CeleryApp

View File

@@ -1,31 +0,0 @@
# external imports
from chainlib.eth.cli import ArgumentParser as BaseArgumentParser
# local imports
from .base import (
CICFlag,
Flag,
)
class ArgumentParser(BaseArgumentParser):
def process_local_flags(self, local_arg_flags):
if local_arg_flags & CICFlag.REDIS:
self.add_argument('--redis-host', dest='redis_host', type=str, help='redis host to use for task submission')
self.add_argument('--redis-port', dest='redis_port', type=int, help='redis host to use for task submission')
self.add_argument('--redis-db', dest='redis_db', type=int, help='redis db to use')
if local_arg_flags & CICFlag.REDIS_CALLBACK:
self.add_argument('--redis-host-callback', dest='redis_host_callback', default='localhost', type=str, help='redis host to use for callback')
self.add_argument('--redis-port-callback', dest='redis_port_callback', default=6379, type=int, help='redis port to use for callback')
self.add_argument('--redis-timeout', default=20.0, type=float, help='Redis callback timeout')
if local_arg_flags & CICFlag.CELERY:
self.add_argument('-q', '--celery-queue', dest='celery_queue', type=str, default='cic-eth', help='Task queue')
if local_arg_flags & CICFlag.SYNCER:
self.add_argument('--offset', type=int, default=0, help='Start block height for initial history sync')
self.add_argument('--no-history', action='store_true', dest='no_history', help='Skip initial history sync')
if local_arg_flags & CICFlag.CHAIN:
self.add_argument('-r', '--registry-address', type=str, dest='registry_address', help='CIC registry contract address')

View File

@@ -1,31 +0,0 @@
# standard imports
import enum
# external imports
from chainlib.eth.cli import (
argflag_std_read,
argflag_std_write,
argflag_std_base,
Flag,
)
class CICFlag(enum.IntEnum):
# celery - nibble 1
CELERY = 1
# redis - nibble 2
REDIS = 16
REDIS_CALLBACK = 32
# chain - nibble 3
CHAIN = 256
# sync - nibble 4
SYNCER = 4096
argflag_local_task = CICFlag.CELERY
argflag_local_taskcallback = argflag_local_task | CICFlag.REDIS | CICFlag.REDIS_CALLBACK
argflag_local_chain = CICFlag.CHAIN
argflag_local_sync = CICFlag.SYNCER | CICFlag.CHAIN

View File

@@ -1,24 +0,0 @@
# standard imports
import logging
# external imports
import celery
logg = logging.getLogger(__name__)
class CeleryApp:
@classmethod
def from_config(cls, config):
backend_url = config.get('CELERY_RESULT_URL')
broker_url = config.get('CELERY_BROKER_URL')
celery_app = None
if backend_url != None:
celery_app = celery.Celery(broker=broker_url, backend=backend_url)
logg.info('creating celery app on {} with backend on {}'.format(broker_url, backend_url))
else:
celery_app = celery.Celery(broker=broker_url)
logg.info('creating celery app without results backend on {}'.format(broker_url))
return celery_app

View File

@@ -1,21 +0,0 @@
# external imports
from chainlib.eth.block import (
block_by_number,
Block,
)
from chainlib.eth.tx import (
receipt,
Tx,
)
from chainlib.interface import ChainInterface
class EthChainInterface(ChainInterface):
def __init__(self):
self._tx_receipt = receipt
self._block_by_number = block_by_number
self._block_from_src = Block.from_src
self._src_normalize = Tx.src_normalize
chain_interface = EthChainInterface()

View File

@@ -1,63 +0,0 @@
# standard imports
import os
import logging
# external imports
from chainlib.eth.cli import (
Config as BaseConfig,
Flag,
)
# local imports
from .base import CICFlag
script_dir = os.path.dirname(os.path.realpath(__file__))
logg = logging.getLogger(__name__)
class Config(BaseConfig):
local_base_config_dir = os.path.join(script_dir, '..', 'data', 'config')
@classmethod
def from_args(cls, args, arg_flags, local_arg_flags, extra_args={}, default_config_dir=None, base_config_dir=None, default_fee_limit=None):
expanded_base_config_dir = [cls.local_base_config_dir]
if base_config_dir != None:
if isinstance(base_config_dir, str):
base_config_dir = [base_config_dir]
for d in base_config_dir:
expanded_base_config_dir.append(d)
config = BaseConfig.from_args(args, arg_flags, extra_args=extra_args, default_config_dir=default_config_dir, base_config_dir=expanded_base_config_dir, load_callback=None)
local_args_override = {}
if local_arg_flags & CICFlag.REDIS:
local_args_override['REDIS_HOST'] = getattr(args, 'redis_host')
local_args_override['REDIS_PORT'] = getattr(args, 'redis_port')
local_args_override['REDIS_DB'] = getattr(args, 'redis_db')
local_args_override['REDIS_TIMEOUT'] = getattr(args, 'redis_timeout')
if local_arg_flags & CICFlag.CHAIN:
local_args_override['CIC_REGISTRY_ADDRESS'] = getattr(args, 'registry_address')
if local_arg_flags & CICFlag.CELERY:
local_args_override['CELERY_QUEUE'] = getattr(args, 'celery_queue')
if local_arg_flags & CICFlag.SYNCER:
local_args_override['SYNCER_OFFSET'] = getattr(args, 'offset')
local_args_override['SYNCER_NO_HISTORY'] = getattr(args, 'no_history')
config.dict_override(local_args_override, 'local cli args')
if local_arg_flags & CICFlag.REDIS_CALLBACK:
config.add(getattr(args, 'redis_host_callback'), '_REDIS_HOST_CALLBACK')
config.add(getattr(args, 'redis_port_callback'), '_REDIS_PORT_CALLBACK')
if local_arg_flags & CICFlag.CELERY:
config.add(config.true('CELERY_DEBUG'), 'CELERY_DEBUG', exists_ok=True)
logg.debug('config loaded:\n{}'.format(config))
return config

View File

@@ -1,90 +0,0 @@
# standard imports
import logging
# external imports
from chainlib.connection import (
RPCConnection,
ConnType,
)
from chainlib.eth.connection import (
EthUnixSignerConnection,
EthHTTPSignerConnection,
)
from chainlib.chain import ChainSpec
logg = logging.getLogger(__name__)
class RPC:
def __init__(self, chain_spec, rpc_provider, signer_provider=None):
self.chain_spec = chain_spec
self.rpc_provider = rpc_provider
self.signer_provider = signer_provider
def get_default(self):
return self.get_by_label('default')
def get_by_label(self, label):
return RPCConnection.connect(self.chain_spec, label)
@staticmethod
def from_config(config, use_signer=False, default_label='default', signer_label='signer'):
chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC'))
RPCConnection.register_location(config.get('RPC_PROVIDER'), chain_spec, default_label)
if use_signer:
RPCConnection.register_constructor(ConnType.UNIX, EthUnixSignerConnection, signer_label)
RPCConnection.register_constructor(ConnType.HTTP, EthHTTPSignerConnection, signer_label)
RPCConnection.register_constructor(ConnType.HTTP_SSL, EthHTTPSignerConnection, signer_label)
RPCConnection.register_location(config.get('SIGNER_PROVIDER'), chain_spec, signer_label)
rpc = RPC(chain_spec, config.get('RPC_PROVIDER'), signer_provider=config.get('SIGNER_PROVIDER'))
logg.info('set up rpc: {}'.format(rpc))
return rpc
def __str__(self):
return 'RPC factory, chain {}, rpc {}, signer {}'.format(self.chain_spec, self.rpc_provider, self.signer_provider)
# TOOD: re-implement file backend option from omittec code:
#broker = config.get('CELERY_BROKER_URL')
#if broker[:4] == 'file':
# bq = tempfile.mkdtemp()
# bp = tempfile.mkdtemp()
# conf_update = {
# 'broker_url': broker,
# 'broker_transport_options': {
# 'data_folder_in': bq,
# 'data_folder_out': bq,
# 'data_folder_processed': bp,
# },
# }
# if config.true('CELERY_DEBUG'):
# conf_update['result_extended'] = True
# current_app.conf.update(conf_update)
# logg.warning('celery broker dirs queue i/o {} processed {}, will NOT be deleted on shutdown'.format(bq, bp))
#else:
# conf_update = {
# 'broker_url': broker,
# }
# if config.true('CELERY_DEBUG'):
# conf_update['result_extended'] = True
# current_app.conf.update(conf_update)
#
#result = config.get('CELERY_RESULT_URL')
#if result[:4] == 'file':
# rq = tempfile.mkdtemp()
# current_app.conf.update({
# 'result_backend': 'file://{}'.format(rq),
# })
# logg.warning('celery backend store dir {} created, will NOT be deleted on shutdown'.format(rq))
#else:
# current_app.conf.update({
# 'result_backend': result,
# })
#

View File

@@ -1,5 +0,0 @@
[celery]
broker_url = redis://localhost:6379
result_url =
queue = cic-eth
debug = 0

View File

@@ -1,10 +0,0 @@
[database]
engine =
driver =
host =
port =
name =
user =
password =
debug = 0
pool_size = 0

View File

@@ -1,5 +0,0 @@
[redis]
host = localhost
port = 6379
db = 0
timeout = 20.0

View File

@@ -1,3 +0,0 @@
[retry]
delay =
batch_size =

View File

@@ -1,2 +0,0 @@
[signer]
provider =

View File

@@ -1,4 +0,0 @@
[syncer]
loop_interval = 1
offset = 0
no_history = 0

View File

@@ -8,8 +8,7 @@ Create Date: 2021-04-02 18:30:55.398388
from alembic import op
import sqlalchemy as sa
#from chainqueue.db.migrations.sqlalchemy import (
from chainqueue.db.migrations.default.export import (
from chainqueue.db.migrations.sqlalchemy import (
chainqueue_upgrade,
chainqueue_downgrade,
)

View File

@@ -8,8 +8,8 @@ Create Date: 2021-04-02 18:41:20.864265
import datetime
from alembic import op
import sqlalchemy as sa
from chainlib.eth.constant import ZERO_ADDRESS
from cic_eth.db.enum import LockEnum
from cic_eth.encode import ZERO_ADDRESS_NORMAL
# revision identifiers, used by Alembic.
@@ -30,7 +30,7 @@ def upgrade():
sa.Column("otx_id", sa.Integer, sa.ForeignKey('otx.id'), nullable=True),
)
op.create_index('idx_chain_address', 'lock', ['blockchain', 'address'], unique=True)
op.execute("INSERT INTO lock (address, date_created, blockchain, flags) VALUES('{}', '{}', '::', {})".format(ZERO_ADDRESS_NORMAL, datetime.datetime.utcnow(), LockEnum.INIT | LockEnum.SEND | LockEnum.QUEUE))
op.execute("INSERT INTO lock (address, date_created, blockchain, flags) VALUES('{}', '{}', '::', {})".format(ZERO_ADDRESS, datetime.datetime.utcnow(), LockEnum.INIT | LockEnum.SEND | LockEnum.QUEUE))
def downgrade():

View File

@@ -8,8 +8,7 @@ Create Date: 2021-04-02 18:36:44.459603
from alembic import op
import sqlalchemy as sa
#from chainsyncer.db.migrations.sqlalchemy import (
from chainsyncer.db.migrations.default.export import (
from chainsyncer.db.migrations.sqlalchemy import (
chainsyncer_upgrade,
chainsyncer_downgrade,
)

View File

@@ -126,4 +126,3 @@ class SessionBase(Model):
logg.debug('commit and destroy session {}'.format(session_key))
session.commit()
session.close()
del SessionBase.localsessions[session_key]

View File

@@ -4,12 +4,12 @@ import logging
# third-party imports
from sqlalchemy import Column, String, Integer, DateTime, ForeignKey
from chainlib.eth.constant import ZERO_ADDRESS
from chainqueue.db.models.tx import TxCache
from chainqueue.db.models.otx import Otx
# local imports
from cic_eth.db.models.base import SessionBase
from cic_eth.encode import ZERO_ADDRESS_NORMAL
logg = logging.getLogger()
@@ -37,7 +37,7 @@ class Lock(SessionBase):
@staticmethod
def set(chain_str, flags, address=ZERO_ADDRESS_NORMAL, session=None, tx_hash=None):
def set(chain_str, flags, address=ZERO_ADDRESS, session=None, tx_hash=None):
"""Sets flags associated with the given address and chain.
If a flags entry does not exist it is created.
@@ -90,7 +90,7 @@ class Lock(SessionBase):
@staticmethod
def reset(chain_str, flags, address=ZERO_ADDRESS_NORMAL, session=None):
def reset(chain_str, flags, address=ZERO_ADDRESS, session=None):
"""Resets flags associated with the given address and chain.
If the resulting flags entry value is 0, the entry will be deleted.
@@ -134,7 +134,7 @@ class Lock(SessionBase):
@staticmethod
def check(chain_str, flags, address=ZERO_ADDRESS_NORMAL, session=None):
def check(chain_str, flags, address=ZERO_ADDRESS, session=None):
"""Checks whether all given flags are set for given address and chain.
Does not validate the address against any other tables or components.

View File

@@ -1,16 +0,0 @@
# external imports
from chainlib.eth.constant import ZERO_ADDRESS
from chainqueue.encode import TxHexNormalizer
from chainlib.eth.tx import unpack
tx_normalize = TxHexNormalizer()
ZERO_ADDRESS_NORMAL = tx_normalize.wallet_address(ZERO_ADDRESS)
def unpack_normal(signed_tx_bytes, chain_spec):
tx = unpack(signed_tx_bytes, chain_spec)
tx['hash'] = tx_normalize.tx_hash(tx['hash'])
tx['from'] = tx_normalize.wallet_address(tx['from'])
tx['to'] = tx_normalize.wallet_address(tx['to'])
return tx

View File

@@ -13,14 +13,17 @@ from chainlib.eth.sign import (
new_account,
sign_message,
)
from chainlib.eth.address import to_checksum_address, is_address
from chainlib.eth.tx import TxFormat
from chainlib.eth.address import to_checksum_address
from chainlib.eth.tx import (
TxFormat,
unpack,
)
from chainlib.chain import ChainSpec
from chainlib.error import JSONRPCException
from eth_accounts_index.registry import AccountRegistry
from eth_accounts_index import AccountsIndex
from sarafu_faucet import MinterFaucet
from chainqueue.sql.tx import cache_tx_dict
from chainqueue.db.models.tx import TxCache
# local import
from cic_eth_registry import CICRegistry
@@ -31,7 +34,6 @@ from cic_eth.eth.gas import (
from cic_eth.db.models.nonce import Nonce
from cic_eth.db.models.base import SessionBase
from cic_eth.db.models.role import AccountRole
from cic_eth.encode import tx_normalize
from cic_eth.error import (
RoleMissingError,
SignerError,
@@ -47,11 +49,6 @@ from cic_eth.eth.nonce import (
from cic_eth.queue.tx import (
register_tx,
)
from cic_eth.encode import (
unpack_normal,
ZERO_ADDRESS_NORMAL,
tx_normalize,
)
logg = logging.getLogger()
celery_app = celery.current_app
@@ -86,7 +83,7 @@ def create(self, password, chain_spec_dict):
# TODO: It seems infeasible that a can be None in any case, verify
if a == None:
raise SignerError('create account')
a = tx_normalize.wallet_address(a)
logg.debug('created account {}'.format(a))
# Initialize nonce provider record for account
@@ -177,9 +174,6 @@ def gift(self, account_address, chain_spec_dict):
"""
chain_spec = ChainSpec.from_dict(chain_spec_dict)
if is_address(account_address):
account_address = tx_normalize.wallet_address(account_address)
logg.debug('gift account address {} to index'.format(account_address))
queue = self.request.delivery_info.get('routing_key')
@@ -253,9 +247,8 @@ def have(self, account, chain_spec_dict):
@celery_app.task(bind=True, base=CriticalSQLAlchemyTask)
def set_role(self, tag, address, chain_spec_dict):
if not is_address(address):
raise ValueError('invalid address {}'.format(address))
address = tx_normalize.wallet_address(address)
if not to_checksum_address(address):
raise ValueError('invalid checksum address {}'.format(address))
session = SessionBase.create_session()
role = AccountRole.set(tag, address, session=session)
session.add(role)
@@ -302,24 +295,25 @@ def cache_gift_data(
chain_spec = ChainSpec.from_dict(chain_spec_dict)
tx_signed_raw_bytes = bytes.fromhex(strip_0x(tx_signed_raw_hex))
tx = unpack_normal(tx_signed_raw_bytes, chain_spec)
tx = unpack(tx_signed_raw_bytes, chain_spec)
tx_data = Faucet.parse_give_to_request(tx['data'])
sender_address = tx_normalize.wallet_address(tx['from'])
recipient_address = tx_normalize.wallet_address(tx['to'])
session = self.create_session()
tx_dict = {
'hash': tx['hash'],
'from': sender_address,
'to': recipient_address,
'source_token': ZERO_ADDRESS_NORMAL,
'destination_token': ZERO_ADDRESS_NORMAL,
'from_value': 0,
'to_value': 0,
}
tx_cache = TxCache(
tx_hash_hex,
tx['from'],
tx['to'],
ZERO_ADDRESS,
ZERO_ADDRESS,
0,
0,
session=session,
)
(tx_dict, cache_id) = cache_tx_dict(tx_dict, session=session)
session.add(tx_cache)
session.commit()
cache_id = tx_cache.id
session.close()
return (tx_hash_hex, cache_id)
@@ -343,22 +337,23 @@ def cache_account_data(
:rtype: tuple
"""
chain_spec = ChainSpec.from_dict(chain_spec_dict)
tx_signed_raw_bytes = bytes.fromhex(strip_0x(tx_signed_raw_hex))
tx = unpack_normal(tx_signed_raw_bytes, chain_spec)
tx_signed_raw_bytes = bytes.fromhex(tx_signed_raw_hex[2:])
tx = unpack(tx_signed_raw_bytes, chain_spec)
tx_data = AccountsIndex.parse_add_request(tx['data'])
sender_address = tx_normalize.wallet_address(tx['from'])
recipient_address = tx_normalize.wallet_address(tx['to'])
session = SessionBase.create_session()
tx_dict = {
'hash': tx['hash'],
'from': sender_address,
'to': recipient_address,
'source_token': ZERO_ADDRESS_NORMAL,
'destination_token': ZERO_ADDRESS_NORMAL,
'from_value': 0,
'to_value': 0,
}
(tx_dict, cache_id) = cache_tx_dict(tx_dict, session=session)
tx_cache = TxCache(
tx_hash_hex,
tx['from'],
tx['to'],
ZERO_ADDRESS,
ZERO_ADDRESS,
0,
0,
session=session,
)
session.add(tx_cache)
session.commit()
cache_id = tx_cache.id
session.close()
return (tx_hash_hex, cache_id)

View File

@@ -0,0 +1,385 @@
# standard imports
import os
import logging
# third-party imports
import celery
import web3
from cic_registry import CICRegistry
from cic_registry.chain import ChainSpec
# local imports
from cic_eth.db import SessionBase
from cic_eth.db.models.convert import TxConvertTransfer
from cic_eth.db.models.otx import Otx
from cic_eth.db.models.tx import TxCache
from cic_eth.eth.task import sign_and_register_tx
from cic_eth.eth.task import create_check_gas_and_send_task
from cic_eth.eth.token import TokenTxFactory
from cic_eth.eth.factory import TxFactory
from cic_eth.eth.util import unpack_signed_raw_tx
from cic_eth.eth.rpc import RpcClient
celery_app = celery.current_app
#logg = celery_app.log.get_default_logger()
logg = logging.getLogger()
contract_function_signatures = {
'convert': 'f3898a97',
'convert2': '569706eb',
}
class BancorTxFactory(TxFactory):
"""Factory for creating Bancor network transactions.
"""
def convert(
self,
source_token_address,
destination_token_address,
reserve_address,
source_amount,
minimum_return,
chain_spec,
fee_beneficiary='0x0000000000000000000000000000000000000000',
fee_ppm=0,
):
"""Create a BancorNetwork "convert" transaction.
:param source_token_address: ERC20 contract address for token to convert from
:type source_token_address: str, 0x-hex
:param destination_token_address: ERC20 contract address for token to convert to
:type destination_token_address: str, 0x-hex
:param reserve_address: ERC20 contract address of Common reserve token
:type reserve_address: str, 0x-hex
:param source_amount: Amount of source tokens to convert
:type source_amount: int
:param minimum_return: Minimum amount of destination tokens to accept as result for conversion
:type source_amount: int
:return: Unsigned "convert" transaction in standard Ethereum format
:rtype: dict
"""
network_contract = CICRegistry.get_contract(chain_spec, 'BancorNetwork')
network_gas = network_contract.gas('convert')
tx_convert_buildable = network_contract.contract.functions.convert2(
[
source_token_address,
source_token_address,
reserve_address,
destination_token_address,
destination_token_address,
],
source_amount,
minimum_return,
fee_beneficiary,
fee_ppm,
)
tx_convert = tx_convert_buildable.buildTransaction({
'from': self.address,
'gas': network_gas,
'gasPrice': self.gas_price,
'chainId': chain_spec.chain_id(),
'nonce': self.next_nonce(),
})
return tx_convert
def unpack_convert(data):
f = data[2:10]
if f != contract_function_signatures['convert2']:
raise ValueError('Invalid convert data ({})'.format(f))
d = data[10:]
path = d[384:]
source = path[64-40:64]
destination = path[-40:]
amount = int(d[64:128], 16)
min_return = int(d[128:192], 16)
fee_recipient = d[192:256]
fee = int(d[256:320], 16)
return {
'amount': amount,
'min_return': min_return,
'source_token': web3.Web3.toChecksumAddress('0x' + source),
'destination_token': web3.Web3.toChecksumAddress('0x' + destination),
'fee_recipient': fee_recipient,
'fee': fee,
}
# Kept for historical reference, it unpacks a convert call without fee parameters
#def _unpack_convert_mint(data):
# f = data[2:10]
# if f != contract_function_signatures['convert2']:
# raise ValueError('Invalid convert data ({})'.format(f))
#
# d = data[10:]
# path = d[256:]
# source = path[64-40:64]
# destination = path[-40:]
#
# amount = int(d[64:128], 16)
# min_return = int(d[128:192], 16)
# return {
# 'amount': amount,
# 'min_return': min_return,
# 'source_token': web3.Web3.toChecksumAddress('0x' + source),
# 'destination_token': web3.Web3.toChecksumAddress('0x' + destination),
# }
@celery_app.task(bind=True)
def convert_with_default_reserve(self, tokens, from_address, source_amount, minimum_return, to_address, chain_str):
"""Performs a conversion between two liquid tokens using Bancor network.
:param tokens: Token pair, source and destination respectively
:type tokens: list of str, 0x-hex
:param from_address: Ethereum address of sender
:type from_address: str, 0x-hex
:param source_amount: Amount of source tokens to convert
:type source_amount: int
:param minimum_return: Minimum about of destination tokens to receive
:type minimum_return: int
"""
chain_spec = ChainSpec.from_chain_str(chain_str)
queue = self.request.delivery_info['routing_key']
c = RpcClient(chain_spec, holder_address=from_address)
cr = CICRegistry.get_contract(chain_spec, 'BancorNetwork')
source_token = CICRegistry.get_address(chain_spec, tokens[0]['address'])
reserve_address = CICRegistry.get_contract(chain_spec, 'BNTToken', 'ERC20').address()
tx_factory = TokenTxFactory(from_address, c)
tx_approve_zero = tx_factory.approve(source_token.address(), cr.address(), 0, chain_spec)
(tx_approve_zero_hash_hex, tx_approve_zero_signed_hex) = sign_and_register_tx(tx_approve_zero, chain_str, queue, 'cic_eth.eth.token.otx_cache_approve')
tx_approve = tx_factory.approve(source_token.address(), cr.address(), source_amount, chain_spec)
(tx_approve_hash_hex, tx_approve_signed_hex) = sign_and_register_tx(tx_approve, chain_str, queue, 'cic_eth.eth.token.otx_cache_approve')
tx_factory = BancorTxFactory(from_address, c)
tx_convert = tx_factory.convert(
tokens[0]['address'],
tokens[1]['address'],
reserve_address,
source_amount,
minimum_return,
chain_spec,
)
(tx_convert_hash_hex, tx_convert_signed_hex) = sign_and_register_tx(tx_convert, chain_str, queue, 'cic_eth.eth.bancor.otx_cache_convert')
# TODO: consider moving save recipient to async task / chain it before the tx send
if to_address != None:
save_convert_recipient(tx_convert_hash_hex, to_address, chain_str)
s = create_check_gas_and_send_task(
[tx_approve_zero_signed_hex, tx_approve_signed_hex, tx_convert_signed_hex],
chain_str,
from_address,
tx_approve_zero['gasPrice'] * tx_approve_zero['gas'],
tx_hashes_hex=[tx_approve_hash_hex],
queue=queue,
)
s.apply_async()
return tx_convert_hash_hex
#@celery_app.task()
#def process_approval(tx_hash_hex):
# t = session.query(TxConvertTransfer).query(TxConvertTransfer.approve_tx_hash==tx_hash_hex).first()
# c = session.query(Otx).query(Otx.tx_hash==t.convert_tx_hash)
# gas_limit = 8000000
# gas_price = GasOracle.gas_price()
#
# # TODO: use celery group instead
# s_queue = celery.signature(
# 'cic_eth.queue.tx.create',
# [
# nonce,
# c['address'], # TODO: check that this is in fact sender address
# c['tx_hash'],
# c['signed_tx'],
# ]
# )
# s_queue.apply_async()
#
# s_check_gas = celery.signature(
# 'cic_eth.eth.gas.check_gas',
# [
# c['address'],
# [c['signed_tx']],
# gas_limit * gas_price,
# ]
# )
# s_send = celery.signature(
# 'cic_eth.eth.tx.send',
# [],
# )
#
# s_set_sent = celery.signature(
# 'cic_eth.queue.state.set_sent',
# [False],
# )
# s_send.link(s_set_sent)
# s_check_gas.link(s_send)
# s_check_gas.apply_async()
# return tx_hash_hex
@celery_app.task()
def save_convert_recipient(convert_hash, recipient_address, chain_str):
"""Registers the recipient target for a convert-and-transfer operation.
:param convert_hash: Transaction hash of convert operation
:type convert_hash: str, 0x-hex
:param recipient_address: Address of consequtive transfer recipient
:type recipient_address: str, 0x-hex
"""
session = SessionBase.create_session()
t = TxConvertTransfer(convert_hash, recipient_address, chain_str)
session.add(t)
session.commit()
session.close()
@celery_app.task()
def save_convert_transfer(convert_hash, transfer_hash):
"""Registers that the transfer part of a convert-and-transfer operation has been executed.
:param convert_hash: Transaction hash of convert operation
:type convert_hash: str, 0x-hex
:param convert_hash: Transaction hash of transfer operation
:type convert_hash: str, 0x-hex
:returns: transfer_hash,
:rtype: list, single str, 0x-hex
"""
session = SessionBase.create_session()
t = TxConvertTransfer.get(convert_hash)
t.transfer(transfer_hash)
session.add(t)
session.commit()
session.close()
return [transfer_hash]
# TODO: seems unused, consider removing
@celery_app.task()
def resolve_converters_by_tokens(tokens, chain_str):
"""Return converters for a list of tokens.
:param tokens: Token addresses to look up
:type tokens: list of str, 0x-hex
:return: Addresses of matching converters
:rtype: list of str, 0x-hex
"""
chain_spec = ChainSpec.from_chain_str(chain_str)
for t in tokens:
c = CICRegistry.get_contract(chain_spec, 'ConverterRegistry')
fn = c.function('getConvertersByAnchors')
try:
converters = fn([t['address']]).call()
except Exception as e:
raise e
t['converters'] = converters
return tokens
@celery_app.task(bind=True)
def transfer_converted(self, tokens, holder_address, receiver_address, value, tx_convert_hash_hex, chain_str):
"""Execute the ERC20 transfer of a convert-and-transfer operation.
First argument is a list of tokens, to enable the task to be chained to the symbol to token address resolver function. However, it accepts only one token as argument.
:param tokens: Token addresses
:type tokens: list of str, 0x-hex
:param holder_address: Token holder address
:type holder_address: str, 0x-hex
:param holder_address: Token receiver address
:type holder_address: str, 0x-hex
:param value: Amount of token, in 'wei'
:type value: int
:raises TokenCountError: Either none or more then one tokens have been passed as tokens argument
:return: Transaction hash
:rtype: str, 0x-hex
"""
# we only allow one token, one transfer
if len(tokens) != 1:
raise TokenCountError
chain_spec = ChainSpec.from_chain_str(chain_str)
queue = self.request.delivery_info['routing_key']
c = RpcClient(chain_spec, holder_address=holder_address)
# get transaction parameters
gas_price = c.gas_price()
tx_factory = TokenTxFactory(holder_address, c)
token_address = tokens[0]['address']
tx_transfer = tx_factory.transfer(
token_address,
receiver_address,
value,
chain_spec,
)
(tx_transfer_hash_hex, tx_transfer_signed_hex) = sign_and_register_tx(tx_transfer, chain_str, queue, 'cic_eth.eth.token.otx_cache_transfer')
# send transaction
logg.info('transfer converted token {} from {} to {} value {} {}'.format(token_address, holder_address, receiver_address, value, tx_transfer_signed_hex))
s = create_check_gas_and_send_task(
[tx_transfer_signed_hex],
chain_str,
holder_address,
tx_transfer['gasPrice'] * tx_transfer['gas'],
None,
queue,
)
s_save = celery.signature(
'cic_eth.eth.bancor.save_convert_transfer',
[
tx_convert_hash_hex,
tx_transfer_hash_hex,
],
queue=queue,
)
s_save.link(s)
s_save.apply_async()
return tx_transfer_hash_hex
@celery_app.task()
def otx_cache_convert(
tx_hash_hex,
tx_signed_raw_hex,
chain_str,
):
chain_spec = ChainSpec.from_chain_str(chain_str)
tx_signed_raw_bytes = bytes.fromhex(tx_signed_raw_hex[2:])
tx = unpack(tx_signed_raw_bytes, chain_spec)
tx_data = unpack_convert(tx['data'])
logg.debug('tx data {}'.format(tx_data))
session = TxCache.create_session()
tx_cache = TxCache(
tx_hash_hex,
tx['from'],
tx['from'],
tx_data['source_token'],
tx_data['destination_token'],
tx_data['amount'],
tx_data['amount'],
)
session.add(tx_cache)
session.commit()
session.close()
return tx_hash_hex

View File

@@ -12,13 +12,10 @@ from chainlib.eth.tx import (
)
from cic_eth_registry import CICRegistry
from cic_eth_registry.erc20 import ERC20Token
from hexathon import (
strip_0x,
add_0x,
)
from hexathon import strip_0x
from chainqueue.db.models.tx import TxCache
from chainqueue.error import NotLocalTxError
from eth_erc20 import ERC20
from chainqueue.sql.tx import cache_tx_dict
# local imports
from cic_eth.db.models.base import SessionBase
@@ -41,7 +38,6 @@ from cic_eth.task import (
CriticalSQLAlchemyAndSignerTask,
)
from cic_eth.eth.nonce import CustodialTaskNonceOracle
from cic_eth.encode import tx_normalize
celery_app = celery.current_app
logg = logging.getLogger()
@@ -66,8 +62,7 @@ def balance(tokens, holder_address, chain_spec_dict):
for t in tokens:
address = t['address']
logg.debug('address {} {}'.format(address, holder_address))
token = ERC20Token(chain_spec, rpc, add_0x(address))
token = ERC20Token(chain_spec, rpc, address)
c = ERC20(chain_spec)
o = c.balance_of(address, holder_address, sender_address=caller_address)
r = rpc.do(o)
@@ -376,22 +371,23 @@ def cache_transfer_data(
tx = unpack(tx_signed_raw_bytes, chain_spec)
tx_data = ERC20.parse_transfer_request(tx['data'])
sender_address = tx_normalize.wallet_address(tx['from'])
recipient_address = tx_normalize.wallet_address(tx_data[0])
recipient_address = tx_data[0]
token_value = tx_data[1]
session = SessionBase.create_session()
tx_dict = {
'hash': tx_hash_hex,
'from': sender_address,
'to': recipient_address,
'source_token': tx['to'],
'destination_token': tx['to'],
'from_value': token_value,
'to_value': token_value,
}
(tx_dict, cache_id) = cache_tx_dict(tx_dict, session=session)
tx_cache = TxCache(
tx_hash_hex,
tx['from'],
recipient_address,
tx['to'],
tx['to'],
token_value,
token_value,
session=session,
)
session.add(tx_cache)
session.commit()
cache_id = tx_cache.id
session.close()
return (tx_hash_hex, cache_id)
@@ -421,16 +417,19 @@ def cache_transfer_from_data(
token_value = tx_data[2]
session = SessionBase.create_session()
tx_dict = {
'hash': tx_hash_hex,
'from': tx['from'],
'to': recipient_address,
'source_token': tx['to'],
'destination_token': tx['to'],
'from_value': token_value,
'to_value': token_value,
}
(tx_dict, cache_id) = cache_tx_dict(tx_dict, session=session)
tx_cache = TxCache(
tx_hash_hex,
tx['from'],
recipient_address,
tx['to'],
tx['to'],
token_value,
token_value,
session=session,
)
session.add(tx_cache)
session.commit()
cache_id = tx_cache.id
session.close()
return (tx_hash_hex, cache_id)
@@ -455,21 +454,23 @@ def cache_approve_data(
tx = unpack(tx_signed_raw_bytes, chain_spec)
tx_data = ERC20.parse_approve_request(tx['data'])
sender_address = tx_normalize.wallet_address(tx['from'])
recipient_address = tx_normalize.wallet_address(tx_data[0])
recipient_address = tx_data[0]
token_value = tx_data[1]
session = SessionBase.create_session()
tx_dict = {
'hash': tx_hash_hex,
'from': sender_address,
'to': recipient_address,
'source_token': tx['to'],
'destination_token': tx['to'],
'from_value': token_value,
'to_value': token_value,
}
(tx_dict, cache_id) = cache_tx_dict(tx_dict, session=session)
tx_cache = TxCache(
tx_hash_hex,
tx['from'],
recipient_address,
tx['to'],
tx['to'],
token_value,
token_value,
session=session,
)
session.add(tx_cache)
session.commit()
cache_id = tx_cache.id
session.close()
return (tx_hash_hex, cache_id)

View File

@@ -3,20 +3,12 @@ import logging
# external imports
import celery
from hexathon import (
strip_0x,
add_0x,
)
#from chainlib.eth.constant import ZERO_ADDRESS
from hexathon import strip_0x
from chainlib.eth.constant import ZERO_ADDRESS
from chainlib.chain import ChainSpec
from chainlib.eth.address import (
is_checksum_address,
to_checksum_address,
is_address
)
from chainlib.eth.address import is_checksum_address
from chainlib.connection import RPCConnection
from chainqueue.db.enum import StatusBits
from chainqueue.sql.tx import cache_tx_dict
from chainlib.eth.gas import (
balance,
price,
@@ -28,6 +20,7 @@ from chainlib.eth.error import (
from chainlib.eth.tx import (
TxFactory,
TxFormat,
unpack,
)
from chainlib.eth.contract import (
abi_decode_single,
@@ -51,7 +44,6 @@ from cic_eth.eth.nonce import CustodialTaskNonceOracle
from cic_eth.queue.tx import (
queue_create,
register_tx,
unpack,
)
from cic_eth.queue.query import get_tx
from cic_eth.task import (
@@ -60,11 +52,6 @@ from cic_eth.task import (
CriticalSQLAlchemyAndSignerTask,
CriticalWeb3AndSignerTask,
)
from cic_eth.encode import (
tx_normalize,
ZERO_ADDRESS_NORMAL,
unpack_normal,
)
celery_app = celery.current_app
logg = logging.getLogger()
@@ -142,27 +129,30 @@ def cache_gas_data(
"""
chain_spec = ChainSpec.from_dict(chain_spec_dict)
tx_signed_raw_bytes = bytes.fromhex(strip_0x(tx_signed_raw_hex))
tx = unpack_normal(tx_signed_raw_bytes, chain_spec)
tx = unpack(tx_signed_raw_bytes, chain_spec)
session = SessionBase.create_session()
tx_dict = {
'hash': tx['hash'],
'from': tx['from'],
'to': tx['to'],
'source_token': ZERO_ADDRESS_NORMAL,
'destination_token': ZERO_ADDRESS_NORMAL,
'from_value': tx['value'],
'to_value': tx['value'],
}
tx_cache = TxCache(
tx_hash_hex,
tx['from'],
tx['to'],
ZERO_ADDRESS,
ZERO_ADDRESS,
tx['value'],
tx['value'],
session=session,
)
(tx_dict, cache_id) = cache_tx_dict(tx_dict, session=session)
session.add(tx_cache)
session.commit()
cache_id = tx_cache.id
session.close()
return (tx_hash_hex, cache_id)
@celery_app.task(bind=True, throws=(OutOfGasError), base=CriticalSQLAlchemyAndWeb3Task)
def check_gas(self, tx_hashes_hex, chain_spec_dict, txs_hex=[], address=None, gas_required=MAXIMUM_FEE_UNITS):
def check_gas(self, tx_hashes, chain_spec_dict, txs=[], address=None, gas_required=MAXIMUM_FEE_UNITS):
"""Check the gas level of the sender address of a transaction.
If the account balance is not sufficient for the required gas, gas refill is requested and OutOfGasError raiser.
@@ -182,23 +172,8 @@ def check_gas(self, tx_hashes_hex, chain_spec_dict, txs_hex=[], address=None, ga
:return: Signed raw transaction data list
:rtype: param txs, unchanged
"""
rpc_format_address = None
if address != None:
if not is_address(address):
raise ValueError('invalid address {}'.format(address))
address = tx_normalize.wallet_address(address)
address = add_0x(address)
tx_hashes = []
txs = []
for tx_hash in tx_hashes_hex:
tx_hash = tx_normalize.tx_hash(tx_hash)
tx_hashes.append(tx_hash)
for tx in txs_hex:
tx = tx_normalize.tx_wire(tx)
txs.append(tx)
chain_spec = ChainSpec.from_dict(chain_spec_dict)
logg.debug('txs {} tx_hashes {}'.format(txs, tx_hashes))
addresspass = None
if len(txs) == 0:
@@ -214,7 +189,8 @@ def check_gas(self, tx_hashes_hex, chain_spec_dict, txs_hex=[], address=None, ga
raise ValueError('txs passed to check gas must all have same sender; had {} got {}'.format(address, tx['from']))
addresspass.append(address)
rpc_format_address = add_0x(to_checksum_address(address))
if not is_checksum_address(address):
raise ValueError('invalid address {}'.format(address))
queue = self.request.delivery_info.get('routing_key')
@@ -222,7 +198,7 @@ def check_gas(self, tx_hashes_hex, chain_spec_dict, txs_hex=[], address=None, ga
gas_balance = 0
try:
o = balance(rpc_format_address)
o = balance(address)
r = conn.do(o)
conn.disconnect()
gas_balance = abi_decode_single(ABIContractType.UINT256, r)
@@ -330,7 +306,6 @@ def refill_gas(self, recipient_address, chain_spec_dict):
# Determine value of gas tokens to send
# if an uncompleted gas refill for the same recipient already exists, we still need to spend the nonce
# however, we will perform a 0-value transaction instead
recipient_address = tx_normalize.wallet_address(recipient_address)
zero_amount = False
session = SessionBase.create_session()
status_filter = StatusBits.FINAL | StatusBits.NODE_ERROR | StatusBits.NETWORK_ERROR | StatusBits.UNKNOWN_ERROR
@@ -405,7 +380,6 @@ def resend_with_higher_gas(self, txold_hash_hex, chain_spec_dict, gas=None, defa
:returns: Transaction hash
:rtype: str, 0x-hex
"""
txold_hash_hex = tx_normalize.tx_hash(txold_hash_hex)
session = SessionBase.create_session()
otx = Otx.load(txold_hash_hex, session)

View File

@@ -2,9 +2,8 @@
from chainlib.eth.constant import ZERO_ADDRESS
from chainlib.status import Status as TxStatus
from cic_eth_registry.erc20 import ERC20Token
from hexathon import add_0x
# local impor:ts
# local imports
from cic_eth.ext.address import translate_address
@@ -45,8 +44,8 @@ class ExtendedTx:
destination = source
if destination_value == None:
destination_value = source_value
st = ERC20Token(self.chain_spec, self.rpc, add_0x(source))
dt = ERC20Token(self.chain_spec, self.rpc, add_0x(destination))
st = ERC20Token(self.chain_spec, self.rpc, source)
dt = ERC20Token(self.chain_spec, self.rpc, destination)
self.source_token = source
self.source_token_symbol = st.symbol
self.source_token_name = st.name

Some files were not shown because too many files have changed in this diff Show More