Compare commits

..

3 Commits

332 changed files with 2693 additions and 13127 deletions

6
.gitignore vendored
View File

@@ -8,9 +8,3 @@ gmon.out
*.egg-info
dist/
build/
**/*sqlite
**/.nyc_output
**/coverage
**/.venv
.idea
**/.vim

View File

@@ -6,7 +6,6 @@ include:
- local: 'apps/cic-notify/.gitlab-ci.yml'
- local: 'apps/cic-meta/.gitlab-ci.yml'
- local: 'apps/cic-cache/.gitlab-ci.yml'
- local: 'apps/data-seeding/.gitlab-ci.yml'
stages:
- build

View File

@@ -2,5 +2,4 @@
omit =
.venv/*
scripts/*
cic_cache/db/migrations/*
cic_cache/version.py
cic_cache/db/postgres/*

View File

@@ -1,4 +0,0 @@
.git
.cache
.dot
**/doc

View File

@@ -1,52 +1,22 @@
.cic_cache_variables:
variables:
APP_NAME: cic-cache
DOCKERFILE_PATH: docker/Dockerfile_ci
CONTEXT: apps/$APP_NAME
DOCKERFILE_PATH: $APP_NAME/docker/Dockerfile
.cic_cache_changes_target:
rules:
- changes:
- $CONTEXT/$APP_NAME/*
build-mr-cic-cache:
extends:
- .cic_cache_changes_target
- .py_build_merge_request
- .cic_cache_variables
rules:
- if: $CI_PIPELINE_SOURCE == "merge_request_event"
changes:
- apps/cic-cache/**/*
when: always
test-mr-cic-cache:
stage: test
extends:
- .cic_cache_variables
cache:
key:
files:
- test_requirements.txt
paths:
- /root/.cache/pip
image: $MR_IMAGE_TAG
script:
- cd apps/$APP_NAME/
- >
pip install --extra-index-url https://pip.grassrootseconomics.net:8433
--extra-index-url https://gitlab.com/api/v4/projects/27624814/packages/pypi/simple
-r test_requirements.txt
- export PYTHONPATH=. && pytest -x --cov=cic_cache --cov-fail-under=90 --cov-report term-missing tests
needs: ["build-mr-cic-cache"]
rules:
- if: $CI_PIPELINE_SOURCE == "merge_request_event"
changes:
- apps/$APP_NAME/**/*
when: always
build-push-cic-cache:
extends:
- .py_build_push
- .cic_cache_variables
rules:
- if: $CI_COMMIT_BRANCH == "master"
changes:
- apps/cic-cache/**/*
when: always

View File

@@ -1,33 +1,25 @@
# standard imports
import logging
import datetime
# external imports
# third-party imports
import moolb
# local imports
from cic_cache.db.list import (
list_transactions_mined,
list_transactions_account_mined,
list_transactions_mined_with_data,
)
from cic_cache.db import list_transactions_mined
from cic_cache.db import list_transactions_account_mined
logg = logging.getLogger()
DEFAULT_FILTER_SIZE = 8192 * 8
class Cache:
class BloomCache:
def __init__(self, session):
self.session = session
class BloomCache(Cache):
@staticmethod
def __get_filter_size(n):
n = DEFAULT_FILTER_SIZE
n = 8192 * 8
logg.warning('filter size hardcoded to {}'.format(n))
return n
@@ -95,44 +87,3 @@ class BloomCache(Cache):
f_blocktx.add(block + tx)
logg.debug('added block {} tx {} lo {} hi {}'.format(r[0], r[1], lowest_block, highest_block))
return (lowest_block, highest_block, f_block.to_bytes(), f_blocktx.to_bytes(),)
class DataCache(Cache):
def load_transactions_with_data(self, offset, end):
rows = list_transactions_mined_with_data(self.session, offset, end)
tx_cache = []
highest_block = -1;
lowest_block = -1;
date_is_str = None # stick this in startup
for r in rows:
if highest_block == -1:
highest_block = r['block_number']
lowest_block = r['block_number']
tx_type = 'unknown'
if r['value'] != None:
tx_type = '{}.{}'.format(r['domain'], r['value'])
if date_is_str == None:
date_is_str = type(r['date_block']).__name__ == 'str'
o = {
'block_number': r['block_number'],
'tx_hash': r['tx_hash'],
'date_block': r['date_block'],
'sender': r['sender'],
'recipient': r['recipient'],
'from_value': int(r['from_value']),
'to_value': int(r['to_value']),
'source_token': r['source_token'],
'destination_token': r['destination_token'],
'success': r['success'],
'tx_type': tx_type,
}
if date_is_str:
o['date_block'] = datetime.datetime.fromisoformat(r['date_block'])
tx_cache.append(o)
return (lowest_block, highest_block, tx_cache)

View File

@@ -28,26 +28,6 @@ def list_transactions_mined(
return r
def list_transactions_mined_with_data(
session,
offset,
end,
):
"""Executes db query to return all confirmed transactions according to the specified offset and limit.
:param offset: Offset in data set to return transactions from
:type offset: int
:param limit: Max number of transactions to retrieve
:type limit: int
:result: Result set
:rtype: SQLAlchemy.ResultProxy
"""
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} AND block_number <= {} ORDER BY block_number ASC, tx_index ASC".format(offset, end)
r = session.execute(s)
return r
def list_transactions_account_mined(
session,
address,

View File

@@ -1,2 +1 @@
from .erc20 import *
from .faucet import *

View File

@@ -1,73 +0,0 @@
# standard imports
import logging
# external imports
from erc20_faucet import Faucet
from chainlib.eth.address import to_checksum_address
from chainlib.eth.constant import ZERO_ADDRESS
from chainlib.status import Status
from hexathon import strip_0x
# local imports
import cic_cache.db as cic_cache_db
from .base import TagSyncFilter
#logg = logging.getLogger().getChild(__name__)
logg = logging.getLogger()
class FaucetFilter(TagSyncFilter):
def __init__(self, chain_spec, sender_address=ZERO_ADDRESS):
super(FaucetFilter, self).__init__('give_to', domain='faucet')
self.chain_spec = chain_spec
self.sender_address = sender_address
def filter(self, conn, block, tx, db_session=None):
try:
data = strip_0x(tx.payload)
except ValueError:
return False
logg.debug('data {}'.format(data))
if Faucet.method_for(data[:8]) == None:
return False
token_sender = tx.inputs[0]
token_recipient = data[64+8-40:]
logg.debug('token recipient {}'.format(token_recipient))
f = Faucet(self.chain_spec)
o = f.token(token_sender, sender_address=self.sender_address)
r = conn.do(o)
token = f.parse_token(r)
f = Faucet(self.chain_spec)
o = f.token_amount(token_sender, sender_address=self.sender_address)
r = conn.do(o)
token_value = f.parse_token_amount(r)
cic_cache_db.add_transaction(
db_session,
tx.hash,
block.number,
tx.index,
to_checksum_address(token_sender),
to_checksum_address(token_recipient),
token,
token,
token_value,
token_value,
tx.status == Status.SUCCESS,
block.timestamp,
)
db_session.flush()
cic_cache_db.tag_transaction(
db_session,
tx.hash,
self.tag_name,
domain=self.tag_domain,
)
db_session.commit()
return True

View File

@@ -1,114 +0,0 @@
# standard imports
import logging
import json
import re
import base64
# external imports
from hexathon import add_0x
# local imports
from cic_cache.cache import (
BloomCache,
DataCache,
)
logg = logging.getLogger(__name__)
#logg = logging.getLogger()
re_transactions_all_bloom = r'/tx/(\d+)?/?(\d+)/?'
re_transactions_account_bloom = r'/tx/user/((0x)?[a-fA-F0-9]+)(/(\d+)(/(\d+))?)?/?'
re_transactions_all_data = r'/txa/(\d+)?/?(\d+)/?'
DEFAULT_LIMIT = 100
def process_transactions_account_bloom(session, env):
r = re.match(re_transactions_account_bloom, env.get('PATH_INFO'))
if not r:
return None
address = r[1]
if r[2] == None:
address = add_0x(address)
offset = 0
if r.lastindex > 2:
offset = r[4]
limit = DEFAULT_LIMIT
if r.lastindex > 4:
limit = r[6]
c = BloomCache(session)
(lowest_block, highest_block, bloom_filter_block, bloom_filter_tx) = c.load_transactions_account(address, offset, limit)
o = {
'alg': 'sha256',
'low': lowest_block,
'high': highest_block,
'block_filter': base64.b64encode(bloom_filter_block).decode('utf-8'),
'blocktx_filter': base64.b64encode(bloom_filter_tx).decode('utf-8'),
'filter_rounds': 3,
}
j = json.dumps(o)
return ('application/json', j.encode('utf-8'),)
def process_transactions_all_bloom(session, env):
r = re.match(re_transactions_all_bloom, env.get('PATH_INFO'))
if not r:
return None
offset = DEFAULT_LIMIT
if r.lastindex > 0:
offset = r[1]
limit = 0
if r.lastindex > 1:
limit = r[2]
c = BloomCache(session)
(lowest_block, highest_block, bloom_filter_block, bloom_filter_tx) = c.load_transactions(offset, limit)
o = {
'alg': 'sha256',
'low': lowest_block,
'high': highest_block,
'block_filter': base64.b64encode(bloom_filter_block).decode('utf-8'),
'blocktx_filter': base64.b64encode(bloom_filter_tx).decode('utf-8'),
'filter_rounds': 3,
}
j = json.dumps(o)
return ('application/json', j.encode('utf-8'),)
def process_transactions_all_data(session, env):
r = re.match(re_transactions_all_data, env.get('PATH_INFO'))
if not r:
return None
if env.get('HTTP_X_CIC_CACHE_MODE') != 'all':
return None
offset = r[1]
end = r[2]
if int(r[2]) < int(r[1]):
raise ValueError('cart before the horse, dude')
c = DataCache(session)
(lowest_block, highest_block, tx_cache) = c.load_transactions_with_data(offset, end)
for r in tx_cache:
r['date_block'] = r['date_block'].timestamp()
o = {
'low': lowest_block,
'high': highest_block,
'data': tx_cache,
}
j = json.dumps(o)
return ('application/json', j.encode('utf-8'),)

View File

@@ -1,20 +1,18 @@
# standard imports
import os
import re
import logging
import argparse
import json
import base64
# external imports
# third-party imports
import confini
# local imports
from cic_cache import BloomCache
from cic_cache.db import dsn_from_config
from cic_cache.db.models.base import SessionBase
from cic_cache.runnable.daemons.query import (
process_transactions_account_bloom,
process_transactions_all_bloom,
process_transactions_all_data,
)
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
@@ -46,6 +44,72 @@ logg.debug('config:\n{}'.format(config))
dsn = dsn_from_config(config)
SessionBase.connect(dsn, config.true('DATABASE_DEBUG'))
re_transactions_all_bloom = r'/tx/(\d+)?/?(\d+)/?'
re_transactions_account_bloom = r'/tx/user/((0x)?[a-fA-F0-9]+)/?(\d+)?/?(\d+)/?'
DEFAULT_LIMIT = 100
def process_transactions_account_bloom(session, env):
r = re.match(re_transactions_account_bloom, env.get('PATH_INFO'))
if not r:
return None
address = r[1]
if r[2] == None:
address = '0x' + address
offset = DEFAULT_LIMIT
if r.lastindex > 2:
offset = r[3]
limit = 0
if r.lastindex > 3:
limit = r[4]
c = BloomCache(session)
(lowest_block, highest_block, bloom_filter_block, bloom_filter_tx) = c.load_transactions_account(address, offset, limit)
o = {
'alg': 'sha256',
'low': lowest_block,
'high': highest_block,
'block_filter': base64.b64encode(bloom_filter_block).decode('utf-8'),
'blocktx_filter': base64.b64encode(bloom_filter_tx).decode('utf-8'),
'filter_rounds': 3,
}
j = json.dumps(o)
return ('application/json', j.encode('utf-8'),)
def process_transactions_all_bloom(session, env):
r = re.match(re_transactions_all_bloom, env.get('PATH_INFO'))
if not r:
return None
offset = DEFAULT_LIMIT
if r.lastindex > 0:
offset = r[1]
limit = 0
if r.lastindex > 1:
limit = r[2]
c = BloomCache(session)
(lowest_block, highest_block, bloom_filter_block, bloom_filter_tx) = c.load_transactions(offset, limit)
o = {
'alg': 'sha256',
'low': lowest_block,
'high': highest_block,
'block_filter': base64.b64encode(bloom_filter_block).decode('utf-8'),
'blocktx_filter': base64.b64encode(bloom_filter_tx).decode('utf-8'),
'filter_rounds': 3,
}
j = json.dumps(o)
return ('application/json', j.encode('utf-8'),)
# uwsgi application
def application(env, start_response):
@@ -55,16 +119,10 @@ def application(env, start_response):
session = SessionBase.create_session()
for handler in [
process_transactions_all_data,
process_transactions_all_bloom,
process_transactions_account_bloom,
]:
r = None
try:
r = handler(session, env)
except ValueError as e:
start_response('400 {}'.format(str(e)))
return []
r = handler(session, env)
if r != None:
(mime_type, content) = r
break

View File

@@ -16,7 +16,6 @@ import cic_base.config
import cic_base.log
import cic_base.argparse
import cic_base.rpc
from cic_base.eth.syncer import chain_interface
from cic_eth_registry import CICRegistry
from cic_eth_registry.error import UnknownContractError
from chainlib.chain import ChainSpec
@@ -29,8 +28,10 @@ from hexathon import (
strip_0x,
)
from chainsyncer.backend.sql import SQLBackend
from chainsyncer.driver.head import HeadSyncer
from chainsyncer.driver.history import HistorySyncer
from chainsyncer.driver import (
HeadSyncer,
HistorySyncer,
)
from chainsyncer.db.models.base import SessionBase
# local imports
@@ -40,26 +41,16 @@ from cic_cache.db import (
)
from cic_cache.runnable.daemons.filters import (
ERC20TransferFilter,
FaucetFilter,
)
script_dir = os.path.realpath(os.path.dirname(__file__))
def add_block_args(argparser):
argparser.add_argument('--history-start', type=int, default=0, dest='history_start', help='Start block height for initial history sync')
argparser.add_argument('--no-history', action='store_true', dest='no_history', help='Skip initial history sync')
return argparser
logg = cic_base.log.create()
argparser = cic_base.argparse.create(script_dir, cic_base.argparse.full_template)
argparser = cic_base.argparse.add(argparser, add_block_args, 'block')
#argparser = cic_base.argparse.add(argparser, add_traffic_args, 'traffic')
args = cic_base.argparse.parse(argparser, logg)
config = cic_base.config.create(args.c, args, args.env_prefix)
config.add(args.history_start, 'SYNCER_HISTORY_START', True)
config.add(args.no_history, '_NO_HISTORY', True)
cic_base.config.log(config)
dsn = dsn_from_config(config)
@@ -68,6 +59,7 @@ SessionBase.connect(dsn, debug=config.true('DATABASE_DEBUG'))
chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
#RPCConnection.register_location(config.get('ETH_PROVIDER'), chain_spec, 'default')
cic_base.rpc.setup(chain_spec, config.get('ETH_PROVIDER'))
@@ -79,7 +71,6 @@ def register_filter_tags(filters, session):
session.commit()
logg.info('added tag name "{}" domain "{}"'.format(tag[0], tag[1]))
except sqlalchemy.exc.IntegrityError:
session.rollback()
logg.debug('already have tag name "{}" domain "{}"'.format(tag[0], tag[1]))
@@ -91,7 +82,7 @@ def main():
r = rpc.do(o)
block_offset = int(strip_0x(r), 16) + 1
logg.debug('current block height {}'.format(block_offset))
logg.debug('starting at block {}'.format(block_offset))
syncers = []
@@ -100,22 +91,17 @@ def main():
syncer_backends = SQLBackend.resume(chain_spec, block_offset)
if len(syncer_backends) == 0:
initial_block_start = config.get('SYNCER_HISTORY_START')
initial_block_offset = block_offset
if config.get('_NO_HISTORY'):
initial_block_start = block_offset
initial_block_offset += 1
syncer_backends.append(SQLBackend.initial(chain_spec, initial_block_offset, start_block_height=initial_block_start))
logg.info('found no backends to resume, adding initial sync from history start {} end {}'.format(initial_block_start, initial_block_offset))
logg.info('found no backends to resume')
syncer_backends.append(SQLBackend.initial(chain_spec, block_offset))
else:
for syncer_backend in syncer_backends:
logg.info('resuming sync session {}'.format(syncer_backend))
for syncer_backend in syncer_backends:
syncers.append(HistorySyncer(syncer_backend, chain_interface))
syncers.append(HistorySyncer(syncer_backend))
syncer_backend = SQLBackend.live(chain_spec, block_offset+1)
syncers.append(HeadSyncer(syncer_backend, chain_interface))
syncers.append(HeadSyncer(syncer_backend))
trusted_addresses_src = config.get('CIC_TRUST_ADDRESS')
if trusted_addresses_src == None:
@@ -126,11 +112,9 @@ def main():
logg.info('using trusted address {}'.format(address))
erc20_transfer_filter = ERC20TransferFilter(chain_spec)
faucet_filter = FaucetFilter(chain_spec)
filters = [
erc20_transfer_filter,
faucet_filter,
]
session = SessionBase.create_session()

View File

@@ -1,2 +1,2 @@
[eth]
provider = http://localhost:63545
provider = ws://localhost:63546

View File

@@ -1,3 +1,2 @@
[syncer]
loop_interval = 1
history_start = 0

View File

@@ -1,3 +1,2 @@
[syncer]
loop_interval = 5
history_start = 0

View File

@@ -1,2 +0,0 @@
[syncer]
loop_interval = 1

View File

@@ -1,38 +1,52 @@
# syntax = docker/dockerfile:1.2
FROM registry.gitlab.com/grassrootseconomics/cic-base-images:python-3.8.6-dev-55da5f4e as dev
# RUN pip install $pip_extra_index_url_flag cic-base[full_graph]==0.1.2b9
FROM python:3.8.6-slim-buster
COPY requirements.txt .
#RUN pip install $pip_extra_index_url_flag -r test_requirements.txt
#RUN pip install $pip_extra_index_url_flag .
#RUN pip install .[server]
#COPY --from=0 /usr/local/share/cic/solidity/ /usr/local/share/cic/solidity/
ARG EXTRA_INDEX_URL="https://pip.grassrootseconomics.net:8433"
ARG GITLAB_PYTHON_REGISTRY="https://gitlab.com/api/v4/projects/27624814/packages/pypi/simple"
RUN --mount=type=cache,mode=0755,target=/root/.cache/pip \
pip install --index-url https://pypi.org/simple \
--extra-index-url $GITLAB_PYTHON_REGISTRY --extra-index-url $EXTRA_INDEX_URL \
-r requirements.txt
WORKDIR /usr/src/cic-cache
COPY . .
ARG pip_extra_index_url_flag='--index https://pypi.org/simple --extra-index-url https://pip.grassrootseconomics.net:8433'
ARG root_requirement_file='requirements.txt'
RUN python setup.py install
#RUN apk update && \
# apk add gcc musl-dev gnupg libpq
#RUN apk add postgresql-dev
#RUN apk add linux-headers
#RUN apk add libffi-dev
RUN apt-get update && \
apt install -y gcc gnupg libpq-dev wget make g++ gnupg bash procps git
# Copy shared requirements from top of mono-repo
RUN echo "copying root req file ${root_requirement_file}"
RUN pip install $pip_extra_index_url_flag cic-base[full_graph]==0.1.2a76
COPY cic-cache/requirements.txt ./
COPY cic-cache/setup.cfg \
cic-cache/setup.py \
./
COPY cic-cache/cic_cache/ ./cic_cache/
COPY cic-cache/scripts/ ./scripts/
COPY cic-cache/test_requirements.txt ./
RUN pip install $pip_extra_index_url_flag -r test_requirements.txt
RUN pip install $pip_extra_index_url_flag .
RUN pip install .[server]
COPY cic-cache/tests/ ./tests/
#COPY db/ cic-cache/db
#RUN apk add postgresql-client
# ini files in config directory defines the configurable parameters for the application
# they can all be overridden by environment variables
# to generate a list of environment variables from configuration, use: confini-dump -z <dir> (executable provided by confini package)
COPY config/ /usr/local/etc/cic-cache/
COPY cic-cache/config/ /usr/local/etc/cic-cache/
# for db migrations
RUN git clone https://github.com/vishnubob/wait-for-it.git /usr/local/bin/wait-for-it/
COPY cic_cache/db/migrations/ /usr/local/share/cic-cache/alembic/
COPY cic-cache/cic_cache/db/migrations/ /usr/local/share/cic-cache/alembic/
COPY /docker/start_tracker.sh ./start_tracker.sh
COPY /docker/db.sh ./db.sh
COPY cic-cache/docker/start_tracker.sh ./start_tracker.sh
COPY cic-cache/docker/db.sh ./db.sh
RUN chmod 755 ./*.sh
# Tracker
# ENTRYPOINT ["/usr/local/bin/cic-cache-tracker", "-vv"]
# Server
# ENTRYPOINT [ "/usr/local/bin/uwsgi", "--wsgi-file", "/usr/local/lib/python3.8/site-packages/cic_cache/runnable/server.py", "--http", ":80", "--pyargv", "-vv" ]
ENTRYPOINT []

View File

@@ -1,37 +0,0 @@
# syntax = docker/dockerfile:1.2
FROM registry.gitlab.com/grassrootseconomics/cic-base-images:python-3.8.6-dev-55da5f4e as dev
# RUN pip install $pip_extra_index_url_flag cic-base[full_graph]==0.1.2b9
COPY requirements.txt .
#RUN pip install $pip_extra_index_url_flag -r test_requirements.txt
#RUN pip install $pip_extra_index_url_flag .
#RUN pip install .[server]
ARG EXTRA_INDEX_URL="https://pip.grassrootseconomics.net:8433"
ARG GITLAB_PYTHON_REGISTRY="https://gitlab.com/api/v4/projects/27624814/packages/pypi/simple"
RUN pip install --index-url https://pypi.org/simple \
--extra-index-url $GITLAB_PYTHON_REGISTRY --extra-index-url $EXTRA_INDEX_URL \
-r requirements.txt
COPY . .
RUN python setup.py install
# ini files in config directory defines the configurable parameters for the application
# they can all be overridden by environment variables
# to generate a list of environment variables from configuration, use: confini-dump -z <dir> (executable provided by confini package)
COPY config/ /usr/local/etc/cic-cache/
# for db migrations
RUN git clone https://github.com/vishnubob/wait-for-it.git /usr/local/bin/wait-for-it/
COPY cic_cache/db/migrations/ /usr/local/share/cic-cache/alembic/
COPY /docker/start_tracker.sh ./start_tracker.sh
COPY /docker/db.sh ./db.sh
RUN chmod 755 ./*.sh
# Tracker
# ENTRYPOINT ["/usr/local/bin/cic-cache-tracker", "-vv"]
# Server
# ENTRYPOINT [ "/usr/local/bin/uwsgi", "--wsgi-file", "/usr/local/lib/python3.8/site-packages/cic_cache/runnable/server.py", "--http", ":80", "--pyargv", "-vv" ]
ENTRYPOINT []

View File

@@ -1,13 +1,12 @@
cic-base==0.1.3a3+build.984b5cff
cic-base~=0.1.2b8
alembic==1.4.2
confini~=0.3.6rc3
uwsgi==2.0.19.1
moolb~=0.1.0
cic-eth-registry~=0.5.6a1
cic-eth-registry~=0.5.5a4
SQLAlchemy==1.3.20
semver==2.13.0
psycopg2==2.8.6
celery==4.4.7
redis==3.5.3
chainsyncer[sql]~=0.0.3a3
erc20-faucet~=0.2.2a1
chainsyncer[sql]~=0.0.2a4

View File

@@ -2,7 +2,6 @@
import os
import argparse
import logging
import re
import alembic
from alembic.config import Config as AlembicConfig
@@ -24,8 +23,6 @@ argparser = argparse.ArgumentParser()
argparser.add_argument('-c', type=str, default=config_dir, help='config file')
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
argparser.add_argument('--migrations-dir', dest='migrations_dir', default=migrationsdir, type=str, help='path to alembic migrations directory')
argparser.add_argument('--reset', action='store_true', help='downgrade before upgrading')
argparser.add_argument('-f', action='store_true', help='force action')
argparser.add_argument('-v', action='store_true', help='be verbose')
argparser.add_argument('-vv', action='store_true', help='be more verbose')
args = argparser.parse_args()
@@ -56,10 +53,4 @@ ac = AlembicConfig(os.path.join(migrations_dir, 'alembic.ini'))
ac.set_main_option('sqlalchemy.url', dsn)
ac.set_main_option('script_location', migrations_dir)
if args.reset:
if not args.f:
if not re.match(r'[yY][eE]?[sS]?', input('EEK! this will DELETE the existing db. are you sure??')):
logg.error('user chickened out on requested reset, bailing')
sys.exit(1)
alembic.command.downgrade(ac, 'base')
alembic.command.upgrade(ac, 'head')

View File

@@ -6,5 +6,6 @@ sqlparse==0.4.1
pytest-celery==0.0.0a1
eth_tester==0.5.0b3
py-evm==0.3.0a20
cic_base[full]==0.1.3a3+build.984b5cff
sarafu-faucet~=0.0.4a1
web3==5.12.2
cic-eth-registry~=0.5.5a3
cic-base[full]==0.1.2b8

View File

@@ -5,12 +5,9 @@ import datetime
# external imports
import pytest
import moolb
# local imports
from cic_cache import db
from cic_cache import BloomCache
from cic_cache.cache import DEFAULT_FILTER_SIZE
script_dir = os.path.dirname(os.path.realpath(__file__))
root_dir = os.path.dirname(script_dir)
@@ -91,20 +88,3 @@ def txs(
tx_hash_first,
tx_hash_second,
]
@pytest.fixture(scope='function')
def tag_txs(
init_database,
txs,
):
db.add_tag(init_database, 'taag', domain='test')
init_database.commit()
db.tag_transaction(init_database, txs[1], 'taag', domain='test')
@pytest.fixture(scope='session')
def zero_filter():
return moolb.Bloom(DEFAULT_FILTER_SIZE, 3)

View File

@@ -10,7 +10,6 @@ from sqlalchemy import text
from chainlib.eth.tx import Tx
from chainlib.eth.block import Block
from chainlib.chain import ChainSpec
from chainlib.eth.error import RequestMismatchException
from hexathon import (
strip_0x,
add_0x,
@@ -19,22 +18,11 @@ from hexathon import (
# local imports
from cic_cache.db import add_tag
from cic_cache.runnable.daemons.filters.erc20 import ERC20TransferFilter
from cic_cache.runnable.daemons.filters.base import TagSyncFilter
logg = logging.getLogger()
def test_base_filter_str(
init_database,
):
f = TagSyncFilter('foo')
assert 'foo' == str(f)
f = TagSyncFilter('foo', domain='bar')
assert 'bar.foo' == str(f)
def test_erc20_filter(
def test_cache(
eth_rpc,
foo_token,
init_database,
@@ -79,95 +67,3 @@ def test_erc20_filter(
s = text("SELECT x.tx_hash FROM tag a INNER JOIN tag_tx_link l ON l.tag_id = a.id INNER JOIN tx x ON x.id = l.tx_id WHERE a.domain = :a AND a.value = :b")
r = init_database.execute(s, {'a': fltr.tag_domain, 'b': fltr.tag_name}).fetchone()
assert r[0] == tx.hash
def test_erc20_filter_nocontract(
eth_rpc,
foo_token,
init_database,
list_defaults,
list_actors,
tags,
):
chain_spec = ChainSpec('foo', 'bar', 42, 'baz')
fltr = ERC20TransferFilter(chain_spec)
add_tag(init_database, fltr.tag_name, domain=fltr.tag_domain)
# incomplete args
data = 'a9059cbb'
data += strip_0x(list_actors['alice'])
data += '1000'.ljust(64, '0')
block = Block({
'hash': os.urandom(32).hex(),
'number': 42,
'timestamp': datetime.datetime.utcnow().timestamp(),
'transactions': [],
})
tx = Tx({
'to': os.urandom(20).hex(),
'from': list_actors['bob'],
'data': data,
'value': 0,
'hash': os.urandom(32).hex(),
'nonce': 13,
'gasPrice': 10000000,
'gas': 123456,
})
block.txs.append(tx)
tx.block = block
assert not fltr.filter(eth_rpc, block, tx, db_session=init_database)
@pytest.mark.parametrize(
'contract_method,contract_input,expected_exception',
[
('a9059cbb', os.urandom(32).hex(), ValueError), # not enough args
('a9059cbb', os.urandom(31).hex(), ValueError), # wrong arg boundary
('a9059cbc', os.urandom(64).hex(), RequestMismatchException), # wrong method
],
)
def test_erc20_filter_bogus(
eth_rpc,
foo_token,
init_database,
list_defaults,
list_actors,
tags,
contract_method,
contract_input,
expected_exception,
):
chain_spec = ChainSpec('foo', 'bar', 42, 'baz')
fltr = ERC20TransferFilter(chain_spec)
add_tag(init_database, fltr.tag_name, domain=fltr.tag_domain)
# incomplete args
data = contract_method
data += contract_input
block = Block({
'hash': os.urandom(32).hex(),
'number': 42,
'timestamp': datetime.datetime.utcnow().timestamp(),
'transactions': [],
})
tx = Tx({
'to': foo_token,
'from': list_actors['bob'],
'data': data,
'value': 0,
'hash': os.urandom(32).hex(),
'nonce': 13,
'gasPrice': 10000000,
'gas': 123456,
})
block.txs.append(tx)
tx.block = block
assert not fltr.filter(eth_rpc, block, tx, db_session=init_database)

View File

@@ -1,71 +0,0 @@
# standard imports
import logging
# external imports
from chainlib.chain import ChainSpec
from chainlib.eth.nonce import RPCNonceOracle
from chainlib.eth.block import (
block_by_hash,
Block,
)
from chainlib.eth.tx import (
receipt,
unpack,
transaction,
Tx,
)
from hexathon import strip_0x
from erc20_faucet.faucet import SingleShotFaucet
from sqlalchemy import text
# local imports
from cic_cache.db import add_tag
from cic_cache.runnable.daemons.filters.faucet import FaucetFilter
logg = logging.getLogger()
def test_filter_faucet(
eth_rpc,
eth_signer,
foo_token,
faucet_noregistry,
init_database,
list_defaults,
contract_roles,
agent_roles,
tags,
):
chain_spec = ChainSpec('foo', 'bar', 42, 'baz')
fltr = FaucetFilter(chain_spec, contract_roles['CONTRACT_DEPLOYER'])
add_tag(init_database, fltr.tag_name, domain=fltr.tag_domain)
nonce_oracle = RPCNonceOracle(agent_roles['ALICE'], eth_rpc)
c = SingleShotFaucet(chain_spec, signer=eth_signer, nonce_oracle=nonce_oracle)
(tx_hash_hex, o) = c.give_to(faucet_noregistry, agent_roles['ALICE'], agent_roles['ALICE'])
r = eth_rpc.do(o)
tx_src = unpack(bytes.fromhex(strip_0x(o['params'][0])), chain_spec)
o = receipt(r)
r = eth_rpc.do(o)
rcpt = Tx.src_normalize(r)
assert r['status'] == 1
o = block_by_hash(r['block_hash'])
r = eth_rpc.do(o)
block_object = Block(r)
tx = Tx(tx_src, block_object)
tx.apply_receipt(rcpt)
r = fltr.filter(eth_rpc, block_object, tx, init_database)
assert r
s = text("SELECT x.tx_hash FROM tag a INNER JOIN tag_tx_link l ON l.tag_id = a.id INNER JOIN tx x ON x.id = l.tx_id WHERE a.domain = :a AND a.value = :b")
r = init_database.execute(s, {'a': fltr.tag_domain, 'b': fltr.tag_name}).fetchone()
assert r[0] == tx.hash

View File

@@ -1,31 +0,0 @@
# standard imports
import json
# external imports
import pytest
# local imports
from cic_cache.runnable.daemons.query import process_transactions_all_data
def test_api_all_data(
init_database,
txs,
):
env = {
'PATH_INFO': '/txa/410000/420000',
'HTTP_X_CIC_CACHE_MODE': 'all',
}
j = process_transactions_all_data(init_database, env)
o = json.loads(j[1])
assert len(o['data']) == 2
env = {
'PATH_INFO': '/txa/420000/410000',
'HTTP_X_CIC_CACHE_MODE': 'all',
}
with pytest.raises(ValueError):
j = process_transactions_all_data(init_database, env)

View File

@@ -9,7 +9,6 @@ import pytest
# local imports
from cic_cache import BloomCache
from cic_cache.cache import DataCache
logg = logging.getLogger()
@@ -34,23 +33,3 @@ def test_cache(
assert b[0] == list_defaults['block'] - 1
def test_cache_data(
init_database,
list_defaults,
list_actors,
list_tokens,
txs,
tag_txs,
):
session = init_database
c = DataCache(session)
b = c.load_transactions_with_data(410000, 420000)
assert len(b[2]) == 2
assert b[2][0]['tx_hash'] == txs[1]
assert b[2][1]['tx_type'] == 'unknown'
assert b[2][0]['tx_type'] == 'test.taag'

View File

@@ -1,230 +0,0 @@
# standard imports
import logging
import json
import base64
import copy
import re
# external imports
import pytest
from hexathon import strip_0x
# local imports
from cic_cache.runnable.daemons.query import *
logg = logging.getLogger()
@pytest.mark.parametrize(
'query_path_prefix, query_role, query_address_index, query_offset, query_offset_index, query_limit, query_limit_index, match_re',
[
('/tx/user/', 'alice', 0, None, 3, None, 5, re_transactions_account_bloom),
('/tx/user/', 'alice', 0, 42, 3, None, 5, re_transactions_account_bloom),
('/tx/user/', 'alice', 0, 42, 3, 13, 5, re_transactions_account_bloom),
('/tx/', None, 0, None, 3, None, 5, re_transactions_all_bloom),
('/tx/', None, 0, 42, 3, None, 5, re_transactions_all_bloom),
('/tx/', None, 0, 42, 3, 13, 5, re_transactions_all_bloom),
('/txa/', None, 0, None, 3, None, 5, re_transactions_all_data),
('/txa/', None, 0, 42, 3, None, 5, re_transactions_all_data),
('/txa/', None, 0, 42, 3, 13, 5, re_transactions_all_data),
],
)
def test_query_regex(
list_actors,
query_path_prefix,
query_role,
query_address_index,
query_offset,
query_offset_index,
query_limit,
query_limit_index,
match_re,
):
paths = []
path = query_path_prefix
query_address = None
if query_role != None:
query_address = strip_0x(list_actors[query_role])
paths.append(path + '0x' + query_address)
paths.append(path + query_address)
if query_offset != None:
if query_limit != None:
for i in range(len(paths)-1):
paths[i] += '/{}/{}'.format(query_offset, query_limit)
else:
for i in range(len(paths)-1):
paths[i] += '/' + str(query_offset)
for i in range(len(paths)):
paths.append(paths[i] + '/')
for p in paths:
logg.debug('testing path {} against {}'.format(p, match_re))
m = re.match(match_re, p)
l = len(m.groups())
logg.debug('laast index match {} groups {}'.format(m.lastindex, l))
for i in range(l+1):
logg.debug('group {} {}'.format(i, m[i]))
if m.lastindex >= query_offset_index:
assert query_offset == int(m[query_offset_index + 1])
if m.lastindex >= query_limit_index:
assert query_limit == int(m[query_limit_index + 1])
if query_address_index != None:
match_address = strip_0x(m[query_address_index + 1])
assert query_address == match_address
@pytest.mark.parametrize(
'role_name, query_offset, query_limit, query_match',
[
('alice', None, None, [(420000, 13), (419999, 42)]),
('alice', None, 1, [(420000, 13)]),
('alice', 1, None, [(419999, 42)]), # 420000 == list_defaults['block']
('alice', 2, None, []), # 420000 == list_defaults['block']
],
)
def test_query_process_txs_account(
init_database,
list_defaults,
list_actors,
list_tokens,
txs,
zero_filter,
role_name,
query_offset,
query_limit,
query_match,
):
actor = None
try:
actor = list_actors[role_name]
except KeyError:
actor = os.urandom(20).hex()
path_info = '/tx/user/0x' + strip_0x(actor)
if query_offset != None:
path_info += '/' + str(query_offset)
if query_limit != None:
if query_offset == None:
path_info += '/0'
path_info += '/' + str(query_limit)
env = {
'PATH_INFO': path_info,
}
logg.debug('using path {}'.format(path_info))
r = process_transactions_account_bloom(init_database, env)
assert r != None
o = json.loads(r[1])
block_filter_data = base64.b64decode(o['block_filter'].encode('utf-8'))
zero_filter_data = zero_filter.to_bytes()
if len(query_match) == 0:
assert block_filter_data == zero_filter_data
return
assert block_filter_data != zero_filter_data
block_filter = copy.copy(zero_filter)
block_filter.merge(block_filter_data)
block_filter_data = block_filter.to_bytes()
assert block_filter_data != zero_filter_data
for (block, tx) in query_match:
block = block.to_bytes(4, byteorder='big')
assert block_filter.check(block)
@pytest.mark.parametrize(
'query_offset, query_limit, query_match',
[
(None, 2, [(420000, 13), (419999, 42)]),
(0, 1, [(420000, 13)]),
(1, 1, [(419999, 42)]),
(2, 0, []),
],
)
def test_query_process_txs_bloom(
init_database,
list_defaults,
list_actors,
list_tokens,
txs,
zero_filter,
query_offset,
query_limit,
query_match,
):
path_info = '/tx'
if query_offset != None:
path_info += '/' + str(query_offset)
if query_limit != None:
if query_offset == None:
path_info += '/0'
path_info += '/' + str(query_limit)
env = {
'PATH_INFO': path_info,
}
logg.debug('using path {}'.format(path_info))
r = process_transactions_all_bloom(init_database, env)
assert r != None
o = json.loads(r[1])
block_filter_data = base64.b64decode(o['block_filter'].encode('utf-8'))
zero_filter_data = zero_filter.to_bytes()
if len(query_match) == 0:
assert block_filter_data == zero_filter_data
return
assert block_filter_data != zero_filter_data
block_filter = copy.copy(zero_filter)
block_filter.merge(block_filter_data)
block_filter_data = block_filter.to_bytes()
assert block_filter_data != zero_filter_data
for (block, tx) in query_match:
block = block.to_bytes(4, byteorder='big')
assert block_filter.check(block)
@pytest.mark.parametrize(
'query_block_start, query_block_end, query_match_count',
[
(None, 42, 0),
(420000, 420001, 1),
(419999, 419999, 1), # matches are inclusive
(419999, 420000, 2),
(419999, 420001, 2),
],
)
def test_query_process_txs_data(
init_database,
list_defaults,
list_actors,
list_tokens,
txs,
zero_filter,
query_block_start,
query_block_end,
query_match_count,
):
path_info = '/txa'
if query_block_start != None:
path_info += '/' + str(query_block_start)
if query_block_end != None:
if query_block_start == None:
path_info += '/0'
path_info += '/' + str(query_block_end)
env = {
'PATH_INFO': path_info,
'HTTP_X_CIC_CACHE_MODE': 'all',
}
logg.debug('using path {}'.format(path_info))
r = process_transactions_all_data(init_database, env)
assert r != None
o = json.loads(r[1])
assert len(o['data']) == query_match_count

View File

@@ -1 +0,0 @@
include *requirements.txt

View File

@@ -1,53 +0,0 @@
# standard imports
import logging
# external imports
import celery
from erc20_demurrage_token.demurrage import DemurrageCalculator
from chainlib.connection import RPCConnection
from chainlib.chain import ChainSpec
from chainlib.eth.constant import ZERO_ADDRESS
from cic_eth_registry import CICRegistry
logg = logging.getLogger(__name__)
celery_app = celery.current_app
class NoopCalculator:
def amount_since(self, amount, timestamp):
logg.debug('noopcalculator amount {} timestamp {}'.format(amount, timestamp))
return amount
class DemurrageCalculationTask(celery.Task):
demurrage_token_calcs = {}
@classmethod
def register_token(cls, rpc, chain_spec, token_symbol, sender_address=ZERO_ADDRESS):
registry = CICRegistry(chain_spec, rpc)
token_address = registry.by_name(token_symbol, sender_address=sender_address)
try:
c = DemurrageCalculator.from_contract(rpc, chain_spec, token_address, sender_address=sender_address)
logg.info('found demurrage calculator for ERC20 {} @ {}'.format(token_symbol, token_address))
except:
logg.warning('Token {} at address {} does not appear to be a demurrage contract. Calls to balance adjust for this token will always return the same amount'.format(token_symbol, token_address))
c = NoopCalculator()
cls.demurrage_token_calcs[token_symbol] = c
@celery_app.task(bind=True, base=DemurrageCalculationTask)
def get_adjusted_balance(self, token_symbol, amount, timestamp):
c = self.demurrage_token_calcs[token_symbol]
return c.amount_since(amount, timestamp)
def aux_setup(rpc, config, sender_address=ZERO_ADDRESS):
chain_spec_str = config.get('CIC_CHAIN_SPEC')
chain_spec = ChainSpec.from_chain_str(chain_spec_str)
token_symbol = config.get('CIC_DEFAULT_TOKEN_SYMBOL')
DemurrageCalculationTask.register_token(rpc, chain_spec, token_symbol, sender_address=sender_address)

View File

@@ -1,30 +0,0 @@
# standard imports
import logging
# external imports
import celery
from cic_eth.api.base import ApiBase
app = celery.current_app
logg = logging.getLogger(__name__)
class Api(ApiBase):
def get_adjusted_balance(self, token_symbol, balance, timestamp):
s = celery.signature(
'cic_eth_aux.erc20_demurrage_token.get_adjusted_balance',
[
token_symbol,
balance,
timestamp,
],
queue=None,
)
if self.callback_param != None:
s.link(self.callback_success)
s.link.on_error(self.callback_error)
t = s.apply_async(queue=self.queue)
return t

View File

@@ -1,5 +0,0 @@
celery==4.4.7
erc20-demurrage-token~=0.0.2a3
cic-eth-registry~=0.5.6a1
chainlib~=0.0.5a1
cic_eth~=0.12.0a2

View File

@@ -1,30 +0,0 @@
[metadata]
name = cic-eth-aux-erc20-demurrage-token
version = 0.0.2a4
description = cic-eth tasks supporting erc20 demurrage token
author = Louis Holbrook
author_email = dev@holbrook.no
url = https://gitlab.com/ccicnet/erc20-demurrage-token
keywords =
ethereum
blockchain
cryptocurrency
erc20
classifiers =
Programming Language :: Python :: 3
Operating System :: OS Independent
Development Status :: 3 - Alpha
Environment :: No Input/Output (Daemon)
Intended Audience :: Developers
License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)
Topic :: Internet
#Topic :: Blockchain :: EVM
license = GPL3
licence_files =
LICENSE
[options]
include_package_data = True
python_requires = >= 3.6
packages =
cic_eth_aux.erc20_demurrage_token

View File

@@ -1,25 +0,0 @@
from setuptools import setup
requirements = []
f = open('requirements.txt', 'r')
while True:
l = f.readline()
if l == '':
break
requirements.append(l.rstrip())
f.close()
test_requirements = []
f = open('test_requirements.txt', 'r')
while True:
l = f.readline()
if l == '':
break
test_requirements.append(l.rstrip())
f.close()
setup(
install_requires=requirements,
tests_require=test_requirements,
)

View File

@@ -1,12 +0,0 @@
pytest==6.0.1
pytest-celery==0.0.0a1
pytest-mock==3.3.1
pytest-cov==2.10.1
eth-tester==0.5.0b3
py-evm==0.3.0a20
SQLAlchemy==1.3.20
cic-eth~=0.12.0a1
liveness~=0.0.1a7
eth-accounts-index==0.0.12a1
eth-contract-registry==0.5.6a1
eth-address-index==0.1.2a1

View File

@@ -1,88 +0,0 @@
# external imports
import celery
from chainlib.eth.pytest.fixtures_chain import *
from chainlib.eth.pytest.fixtures_ethtester import *
from cic_eth_registry.pytest.fixtures_contracts import *
from cic_eth_registry.pytest.fixtures_tokens import *
from erc20_demurrage_token.unittest.base import TestTokenDeploy
from erc20_demurrage_token.token import DemurrageToken
from eth_token_index.index import TokenUniqueSymbolIndex
from eth_address_declarator.declarator import AddressDeclarator
# cic-eth imports
from cic_eth.pytest.fixtures_celery import *
from cic_eth.pytest.fixtures_token import *
from cic_eth.pytest.fixtures_config import *
@pytest.fixture(scope='function')
def demurrage_token(
default_chain_spec,
eth_rpc,
token_registry,
contract_roles,
eth_signer,
):
d = TestTokenDeploy(eth_rpc, token_symbol='BAR', token_name='Bar Token')
nonce_oracle = RPCNonceOracle(contract_roles['CONTRACT_DEPLOYER'], conn=eth_rpc)
c = DemurrageToken(default_chain_spec, signer=eth_signer, nonce_oracle=nonce_oracle)
token_address = d.deploy(eth_rpc, contract_roles['CONTRACT_DEPLOYER'], c, 'SingleNocap')
logg.debug('demurrage token contract "BAR" deployed to {}'.format(token_address))
return token_address
@pytest.fixture(scope='function')
def demurrage_token_symbol(
default_chain_spec,
eth_rpc,
demurrage_token,
contract_roles,
):
c = DemurrageToken(default_chain_spec)
o = c.symbol(demurrage_token, sender_address=contract_roles['CONTRACT_DEPLOYER'])
r = eth_rpc.do(o)
return c.parse_symbol(r)
@pytest.fixture(scope='function')
def demurrage_token_declaration(
foo_token_declaration,
):
return foo_token_declaration
@pytest.fixture(scope='function')
def register_demurrage_token(
default_chain_spec,
token_registry,
eth_rpc,
eth_signer,
register_lookups,
contract_roles,
demurrage_token_declaration,
demurrage_token,
address_declarator,
):
nonce_oracle = RPCNonceOracle(contract_roles['CONTRACT_DEPLOYER'], eth_rpc)
c = TokenUniqueSymbolIndex(default_chain_spec, signer=eth_signer, nonce_oracle=nonce_oracle)
(tx_hash_hex, o) = c.register(token_registry, contract_roles['CONTRACT_DEPLOYER'], demurrage_token)
eth_rpc.do(o)
o = receipt(tx_hash_hex)
r = eth_rpc.do(o)
assert r['status'] == 1
nonce_oracle = RPCNonceOracle(contract_roles['TRUSTED_DECLARATOR'], eth_rpc)
c = AddressDeclarator(default_chain_spec, signer=eth_signer, nonce_oracle=nonce_oracle)
(tx_hash_hex, o) = c.add_declaration(address_declarator, contract_roles['TRUSTED_DECLARATOR'], demurrage_token, demurrage_token_declaration)
eth_rpc.do(o)
o = receipt(tx_hash_hex)
r = eth_rpc.do(o)
assert r['status'] == 1
return token_registry

View File

@@ -1,69 +0,0 @@
# standard imports
import logging
import copy
import datetime
# external imports
import celery
# cic-eth imports
from cic_eth_aux.erc20_demurrage_token import (
DemurrageCalculationTask,
aux_setup,
)
from cic_eth_aux.erc20_demurrage_token.api import Api as AuxApi
logg = logging.getLogger()
def test_demurrage_calulate_task(
default_chain_spec,
eth_rpc,
cic_registry,
celery_session_worker,
register_demurrage_token,
demurrage_token_symbol,
contract_roles,
load_config,
):
config = copy.copy(load_config)
config.add(str(default_chain_spec), 'CIC_CHAIN_SPEC', exists_ok=True)
config.add(demurrage_token_symbol, 'CIC_DEFAULT_TOKEN_SYMBOL', exists_ok=True)
aux_setup(eth_rpc, load_config, sender_address=contract_roles['CONTRACT_DEPLOYER'])
since = datetime.datetime.utcnow() - datetime.timedelta(minutes=1)
s = celery.signature(
'cic_eth_aux.erc20_demurrage_token.get_adjusted_balance',
[
demurrage_token_symbol,
1000,
since.timestamp(),
],
queue=None,
)
t = s.apply_async()
r = t.get_leaf()
assert t.successful()
assert r == 980
def test_demurrage_calculate_api(
default_chain_spec,
eth_rpc,
cic_registry,
celery_session_worker,
register_demurrage_token,
demurrage_token_symbol,
contract_roles,
load_config,
):
api = AuxApi(str(default_chain_spec), queue=None)
since = datetime.datetime.utcnow() - datetime.timedelta(minutes=1)
t = api.get_adjusted_balance(demurrage_token_symbol, 1000, since.timestamp())
r = t.get_leaf()
assert t.successful()
assert r == 980

View File

@@ -5,5 +5,3 @@ omit =
cic_eth/db/migrations/*
cic_eth/sync/head.py
cic_eth/sync/mempool.py
cic_eth/queue/state.py
*redis*.py

View File

@@ -1,6 +0,0 @@
.git
.cache
.dot
**/doc
**/.venv
**/venv

View File

@@ -1,52 +1,22 @@
.cic_eth_variables:
variables:
APP_NAME: cic-eth
DOCKERFILE_PATH: docker/Dockerfile_ci
CONTEXT: apps/$APP_NAME
DOCKERFILE_PATH: $APP_NAME/docker/Dockerfile
.cic_eth_changes_target:
rules:
- changes:
- $CONTEXT/$APP_NAME/*
build-mr-cic-eth:
extends:
- .cic_eth_changes_target
- .py_build_merge_request
- .cic_eth_variables
- .py_build_target_dev
rules:
- if: $CI_PIPELINE_SOURCE == "merge_request_event"
changes:
- apps/cic-eth/**/*
when: always
test-mr-cic-eth:
stage: test
extends:
- .cic_eth_variables
cache:
key:
files:
- test_requirements.txt
paths:
- /root/.cache/pip
image: $MR_IMAGE_TAG
script:
- cd apps/$APP_NAME/
- >
pip install --extra-index-url https://pip.grassrootseconomics.net:8433
--extra-index-url https://gitlab.com/api/v4/projects/27624814/packages/pypi/simple
-r admin_requirements.txt
-r services_requirements.txt
-r test_requirements.txt
- export PYTHONPATH=. && pytest -x --cov=cic_eth --cov-fail-under=90 --cov-report term-missing tests
needs: ["build-mr-cic-eth"]
rules:
- if: $CI_PIPELINE_SOURCE == "merge_request_event"
changes:
- apps/cic-eth/**/*
when: always
build-push-cic-eth:
extends:
- .py_build_push
- .cic_eth_variables
rules:
- if: $CI_COMMIT_BRANCH == "master"
changes:
- apps/cic-eth/**/*
when: always

View File

@@ -1,2 +0,0 @@
include *requirements.txt config/test/*

View File

@@ -1,5 +0,0 @@
SQLAlchemy==1.3.20
cic-eth-registry~=0.5.6a1
hexathon~=0.0.1a7
chainqueue~=0.0.2b5
eth-erc20==0.0.10a2

View File

@@ -4,18 +4,11 @@ import logging
# external imports
import celery
from chainlib.chain import ChainSpec
from chainlib.connection import RPCConnection
from chainlib.eth.tx import (
unpack,
TxFactory,
)
from chainlib.eth.gas import OverrideGasOracle
from chainqueue.sql.query import get_tx
from chainqueue.sql.state import set_cancel
from chainlib.eth.tx import unpack
from chainqueue.query import get_tx
from chainqueue.state import set_cancel
from chainqueue.db.models.otx import Otx
from chainqueue.db.models.tx import TxCache
from hexathon import strip_0x
from potaahto.symbols import snake_and_camel
# local imports
from cic_eth.db.models.base import SessionBase
@@ -28,14 +21,13 @@ from cic_eth.admin.ctrl import (
)
from cic_eth.queue.tx import queue_create
from cic_eth.eth.gas import create_check_gas_task
from cic_eth.task import BaseTask
celery_app = celery.current_app
logg = logging.getLogger()
@celery_app.task(bind=True, base=BaseTask)
def shift_nonce(self, chainspec_dict, tx_hash_orig_hex, delta=1):
@celery_app.task(bind=True)
def shift_nonce(self, chain_str, tx_hash_orig_hex, delta=1):
"""Shift all transactions with nonces higher than the offset by the provided position delta.
Transactions who are replaced by transactions that move nonces will be marked as OVERRIDDEN.
@@ -46,29 +38,25 @@ def shift_nonce(self, chainspec_dict, tx_hash_orig_hex, delta=1):
:type tx_hash_orig_hex: str, 0x-hex
:param delta: Amount
"""
chain_spec = ChainSpec.from_dict(chainspec_dict)
rpc = RPCConnection.connect(chain_spec, 'default')
rpc_signer = RPCConnection.connect(chain_spec, 'signer')
queue = None
try:
queue = self.request.delivery_info.get('routing_key')
except AttributeError:
pass
session = BaseTask.session_func()
tx_brief = get_tx(chain_spec, tx_hash_orig_hex, session=session)
tx_raw = bytes.fromhex(strip_0x(tx_brief['signed_tx']))
chain_spec = ChainSpec.from_chain_str(chain_str)
tx_brief = get_tx(tx_hash_orig_hex)
tx_raw = bytes.fromhex(strip_0x(tx_brief['signed_tx'][2:]))
tx = unpack(tx_raw, chain_spec)
nonce = tx_brief['nonce']
address = tx['from']
logg.debug('shifting nonce {} position(s) for address {}, offset {}, hash {}'.format(delta, address, nonce, tx['hash']))
logg.debug('shifting nonce {} position(s) for address {}, offset {}'.format(delta, address, nonce))
lock_queue(None, chain_spec.asdict(), address=address)
lock_send(None, chain_spec.asdict(), address=address)
set_cancel(chain_spec, strip_0x(tx['hash']), manual=True, session=session)
lock_queue(None, chain_str, address)
lock_send(None, chain_str, address)
session = SessionBase.create_session()
q = session.query(Otx)
q = q.join(TxCache)
q = q.filter(TxCache.sender==address)
@@ -81,57 +69,49 @@ def shift_nonce(self, chainspec_dict, tx_hash_orig_hex, delta=1):
for otx in otxs:
tx_raw = bytes.fromhex(strip_0x(otx.signed_tx))
tx_new = unpack(tx_raw, chain_spec)
tx_new = snake_and_camel(tx_new)
tx_previous_hash_hex = tx_new['hash']
tx_previous_nonce = tx_new['nonce']
tx_new['gas_price'] += 1
tx_new['gasPrice'] = tx_new['gas_price']
tx_new['nonce'] -= delta
logg.debug('tx_new {}'.format(tx_new))
del(tx_new['hash'])
del(tx_new['hash_unsigned'])
del(tx_new['hashUnsigned'])
tx_new['nonce'] -= delta
gas_oracle = OverrideGasOracle(limit=tx_new['gas'], price=tx_new['gas_price'] + 1) # TODO: it should be possible to merely set this price here and if missing in the existing struct then fill it in (chainlib.eth.tx)
c = TxFactory(chain_spec, signer=rpc_signer, gas_oracle=gas_oracle)
(tx_hash_hex, tx_signed_raw_hex) = c.build_raw(tx_new)
(tx_hash_hex, tx_signed_raw_hex) = sign_tx(tx_new, chain_str)
logg.debug('tx {} -> {} nonce {} -> {}'.format(tx_previous_hash_hex, tx_hash_hex, tx_previous_nonce, tx_new['nonce']))
otx = Otx(
tx_new['nonce'],
tx_hash_hex,
tx_signed_raw_hex,
)
nonce=tx_new['nonce'],
address=tx_new['from'],
tx_hash=tx_hash_hex,
signed_tx=tx_signed_raw_hex,
)
session.add(otx)
session.commit()
# TODO: cancel all first, then replace. Otherwise we risk two non-locked states for two different nonces.
set_cancel(chain_spec, strip_0x(tx_previous_hash_hex), manual=True, session=session)
set_cancel(tx_previous_hash_hex, True)
TxCache.clone(tx_previous_hash_hex, tx_hash_hex, session=session)
TxCache.clone(tx_previous_hash_hex, tx_hash_hex)
tx_hashes.append(tx_hash_hex)
txs.append(tx_signed_raw_hex)
session.commit()
session.close()
s = create_check_gas_task(
s = create_check_gas_and_send_task(
txs,
chain_spec,
chain_str,
tx_new['from'],
gas=tx_new['gas'],
tx_hashes_hex=tx_hashes,
queue=queue,
tx_new['gas'],
tx_hashes,
queue,
)
s_unlock_send = celery.signature(
'cic_eth.admin.ctrl.unlock_send',
[
chain_spec.asdict(),
chain_str,
tx_new['from'],
],
queue=queue,
@@ -139,7 +119,7 @@ def shift_nonce(self, chainspec_dict, tx_hash_orig_hex, delta=1):
s_unlock_direct = celery.signature(
'cic_eth.admin.ctrl.unlock_queue',
[
chain_spec.asdict(),
chain_str,
tx_new['from'],
],
queue=queue,

View File

@@ -16,6 +16,4 @@ def default_token(self):
return {
'symbol': self.default_token_symbol,
'address': self.default_token_address,
'name': self.default_token_name,
'decimals': self.default_token_decimals,
}

View File

@@ -5,3 +5,4 @@
"""
from .api_task import Api
from .api_admin import AdminApi

View File

@@ -8,7 +8,6 @@ from chainlib.eth.constant import (
ZERO_ADDRESS,
)
from cic_eth_registry import CICRegistry
from cic_eth_registry.erc20 import ERC20Token
from cic_eth_registry.error import UnknownContractError
from chainlib.eth.address import to_checksum_address
from chainlib.eth.contract import code
@@ -31,14 +30,13 @@ from chainqueue.db.enum import (
status_str,
)
from chainqueue.error import TxStateChangeError
from chainqueue.sql.query import get_tx
from eth_erc20 import ERC20
# local imports
from cic_eth.db.models.base import SessionBase
from cic_eth.db.models.role import AccountRole
from cic_eth.db.models.nonce import Nonce
from cic_eth.error import InitializationError
from cic_eth.queue.query import get_tx
app = celery.current_app
@@ -190,7 +188,6 @@ class AdminApi:
s_manual = celery.signature(
'cic_eth.queue.state.set_manual',
[
chain_spec.asdict(),
tx_hash_hex,
],
queue=self.queue,
@@ -209,9 +206,8 @@ class AdminApi:
s.link(s_gas)
return s_manual.apply_async()
def check_nonce(self, chain_spec, address):
def check_nonce(self, address):
s = celery.signature(
'cic_eth.queue.query.get_account_tx',
[
@@ -232,12 +228,13 @@ class AdminApi:
s_get_tx = celery.signature(
'cic_eth.queue.query.get_tx',
[
chain_spec.asdict(),
chain_spec.asdict(),
k,
],
queue=self.queue,
)
tx = s_get_tx.apply_async().get()
#tx = get_tx(k)
logg.debug('checking nonce {} (previous {})'.format(tx['nonce'], last_nonce))
nonce_otx = tx['nonce']
if not is_alive(tx['status']) and tx['status'] & local_fail > 0:
@@ -245,9 +242,7 @@ class AdminApi:
blocking_tx = k
blocking_nonce = nonce_otx
elif nonce_otx - last_nonce > 1:
logg.debug('tx {}'.format(tx))
tx_obj = unpack(bytes.fromhex(strip_0x(tx['signed_tx'])), chain_spec)
logg.error('nonce gap; {} followed {} for account {}'.format(nonce_otx, last_nonce, tx_obj['from']))
logg.error('nonce gap; {} followed {} for account {}'.format(nonce_otx, last_nonce, tx['from']))
blocking_tx = k
blocking_nonce = nonce_otx
break
@@ -261,13 +256,12 @@ class AdminApi:
'blocking': blocking_nonce,
},
'tx': {
'blocking': add_0x(blocking_tx),
'blocking': blocking_tx,
}
}
}
# TODO: is risky since it does not validate that there is actually a nonce problem?
def fix_nonce(self, chain_spec, address, nonce):
def fix_nonce(self, address, nonce, chain_spec):
s = celery.signature(
'cic_eth.queue.query.get_account_tx',
[
@@ -281,17 +275,15 @@ class AdminApi:
txs = s.apply_async().get()
tx_hash_hex = None
session = SessionBase.create_session()
for k in txs.keys():
tx_dict = get_tx(chain_spec, k, session=session)
tx_dict = get_tx(k)
if tx_dict['nonce'] == nonce:
tx_hash_hex = k
session.close()
s_nonce = celery.signature(
'cic_eth.admin.nonce.shift_nonce',
[
chain_spec.asdict(),
self.rpc.chain_spec.asdict(),
tx_hash_hex,
],
queue=self.queue
@@ -396,13 +388,12 @@ class AdminApi:
t = s.apply_async()
tx = t.get()
source_token = None
if tx['source_token'] != ZERO_ADDRESS:
source_token_declaration = None
if registry != None:
try:
source_token_declaration = registry.by_address(tx['source_token'], sender_address=self.call_address)
source_token = registry.by_address(tx['source_token'])
except UnknownContractError:
logg.warning('unknown source token contract {} (direct)'.format(tx['source_token']))
else:
@@ -415,21 +406,16 @@ class AdminApi:
queue=self.queue
)
t = s.apply_async()
source_token_declaration = t.get()
if source_token_declaration != None:
logg.warning('found declarator record for source token {} but not checking validity'.format(tx['source_token']))
source_token = ERC20Token(chain_spec, self.rpc, tx['source_token'])
logg.debug('source token set tup {}'.format(source_token))
source_token = t.get()
if source_token == None:
logg.warning('unknown source token contract {} (task pool)'.format(tx['source_token']))
destination_token = None
if tx['destination_token'] != ZERO_ADDRESS:
destination_token_declaration = None
if registry != None:
try:
destination_token_declaration = registry.by_address(tx['destination_token'], sender_address=self.call_address)
destination_token = registry.by_address(tx['destination_token'])
except UnknownContractError:
logg.warning('unknown destination token contract {}'.format(tx['destination_token']))
else:
@@ -442,10 +428,10 @@ class AdminApi:
queue=self.queue
)
t = s.apply_async()
destination_token_declaration = t.get()
if destination_token_declaration != None:
logg.warning('found declarator record for destination token {} but not checking validity'.format(tx['destination_token']))
destination_token = ERC20Token(chain_spec, self.rpc, tx['destination_token'])
destination_token = t.get()
if destination_token == None:
logg.warning('unknown destination token contract {} (task pool)'.format(tx['destination_token']))
tx['sender_description'] = 'Custodial account'
tx['recipient_description'] = 'Custodial account'
@@ -557,19 +543,13 @@ class AdminApi:
if role != None:
tx['recipient_description'] = role
erc20_c = ERC20(chain_spec)
if source_token != None:
tx['source_token_symbol'] = source_token.symbol
o = erc20_c.balance_of(tx['source_token'], tx['sender'], sender_address=self.call_address)
r = self.rpc.do(o)
tx['sender_token_balance'] = erc20_c.parse_balance(r)
tx['source_token_symbol'] = source_token.symbol()
tx['sender_token_balance'] = source_token.function('balanceOf')(tx['sender']).call()
if destination_token != None:
tx['destination_token_symbol'] = destination_token.symbol
o = erc20_c.balance_of(tx['destination_token'], tx['recipient'], sender_address=self.call_address)
r = self.rpc.do(o)
tx['recipient_token_balance'] = erc20_c.parse_balance(r)
#tx['recipient_token_balance'] = destination_token.function('balanceOf')(tx['recipient']).call()
tx['destination_token_symbol'] = destination_token.symbol()
tx['recipient_token_balance'] = source_token.function('balanceOf')(tx['recipient']).call()
# TODO: this can mean either not subitted or culled, need to check other txs with same nonce to determine which
tx['network_status'] = 'Not in node'

View File

@@ -8,19 +8,59 @@ import logging
# external imports
import celery
from cic_eth_registry import CICRegistry
from chainlib.chain import ChainSpec
# local imports
from cic_eth.api.base import ApiBase
from cic_eth.enum import LockEnum
from cic_eth.db.enum import LockEnum
app = celery.current_app
logg = logging.getLogger(__name__)
class Api(ApiBase):
class Api:
"""Creates task chains to perform well-known CIC operations.
Each method that sends tasks returns details about the root task. The root task uuid can be provided in the callback, to enable to caller to correlate the result with individual calls. It can also be used to independently poll the completion of a task chain.
:param callback_param: Static value to pass to callback
:type callback_param: str
:param callback_task: Callback task that executes callback_param call. (Must be included by the celery worker)
:type callback_task: string
:param queue: Name of worker queue to submit tasks to
:type queue: str
"""
def __init__(self, chain_str, queue='cic-eth', callback_param=None, callback_task='cic_eth.callbacks.noop.noop', callback_queue=None):
self.chain_str = chain_str
self.chain_spec = ChainSpec.from_chain_str(chain_str)
self.callback_param = callback_param
self.callback_task = callback_task
self.queue = queue
logg.debug('api using queue {}'.format(self.queue))
self.callback_success = None
self.callback_error = None
if callback_queue == None:
callback_queue=self.queue
if callback_param != None:
self.callback_success = celery.signature(
callback_task,
[
callback_param,
0,
],
queue=callback_queue,
)
self.callback_error = celery.signature(
callback_task,
[
callback_param,
1,
],
queue=callback_queue,
)
def default_token(self):
s_token = celery.signature(
@@ -34,156 +74,29 @@ class Api(ApiBase):
return s_token.apply_async()
# def convert_transfer(self, from_address, to_address, target_return, minimum_return, from_token_symbol, to_token_symbol):
# """Executes a chain of celery tasks that performs conversion between two ERC20 tokens, and transfers to a specified receipient after convert has completed.
#
# :param from_address: Ethereum address of sender
# :type from_address: str, 0x-hex
# :param to_address: Ethereum address of receipient
# :type to_address: str, 0x-hex
# :param target_return: Estimated return from conversion
# :type target_return: int
# :param minimum_return: The least value of destination token return to allow
# :type minimum_return: int
# :param from_token_symbol: ERC20 token symbol of token being converted
# :type from_token_symbol: str
# :param to_token_symbol: ERC20 token symbol of token to receive
# :type to_token_symbol: str
# :returns: uuid of root task
# :rtype: celery.Task
# """
# raise NotImplementedError('out of service until new DEX migration is done')
# s_check = celery.signature(
# 'cic_eth.admin.ctrl.check_lock',
# [
# [from_token_symbol, to_token_symbol],
# self.chain_spec.asdict(),
# LockEnum.QUEUE,
# from_address,
# ],
# queue=self.queue,
# )
# s_nonce = celery.signature(
# 'cic_eth.eth.nonce.reserve_nonce',
# [
# self.chain_spec.asdict(),
# ],
# queue=self.queue,
# )
# s_tokens = celery.signature(
# 'cic_eth.eth.erc20.resolve_tokens_by_symbol',
# [
# self.chain_str,
# ],
# queue=self.queue,
# )
# s_convert = celery.signature(
# 'cic_eth.eth.bancor.convert_with_default_reserve',
# [
# from_address,
# target_return,
# minimum_return,
# to_address,
# self.chain_spec.asdict(),
# ],
# queue=self.queue,
# )
# s_nonce.link(s_tokens)
# s_check.link(s_nonce)
# if self.callback_param != None:
# s_convert.link(self.callback_success)
# s_tokens.link(s_convert).on_error(self.callback_error)
# else:
# s_tokens.link(s_convert)
#
# t = s_check.apply_async(queue=self.queue)
# return t
#
#
# def convert(self, from_address, target_return, minimum_return, from_token_symbol, to_token_symbol):
# """Executes a chain of celery tasks that performs conversion between two ERC20 tokens.
#
# :param from_address: Ethereum address of sender
# :type from_address: str, 0x-hex
# :param target_return: Estimated return from conversion
# :type target_return: int
# :param minimum_return: The least value of destination token return to allow
# :type minimum_return: int
# :param from_token_symbol: ERC20 token symbol of token being converted
# :type from_token_symbol: str
# :param to_token_symbol: ERC20 token symbol of token to receive
# :type to_token_symbol: str
# :returns: uuid of root task
# :rtype: celery.Task
# """
# raise NotImplementedError('out of service until new DEX migration is done')
# s_check = celery.signature(
# 'cic_eth.admin.ctrl.check_lock',
# [
# [from_token_symbol, to_token_symbol],
# self.chain_spec.asdict(),
# LockEnum.QUEUE,
# from_address,
# ],
# queue=self.queue,
# )
# s_nonce = celery.signature(
# 'cic_eth.eth.nonce.reserve_nonce',
# [
# self.chain_spec.asdict(),
# ],
# queue=self.queue,
# )
# s_tokens = celery.signature(
# 'cic_eth.eth.erc20.resolve_tokens_by_symbol',
# [
# self.chain_spec.asdict(),
# ],
# queue=self.queue,
# )
# s_convert = celery.signature(
# 'cic_eth.eth.bancor.convert_with_default_reserve',
# [
# from_address,
# target_return,
# minimum_return,
# from_address,
# self.chain_spec.asdict(),
# ],
# queue=self.queue,
# )
# s_nonce.link(s_tokens)
# s_check.link(s_nonce)
# if self.callback_param != None:
# s_convert.link(self.callback_success)
# s_tokens.link(s_convert).on_error(self.callback_error)
# else:
# s_tokens.link(s_convert)
#
# t = s_check.apply_async(queue=self.queue)
# return t
def transfer_from(self, from_address, to_address, value, token_symbol, spender_address):
"""Executes a chain of celery tasks that performs a transfer of ERC20 tokens by one address on behalf of another address to a third party.
def convert_transfer(self, from_address, to_address, target_return, minimum_return, from_token_symbol, to_token_symbol):
"""Executes a chain of celery tasks that performs conversion between two ERC20 tokens, and transfers to a specified receipient after convert has completed.
:param from_address: Ethereum address of sender
:type from_address: str, 0x-hex
:param to_address: Ethereum address of recipient
:param to_address: Ethereum address of receipient
:type to_address: str, 0x-hex
:param value: Estimated return from conversion
:type value: int
:param token_symbol: ERC20 token symbol of token to send
:type token_symbol: str
:param spender_address: Ethereum address of recipient
:type spender_address: str, 0x-hex
:param target_return: Estimated return from conversion
:type target_return: int
:param minimum_return: The least value of destination token return to allow
:type minimum_return: int
:param from_token_symbol: ERC20 token symbol of token being converted
:type from_token_symbol: str
:param to_token_symbol: ERC20 token symbol of token to receive
:type to_token_symbol: str
:returns: uuid of root task
:rtype: celery.Task
"""
raise NotImplementedError('out of service until new DEX migration is done')
s_check = celery.signature(
'cic_eth.admin.ctrl.check_lock',
[
[token_symbol],
[from_token_symbol, to_token_symbol],
self.chain_spec.asdict(),
LockEnum.QUEUE,
from_address,
@@ -194,7 +107,70 @@ class Api(ApiBase):
'cic_eth.eth.nonce.reserve_nonce',
[
self.chain_spec.asdict(),
],
queue=self.queue,
)
s_tokens = celery.signature(
'cic_eth.eth.erc20.resolve_tokens_by_symbol',
[
self.chain_str,
],
queue=self.queue,
)
s_convert = celery.signature(
'cic_eth.eth.bancor.convert_with_default_reserve',
[
from_address,
target_return,
minimum_return,
to_address,
self.chain_spec.asdict(),
],
queue=self.queue,
)
s_nonce.link(s_tokens)
s_check.link(s_nonce)
if self.callback_param != None:
s_convert.link(self.callback_success)
s_tokens.link(s_convert).on_error(self.callback_error)
else:
s_tokens.link(s_convert)
t = s_check.apply_async(queue=self.queue)
return t
def convert(self, from_address, target_return, minimum_return, from_token_symbol, to_token_symbol):
"""Executes a chain of celery tasks that performs conversion between two ERC20 tokens.
:param from_address: Ethereum address of sender
:type from_address: str, 0x-hex
:param target_return: Estimated return from conversion
:type target_return: int
:param minimum_return: The least value of destination token return to allow
:type minimum_return: int
:param from_token_symbol: ERC20 token symbol of token being converted
:type from_token_symbol: str
:param to_token_symbol: ERC20 token symbol of token to receive
:type to_token_symbol: str
:returns: uuid of root task
:rtype: celery.Task
"""
raise NotImplementedError('out of service until new DEX migration is done')
s_check = celery.signature(
'cic_eth.admin.ctrl.check_lock',
[
[from_token_symbol, to_token_symbol],
self.chain_spec.asdict(),
LockEnum.QUEUE,
from_address,
],
queue=self.queue,
)
s_nonce = celery.signature(
'cic_eth.eth.nonce.reserve_nonce',
[
self.chain_spec.asdict(),
],
queue=self.queue,
)
@@ -205,41 +181,29 @@ class Api(ApiBase):
],
queue=self.queue,
)
s_allow = celery.signature(
'cic_eth.eth.erc20.check_allowance',
s_convert = celery.signature(
'cic_eth.eth.bancor.convert_with_default_reserve',
[
from_address,
value,
target_return,
minimum_return,
from_address,
self.chain_spec.asdict(),
spender_address,
],
queue=self.queue,
)
s_transfer = celery.signature(
'cic_eth.eth.erc20.transfer_from',
[
from_address,
to_address,
value,
self.chain_spec.asdict(),
spender_address,
],
queue=self.queue,
)
s_tokens.link(s_allow)
s_nonce.link(s_tokens)
s_check.link(s_nonce)
if self.callback_param != None:
s_transfer.link(self.callback_success)
s_allow.link(s_transfer).on_error(self.callback_error)
s_convert.link(self.callback_success)
s_tokens.link(s_convert).on_error(self.callback_error)
else:
s_allow.link(s_transfer)
s_tokens.link(s_convert)
t = s_check.apply_async(queue=self.queue)
return t
def transfer(self, from_address, to_address, value, token_symbol):
"""Executes a chain of celery tasks that performs a transfer of ERC20 tokens from one address to another.

View File

@@ -1,52 +0,0 @@
# standard imports
import logging
# external imports
import celery
from chainlib.chain import ChainSpec
logg = logging.getLogger(__name__)
class ApiBase:
"""Creates task chains to perform well-known CIC operations.
Each method that sends tasks returns details about the root task. The root task uuid can be provided in the callback, to enable to caller to correlate the result with individual calls. It can also be used to independently poll the completion of a task chain.
:param callback_param: Static value to pass to callback
:type callback_param: str
:param callback_task: Callback task that executes callback_param call. (Must be included by the celery worker)
:type callback_task: string
:param queue: Name of worker queue to submit tasks to
:type queue: str
"""
def __init__(self, chain_str, queue='cic-eth', callback_param=None, callback_task='cic_eth.callbacks.noop.noop', callback_queue=None):
self.chain_str = chain_str
self.chain_spec = ChainSpec.from_chain_str(chain_str)
self.callback_param = callback_param
self.callback_task = callback_task
self.queue = queue
logg.debug('api using queue {}'.format(self.queue))
self.callback_success = None
self.callback_error = None
if callback_queue == None:
callback_queue=self.queue
if callback_param != None:
self.callback_success = celery.signature(
callback_task,
[
callback_param,
0,
],
queue=callback_queue,
)
self.callback_error = celery.signature(
callback_task,
[
callback_param,
1,
],
queue=callback_queue,
)

View File

@@ -1 +1,158 @@
from cic_eth.enum import *
# standard imports
import enum
@enum.unique
class StatusBits(enum.IntEnum):
"""Individual bit flags that are combined to define the state and legacy of a queued transaction
"""
QUEUED = 0x01 # transaction should be sent to network
IN_NETWORK = 0x08 # transaction is in network
DEFERRED = 0x10 # an attempt to send the transaction to network has failed
GAS_ISSUES = 0x20 # transaction is pending sender account gas funding
LOCAL_ERROR = 0x100 # errors that originate internally from the component
NODE_ERROR = 0x200 # errors originating in the node (invalid RLP input...)
NETWORK_ERROR = 0x400 # errors that originate from the network (REVERT)
UNKNOWN_ERROR = 0x800 # unclassified errors (the should not occur)
FINAL = 0x1000 # transaction processing has completed
OBSOLETE = 0x2000 # transaction has been replaced by a different transaction with higher fee
MANUAL = 0x8000 # transaction processing has been manually overridden
@enum.unique
class StatusEnum(enum.IntEnum):
"""
- Inactive, not finalized. (<0)
* PENDING: The initial state of a newly added transaction record. No action has been performed on this transaction yet.
* SENDFAIL: The transaction was not received by the node.
* RETRY: The transaction is queued for a new send attempt after previously failing.
* READYSEND: The transaction is queued for its first send attempt
* OBSOLETED: A new transaction with the same nonce and higher gas has been sent to network.
* WAITFORGAS: The transaction is on hold pending gas funding.
- Active state: (==0)
* SENT: The transaction has been sent to the mempool.
- Inactive, finalized. (>0)
* FUBAR: Unknown error occurred and transaction is abandoned. Manual intervention needed.
* CANCELLED: The transaction was sent, but was not mined and has disappered from the mempool. This usually follows a transaction being obsoleted.
* OVERRIDDEN: Transaction has been manually overriden.
* REJECTED: The transaction was rejected by the node.
* REVERTED: The transaction was mined, but exception occurred during EVM execution. (Block number will be set)
* SUCCESS: THe transaction was successfully mined. (Block number will be set)
"""
PENDING = 0
SENDFAIL = StatusBits.DEFERRED | StatusBits.LOCAL_ERROR
RETRY = StatusBits.QUEUED | StatusBits.DEFERRED
READYSEND = StatusBits.QUEUED
OBSOLETED = StatusBits.OBSOLETE | StatusBits.IN_NETWORK
WAITFORGAS = StatusBits.GAS_ISSUES
SENT = StatusBits.IN_NETWORK
FUBAR = StatusBits.FINAL | StatusBits.UNKNOWN_ERROR
CANCELLED = StatusBits.IN_NETWORK | StatusBits.FINAL | StatusBits.OBSOLETE
OVERRIDDEN = StatusBits.FINAL | StatusBits.OBSOLETE | StatusBits.MANUAL
REJECTED = StatusBits.NODE_ERROR | StatusBits.FINAL
REVERTED = StatusBits.IN_NETWORK | StatusBits.FINAL | StatusBits.NETWORK_ERROR
SUCCESS = StatusBits.IN_NETWORK | StatusBits.FINAL
@enum.unique
class LockEnum(enum.IntEnum):
"""
STICKY: When set, reset is not possible
CREATE: Disable creation of accounts
SEND: Disable sending to network
QUEUE: Disable queueing new or modified transactions
"""
STICKY=1
INIT=2
CREATE=4
SEND=8
QUEUE=16
QUERY=32
ALL=int(0xfffffffffffffffe)
def status_str(v, bits_only=False):
"""Render a human-readable string describing the status
If the bit field exactly matches a StatusEnum value, the StatusEnum label will be returned.
If a StatusEnum cannot be matched, the string will be postfixed with "*", unless explicitly instructed to return bit field labels only.
:param v: Status bit field
:type v: number
:param bits_only: Only render individual bit labels.
:type bits_only: bool
:returns: Status string
:rtype: str
"""
s = ''
if not bits_only:
try:
s = StatusEnum(v).name
return s
except ValueError:
pass
if v == 0:
return 'NONE'
for i in range(16):
b = (1 << i)
if (b & 0xffff) & v:
n = StatusBits(b).name
if len(s) > 0:
s += ','
s += n
if not bits_only:
s += '*'
return s
def all_errors():
"""Bit mask of all error states
:returns: Error flags
:rtype: number
"""
return StatusBits.LOCAL_ERROR | StatusBits.NODE_ERROR | StatusBits.NETWORK_ERROR | StatusBits.UNKNOWN_ERROR
def is_error_status(v):
"""Check if value is an error state
:param v: Status bit field
:type v: number
:returns: True if error
:rtype: bool
"""
return bool(v & all_errors())
def dead():
"""Bit mask defining whether a transaction is still likely to be processed on the network.
:returns: Bit mask
:rtype: number
"""
return StatusBits.FINAL | StatusBits.OBSOLETE
def is_alive(v):
"""Check if transaction is still likely to be processed on the network.
The contingency of "likely" refers to the case a transaction has been obsoleted after sent to the network, but the network still confirms the obsoleted transaction. The return value of this method will not change as a result of this, BUT the state itself will (as the FINAL bit will be set).
:returns:
"""
return bool(v & dead() == 0)

View File

@@ -0,0 +1,8 @@
import math
def num_serialize(n):
if n == 0:
return b'\x00'
binlog = math.log2(n)
bytelength = int(binlog / 8 + 1)
return n.to_bytes(bytelength, 'big')

View File

@@ -1,158 +0,0 @@
# standard imports
import enum
@enum.unique
class StatusBits(enum.IntEnum):
"""Individual bit flags that are combined to define the state and legacy of a queued transaction
"""
QUEUED = 0x01 # transaction should be sent to network
IN_NETWORK = 0x08 # transaction is in network
DEFERRED = 0x10 # an attempt to send the transaction to network has failed
GAS_ISSUES = 0x20 # transaction is pending sender account gas funding
LOCAL_ERROR = 0x100 # errors that originate internally from the component
NODE_ERROR = 0x200 # errors originating in the node (invalid RLP input...)
NETWORK_ERROR = 0x400 # errors that originate from the network (REVERT)
UNKNOWN_ERROR = 0x800 # unclassified errors (the should not occur)
FINAL = 0x1000 # transaction processing has completed
OBSOLETE = 0x2000 # transaction has been replaced by a different transaction with higher fee
MANUAL = 0x8000 # transaction processing has been manually overridden
@enum.unique
class StatusEnum(enum.IntEnum):
"""
- Inactive, not finalized. (<0)
* PENDING: The initial state of a newly added transaction record. No action has been performed on this transaction yet.
* SENDFAIL: The transaction was not received by the node.
* RETRY: The transaction is queued for a new send attempt after previously failing.
* READYSEND: The transaction is queued for its first send attempt
* OBSOLETED: A new transaction with the same nonce and higher gas has been sent to network.
* WAITFORGAS: The transaction is on hold pending gas funding.
- Active state: (==0)
* SENT: The transaction has been sent to the mempool.
- Inactive, finalized. (>0)
* FUBAR: Unknown error occurred and transaction is abandoned. Manual intervention needed.
* CANCELLED: The transaction was sent, but was not mined and has disappered from the mempool. This usually follows a transaction being obsoleted.
* OVERRIDDEN: Transaction has been manually overriden.
* REJECTED: The transaction was rejected by the node.
* REVERTED: The transaction was mined, but exception occurred during EVM execution. (Block number will be set)
* SUCCESS: THe transaction was successfully mined. (Block number will be set)
"""
PENDING = 0
SENDFAIL = StatusBits.DEFERRED | StatusBits.LOCAL_ERROR
RETRY = StatusBits.QUEUED | StatusBits.DEFERRED
READYSEND = StatusBits.QUEUED
OBSOLETED = StatusBits.OBSOLETE | StatusBits.IN_NETWORK
WAITFORGAS = StatusBits.GAS_ISSUES
SENT = StatusBits.IN_NETWORK
FUBAR = StatusBits.FINAL | StatusBits.UNKNOWN_ERROR
CANCELLED = StatusBits.IN_NETWORK | StatusBits.FINAL | StatusBits.OBSOLETE
OVERRIDDEN = StatusBits.FINAL | StatusBits.OBSOLETE | StatusBits.MANUAL
REJECTED = StatusBits.NODE_ERROR | StatusBits.FINAL
REVERTED = StatusBits.IN_NETWORK | StatusBits.FINAL | StatusBits.NETWORK_ERROR
SUCCESS = StatusBits.IN_NETWORK | StatusBits.FINAL
@enum.unique
class LockEnum(enum.IntEnum):
"""
STICKY: When set, reset is not possible
CREATE: Disable creation of accounts
SEND: Disable sending to network
QUEUE: Disable queueing new or modified transactions
"""
STICKY=1
INIT=2
CREATE=4
SEND=8
QUEUE=16
QUERY=32
ALL=int(0xfffffffffffffffe)
def status_str(v, bits_only=False):
"""Render a human-readable string describing the status
If the bit field exactly matches a StatusEnum value, the StatusEnum label will be returned.
If a StatusEnum cannot be matched, the string will be postfixed with "*", unless explicitly instructed to return bit field labels only.
:param v: Status bit field
:type v: number
:param bits_only: Only render individual bit labels.
:type bits_only: bool
:returns: Status string
:rtype: str
"""
s = ''
if not bits_only:
try:
s = StatusEnum(v).name
return s
except ValueError:
pass
if v == 0:
return 'NONE'
for i in range(16):
b = (1 << i)
if (b & 0xffff) & v:
n = StatusBits(b).name
if len(s) > 0:
s += ','
s += n
if not bits_only:
s += '*'
return s
def all_errors():
"""Bit mask of all error states
:returns: Error flags
:rtype: number
"""
return StatusBits.LOCAL_ERROR | StatusBits.NODE_ERROR | StatusBits.NETWORK_ERROR | StatusBits.UNKNOWN_ERROR
def is_error_status(v):
"""Check if value is an error state
:param v: Status bit field
:type v: number
:returns: True if error
:rtype: bool
"""
return bool(v & all_errors())
def dead():
"""Bit mask defining whether a transaction is still likely to be processed on the network.
:returns: Bit mask
:rtype: number
"""
return StatusBits.FINAL | StatusBits.OBSOLETE
def is_alive(v):
"""Check if transaction is still likely to be processed on the network.
The contingency of "likely" refers to the case a transaction has been obsoleted after sent to the network, but the network still confirms the obsoleted transaction. The return value of this method will not change as a result of this, BUT the state itself will (as the FINAL bit will be set).
:returns:
"""
return bool(v & dead() == 0)

View File

@@ -80,8 +80,3 @@ class SignerError(SeppukuError):
class RoleAgencyError(SeppukuError):
"""Exception raise when a role cannot perform its function. This is a critical exception
"""
class YouAreBrokeError(Exception):
"""Exception raised when a value transfer is attempted without access to sufficient funds
"""

View File

@@ -20,8 +20,7 @@ from chainlib.eth.tx import (
)
from chainlib.chain import ChainSpec
from chainlib.error import JSONRPCException
from eth_accounts_index.registry import AccountRegistry
from eth_accounts_index import AccountsIndex
from eth_accounts_index.registry import AccountRegistry # TODO, use interface module instead (needs gas limit method)
from sarafu_faucet import MinterFaucet
from chainqueue.db.models.tx import TxCache
@@ -128,12 +127,12 @@ def register(self, account_address, chain_spec_dict, writer_address=None):
if writer_address == ZERO_ADDRESS:
session.close()
raise RoleMissingError('call address for resgistering {}'.format(account_address))
account_registry_address = registry.by_name('AccountRegistry', sender_address=call_address)
account_registry_address = registry.by_name('AccountsIndex', sender_address=call_address)
# Generate and sign transaction
rpc_signer = RPCConnection.connect(chain_spec, 'signer')
nonce_oracle = CustodialTaskNonceOracle(writer_address, self.request.root_id, session=session) #, default_nonce)
gas_oracle = self.create_gas_oracle(rpc, AccountRegistry.gas)
gas_oracle = self.create_gas_oracle(rpc, AccountsIndex.gas)
account_registry = AccountsIndex(chain_spec, signer=rpc_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle)
(tx_hash_hex, tx_signed_raw_hex) = account_registry.add(account_registry_address, writer_address, account_address, tx_format=TxFormat.RLP_SIGNED)
rpc_signer.disconnect()

View File

@@ -24,7 +24,6 @@ from cic_eth.error import (
TokenCountError,
PermanentTxError,
OutOfGasError,
YouAreBrokeError,
)
from cic_eth.queue.tx import register_tx
from cic_eth.eth.gas import (
@@ -72,117 +71,6 @@ def balance(tokens, holder_address, chain_spec_dict):
return tokens
@celery_app.task(bind=True)
def check_allowance(self, tokens, holder_address, value, chain_spec_dict, spender_address):
"""Best-effort verification that the allowance for a transfer from spend is sufficient.
:raises YouAreBrokeError: If allowance is insufficient
:param tokens: Token addresses
:type tokens: list of str, 0x-hex
:param holder_address: Token holder address
:type holder_address: str, 0x-hex
:param value: Amount of token, in 'wei'
:type value: int
:param chain_str: Chain spec string representation
:type chain_str: str
:param spender_address: Address of account spending on behalf of holder
:type spender_address: str, 0x-hex
:return: Token list as passed to task
:rtype: dict
"""
logg.debug('tokens {}'.format(tokens))
if len(tokens) != 1:
raise TokenCountError
t = tokens[0]
chain_spec = ChainSpec.from_dict(chain_spec_dict)
rpc = RPCConnection.connect(chain_spec, 'default')
caller_address = ERC20Token.caller_address
c = ERC20(chain_spec)
o = c.allowance(t['address'], holder_address, spender_address, sender_address=caller_address)
r = rpc.do(o)
allowance = c.parse_allowance(r)
if allowance < value:
errstr = 'allowance {} insufficent to transfer {} {} by {} on behalf of {}'.format(allowance, value, t['symbol'], spender_address, holder_address)
logg.error(errstr)
raise YouAreBrokeError(errstr)
return tokens
@celery_app.task(bind=True, base=CriticalSQLAlchemyAndSignerTask)
def transfer_from(self, tokens, holder_address, receiver_address, value, chain_spec_dict, spender_address):
"""Transfer ERC20 tokens between addresses
First argument is a list of tokens, to enable the task to be chained to the symbol to token address resolver function. However, it accepts only one token as argument.
:param tokens: Token addresses
:type tokens: list of str, 0x-hex
:param holder_address: Token holder address
:type holder_address: str, 0x-hex
:param receiver_address: Token receiver address
:type receiver_address: str, 0x-hex
:param value: Amount of token, in 'wei'
:type value: int
:param chain_str: Chain spec string representation
:type chain_str: str
:param spender_address: Address of account spending on behalf of holder
:type spender_address: str, 0x-hex
:raises TokenCountError: Either none or more then one tokens have been passed as tokens argument
:return: Transaction hash for tranfer operation
:rtype: str, 0x-hex
"""
# we only allow one token, one transfer
logg.debug('tokens {}'.format(tokens))
if len(tokens) != 1:
raise TokenCountError
t = tokens[0]
chain_spec = ChainSpec.from_dict(chain_spec_dict)
queue = self.request.delivery_info.get('routing_key')
rpc = RPCConnection.connect(chain_spec, 'default')
rpc_signer = RPCConnection.connect(chain_spec, 'signer')
session = self.create_session()
nonce_oracle = CustodialTaskNonceOracle(holder_address, self.request.root_id, session=session)
gas_oracle = self.create_gas_oracle(rpc, MaxGasOracle.gas)
c = ERC20(chain_spec, signer=rpc_signer, gas_oracle=gas_oracle, nonce_oracle=nonce_oracle)
try:
(tx_hash_hex, tx_signed_raw_hex) = c.transfer_from(t['address'], spender_address, holder_address, receiver_address, value, tx_format=TxFormat.RLP_SIGNED)
except FileNotFoundError as e:
raise SignerError(e)
except ConnectionError as e:
raise SignerError(e)
rpc_signer.disconnect()
rpc.disconnect()
cache_task = 'cic_eth.eth.erc20.cache_transfer_from_data'
register_tx(tx_hash_hex, tx_signed_raw_hex, chain_spec, queue, cache_task=cache_task, session=session)
session.commit()
session.close()
gas_pair = gas_oracle.get_gas(tx_signed_raw_hex)
gas_budget = gas_pair[0] * gas_pair[1]
logg.debug('transfer tx {} {} {}'.format(tx_hash_hex, queue, gas_budget))
s = create_check_gas_task(
[tx_signed_raw_hex],
chain_spec,
holder_address,
gas_budget,
[tx_hash_hex],
queue,
)
s.apply_async()
return tx_hash_hex
@celery_app.task(bind=True, base=CriticalSQLAlchemyAndSignerTask)
def transfer(self, tokens, holder_address, receiver_address, value, chain_spec_dict):
"""Transfer ERC20 tokens between addresses
@@ -344,7 +232,6 @@ def resolve_tokens_by_symbol(self, token_symbols, chain_spec_dict):
logg.debug('token {}'.format(token_address))
tokens.append({
'address': token_address,
'symbol': token_symbol,
'converters': [],
})
rpc.disconnect()
@@ -392,48 +279,6 @@ def cache_transfer_data(
return (tx_hash_hex, cache_id)
@celery_app.task(base=CriticalSQLAlchemyTask)
def cache_transfer_from_data(
tx_hash_hex,
tx_signed_raw_hex,
chain_spec_dict,
):
"""Helper function for otx_cache_transfer_from
:param tx_hash_hex: Transaction hash
:type tx_hash_hex: str, 0x-hex
:param tx: Signed raw transaction
:type tx: str, 0x-hex
:returns: Transaction hash and id of cache element in storage backend, respectively
:rtype: tuple
"""
chain_spec = ChainSpec.from_dict(chain_spec_dict)
tx_signed_raw_bytes = bytes.fromhex(strip_0x(tx_signed_raw_hex))
tx = unpack(tx_signed_raw_bytes, chain_spec)
tx_data = ERC20.parse_transfer_from_request(tx['data'])
spender_address = tx_data[0]
recipient_address = tx_data[1]
token_value = tx_data[2]
session = SessionBase.create_session()
tx_cache = TxCache(
tx_hash_hex,
tx['from'],
recipient_address,
tx['to'],
tx['to'],
token_value,
token_value,
session=session,
)
session.add(tx_cache)
session.commit()
cache_id = tx_cache.id
session.close()
return (tx_hash_hex, cache_id)
@celery_app.task(base=CriticalSQLAlchemyTask)
def cache_approve_data(
tx_hash_hex,

View File

@@ -57,12 +57,10 @@ celery_app = celery.current_app
logg = logging.getLogger()
MAXIMUM_FEE_UNITS = 8000000
class MaxGasOracle:
def gas(code=None):
return MAXIMUM_FEE_UNITS
return 8000000
def create_check_gas_task(tx_signed_raws_hex, chain_spec, holder_address, gas=None, tx_hashes_hex=None, queue=None):
@@ -152,7 +150,7 @@ def cache_gas_data(
@celery_app.task(bind=True, throws=(OutOfGasError), base=CriticalSQLAlchemyAndWeb3Task)
def check_gas(self, tx_hashes, chain_spec_dict, txs=[], address=None, gas_required=MAXIMUM_FEE_UNITS):
def check_gas(self, tx_hashes, chain_spec_dict, txs=[], address=None, gas_required=None):
"""Check the gas level of the sender address of a transaction.
If the account balance is not sufficient for the required gas, gas refill is requested and OutOfGasError raiser.
@@ -172,30 +170,24 @@ def check_gas(self, tx_hashes, chain_spec_dict, txs=[], address=None, gas_requir
:return: Signed raw transaction data list
:rtype: param txs, unchanged
"""
chain_spec = ChainSpec.from_dict(chain_spec_dict)
logg.debug('txs {} tx_hashes {}'.format(txs, tx_hashes))
addresspass = None
if len(txs) == 0:
addresspass = []
for i in range(len(tx_hashes)):
o = get_tx(chain_spec_dict, tx_hashes[i])
o = get_tx(tx_hashes[i])
txs.append(o['signed_tx'])
logg.debug('sender {}'.format(o))
tx = unpack(bytes.fromhex(strip_0x(o['signed_tx'])), chain_spec)
if address == None:
address = tx['from']
elif address != tx['from']:
raise ValueError('txs passed to check gas must all have same sender; had {} got {}'.format(address, tx['from']))
addresspass.append(address)
address = o['address']
#if not web3.Web3.isChecksumAddress(address):
if not is_checksum_address(address):
raise ValueError('invalid address {}'.format(address))
chain_spec = ChainSpec.from_dict(chain_spec_dict)
queue = self.request.delivery_info.get('routing_key')
conn = RPCConnection.connect(chain_spec)
# TODO: it should not be necessary to pass address explicitly, if not passed should be derived from the tx
gas_balance = 0
try:
o = balance(address)
@@ -206,9 +198,6 @@ def check_gas(self, tx_hashes, chain_spec_dict, txs=[], address=None, gas_requir
conn.disconnect()
raise EthError('gas_balance call for {}: {}'.format(address, e))
if gas_required == None:
gas_required = MAXIMUM_FEE_UNITS
logg.debug('address {} has gas {} needs {}'.format(address, gas_balance, gas_required))
session = SessionBase.create_session()
gas_provider = AccountRole.get_address('GAS_GIFTER', session=session)
@@ -279,8 +268,7 @@ def check_gas(self, tx_hashes, chain_spec_dict, txs=[], address=None, gas_requir
queue=queue,
)
ready_tasks.append(s)
t = celery.group(ready_tasks)()
logg.debug('group {}'.format(t))
celery.group(ready_tasks)()
return txs

View File

@@ -21,7 +21,6 @@ from chainqueue.db.models.tx import Otx
from chainqueue.db.models.tx import TxCache
from chainqueue.db.enum import StatusBits
from chainqueue.error import NotLocalTxError
from potaahto.symbols import snake_and_camel
# local imports
from cic_eth.db import SessionBase
@@ -59,9 +58,6 @@ def hashes_to_txs(self, tx_hashes):
if len(tx_hashes) == 0:
raise ValueError('no transaction to send')
for i in range(len(tx_hashes)):
tx_hashes[i] = strip_0x(tx_hashes[i])
queue = self.request.delivery_info['routing_key']
session = SessionBase.create_session()
@@ -152,7 +148,7 @@ def send(self, txs, chain_spec_dict):
@celery_app.task(bind=True, throws=(NotFoundEthException,), base=CriticalWeb3Task)
def sync_tx(self, tx_hash_hex, chain_spec_dict):
"""Force update of network status of a single transaction
"""Force update of network status of a simgle transaction
:param tx_hash_hex: Transaction hash
:type tx_hash_hex: str, 0x-hex
@@ -177,14 +173,12 @@ def sync_tx(self, tx_hash_hex, chain_spec_dict):
# TODO: apply receipt in tx object to validate and normalize input
if rcpt != None:
rcpt = snake_and_camel(rcpt)
success = rcpt['status'] == 1
logg.debug('sync tx {} mined block {} tx index {} success {}'.format(tx_hash_hex, rcpt['blockNumber'], rcpt['transactionIndex'], success))
logg.debug('sync tx {} mined block {} success {}'.format(tx_hash_hex, rcpt['blockNumber'], success))
s = celery.signature(
'cic_eth.queue.state.set_final',
[
chain_spec_dict,
tx_hash_hex,
rcpt['blockNumber'],
rcpt['transactionIndex'],
@@ -192,14 +186,12 @@ def sync_tx(self, tx_hash_hex, chain_spec_dict):
],
queue=queue,
)
# TODO: it's not entirely clear how we can reliable determine that its in mempool without explicitly checking
else:
logg.debug('sync tx {} mempool'.format(tx_hash_hex))
s = celery.signature(
'cic_eth.queue.state.set_sent',
[
chain_spec_dict,
tx_hash_hex,
],
queue=queue,

View File

@@ -19,7 +19,7 @@ from cic_eth_registry import CICRegistry
from cic_eth_registry.erc20 import ERC20Token
from chainqueue.db.models.otx import Otx
from chainqueue.db.enum import StatusEnum
from chainqueue.sql.query import get_tx_cache
from chainqueue.query import get_tx_cache
from eth_erc20 import ERC20
# local imports

View File

@@ -1,77 +0,0 @@
# standard imports
import os
# external imports
import pytest
from chainlib.eth.contract import (
ABIContractEncoder,
ABIContractType,
)
from chainlib.eth.nonce import RPCNonceOracle
from chainlib.eth.gas import OverrideGasOracle
from chainlib.eth.block import (
block_latest,
block_by_number,
Block,
)
from chainlib.eth.tx import (
receipt,
TxFactory,
TxFormat,
unpack,
Tx,
)
from hexathon import strip_0x
script_dir = os.path.dirname(os.path.realpath(__file__))
root_dir = os.path.dirname(script_dir)
@pytest.fixture(scope='function')
def bogus_tx_block(
default_chain_spec,
eth_rpc,
eth_signer,
contract_roles,
):
nonce_oracle = RPCNonceOracle(contract_roles['CONTRACT_DEPLOYER'], conn=eth_rpc)
gas_oracle = OverrideGasOracle(limit=2000000, conn=eth_rpc)
f = open(os.path.join(script_dir, 'testdata', 'Bogus.bin'), 'r')
bytecode = f.read()
f.close()
c = TxFactory(default_chain_spec, signer=eth_signer, gas_oracle=gas_oracle, nonce_oracle=nonce_oracle)
tx = c.template(contract_roles['CONTRACT_DEPLOYER'], None, use_nonce=True)
tx = c.set_code(tx, bytecode)
(tx_hash_hex, o) = c.build(tx)
r = eth_rpc.do(o)
o = receipt(tx_hash_hex)
r = eth_rpc.do(o)
contract_address = r['contract_address']
enc = ABIContractEncoder()
enc.method('poke')
data = enc.get()
tx = c.template(contract_roles['CONTRACT_DEPLOYER'], contract_address, use_nonce=True)
tx = c.set_code(tx, data)
(tx_hash_hex, o) = c.finalize(tx, TxFormat.JSONRPC)
r = eth_rpc.do(o)
tx_signed_raw_hex = strip_0x(o['params'][0])
o = block_latest()
r = eth_rpc.do(o)
o = block_by_number(r, include_tx=False)
r = eth_rpc.do(o)
block = Block(r)
block.txs = [tx_hash_hex]
tx_signed_raw_bytes = bytes.fromhex(strip_0x(tx_signed_raw_hex))
tx_src = unpack(tx_signed_raw_bytes, default_chain_spec)
tx = Tx(tx_src, block=block)
return (block, tx)

View File

@@ -1,19 +0,0 @@
# external imports
import pytest
from eth_erc20 import ERC20
# TODO: missing dep fixture includes
@pytest.fixture(scope='function')
def foo_token_symbol(
default_chain_spec,
foo_token,
eth_rpc,
contract_roles,
):
c = ERC20(default_chain_spec)
o = c.symbol(foo_token, sender_address=contract_roles['CONTRACT_DEPLOYER'])
r = eth_rpc.do(o)
return c.parse_symbol(r)

View File

@@ -1 +0,0 @@
60806040526000805534801561001457600080fd5b50610181806100246000396000f3fe608060405234801561001057600080fd5b5060043610610053576000357c0100000000000000000000000000000000000000000000000000000000900480630dbe671f146100585780631817835814610076575b600080fd5b610060610080565b60405161006d91906100ae565b60405180910390f35b61007e610086565b005b60005481565b600080815480929190610098906100d3565b9190505550565b6100a8816100c9565b82525050565b60006020820190506100c3600083018461009f565b92915050565b6000819050919050565b60006100de826100c9565b91507fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff8214156101115761011061011c565b5b600182019050919050565b7f4e487b7100000000000000000000000000000000000000000000000000000000600052601160045260246000fdfea264697066735822122034ad8e91e864f030d47f5b93e281869206c1b203c36dc79a209ac9c9c16e577564736f6c63430008040033

View File

@@ -1,10 +0,0 @@
pragma solidity ^0.8.0;
contract Bogus {
uint256 public a = 0;
function poke() public {
a++;
}
}

View File

@@ -5,7 +5,7 @@ import datetime
import celery
from chainlib.chain import ChainSpec
from chainlib.eth.tx import unpack
import chainqueue.sql.query
import chainqueue.query
from chainqueue.db.enum import (
StatusEnum,
is_alive,
@@ -28,7 +28,7 @@ celery_app = celery.current_app
def get_tx_cache(chain_spec_dict, tx_hash):
chain_spec = ChainSpec.from_dict(chain_spec_dict)
session = SessionBase.create_session()
r = chainqueue.sql.query.get_tx_cache(chain_spec, tx_hash, session=session)
r = chainqueue.query.get_tx_cache(chain_spec, tx_hash, session=session)
session.close()
return r
@@ -37,7 +37,7 @@ def get_tx_cache(chain_spec_dict, tx_hash):
def get_tx(chain_spec_dict, tx_hash):
chain_spec = ChainSpec.from_dict(chain_spec_dict)
session = SessionBase.create_session()
r = chainqueue.sql.query.get_tx(chain_spec, tx_hash, session=session)
r = chainqueue.query.get_tx(chain_spec, tx_hash)
session.close()
return r
@@ -46,7 +46,7 @@ def get_tx(chain_spec_dict, tx_hash):
def get_account_tx(chain_spec_dict, address, as_sender=True, as_recipient=True, counterpart=None):
chain_spec = ChainSpec.from_dict(chain_spec_dict)
session = SessionBase.create_session()
r = chainqueue.sql.query.get_account_tx(chain_spec, address, as_sender=True, as_recipient=True, counterpart=None, session=session)
r = chainqueue.query.get_account_tx(chain_spec, address, as_sender=True, as_recipient=True, counterpart=None, session=session)
session.close()
return r
@@ -55,17 +55,17 @@ def get_account_tx(chain_spec_dict, address, as_sender=True, as_recipient=True,
def get_upcoming_tx_nolock(chain_spec_dict, status=StatusEnum.READYSEND, not_status=None, recipient=None, before=None, limit=0, session=None):
chain_spec = ChainSpec.from_dict(chain_spec_dict)
session = SessionBase.create_session()
r = chainqueue.sql.query.get_upcoming_tx(chain_spec, status, not_status=not_status, recipient=recipient, before=before, limit=limit, session=session, decoder=unpack)
r = chainqueue.query.get_upcoming_tx(chain_spec, status, not_status=not_status, recipient=recipient, before=before, limit=limit, session=session, decoder=unpack)
session.close()
return r
def get_status_tx(chain_spec, status, not_status=None, before=None, exact=False, limit=0, session=None):
return chainqueue.sql.query.get_status_tx_cache(chain_spec, status, not_status=not_status, before=before, exact=exact, limit=limit, session=session, decoder=unpack)
return chainqueue.query.get_status_tx_cache(chain_spec, status, not_status=not_status, before=before, exact=exact, limit=limit, session=session, decoder=unpack)
def get_paused_tx(chain_spec, status=None, sender=None, session=None, decoder=None):
return chainqueue.sql.query.get_paused_tx_cache(chain_spec, status=status, sender=sender, session=session, decoder=unpack)
return chainqueue.query.get_paused_tx_cache(chain_spec, status=status, sender=sender, session=session, decoder=unpack)
def get_nonce_tx(chain_spec, nonce, sender):

View File

@@ -1,6 +1,6 @@
# external imports
from chainlib.chain import ChainSpec
import chainqueue.sql.state
import chainqueue.state
# local imports
import celery
@@ -14,7 +14,7 @@ celery_app = celery.current_app
def set_sent(chain_spec_dict, tx_hash, fail=False):
chain_spec = ChainSpec.from_dict(chain_spec_dict)
session = SessionBase.create_session()
r = chainqueue.sql.state.set_sent(chain_spec, tx_hash, fail, session=session)
r = chainqueue.state.set_sent(chain_spec, tx_hash, fail, session=session)
session.close()
return r
@@ -23,7 +23,7 @@ def set_sent(chain_spec_dict, tx_hash, fail=False):
def set_final(chain_spec_dict, tx_hash, block=None, tx_index=None, fail=False):
chain_spec = ChainSpec.from_dict(chain_spec_dict)
session = SessionBase.create_session()
r = chainqueue.sql.state.set_final(chain_spec, tx_hash, block=block, tx_index=tx_index, fail=fail, session=session)
r = chainqueue.state.set_final(chain_spec, tx_hash, block=block, tx_index=tx_index, fail=fail, session=session)
session.close()
return r
@@ -32,7 +32,7 @@ def set_final(chain_spec_dict, tx_hash, block=None, tx_index=None, fail=False):
def set_cancel(chain_spec_dict, tx_hash, manual=False):
chain_spec = ChainSpec.from_dict(chain_spec_dict)
session = SessionBase.create_session()
r = chainqueue.sql.state.set_cancel(chain_spec, tx_hash, manual, session=session)
r = chainqueue.state.set_cancel(chain_spec, tx_hash, manual, session=session)
session.close()
return r
@@ -41,7 +41,7 @@ def set_cancel(chain_spec_dict, tx_hash, manual=False):
def set_rejected(chain_spec_dict, tx_hash):
chain_spec = ChainSpec.from_dict(chain_spec_dict)
session = SessionBase.create_session()
r = chainqueue.sql.state.set_rejected(chain_spec, tx_hash, session=session)
r = chainqueue.state.set_rejected(chain_spec, tx_hash, session=session)
session.close()
return r
@@ -50,7 +50,7 @@ def set_rejected(chain_spec_dict, tx_hash):
def set_fubar(chain_spec_dict, tx_hash):
chain_spec = ChainSpec.from_dict(chain_spec_dict)
session = SessionBase.create_session()
r = chainqueue.sql.state.set_fubar(chain_spec, tx_hash, session=session)
r = chainqueue.state.set_fubar(chain_spec, tx_hash, session=session)
session.close()
return r
@@ -59,7 +59,7 @@ def set_fubar(chain_spec_dict, tx_hash):
def set_manual(chain_spec_dict, tx_hash):
chain_spec = ChainSpec.from_dict(chain_spec_dict)
session = SessionBase.create_session()
r = chainqueue.sql.state.set_manual(chain_spec, tx_hash, session=session)
r = chainqueue.state.set_manual(chain_spec, tx_hash, session=session)
session.close()
return r
@@ -68,7 +68,7 @@ def set_manual(chain_spec_dict, tx_hash):
def set_ready(chain_spec_dict, tx_hash):
chain_spec = ChainSpec.from_dict(chain_spec_dict)
session = SessionBase.create_session()
r = chainqueue.sql.state.set_ready(chain_spec, tx_hash, session=session)
r = chainqueue.state.set_ready(chain_spec, tx_hash, session=session)
session.close()
return r
@@ -77,7 +77,7 @@ def set_ready(chain_spec_dict, tx_hash):
def set_reserved(chain_spec_dict, tx_hash):
chain_spec = ChainSpec.from_dict(chain_spec_dict)
session = SessionBase.create_session()
r = chainqueue.sql.state.set_reserved(chain_spec, tx_hash, session=session)
r = chainqueue.state.set_reserved(chain_spec, tx_hash, session=session)
session.close()
return r
@@ -86,7 +86,7 @@ def set_reserved(chain_spec_dict, tx_hash):
def set_waitforgas(chain_spec_dict, tx_hash):
chain_spec = ChainSpec.from_dict(chain_spec_dict)
session = SessionBase.create_session()
r = chainqueue.sql.state.set_waitforgas(chain_spec, tx_hash, session=session)
r = chainqueue.state.set_waitforgas(chain_spec, tx_hash, session=session)
session.close()
return r
@@ -95,7 +95,7 @@ def set_waitforgas(chain_spec_dict, tx_hash):
def get_state_log(chain_spec_dict, tx_hash):
chain_spec = ChainSpec.from_dict(chain_spec_dict)
session = SessionBase.create_session()
r = chainqueue.sql.state.get_state_log(chain_spec, tx_hash, session=session)
r = chainqueue.state.get_state_log(chain_spec, tx_hash, session=session)
session.close()
return r
@@ -104,6 +104,6 @@ def get_state_log(chain_spec_dict, tx_hash):
def obsolete(chain_spec_dict, tx_hash, final):
chain_spec = ChainSpec.from_dict(chain_spec_dict)
session = SessionBase.create_session()
r = chainqueue.sql.state.obsolete_by_cache(chain_spec, tx_hash, final, session=session)
r = chainqueue.state.obsolete_by_cache(chain_spec, tx_hash, final, session=session)
session.close()
return r

View File

@@ -15,14 +15,14 @@ from sqlalchemy import tuple_
from sqlalchemy import func
from chainlib.chain import ChainSpec
from chainlib.eth.tx import unpack
import chainqueue.sql.state
import chainqueue.state
from chainqueue.db.enum import (
StatusEnum,
StatusBits,
is_alive,
dead,
)
from chainqueue.sql.tx import create
from chainqueue.tx import create
from chainqueue.error import NotLocalTxError
from chainqueue.db.enum import status_str

View File

@@ -5,30 +5,29 @@ import logging
from cic_eth_registry import CICRegistry
from cic_eth_registry.lookup.declarator import AddressDeclaratorLookup
from cic_eth_registry.lookup.tokenindex import TokenIndexLookup
from chainlib.eth.constant import ZERO_ADDRESS
logg = logging.getLogger()
def connect_token_registry(rpc, chain_spec, sender_address=ZERO_ADDRESS):
def connect_token_registry(rpc, chain_spec):
registry = CICRegistry(chain_spec, rpc)
token_registry_address = registry.by_name('TokenRegistry', sender_address=sender_address)
token_registry_address = registry.by_name('TokenRegistry')
logg.debug('using token registry address {}'.format(token_registry_address))
lookup = TokenIndexLookup(chain_spec, token_registry_address)
CICRegistry.add_lookup(lookup)
def connect_declarator(rpc, chain_spec, trusted_addresses, sender_address=ZERO_ADDRESS):
def connect_declarator(rpc, chain_spec, trusted_addresses):
registry = CICRegistry(chain_spec, rpc)
declarator_address = registry.by_name('AddressDeclarator', sender_address=sender_address)
declarator_address = registry.by_name('AddressDeclarator')
logg.debug('using declarator address {}'.format(declarator_address))
lookup = AddressDeclaratorLookup(chain_spec, declarator_address, trusted_addresses)
CICRegistry.add_lookup(lookup)
def connect(rpc, chain_spec, registry_address, sender_address=ZERO_ADDRESS):
def connect(rpc, chain_spec, registry_address):
CICRegistry.address = registry_address
registry = CICRegistry(chain_spec, rpc)
registry_address = registry.by_name('ContractRegistry', sender_address=sender_address)
registry_address = registry.by_name('ContractRegistry')
return registry

View File

@@ -12,7 +12,7 @@ from chainlib.eth.constant import ZERO_ADDRESS
from chainlib.eth.address import is_checksum_address
# local imports
from cic_eth.api.admin import AdminApi
from cic_eth.api import AdminApi
from cic_eth.db.enum import LockEnum
logging.basicConfig(level=logging.WARNING)

View File

@@ -21,7 +21,7 @@ from chainqueue.db.enum import (
StatusBits,
)
from chainqueue.error import NotLocalTxError
from chainqueue.sql.state import set_reserved
from chainqueue.state import set_reserved
# local imports
import cic_eth

View File

@@ -3,19 +3,16 @@ import logging
# external imports
import celery
from cic_eth_registry.error import (
UnknownContractError,
NotAContractError,
)
from cic_eth_registry.error import UnknownContractError
from chainlib.status import Status as TxStatus
from chainlib.eth.address import to_checksum_address
from chainlib.eth.error import RequestMismatchException
from chainlib.eth.constant import ZERO_ADDRESS
from chainlib.eth.erc20 import ERC20
from hexathon import (
strip_0x,
add_0x,
)
from eth_erc20 import ERC20
from erc20_faucet import Faucet
# local imports
@@ -72,9 +69,7 @@ class CallbackFilter(SyncFilter):
#transfer_data['token_address'] = tx.inputs[0]
faucet_contract = tx.inputs[0]
c = Faucet(self.chain_spec)
o = c.token(faucet_contract, sender_address=self.caller_address)
o = Faucet.token(faucet_contract, sender_address=self.caller_address)
r = conn.do(o)
transfer_data['token_address'] = add_0x(c.parse_token(r))
@@ -129,7 +124,8 @@ class CallbackFilter(SyncFilter):
(transfer_type, transfer_data) = parser(tx, conn)
if transfer_type == None:
continue
break
else:
pass
except RequestMismatchException:
continue
@@ -172,9 +168,7 @@ class CallbackFilter(SyncFilter):
t = self.call_back(transfer_type, result)
logg.info('callback success task id {} tx {} queue {}'.format(t, tx.hash, t.queue))
except UnknownContractError:
logg.debug('callback filter {}:{} skipping "transfer" method on unknown contract {} tx {}'.format(self.queue, self.method, transfer_data['to'], tx.hash))
except NotAContractError:
logg.debug('callback filter {}:{} skipping "transfer" on non-contract address {} tx {}'.format(self.queue, self.method, transfer_data['to'], tx.hash))
logg.debug('callback filter {}:{} skipping "transfer" method on unknown contract {} tx {}'.format(tx.queue, tx.method, transfer_data['to'], tx.hash))
def __str__(self):

View File

@@ -10,15 +10,14 @@ from chainlib.eth.tx import unpack
from chainqueue.db.enum import StatusBits
from chainqueue.db.models.tx import TxCache
from chainqueue.db.models.otx import Otx
from chainqueue.sql.query import get_paused_tx_cache as get_paused_tx
from chainqueue.query import get_paused_tx_cache as get_paused_tx
# local imports
from cic_eth.db.models.base import SessionBase
from cic_eth.eth.gas import create_check_gas_task
from .base import SyncFilter
#logg = logging.getLogger().getChild(__name__)
logg = logging.getLogger()
logg = logging.getLogger().getChild(__name__)
class GasFilter(SyncFilter):
@@ -28,11 +27,11 @@ class GasFilter(SyncFilter):
self.chain_spec = chain_spec
def filter(self, conn, block, tx, db_session):
def filter(self, conn, block, tx, session):
if tx.value > 0:
tx_hash_hex = add_0x(tx.hash)
logg.debug('gas refill tx {}'.format(tx_hash_hex))
session = SessionBase.bind_session(db_session)
session = SessionBase.bind_session(session)
q = session.query(TxCache.recipient)
q = q.join(Otx)
q = q.filter(Otx.tx_hash==strip_0x(tx_hash_hex))
@@ -57,7 +56,7 @@ class GasFilter(SyncFilter):
tx_hashes_hex=list(txs.keys()),
queue=self.queue,
)
return s.apply_async()
s.apply_async()
def __str__(self):

View File

@@ -14,7 +14,7 @@ from .base import SyncFilter
logg = logging.getLogger().getChild(__name__)
account_registry_add_log_hash = '0x9cc987676e7d63379f176ea50df0ae8d2d9d1141d1231d4ce15b5965f73c9430'
account_registry_add_log_hash = '0x5ed3bdd47b9af629827a8d129aa39c870b10c03f0153fe9ddb8e84b665061acd'
class RegistrationFilter(SyncFilter):
@@ -50,8 +50,7 @@ class RegistrationFilter(SyncFilter):
queue=self.queue,
)
s_nonce.link(s_gift)
t = s_nonce.apply_async()
return t
s_nonce.apply_async()
def __str__(self):

View File

@@ -3,7 +3,7 @@ import logging
# external imports
import celery
from chainqueue.sql.state import obsolete_by_cache
from chainqueue.state import obsolete_by_cache
logg = logging.getLogger()

View File

@@ -32,7 +32,7 @@ class TransferAuthFilter(SyncFilter):
self.transfer_request_contract = registry.by_name('TransferAuthorization', sender_address=call_address)
def filter(self, conn, block, tx, db_session): #rcpt, chain_str, session=None):
def filter(self, conn, block, tx, session): #rcpt, chain_str, session=None):
if tx.payload == None:
logg.debug('no payload')
@@ -45,17 +45,16 @@ class TransferAuthFilter(SyncFilter):
return False
recipient = tx.inputs[0]
#if recipient != self.transfer_request_contract.address():
if recipient != self.transfer_request_contract:
if recipient != self.transfer_request_contract.address():
logg.debug('not our transfer auth contract address {}'.format(recipient))
return False
r = TransferAuthorization.parse_create_request_request(tx.payload)
sender = r[0]
recipient = r[1]
token = r[2]
value = r[3]
sender = abi_decode_single(ABIContractType.ADDRESS, r[0])
recipient = abi_decode_single(ABIContractType.ADDRESS, r[1])
token = abi_decode_single(ABIContractType.ADDRESS, r[2])
value = abi_decode_single(ABIContractType.UINT256, r[3])
token_data = {
'address': token,
@@ -65,7 +64,6 @@ class TransferAuthFilter(SyncFilter):
'cic_eth.eth.nonce.reserve_nonce',
[
[token_data],
self.chain_spec.asdict(),
sender,
],
queue=self.queue,
@@ -82,7 +80,7 @@ class TransferAuthFilter(SyncFilter):
)
s_nonce.link(s_approve)
t = s_nonce.apply_async()
return t
return True
def __str__(self):

View File

@@ -30,7 +30,7 @@ class TxFilter(SyncFilter):
if otx == None:
logg.debug('tx {} not found locally, skipping'.format(tx_hash_hex))
return None
logg.debug('otx filter match on {}'.format(otx.tx_hash))
logg.info('tx filter match on {}'.format(otx.tx_hash))
db_session.flush()
SessionBase.release_session(db_session)
s_final_state = celery.signature(

View File

@@ -0,0 +1,136 @@
# standard imports
import os
import re
import logging
import argparse
import json
# third-party imports
import web3
import confini
import celery
from json.decoder import JSONDecodeError
from cic_registry.chain import ChainSpec
# local imports
from cic_eth.db import dsn_from_config
from cic_eth.db.models.base import SessionBase
from cic_eth.eth.util import unpack_signed_raw_tx
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
rootdir = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
dbdir = os.path.join(rootdir, 'cic_eth', 'db')
migrationsdir = os.path.join(dbdir, 'migrations')
config_dir = os.path.join('/usr/local/etc/cic-eth')
argparser = argparse.ArgumentParser()
argparser.add_argument('-c', type=str, default=config_dir, help='config file')
argparser.add_argument('-i', '--chain-spec', dest='i', type=str, help='chain spec')
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
argparser.add_argument('-q', type=str, default='cic-eth', help='queue name for worker tasks')
argparser.add_argument('-v', action='store_true', help='be verbose')
argparser.add_argument('-vv', action='store_true', help='be more verbose')
args = argparser.parse_args()
if args.vv:
logging.getLogger().setLevel(logging.DEBUG)
elif args.v:
logging.getLogger().setLevel(logging.INFO)
config = confini.Config(args.c, args.env_prefix)
config.process()
args_override = {
'CIC_CHAIN_SPEC': getattr(args, 'i'),
}
config.censor('PASSWORD', 'DATABASE')
config.censor('PASSWORD', 'SSL')
logg.debug('config:\n{}'.format(config))
dsn = dsn_from_config(config)
SessionBase.connect(dsn)
celery_app = celery.Celery(backend=config.get('CELERY_RESULT_URL'), broker=config.get('CELERY_BROKER_URL'))
queue = args.q
re_something = r'^/something/?'
chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
def process_something(session, env):
r = re.match(re_something, env.get('PATH_INFO'))
if not r:
return None
#if env.get('CONTENT_TYPE') != 'application/json':
# raise AttributeError('content type')
#if env.get('REQUEST_METHOD') != 'POST':
# raise AttributeError('method')
#post_data = json.load(env.get('wsgi.input'))
#return ('text/plain', 'foo'.encode('utf-8'),)
# uwsgi application
def application(env, start_response):
for k in env.keys():
logg.debug('env {} {}'.format(k, env[k]))
headers = []
content = b''
err = None
session = SessionBase.create_session()
for handler in [
process_something,
]:
try:
r = handler(session, env)
except AttributeError as e:
logg.error('handler fail attribute {}'.format(e))
err = '400 Impertinent request'
break
except JSONDecodeError as e:
logg.error('handler fail json {}'.format(e))
err = '400 Invalid data format'
break
except KeyError as e:
logg.error('handler fail key {}'.format(e))
err = '400 Invalid JSON'
break
except ValueError as e:
logg.error('handler fail value {}'.format(e))
err = '400 Invalid data'
break
except RuntimeError as e:
logg.error('task fail value {}'.format(e))
err = '500 Task failed, sorry I cannot tell you more'
break
if r != None:
(mime_type, content) = r
break
session.close()
if err != None:
headers.append(('Content-Type', 'text/plain, charset=UTF-8',))
start_response(err, headers)
session.close()
return [content]
headers.append(('Content-Length', str(len(content))),)
headers.append(('Access-Control-Allow-Origin', '*',));
if len(content) == 0:
headers.append(('Content-Type', 'text/plain, charset=UTF-8',))
start_response('404 Looked everywhere, sorry', headers)
else:
headers.append(('Content-Type', mime_type,))
start_response('200 OK', headers)
return [content]

View File

@@ -7,8 +7,6 @@ import tempfile
import re
import urllib
import websocket
import stat
import importlib
# external imports
import celery
@@ -24,7 +22,6 @@ from chainlib.eth.connection import (
from chainlib.chain import ChainSpec
from chainqueue.db.models.otx import Otx
from cic_eth_registry.error import UnknownContractError
from cic_eth_registry.erc20 import ERC20Token
import liveness.linux
@@ -39,7 +36,7 @@ from cic_eth.eth import (
from cic_eth.admin import (
debug,
ctrl,
token,
token
)
from cic_eth.queue import (
query,
@@ -70,8 +67,6 @@ from cic_eth.task import BaseTask
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
script_dir = os.path.dirname(os.path.realpath(__file__))
config_dir = os.path.join('/usr/local/etc/cic-eth')
argparser = argparse.ArgumentParser()
@@ -80,11 +75,10 @@ argparser.add_argument('-c', type=str, default=config_dir, help='config file')
argparser.add_argument('-q', type=str, default='cic-eth', help='queue name for worker tasks')
argparser.add_argument('-r', type=str, help='CIC registry address')
argparser.add_argument('--default-token-symbol', dest='default_token_symbol', type=str, help='Symbol of default token to use')
argparser.add_argument('--abi-dir', dest='abi_dir', type=str, help='Directory containing bytecode and abi')
argparser.add_argument('--trace-queue-status', default=None, dest='trace_queue_status', action='store_true', help='set to perist all queue entry status changes to storage')
argparser.add_argument('-i', '--chain-spec', dest='i', type=str, help='chain spec')
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
argparser.add_argument('--aux-all', action='store_true', help='include tasks from all submodules from the aux module path')
argparser.add_argument('--aux', action='append', type=str, default=[], help='add single submodule from the aux module path')
argparser.add_argument('-v', action='store_true', help='be verbose')
argparser.add_argument('-vv', action='store_true', help='be more verbose')
args = argparser.parse_args()
@@ -115,8 +109,6 @@ if len(health_modules) != 0:
health_modules = health_modules.split(',')
logg.debug('health mods {}'.format(health_modules))
# connect to database
dsn = dsn_from_config(config)
SessionBase.connect(dsn, pool_size=int(config.get('DATABASE_POOL_SIZE')), debug=config.true('DATABASE_DEBUG'))
@@ -129,25 +121,20 @@ broker = config.get('CELERY_BROKER_URL')
if broker[:4] == 'file':
bq = tempfile.mkdtemp()
bp = tempfile.mkdtemp()
conf_update = {
current_app.conf.update({
'broker_url': broker,
'broker_transport_options': {
'data_folder_in': bq,
'data_folder_out': bq,
'data_folder_processed': bp,
},
}
if config.true('CELERY_DEBUG'):
conf_update['result_extended'] = True
current_app.conf.update(conf_update)
},
)
logg.warning('celery broker dirs queue i/o {} processed {}, will NOT be deleted on shutdown'.format(bq, bp))
else:
conf_update = {
'broker_url': broker,
}
if config.true('CELERY_DEBUG'):
conf_update['result_extended'] = True
current_app.conf.update(conf_update)
current_app.conf.update({
'broker_url': broker,
})
result = config.get('CELERY_RESULT_URL')
if result[:4] == 'file':
@@ -175,84 +162,6 @@ Otx.tracing = config.true('TASKS_TRACE_QUEUE_STATUS')
# raise RuntimeError()
liveness.linux.load(health_modules, rundir=config.get('CIC_RUN_DIR'), config=config, unit='cic-eth-tasker')
rpc = RPCConnection.connect(chain_spec, 'default')
try:
registry = connect_registry(rpc, chain_spec, config.get('CIC_REGISTRY_ADDRESS'))
except UnknownContractError as e:
logg.exception('Registry contract connection failed for {}: {}'.format(config.get('CIC_REGISTRY_ADDRESS'), e))
sys.exit(1)
logg.info('connected contract registry {}'.format(config.get('CIC_REGISTRY_ADDRESS')))
trusted_addresses_src = config.get('CIC_TRUST_ADDRESS')
if trusted_addresses_src == None:
logg.critical('At least one trusted address must be declared in CIC_TRUST_ADDRESS')
sys.exit(1)
trusted_addresses = trusted_addresses_src.split(',')
for address in trusted_addresses:
logg.info('using trusted address {}'.format(address))
connect_declarator(rpc, chain_spec, trusted_addresses)
connect_token_registry(rpc, chain_spec)
# detect aux
# TODO: move to separate file
#aux_dir = os.path.join(script_dir, '..', '..', 'aux')
aux = []
if args.aux_all:
if len(args.aux) > 0:
logg.warning('--aux-all is set so --aux will have no effect')
for p in sys.path:
logg.debug('checking for aux modules in {}'.format(p))
aux_dir = os.path.join(p, 'cic_eth_aux')
try:
d = os.listdir(aux_dir)
except FileNotFoundError:
logg.debug('no aux module found in {}'.format(aux_dir))
continue
for v in d:
if v[:1] == '.':
logg.debug('dotfile, skip {}'.format(v))
continue
aux_mod_path = os.path.join(aux_dir, v)
st = os.stat(aux_mod_path)
if not stat.S_ISDIR(st.st_mode):
logg.debug('not a dir, skip {}'.format(v))
continue
aux_mod_file = os.path.join(aux_dir, v,'__init__.py')
try:
st = os.stat(aux_mod_file)
except FileNotFoundError:
logg.debug('__init__.py not found, skip {}'.format(v))
continue
aux.append(v)
logg.debug('found module {} in {}'.format(v, aux_dir))
elif len(args.aux) > 0:
for p in sys.path:
v_found = None
for v in args.aux:
aux_dir = os.path.join(p, 'cic_eth_aux')
aux_mod_file = os.path.join(aux_dir, v, '__init__.py')
try:
st = os.stat(aux_mod_file)
v_found = v
except FileNotFoundError:
logg.debug('cannot find explicity requested aux module {} in path {}'.format(v, aux_dir))
continue
if v_found == None:
logg.critical('excplicity requested aux module {} not found in any path'.format(v))
sys.exit(1)
logg.info('aux module {} found in path {}'.format(v, aux_dir))
aux.append(v)
for v in aux:
mname = 'cic_eth_aux.' + v
mod = importlib.import_module(mname)
mod.aux_setup(rpc, config)
logg.info('loaded aux module {}'.format(mname))
def main():
argv = ['worker']
if args.vv:
@@ -275,13 +184,25 @@ def main():
rpc = RPCConnection.connect(chain_spec, 'default')
try:
registry = connect_registry(rpc, chain_spec, config.get('CIC_REGISTRY_ADDRESS'))
except UnknownContractError as e:
logg.exception('Registry contract connection failed for {}: {}'.format(config.get('CIC_REGISTRY_ADDRESS'), e))
sys.exit(1)
trusted_addresses_src = config.get('CIC_TRUST_ADDRESS')
if trusted_addresses_src == None:
logg.critical('At least one trusted address must be declared in CIC_TRUST_ADDRESS')
sys.exit(1)
trusted_addresses = trusted_addresses_src.split(',')
for address in trusted_addresses:
logg.info('using trusted address {}'.format(address))
connect_declarator(rpc, chain_spec, trusted_addresses)
connect_token_registry(rpc, chain_spec)
BaseTask.default_token_symbol = config.get('CIC_DEFAULT_TOKEN_SYMBOL')
BaseTask.default_token_address = registry.by_name(BaseTask.default_token_symbol)
default_token = ERC20Token(chain_spec, rpc, BaseTask.default_token_address)
default_token.load(rpc)
BaseTask.default_token_decimals = default_token.decimals
BaseTask.default_token_name = default_token.name
BaseTask.run_dir = config.get('CIC_RUN_DIR')
logg.info('default token set to {} {}'.format(BaseTask.default_token_symbol, BaseTask.default_token_address))

View File

@@ -15,7 +15,6 @@ import cic_base.config
import cic_base.log
import cic_base.argparse
import cic_base.rpc
from cic_base.eth.syncer import chain_interface
from cic_eth_registry.error import UnknownContractError
from chainlib.chain import ChainSpec
from chainlib.eth.constant import ZERO_ADDRESS
@@ -27,8 +26,10 @@ from hexathon import (
strip_0x,
)
from chainsyncer.backend.sql import SQLBackend
from chainsyncer.driver.head import HeadSyncer
from chainsyncer.driver.history import HistorySyncer
from chainsyncer.driver import (
HeadSyncer,
HistorySyncer,
)
from chainsyncer.db.models.base import SessionBase
# local imports
@@ -50,23 +51,15 @@ from cic_eth.registry import (
script_dir = os.path.realpath(os.path.dirname(__file__))
def add_block_args(argparser):
argparser.add_argument('--history-start', type=int, default=0, dest='history_start', help='Start block height for initial history sync')
argparser.add_argument('--no-history', action='store_true', dest='no_history', help='Skip initial history sync')
return argparser
logg = cic_base.log.create()
argparser = cic_base.argparse.create(script_dir, cic_base.argparse.full_template)
argparser = cic_base.argparse.add(argparser, add_block_args, 'block')
#argparser = cic_base.argparse.add(argparser, add_traffic_args, 'traffic')
args = cic_base.argparse.parse(argparser, logg)
config = cic_base.config.create(args.c, args, args.env_prefix)
config.add(args.y, '_KEYSTORE_FILE', True)
config.add(args.q, '_CELERY_QUEUE', True)
config.add(args.history_start, 'SYNCER_HISTORY_START', True)
config.add(args.no_history, '_NO_HISTORY', True)
cic_base.config.log(config)
@@ -76,10 +69,9 @@ SessionBase.connect(dsn, pool_size=16, debug=config.true('DATABASE_DEBUG'))
chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
#RPCConnection.register_location(config.get('ETH_PROVIDER'), chain_spec, 'default')
cic_base.rpc.setup(chain_spec, config.get('ETH_PROVIDER'))
def main():
# connect to celery
celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL'))
@@ -97,7 +89,7 @@ def main():
stat = init_chain_stat(rpc, block_start=block_current)
loop_interval = stat.block_average()
logg.debug('current block height {}'.format(block_offset))
logg.debug('starting at block {}'.format(block_offset))
syncers = []
@@ -106,13 +98,8 @@ def main():
syncer_backends = SQLBackend.resume(chain_spec, block_offset)
if len(syncer_backends) == 0:
initial_block_start = config.get('SYNCER_HISTORY_START')
initial_block_offset = block_offset
if config.get('_NO_HISTORY'):
initial_block_start = block_offset
initial_block_offset += 1
syncer_backends.append(SQLBackend.initial(chain_spec, initial_block_offset, start_block_height=initial_block_start))
logg.info('found no backends to resume, adding initial sync from history start {} end {}'.format(initial_block_start, initial_block_offset))
logg.info('found no backends to resume')
syncer_backends.append(SQLBackend.initial(chain_spec, block_offset))
else:
for syncer_backend in syncer_backends:
logg.info('resuming sync session {}'.format(syncer_backend))
@@ -121,11 +108,11 @@ def main():
for syncer_backend in syncer_backends:
try:
syncers.append(HistorySyncer(syncer_backend, chain_interface))
syncers.append(HistorySyncer(syncer_backend))
logg.info('Initializing HISTORY syncer on backend {}'.format(syncer_backend))
except AttributeError:
logg.info('Initializing HEAD syncer on backend {}'.format(syncer_backend))
syncers.append(HeadSyncer(syncer_backend, chain_interface))
syncers.append(HeadSyncer(syncer_backend))
connect_registry(rpc, chain_spec, config.get('CIC_REGISTRY_ADDRESS'))
@@ -168,6 +155,7 @@ def main():
for cf in callback_filters:
syncer.add_filter(cf)
#r = syncer.loop(int(config.get('SYNCER_LOOP_INTERVAL')), rpc)
r = syncer.loop(int(loop_interval), rpc)
sys.stderr.write("sync {} done at block {}\n".format(syncer, r))

View File

@@ -13,7 +13,6 @@ import celery
# local imports
from cic_eth.api import Api
from cic_eth.api.admin import AdminApi
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
@@ -54,20 +53,13 @@ celery_app = celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=confi
queue = args.q
api = Api(config.get('CIC_CHAIN_SPEC'), queue=queue)
admin_api = AdminApi(None)
def main():
t = admin_api.registry()
registry_address = t.get()
print('Registry: {}'.format(registry_address))
t = api.default_token()
token_info = t.get()
print('Default token symbol: {}'.format(token_info['symbol']))
print('Default token address: {}'.format(token_info['address']))
logg.debug('Default token name: {}'.format(token_info['name']))
logg.debug('Default token decimals: {}'.format(token_info['decimals']))
if __name__ == '__main__':
main()

View File

@@ -11,7 +11,7 @@ from chainlib.chain import ChainSpec
from chainlib.eth.connection import EthHTTPConnection
# local imports
from cic_eth.api.admin import AdminApi
from cic_eth.api.api_admin import AdminApi
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()

View File

@@ -12,7 +12,7 @@ from chainlib.chain import ChainSpec
from xdg.BaseDirectory import xdg_config_home
# local imports
from cic_eth.api.admin import AdminApi
from cic_eth.api import AdminApi
from cic_eth.db import dsn_from_config
from cic_eth.db.models.base import SessionBase

View File

@@ -19,7 +19,7 @@ from chainlib.eth.connection import EthHTTPConnection
from hexathon import add_0x
# local imports
from cic_eth.api.admin import AdminApi
from cic_eth.api import AdminApi
from cic_eth.db.enum import (
StatusEnum,
status_str,

View File

@@ -20,11 +20,7 @@ def init_chain_stat(rpc, block_start=0):
if block_start == 0:
o = block_latest()
r = rpc.do(o)
try:
block_start = int(r, 16)
except TypeError:
block_start = int(r)
logg.debug('blockstart {}'.format(block_start))
block_start = int(r, 16)
for i in range(BLOCK_SAMPLES):
o = block_by_number(block_start-10+i)

View File

@@ -1,5 +1,6 @@
# import
import time
import requests
import logging
import uuid
@@ -19,8 +20,7 @@ import liveness.linux
from cic_eth.error import SeppukuError
from cic_eth.db.models.base import SessionBase
#logg = logging.getLogger().getChild(__name__)
logg = logging.getLogger()
logg = logging.getLogger().getChild(__name__)
celery_app = celery.current_app
@@ -33,8 +33,6 @@ class BaseTask(celery.Task):
create_gas_oracle = RPCGasOracle
default_token_address = None
default_token_symbol = None
default_token_name = None
default_token_decimals = None
run_dir = '/run'
def create_session(self):
@@ -75,7 +73,7 @@ class CriticalSQLAlchemyTask(CriticalTask):
class CriticalWeb3Task(CriticalTask):
autoretry_for = (
ConnectionError,
requests.exceptions.ConnectionError,
)
safe_gas_threshold_amount = 2000000000 * 60000 * 3
safe_gas_refill_amount = safe_gas_threshold_amount * 5
@@ -85,7 +83,7 @@ class CriticalSQLAlchemyAndWeb3Task(CriticalTask):
autoretry_for = (
sqlalchemy.exc.DatabaseError,
sqlalchemy.exc.TimeoutError,
ConnectionError,
requests.exceptions.ConnectionError,
sqlalchemy.exc.ResourceClosedError,
)
safe_gas_threshold_amount = 2000000000 * 60000 * 3
@@ -101,7 +99,7 @@ class CriticalSQLAlchemyAndSignerTask(CriticalTask):
class CriticalWeb3AndSignerTask(CriticalTask):
autoretry_for = (
ConnectionError,
requests.exceptions.ConnectionError,
)
safe_gas_threshold_amount = 2000000000 * 60000 * 3
safe_gas_refill_amount = safe_gas_threshold_amount * 5
@@ -118,13 +116,12 @@ def registry():
return CICRegistry.address
@celery_app.task(bind=True, base=BaseTask)
def registry_address_lookup(self, chain_spec_dict, address, connection_tag='default'):
@celery_app.task()
def registry_address_lookup(chain_spec_dict, address, connection_tag='default'):
chain_spec = ChainSpec.from_dict(chain_spec_dict)
conn = RPCConnection.connect(chain_spec, tag=connection_tag)
registry = CICRegistry(chain_spec, conn)
r = registry.by_address(address, sender_address=self.call_address)
return r
return registry.by_address(address)
@celery_app.task(throws=(UnknownContractError,))
@@ -132,7 +129,7 @@ def registry_name_lookup(chain_spec_dict, name, connection_tag='default'):
chain_spec = ChainSpec.from_dict(chain_spec_dict)
conn = RPCConnection.connect(chain_spec, tag=connection_tag)
registry = CICRegistry(chain_spec, conn)
return registry.by_name(name, sender_address=self.call_address)
return registry.by_name(name)
@celery_app.task()

View File

@@ -8,9 +8,9 @@ import semver
version = (
0,
12,
11,
0,
'alpha.2',
'beta.12',
)
version_object = semver.VersionInfo(

View File

@@ -1,4 +1,3 @@
[celery]
broker_url = redis://
result_url = redis://
debug = 0

View File

@@ -1,4 +1,3 @@
[celery]
broker_url = redis://localhost:63379
result_url = redis://localhost:63379
debug = 0

View File

@@ -1,3 +1,2 @@
[SYNCER]
loop_interval =
history_start = 0

View File

@@ -1,3 +1,2 @@
[SYNCER]
loop_interval =
history_start = 0

View File

@@ -1,22 +0,0 @@
@node cic-eth-accounts
@section Accounts
Accounts are private keys in the signer component keyed by "addresses," a one-way transformation of a public key. Data can be signed by using the account as identifier for corresponding RPC requests.
Any account to be managed by @code{cic-eth} must be created by the corresponding task. This is because @code{cic-eth} creates a @code{nonce} entry for each newly created account, and guarantees that every nonce will only be used once in its threaded environment.
The calling code receives the account address upon creation. It never receives or has access to the private key.
@subsection Signer RPC
The signer is expected to handle a subset of the standard JSON-RPC:
@table @code
@item personal_newAccount(password)
Creates a new account, returning the account address.
@item eth_signTransactions(tx_dict)
Sign the transaction represented as a dictionary.
@item eth_sign(address, message)
Signs an arbtirary message with the standard Ethereum prefix.
@end table

View File

@@ -1,60 +0,0 @@
@node cic-eth system maintenance
@appendix Admin API
The admin API is still in an early stage of refinement. User friendliness can be considerably improved.
All of the API calls are celery task proxies, and return @code{Celery.AsyncResult} unless otherwise noted.
In contrast to the client API module, this API does not currently implement a pluggable callback.
@appendixsection registry
Returns the @code{ContractRegistry} this instance of @code{cic-eth-tasker} is running on.
@appendixsection proxy-do
Execute an arbitary JSON-RPC request using the @code{cic-eth-tasker} blockchain node RPC connection.
@appendixsection default_token
Returns the default token symbol and address.
@appendixsection lock
Set lock bits, globally or per address
@appendixsection unlock
Opposite of lock
@appendixsection get_lock
Get the current state of a lock
@appendixsection tag_account
Associate an identifier with an account address (@xref{cic-eth system accounts})
@appendixsection have_account
Check whether a private key exists in the keystore able to sign on behalf of the given account (it actually performs a signature).
@appendixsection resend
Clone or resend a transaction
@appendixsection check_nonce
Returns diagnostics for nonce sequences per account, e.g. detect nonce gaps that block execution of further transactions.
@appendixsection fix_nonce
Re-orders all nonces by shifting all transaction nonces after the given transaction down by one. This has the additional effect of obsoleting the given transaction. Can be used to close gaps in the nonce sequencing. Use with care!
@appendixsection account
Return brief transaction info lists per account
@appendixsection tx
Return a complex transaction metadata object for a single transaction. The object assembles state from both the blockchain node and the custodial queue system.

View File

@@ -1,18 +0,0 @@
\input texinfo
@setfilename index.html
@settitle CIC custodial services reference deployment
@copying
Released 2021 under GPL3
@end copying
@titlepage
@title CIC custodial services reference deployment
@author Louis Holbrook
@end titlepage
@c
@contents
@include index.texi

View File

@@ -1,4 +0,0 @@
@node cic-eth Appendix Task chains
@appendix Task chains
TBC - explain here how to generate these chain diagrams

View File

@@ -1,108 +0,0 @@
@node cic-eth configuration
@section Configuration
(refer to @code{cic-base} for a general overview of the config pipeline)
Configuration parameters are grouped by configuration filename.
@subsection cic
@table @var
@item registry_address
Ethereum address of the @var{ContractRegistry} contract
@item chain_spec
String representation of the connected blockchain according to the @var{chainlib} @var{ChainSpec} format.
@item tx_retry_delay
Minimum time in seconds to wait before retrying a transaction
@item trust_address
Comma-separated list of one or more ethereum addresses regarded as trusted for describing other resources, Used by @var{cic-eth-registry} in the context of the @var{AddressDeclarator}.
@item defalt_token_symbol
Fallback token to operate on when no other context is given.
@item health_modules
Comma-separated list of methods to execute liveness tests against. (see ...)
@item run_dir
Directory to use for session-scoped variables for @var{cic-eth} daemon parent processes.
@end table
@subsection celery
@table @var
@item broker_url
Message broker URL
@item result_url
Result backend URL
@item debug
Boolean value. If set, the amount of available context for a task in the result backend will be maximized@footnote{This is a @emph{required} setting for the task graph documenter to enabled it to display task names in the graph}.
@end table
@subsection database
See ref cic-base when ready
@subsection eth
@table @var
@item provider
Address of default RPC endpoint for transactions and state queries.
@item gas_gifter_minimum_balance
The minimum gas balance that must be held by the @code{GAS GIFTER} token before the queue processing shuts down@footnote{You should really make sure that this threshold is never hit}
@end table
@subsection redis
Defines connection to the redis server used outside of the context of @var{celery}. This is usually the same server, but should be a different db.
@table @var
@item host
Redis hostname
@item port
Redis port
@item db
Redis db
@end table
@subsection signer
Parameters
@table @var
@item socket_path
The connection string for the signer JSON-RPC service.@footnote{The @var{crypto-dev-signer} supports UNIX socket or a HTTP(S) connections}
@item secret
If set, this password is used to add obfuscation on top of the encryption already applied by the signer for the keystore.
@end table
@subsection ssl
Certificate information for https api callbacks.
@table @var
@item enable_client
Boolean value. If set, client certificate will be used to authenticate the callback request.
@item cert_file
Client certificate file in PEM or DER format
@item key_file
Client key file in PEM or DER format
@item password
Password for unlocking the client key
@item ca_file
Certificate authority bundle, to verify the certificate sent by the callback server.
@end table
@subsection syncer
@table @var
@item loop_interval
Seconds to pause before each execution of the @var{chainsyncer} poll loop.
@end table

View File

@@ -1,46 +0,0 @@
@node cic-eth-dependencies
@section Dependencies
This application is written in Python 3.8. It is tightly coupled with @code{python-celery}, which provides the task worker ecosystem. It also uses @code{SQLAlchemy} which provides useful abstractions for persistent storage though SQL, and @code{alembic} for database schema migrations.
There is currently also a somewhat explicit coupling with @code{Redis}, which is used as message broker for @code{python-celery}. @code{Redis} is also explicitly used by some CLI tools to retrieve results from command execution. This coupling may be relaxed in the future to allow other key-value pubsub solutions instead.
@subsection Generalized project dependencies
The core features are built around four main independent components that have been developed for the purpose of this project, but are separated and maintained as general-purpose libraries.
@table @samp
@item chainlib
A cross-chain library prototype that can provide encodings for transactions on a Solidity-based EVM contract network.
@item chainqueue
Queue manager that guarantees delivery of outgoing blockchain transactions.
@item chainsyncer
Monitors blockchains and guarantees execution of an arbitrary count of pluggable code objects for each block transaction.
@item crypto-dev-signer
An keystore capable of signing for the EVM chain through a standard Ethereum JSON-RPC interface.
@end table
@anchor{cic-eth-dependencies-smart-contracts}
@subsection Smart contract dependencies
The Smart contracts needed by the network must be discoverable through a single entry point called the Contract Registry. The contract registry is expected to reference itself in its records. The authenticity of the contract registry must be guaranteed by external sources of trust.
The contract registry maps contract addresses to well-known identifiers. The contracts are as follows:
@table @code
@item ContractRegistry (points to self)
Resolves plaintext identifiers to contract addresses.
@item AccountRegistry
An append-only store of accounts hosted by the custodial system
@item TokenRegistry
Unique symbol-to-address mappings for token contracts
@item AddressDeclarator
Reverse address to resource lookup
@item TokenAuthorization
Escrow contract for external spending on behalf of custodial users
@item Faucet
Called by newly created accounts to receive initial token balance
@end table
The dependency @code{cic-eth-registry} abstracts and facilitates lookups of resources on the blockchain network. In its current state it resolves tokens by symbol or address, and contracts by common-name identifiers. In the @code{cic-eth} code all lookups for EVM network resources will be performed through this dependency.

View File

@@ -1,49 +0,0 @@
@node cic-eth-incoming
@section Incoming transactions
All transactions in mined blocks will be passed to a selection of plugin filters to the @code{chainsyncer} component. Each of these filters are individual python module files in @code{cic_eth.runnable.daemons.filters}. This section describes their function.
The status bits refer to the bits definining the @code{chainqueue} state.
@subsection tx
Looks up the transaction in the local queue, and if found it sets the @code{FINAL} state bit. If the contract code execution was unsuccessful, the @code{NETWORK ERROR} state bit is also set.
@subsection gas
If the transaction is a gas token transfer, it checks if the recipient is a custodial account awaiting gas refill to execute a transaction (the queue item will have the @code{GAS ISSUES} bit set). If this is the case, the transaction will be activated by setting the @code{QUEUED} bit.
@subsection register
If the transaction is an account registration@footnote{The contract keyed by @var{AccountRegistry} in the @var{ContractRegistry} contract}, a Faucet transaction will be triggered for the registered account@footnote{The faucet contract used in the reference implementation will verify whether the account calling it is registered in the @var{AccountRegistry}. Thus it cannot be called before the account registration has succeeded.}
@subsection callback
Executes, in order, Celery tasks defined in the configuration variable @var{TASKS_TRANSFER_CALLBACKS}. Each of these tasks are registered as individual filters in the @code{chainsyncer} component, with the corresponding execution guarantees.
The callbacks will receive the following arguments
@enumerate
@item @strong{result}
A complex representation of the transaction (see section ?)
@item @strong{transfertype}
A string describing the type of transaction found@footnote{See appendix ? for an overview of possible values}
@item @strong{status}
0 if contract code executed successfully. Any other value is an error@footnote{The values 1-1024 are reserved for system specific errors. In the current implementation only a general error state with value 1 is defined. See appendix ?.}
@end enumerate
@subsection transferauth
If a valid transfer authorization request has been made, a token @emph{allowance}@footnote{@code{approve} for ERC20 tokens} transaction is executed on behalf of the custodial account, with the @var{TransferAuthorization} contract as spender.
@subsection convert
If the transaction is a token conversion, @emph{and} there is a pending transfer registered for the conversion, the corresponding token transfer transaction will be executed. Not currently implemented

View File

@@ -1,14 +0,0 @@
@top cic-eth
@include intro.texi
@include dependencies.texi
@include configuration.texi
@include system.texi
@include interacting.texi
@include outgoing.texi
@include incoming.texi
@include services.texi
@include tools.texi
@include admin.texi
@include chains.texi
@include transfertypes.texi

View File

@@ -1,109 +0,0 @@
@node cic-eth-interacting
@section Interacting with the system
The API to the @var{cic-eth} component is a proxy for executing @emph{chains of Celery tasks}. The tasks that compose individual chains are documented in @ref{cic-eth Appendix Task chains,the Task Chain appendix}, which also describes a CLI tool that can generate graph representationso of them.
There are two API classes, @var{Api} and @var{AdminApi}. The former is described later in this section, the latter described in @ref{cic-eth system maintenance,the Admin API appendix}.
@subsection Interface
API calls are constructed by creating @emph{Celery task signatures} and linking them together, sequentially and/or in parallell. In turn, the tasks themselves may spawn other asynchronous tasks. This means that the code in @file{cic_eth.api.*} does not necessarily specify the full task graph that will be executed for any one command.
The operational guarantee that tasks will be executed, not forgotten, and retried under certain circumstances is deferred to @var{Celery}. On top of this, the @var{chainqueue} component ensures that semantic state changes that the @code{Celery} tasks ask of it are valid.
@anchor{cic-eth-locking}
@subsection Locking
All methods that make a change to the blockchain network must pass @emph{locking layer checks}. Locks may be applied on a global or per-address basis. Lock states are defined by a combination of bit flags. The implemented lock bits are:
@table @var
@item INIT
The system has not yet been initialized. In this state, writes are limited to creating unregistered accounts only.
@item QUEUE
Items may not be added to the queue
@item SEND
Queued items may not be attempted sent to the network
@item CREATE (global-only)
New accounts may not be created
@item STICKY
Until reset, no other part of the locking state can be reset
@end table
@subsection Callback
All API calls provide the option to attach a callback to the end of the task chain. This callback will be executed regardless of whether task chain execution succeeded or not.
Refer to @file{cic-eth.callbacks.noop.noop} for the expected callback signature.
@subsection API Methods that change state
@subsubsection create_account
Creates a new account in the keystore, optionally registering the account with the @var{AccountRegistry} contract.
@subsubsection transfer
Attempts to execute a token transaction between two addresses. It is the caller's responsibility to check whether the token balance is sufficient for the transactions.
@subsubsection refill_gas
Executes a gas token transfer to a custodial address from the @var{GAS GIFTER} system account.
@subsubsection convert
Converts a token to another token for the given custodial account. Currently not implemented.
@anchor{cic-eth-convert-and-transfer}
@subsubsection convert_and_transfer
Same as convert, but will automatically execute a token transfer to another custodial account when conversion has been completed. Currently not implemented.
@subsection Read-only API methods
@subsubsection balance
Retrieves a complex balance statement of a single account, including:
@itemize
@item The network balance at the current block height
@item Value reductions due to by pending outgoing transactions
@item Value increments due to by pending incoming transactions
@end itemize
Only the first of these balance items has guaranteed finality. The reduction by outgoing transaction can be reasonably be assumed to eventually become final. The same applies for the increment by incoming transaction, @emph{unless} the transfer is part of a multiple-transaction operation. For example, a @ref{cic-eth-convert-and-transfer,convert_and_transfer} operation may fail in the convert stage and/or may yield less tokens then expected after conversion.
@subsubsection list
Returns an aggregate iist of all token value changes for a given address. As not all value transfers are a result of literal value transfer contract calls (e.g. @var{transfer} and @var{transferFrom} in @var{ERC20}), this data may come from a number of sources, including:
@itemize
@item Literal value transfers within the custodial system
@item Literal value transfers from or to an external address
@item Faucet invocations (token minting)
@item Demurrage and redistribution built into the token contract
@end itemize
@subsubsection default_token
Return the symbol and address of the token used by default in the network.
@subsubsection ping
Convenience method for the caller to check whether the @var{cic-eth} engine is alive.

View File

@@ -1,74 +0,0 @@
@node cic-eth-outgoing
@section Outgoing transactions
@strong{Important! A pre-requisite for proper functioning of the component is that no other agent is sending transactions to the network for any of the keys in the keystore.}
The term @var{state bit} refers to the bits definining the @code{chainqueue} state.
@subsection Lock
Any task that changes blockchain state @strong{must} apply a @code{QUEUE} lock for the address it operates on. This is to ensure that transactions are sent to the network in order.@footnote{If too many transactions arrive out of order to the blockchain node, it may arbitrarily prune those that cannot directly be included in a block. This puts unnecessary strain (and reliance) on the transaction retry mechanism.}
This lock will be released once the blockchain node confirms handover of the transaction.@footnote{This is the responsibility of the @var{dispatcher} service}
@subsection Nonce
A separate task step is executed for binding a transaction nonce to a Celery task root id, which uniquely identifies the task chain. This provides atomicity of the nonce across the parallell task environment, and also recoverability in case unexpected program interruption.
The nonce of a permanently failed task must be @emph{manually} unlocked. Celery tasks that involve nonces who permanently fail are to be considered @emph{critical anomalies} and should not happen. The queue locking mechanism is designed to prevent the amount of out-of-sequence transactions for an account to escalate.
@subsection Choosing fee prices
@code{cic-eth} uses the @code{chainlib} module to resolve gas price lookups.
Optimizing gas price discovery should be the responsibility of the chainlib layer. It already accommodates using an separate RPC for the @code{eth_gasPrice} call.@footnote{A sample implementation of a gas price tracker speaking JSON-RPC (also built using chainlib/chainsyncer) can be found at @url{https://gitlab.com/nolash/eth-stat-syncer}.}
@subsection Choosing gas limits
To determine the gas limit of a transaction, normally the EVM node will be used to perform a dry-run exection of the inputs against the current chain state.
As the current state of the custodial system should only rely on known, trusted contract bytecode, there is no real need for this mechanism. The @code{chainlib}-based contract interfaces are expected to provide a method call that return safe gas limit values for contract interactions.@footnote{Of course, this method call may in turn conceal more sophisticated gas limit heuristics.}
Note that it is still the responsibility of @code{cic-eth} to make sure that the gas limit of the network is sufficient to allow execution of all needed contracts.
@subsection Gas refills
If the gas balance of a custodial account is below a certain threshold, a gas refill task will be spawned. The gas will be transferred from the @code{GAS GIFTER} system account.
In the event that the balance is insufficient even for the imminent transaction@footnote{This will of course be the case when an account is first created, whereupon it has a balance of 0. The subsequent faucet call will spawn a gas refill task.}, execution of the transaction will be deferred until the gas refill transaction is completed. In this case the transaction will be marked with the @code{GAS ISSUES} state bit.
The value chosen for the gas refill threshold should ideally allow enough of a margin to avoid the need of deferring transactions in the future.
@subsection Queueing transactions
Once the lock, nonce and gas processing parts has been completed, the transaction will be queued for sending. This means that the @code{QUEUED} state bit is set. From here the @ref{cic-eth-services-dispatcher,dispatcher service} takes over responsibility.
@subsection Retrying transactions
There are three conditions create the need to defer and retry transactions.
The first is communication problems with the blockchain node itself, for example if it is overloaded or being restarted. As far as possible, retries of this nature will be left to the Celery task workers. There may be cases, however, where it is appropriate to hand the responsibility to the @code{chainqueue} instead. In this case, the queue item will have the @code{NODE ERROR} state bit set.
The second condition occurs when transactions take too long to be confirmed by the network. In this case, the transaction will be re-submitted, but with a higher gas price.
The third condition occurs when the blockchain node purges the transaction from the mempool before it is sent to the network. @code{cic-eth} does not distinguish this case from the second, as the issue is solved using the same mechanism.
@subsubsection Transaction obsoletion
"Re-submitting" a transaction means creating a transaction with a previously used nonce for an account address.
When this happens, The @code{chainqueue} will still contain all previous transactions with the same nonce. The transaction being superseded will have the @code{OBSOLETED} state bit set.
Once a transaction has been mined, all other transactions with the same node will have the @code{OBSOLETED} and @code{FINAL} state bits set.
@subsection Unexpected conditions
Any unexpected condition exposing the need for urgent code improvement and/or manual intervention will be signalled by marking the transaction with the @code{FUBAR} state bit set.

View File

@@ -0,0 +1,24 @@
@node cic-eth
@chapter cic-eth
@section Overview
@code{cic-eth} is the heart of the custodial account component. It is a combination of python-celery task queues and daemons that sign, dispatch and monitor blockchain transactions, aswell as triggering tasks contingent on other transactions.
@subsection Dependencies
The @code{cic-registry} module is used as a cache for contracts and tokens on the network.
A web3 JSON-RPC service that transparently proxies a keystore and provides transaction and message signing. The current development version uses the python web3 middleware feature to route methodsi involving the keystore to the module @code{crypto-dev-signer}, which is hosted on @file{pypi.org}.
@subsection What does it do
@subsection Tasks
Two main categories exist for tasks, @code{eth} and @code{queue}.
The @code{eth} tasks provide means to construct and decode Ethereum transactions, as well as interfacing the underlying key store.
Tasks in the @code{queue} module operate on the state of transactions queued for processing by @code{cic-eth}.

View File

@@ -1,50 +0,0 @@
@node cic-eth-services
@section Services
There are four daemons that together orchestrate all of the aforementioned recipes. This section will provide a high level description of them.
Each of them have their own set of command line flags. These are available in the CLI help text provided by @kbd{-h} @kbd{--help} and are not recited here.
Daemon executable scripts are located in the @file{cic_eth.runnable.daemons} package. If @var{cic-eth} is installed as a python package, they are installed as executables in @var{PATH}.
@subsection tasker
This is the heart of the custodial system. Tasker is the parent process for the celery workers executing all tasks interacting with and changing the state of the queue and the chain. It is also the only service that interfaces with the signer/keystore.
The other @var{cic-eth} daemons all interface with this component, along with any client adapter bridging an end-user gateway (e.g. @var{cic-ussd}). However, the service itself does not have to be actively running for the other services to run; @var{Celery} handles queueing up the incoming tasks until the @var{tasker} comes back online.@footnote{Whereas this is true, there is currently no fail-safe implemented to handles the event of task backlog overflow in Celery. Furthermore, no targeted testing has yet been performed to asses the stability of the system over time if a sudden, sustained surge of resumed task executions occurs. It may be advisable to suspend activity that adds new queue items to the system if volume is high and/or the @var{cic-eth} outage endures. However, there is no panacea for this condition, as every usage scenario is different}
The tasker has a set of pre-requisites that must be fulfilled before it will start
@itemize
@item It must be given a valid @var{ContractRegistry} address, which must include valid references to all contracts specified in @ref{cic-eth-dependencies-smart-contracts,Smart contract dependencies}
@item The gas gifter balance must be above the minimum threshold (See "eth" section in configurations).
@item There must be a valid alembic migration record in the storage database
@item The redis backend must be reachable and writable
@item There must be a reachable JSON-RPC server at the other end of the signer socket path (see "signer" section in configurations)
@end itemize
@subsection tracker
Implements the @var{chainsyncer}, and registers the filters described in @ref{cic-eth-incoming,Incoming Transactions} to be executed for every transaction. It consumes the appropriate @var{TASKS_TRANSFER_CALLBACKS} configuration setting to add externally defined filters at without having to change the daemon code.
The @var{tracker} has the same requisities for the @var{ContractRegistry} as the @var{tasker}.
@strong{Important! Guarantees of filter executions has some caveats. Refer to the @var{chainsyncer} documentation for more details.}
@anchor{cic-eth-services-dispatcher}
@subsection dispatcher
Uses the @code{get_upcoming_tx} method call from @var{chainqueue} to receive batches of queued transactions that are ready to send to the blockchain node. Every batch will only contain a single transaction by any one address, which will be the transaction with the next nonce not previously seen by the network. There is no limit currently set to how many transactions that will be included in a single batch.
@subsection retrier
The responsibility of the @var{retrier} is to re-queue transactions that failed to be sent to the blockchain node, as well as create @emph{replacements} for transactions whose processing by the network has been delayed. @strong{[refer transaction obolestion]}.
It is in turn the responsiblity of the @var{dispatcher} to send these (re-)queued transactions to the blockchain node.

Some files were not shown because too many files have changed in this diff Show More