Compare commits

..

65 Commits

Author SHA1 Message Date
nolash
5b8ea95a8e Bump chainlib to take dynamic arg vector for argparse 2021-09-17 08:12:51 +02:00
nolash
32b15d6d04 Update signer to fill in missing sign to wire symbol 2021-09-15 21:19:05 +02:00
1e0c475f39 Merge branch 'lash/contract-migration-dump' into 'master'
fix(contract-migration): Replace missing environment

See merge request grassrootseconomics/cic-internal-integration!271
2021-09-15 13:36:10 +00:00
Louis Holbrook
3e6cf594e3 fix(contract-migration): Replace missing environment 2021-09-15 13:36:09 +00:00
b8f79a2dd1 Merge branch 'bvander/extend-timoeout-contract-migration' into 'master'
chore: extend timeout for contract migration

See merge request grassrootseconomics/cic-internal-integration!272
2021-09-15 11:08:14 +00:00
540c2fd950 chore: extend timeout for contract migration 2021-09-15 14:04:18 +03:00
b9b06eced8 Merge branch 'staging' into 'master'
feat: add multi-project build to trigger deploy-k8s-dev in the devops repo

See merge request grassrootseconomics/cic-internal-integration!268
2021-09-09 21:28:49 +00:00
949bb29379 feat: add multi-project build to trigger deploy-k8s-dev in the devops repo 2021-09-09 21:28:49 +00:00
Louis Holbrook
0468906601 Merge branch 'lash/contract-migration-dump' into 'master'
fix(contract-migration): Make http signer work with local deployment

See merge request grassrootseconomics/cic-internal-integration!270
2021-09-09 13:38:10 +00:00
nolash
471243488e Add latest crypto-dev-signer install explicitly in dockerfile for cic-eth 2021-09-09 14:27:47 +02:00
3c4acd82ff test eth and meta only mrs 2021-09-07 11:56:17 -07:00
e07f992c5a Add new file 2021-09-07 18:38:20 +00:00
17e95cb19c Merge branch 'bvander/signer-as-service' into 'master'
run signer as a service over http

See merge request grassrootseconomics/cic-internal-integration!265
2021-09-06 19:10:08 +00:00
3c3a97ce15 run signer as a service over http 2021-09-06 12:07:57 -07:00
a492be4927 Merge branch 'lash/contract-migration-config' into 'master'
Sanitize contract migration configs

See merge request grassrootseconomics/cic-internal-integration!262
2021-09-06 10:06:58 +00:00
Louis Holbrook
1f555748b0 Sanitize contract migration configs 2021-09-06 10:06:58 +00:00
8aa4d20eea Update .gitlab-ci.yml 2021-09-01 20:00:44 +00:00
Louis Holbrook
90cf24dcee Merge branch 'lash/lockfix' into 'master'
Rehabilitate imports

See merge request grassrootseconomics/cic-internal-integration!261
2021-09-01 16:54:10 +00:00
Louis Holbrook
75b711dbd5 Rehabilitate imports 2021-09-01 16:54:10 +00:00
c21c1eb2ef fix data seeding node installs 2021-08-31 11:43:01 -07:00
eb5e612105 minor update to import_ussd script 2021-08-30 11:09:47 -07:00
e017d11770 update readme 2021-08-30 10:14:22 -07:00
e327af68e1 Merge branch 'philip/refactor-import-scripts' into 'master'
Consolidated ussd dataseeding script

See merge request grassrootseconomics/cic-internal-integration!252
2021-08-29 09:55:47 +00:00
92cc6a3f27 Consolidated ussd dataseeding script 2021-08-29 09:55:47 +00:00
f42bf7754a Merge branch 'lash/lockfix' into 'master'
Normalize initial INIT lock address

See merge request grassrootseconomics/cic-internal-integration!260
2021-08-29 09:07:46 +00:00
nolash
7342927e91 Normalize initial INIT lock address 2021-08-29 10:43:12 +02:00
17333af88f Merge branch 'bvander/docker-vm-builds' into 'master'
docker vm builds

See merge request grassrootseconomics/cic-internal-integration!259
2021-08-28 16:26:16 +00:00
6a68d2ed32 docker vm builds 2021-08-28 16:26:16 +00:00
Louis Holbrook
ef77f4c99a Merge branch 'lash/normalize-backend-tx' into 'master'
Normalize tx data for backend

Closes cic-eth#133

See merge request grassrootseconomics/cic-internal-integration!258
2021-08-28 11:10:18 +00:00
Louis Holbrook
56dbe8a502 Normalize tx data for backend 2021-08-28 11:10:18 +00:00
Louis Holbrook
2dc8ac6a12 Merge branch 'lash/upgrade-outer-tools' into 'master'
Upgrade outer tools

See merge request grassrootseconomics/cic-internal-integration!257
2021-08-27 13:13:00 +00:00
Louis Holbrook
0ced68e224 Upgrade outer tools 2021-08-27 13:13:00 +00:00
2afb20e715 Merge branch 'philip/ussd-post-test-bug-fixes' into 'master'
USSD post-test bug fixes

See merge request grassrootseconomics/cic-internal-integration!244
2021-08-25 10:33:35 +00:00
3b0113d0e4 USSD post-test bug fixes 2021-08-25 10:33:35 +00:00
Louis Holbrook
ebf4743a84 Merge branch 'lash/traffic-script-rehab' into 'master'
Implement chainlib cli for traffic script

See merge request grassrootseconomics/cic-internal-integration!255
2021-08-25 09:33:23 +00:00
Louis Holbrook
3bf92e7a8a Implement chainlib cli for traffic script 2021-08-25 09:33:23 +00:00
f0b4c42c68 cleanup contract migration dockerfiles 2021-08-24 15:33:18 -07:00
Louis Holbrook
b62d00180c Merge branch 'lash/cic-eth-seeding-fix' into 'master'
cic-eth data seeding rehab

See merge request grassrootseconomics/cic-internal-integration!254
2021-08-24 21:07:36 +00:00
Louis Holbrook
a49978cc36 cic-eth data seeding rehab 2021-08-24 21:07:36 +00:00
1b0ee269d0 add pre tag to contract-migration 2021-08-24 11:43:39 -07:00
aa2f363b27 fix em 2021-08-24 10:42:30 -07:00
2a24ce6938 remove mount 2021-08-24 10:33:53 -07:00
938a10b5c3 contract-migration ci parity 2021-08-24 10:26:09 -07:00
Louis Holbrook
76e33e578b Merge branch 'lash/verify-details' into 'master'
Add target count to verify

Closes #103

See merge request grassrootseconomics/cic-internal-integration!246
2021-08-24 15:56:41 +00:00
Louis Holbrook
2ec4262734 Add target count to verify 2021-08-24 15:56:41 +00:00
Louis Holbrook
7684fe3883 Merge branch 'lash/cic-eth-upgrade-more' into 'master'
Upgrade cic-ussd deps

See merge request grassrootseconomics/cic-internal-integration!247
2021-08-24 15:25:11 +00:00
Louis Holbrook
995a148c6a Upgrade cic-ussd deps 2021-08-24 15:25:11 +00:00
Louis Holbrook
511e099689 Merge branch 'lash/signer-update' into 'master'
Update signer

See merge request grassrootseconomics/cic-internal-integration!253
2021-08-24 11:35:52 +00:00
Louis Holbrook
f877218c55 Update signer 2021-08-24 11:35:52 +00:00
8ac9a1e99a reverting to fffb2bc3f4 2021-08-21 13:23:43 -04:00
c4cb095a29 Merge branch 'bvander/fix-cic-staff-client-docker' into 'master'
fix issues with build cicada, now it should do auto reload

See merge request grassrootseconomics/cic-internal-integration!250
2021-08-19 20:15:22 +00:00
05b8bbbbca fix issues with build cicada, now it should do auto reload 2021-08-19 20:15:22 +00:00
1ce32fbbe0 Merge branch 'fix-meta-docker-compose' into 'master'
fix a bug in the meta tag

See merge request grassrootseconomics/cic-internal-integration!249
2021-08-19 16:48:43 +00:00
3fd5e77e2c fix a bug in the meta tag 2021-08-19 12:46:32 -04:00
e27a49ef33 add docker swarm deployment configs and remove dependency on kaniko for ci builds 2021-08-19 12:29:41 -04:00
Louis Holbrook
fffb2bc3f4 Merge branch 'lash/faucet-workaround' into 'master'
Empty config dir in faucet setup

See merge request grassrootseconomics/cic-internal-integration!240
2021-08-18 06:34:08 +00:00
Louis Holbrook
8910fb0759 Empty config dir in faucet setup 2021-08-18 06:34:07 +00:00
Louis Holbrook
c84239c820 Merge branch 'lash/fix-configs' into 'master'
Fix configs after cic-base remove merges

See merge request grassrootseconomics/cic-internal-integration!245
2021-08-17 16:52:17 +00:00
Louis Holbrook
452047b900 Fix configs after cic-base remove merges 2021-08-17 16:52:17 +00:00
Louis Holbrook
b8be457c41 Merge branch 'lash/advanced-cache-queries' into 'master'
Finish cic-cache integration for transaction listings

Closes cic-eth#134 and #95

See merge request grassrootseconomics/cic-internal-integration!235
2021-08-17 08:03:14 +00:00
Louis Holbrook
0ec9813e5f Finish cic-cache integration for transaction listings 2021-08-17 08:03:14 +00:00
Louis Holbrook
defa7797dc Merge branch 'lash/remove-cic-base' into 'master'
Replace cic-base with chainlib cli utils

Closes #94

See merge request grassrootseconomics/cic-internal-integration!233
2021-08-17 06:46:51 +00:00
Louis Holbrook
bb3d38a1f9 Replace cic-base with chainlib cli utils 2021-08-17 06:46:51 +00:00
3be1c1b33d Merge branch 'spencer/meta-404' into 'master'
Meta returns 404 if resource is not found.

Closes #72

See merge request grassrootseconomics/cic-internal-integration!232
2021-08-09 19:13:05 +00:00
Spencer Ofwiti
d6c763f2d7 Meta returns 404 if resource is not found. 2021-08-09 19:13:05 +00:00
255 changed files with 5481 additions and 2642 deletions

View File

@@ -1,14 +1,43 @@
include:
- local: 'ci_templates/.cic-template.yml'
- local: 'apps/contract-migration/.gitlab-ci.yml'
#- local: 'ci_templates/.cic-template.yml' #kaniko build templates
# these includes are app specific unit tests
- local: 'apps/cic-eth/.gitlab-ci.yml'
- local: 'apps/cic-ussd/.gitlab-ci.yml'
- local: 'apps/cic-notify/.gitlab-ci.yml'
- local: 'apps/cic-meta/.gitlab-ci.yml'
- local: 'apps/cic-cache/.gitlab-ci.yml'
- local: 'apps/data-seeding/.gitlab-ci.yml'
#- local: 'apps/contract-migration/.gitlab-ci.yml'
#- local: 'apps/data-seeding/.gitlab-ci.yml'
stages:
- build
- test
- release
- deploy
image: registry.gitlab.com/grassrootseconomics/cic-internal-integration/docker-with-compose:latest
variables:
DOCKER_BUILDKIT: "1"
COMPOSE_DOCKER_CLI_BUILD: "1"
CI_DEBUG_TRACE: "true"
before_script:
- docker login -u gitlab-ci-token -p $CI_JOB_TOKEN $CI_REGISTRY
# runs on protected branches and pushes to repo
build-push:
stage: build
tags:
- integration
#script:
# - TAG=$CI_COMMIT_REF_SLUG-$CI_COMMIT_SHORT_SHA sh ./scripts/build-push.sh
script:
- TAG=latest sh ./scripts/build-push.sh
rules:
- if: $CI_COMMIT_REF_PROTECTED == "true"
when: always
deploy-dev:
stage: deploy
trigger: grassrootseconomics/devops
when: manual

View File

@@ -2,25 +2,21 @@
## Getting started
## Make some keys
This repo uses docker-compose and docker buildkit. Set the following environment variables to get started:
```
docker build -t bloxie . && docker run -v "$(pwd)/keys:/root/keys" --rm -it -t bloxie account new --chain /root/bloxberg.json --keys-path /root/keys
export COMPOSE_DOCKER_CLI_BUILD=1
export DOCKER_BUILDKIT=1
```
### Prepare the repo
This is stuff we need to put in makefile but for now...
File mounts and permisssions need to be set
start services, database, redis and local ethereum node
```
chmod -R 755 scripts/initdb apps/cic-meta/scripts/initdb
````
start cluster
docker-compose up -d
```
docker-compose up
Run app/contract-migration to deploy contracts
```
RUN_MASK=3 docker-compose up contract-migration
```
stop cluster
@@ -28,7 +24,7 @@ stop cluster
docker-compose down
```
delete data
stop cluster and delete data
```
docker-compose down -v
```
@@ -38,5 +34,4 @@ rebuild an images
docker-compose up --build <service_name>
```
Deployment variables are writtend to service-configs/.env after everthing is up.

View File

@@ -1,34 +0,0 @@
# The solc image messes up the alpine environment, so we have to go all over again
FROM python:3.8.6-slim-buster
LABEL authors="Louis Holbrook <dev@holbrook.no> 0826EDA1702D1E87C6E2875121D2E7BB88C2A746"
LABEL spdx-license-identifier="GPL-3.0-or-later"
LABEL description="Base layer for buiding development images for the cic component suite"
RUN apt-get update && \
apt-get install -y git gcc g++ libpq-dev && \
apt-get install -y vim gawk jq telnet openssl iputils-ping curl wget gnupg socat bash procps make python2 postgresql-client
RUN echo installing nodejs tooling
COPY ./dev/nvm.sh /root/
# Install nvm with node and npm
# https://stackoverflow.com/questions/25899912/how-to-install-nvm-in-docker
ENV NVM_DIR /root/.nvm
ENV NODE_VERSION 15.3.0
ENV BANCOR_NODE_VERSION 10.16.0
RUN wget -qO- https://raw.githubusercontent.com/nvm-sh/nvm/v0.37.2/install.sh | bash \
&& . $NVM_DIR/nvm.sh \
&& nvm install $NODE_VERSION \
&& nvm alias default $NODE_VERSION \
&& nvm use $NODE_VERSION \
# So many ridiculously stupid issues with node in docker that take oceans of absolutely wasted time to resolve
# owner of these files is "1001" by default - wtf
&& chown -R root:root "$NVM_DIR/versions/node/v$NODE_VERSION"
ENV NODE_PATH $NVM_DIR/versions/node//v$NODE_VERSION/lib/node_modules
ENV PATH $NVM_DIR/versions/node//v$NODE_VERSION/bin:$PATH

View File

@@ -1 +0,0 @@
## this is an example base image if we wanted one for all the other apps. Its just OS level things

View File

@@ -4,3 +4,4 @@ omit =
scripts/*
cic_cache/db/migrations/*
cic_cache/version.py
cic_cache/cli

View File

@@ -1,52 +1,17 @@
.cic_cache_variables:
variables:
APP_NAME: cic-cache
DOCKERFILE_PATH: docker/Dockerfile_ci
CONTEXT: apps/$APP_NAME
build-mr-cic-cache:
extends:
- .py_build_merge_request
- .cic_cache_variables
rules:
- if: $CI_PIPELINE_SOURCE == "merge_request_event"
changes:
- apps/cic-cache/**/*
when: always
test-mr-cic-cache:
stage: test
extends:
- .cic_cache_variables
cache:
key:
files:
- test_requirements.txt
paths:
- /root/.cache/pip
image: $MR_IMAGE_TAG
script:
- cd apps/$APP_NAME/
- >
pip install --extra-index-url https://pip.grassrootseconomics.net:8433
--extra-index-url https://gitlab.com/api/v4/projects/27624814/packages/pypi/simple
-r test_requirements.txt
- export PYTHONPATH=. && pytest -x --cov=cic_cache --cov-fail-under=90 --cov-report term-missing tests
needs: ["build-mr-cic-cache"]
rules:
- if: $CI_PIPELINE_SOURCE == "merge_request_event"
changes:
- apps/$APP_NAME/**/*
when: always
build-push-cic-cache:
extends:
- .py_build_push
- .cic_cache_variables
rules:
- if: $CI_COMMIT_BRANCH == "master"
changes:
- apps/cic-cache/**/*
when: always
build-test-cic-cache:
stage: test
tags:
- integration
variables:
APP_NAME: cic-cache
MR_IMAGE_TAG: mr-$APP_NAME-$CI_COMMIT_REF_SLUG-$CI_COMMIT_SHORT_SHA
script:
- cd apps/cic-cache
- docker build -t $MR_IMAGE_TAG -f docker/Dockerfile .
- docker run $MR_IMAGE_TAG sh docker/run_tests.sh
allow_failure: true
rules:
- if: $CI_PIPELINE_SOURCE == "merge_request_event"
changes:
- apps/$APP_NAME/**/*
when: always

View File

@@ -0,0 +1 @@
include *requirements.txt cic_cache/data/config/*

View File

@@ -0,0 +1 @@
# CIC-CACHE

View File

@@ -55,15 +55,37 @@ class Api:
queue=callback_queue,
)
def list(self, offset, limit, address=None):
def list(self, offset=0, limit=100, address=None, oldest=False):
s = celery.signature(
'cic_cache.tasks.tx.tx_filter',
[
0,
100,
offset,
limit,
address,
oldest,
],
queue=None
queue=self.queue,
)
if self.callback_param != None:
s.link(self.callback_success).on_error(self.callback_error)
t = s.apply_async()
return t
def list_content(self, offset=0, limit=100, address=None, block_offset=None, block_limit=None, oldest=False):
s = celery.signature(
'cic_cache.tasks.tx.tx_filter_content',
[
offset,
limit,
address,
block_offset,
block_limit,
oldest,
],
queue=self.queue,
)
if self.callback_param != None:
s.link(self.callback_success).on_error(self.callback_error)

View File

@@ -10,12 +10,16 @@ from cic_cache.db.list import (
list_transactions_mined,
list_transactions_account_mined,
list_transactions_mined_with_data,
list_transactions_mined_with_data_index,
list_transactions_account_mined_with_data_index,
list_transactions_account_mined_with_data,
)
logg = logging.getLogger()
DEFAULT_FILTER_SIZE = 8192 * 8
DEFAULT_LIMIT = 100
class Cache:
@@ -32,7 +36,7 @@ class BloomCache(Cache):
return n
def load_transactions(self, offset, limit):
def load_transactions(self, offset, limit, block_offset=None, block_limit=None, oldest=False):
"""Retrieves a list of transactions from cache and creates a bloom filter pointing to blocks and transactions.
Block and transaction numbers are serialized as 32-bit big-endian numbers. The input to the second bloom filter is the concatenation of the serialized block number and transaction index.
@@ -49,7 +53,7 @@ class BloomCache(Cache):
:return: Lowest block, bloom filter for blocks, bloom filter for blocks|tx
:rtype: tuple
"""
rows = list_transactions_mined(self.session, offset, limit)
rows = list_transactions_mined(self.session, offset, limit, block_offset=block_offset, block_limit=block_limit, oldest=oldest)
f_block = moolb.Bloom(BloomCache.__get_filter_size(limit), 3)
f_blocktx = moolb.Bloom(BloomCache.__get_filter_size(limit), 3)
@@ -58,7 +62,12 @@ class BloomCache(Cache):
for r in rows:
if highest_block == -1:
highest_block = r[0]
lowest_block = r[0]
lowest_block = r[0]
else:
if oldest:
highest_block = r[0]
else:
lowest_block = r[0]
block = r[0].to_bytes(4, byteorder='big')
tx = r[1].to_bytes(4, byteorder='big')
f_block.add(block)
@@ -67,7 +76,7 @@ class BloomCache(Cache):
return (lowest_block, highest_block, f_block.to_bytes(), f_blocktx.to_bytes(),)
def load_transactions_account(self, address, offset, limit):
def load_transactions_account(self, address, offset, limit, block_offset=None, block_limit=None, oldest=False):
"""Same as load_transactions(...), but only retrieves transactions where the specified account address is sender or recipient.
:param address: Address to retrieve transactions for.
@@ -79,7 +88,7 @@ class BloomCache(Cache):
:return: Lowest block, bloom filter for blocks, bloom filter for blocks|tx
:rtype: tuple
"""
rows = list_transactions_account_mined(self.session, address, offset, limit)
rows = list_transactions_account_mined(self.session, address, offset, limit, block_offset=block_offset, block_limit=block_limit, oldest=oldest)
f_block = moolb.Bloom(BloomCache.__get_filter_size(limit), 3)
f_blocktx = moolb.Bloom(BloomCache.__get_filter_size(limit), 3)
@@ -88,7 +97,12 @@ class BloomCache(Cache):
for r in rows:
if highest_block == -1:
highest_block = r[0]
lowest_block = r[0]
lowest_block = r[0]
else:
if oldest:
highest_block = r[0]
else:
lowest_block = r[0]
block = r[0].to_bytes(4, byteorder='big')
tx = r[1].to_bytes(4, byteorder='big')
f_block.add(block)
@@ -99,8 +113,21 @@ class BloomCache(Cache):
class DataCache(Cache):
def load_transactions_with_data(self, offset, end):
rows = list_transactions_mined_with_data(self.session, offset, end)
def load_transactions_with_data(self, offset, limit, block_offset=None, block_limit=None, oldest=False):
if limit == 0:
limit = DEFAULT_LIMIT
rows = list_transactions_mined_with_data(self.session, offset, limit, block_offset, block_limit, oldest=oldest)
return self.__process_rows(rows, oldest)
def load_transactions_account_with_data(self, address, offset, limit, block_offset=None, block_limit=None, oldest=False):
if limit == 0:
limit = DEFAULT_LIMIT
rows = list_transactions_account_mined_with_data(self.session, address, offset, limit, block_offset, block_limit, oldest=oldest)
return self.__process_rows(rows, oldest)
def __process_rows(self, rows, oldest):
tx_cache = []
highest_block = -1;
lowest_block = -1;
@@ -108,7 +135,12 @@ class DataCache(Cache):
for r in rows:
if highest_block == -1:
highest_block = r['block_number']
lowest_block = r['block_number']
lowest_block = r['block_number']
else:
if oldest:
highest_block = r['block_number']
else:
lowest_block = r['block_number']
tx_type = 'unknown'
if r['value'] != None:

View File

@@ -0,0 +1,15 @@
# local imports
from .base import *
from .chain import (
EthChainInterface,
chain_interface,
)
from .rpc import RPC
from .arg import ArgumentParser
from .config import Config
from .celery import CeleryApp
from .registry import (
connect_registry,
connect_token_registry,
connect_declarator,
)

View File

@@ -0,0 +1,20 @@
# external imports
from chainlib.eth.cli import ArgumentParser as BaseArgumentParser
# local imports
from .base import (
CICFlag,
Flag,
)
class ArgumentParser(BaseArgumentParser):
def process_local_flags(self, local_arg_flags):
if local_arg_flags & CICFlag.CELERY:
self.add_argument('-q', '--celery-queue', dest='celery_queue', type=str, default='cic-cache', help='Task queue')
if local_arg_flags & CICFlag.SYNCER:
self.add_argument('--offset', type=int, default=0, help='Start block height for initial history sync')
self.add_argument('--no-history', action='store_true', dest='no_history', help='Skip initial history sync')
if local_arg_flags & CICFlag.CHAIN:
self.add_argument('-r', '--registry-address', type=str, dest='registry_address', help='CIC registry contract address')

View File

@@ -0,0 +1,31 @@
# standard imports
import enum
# external imports
from chainlib.eth.cli import (
argflag_std_read,
argflag_std_write,
argflag_std_base,
Flag,
)
class CICFlag(enum.IntEnum):
# celery - nibble 1
CELERY = 1
# redis - nibble 2
# REDIS = 16
# REDIS_CALLBACK = 32
# chain - nibble 3
CHAIN = 256
# sync - nibble 4
SYNCER = 4096
argflag_local_task = CICFlag.CELERY
#argflag_local_taskcallback = argflag_local_task | CICFlag.REDIS | CICFlag.REDIS_CALLBACK
argflag_local_chain = CICFlag.CHAIN
argflag_local_sync = CICFlag.SYNCER | CICFlag.CHAIN

View File

@@ -0,0 +1,24 @@
# standard imports
import logging
# external imports
import celery
logg = logging.getLogger(__name__)
class CeleryApp:
@classmethod
def from_config(cls, config):
backend_url = config.get('CELERY_RESULT_URL')
broker_url = config.get('CELERY_BROKER_URL')
celery_app = None
if backend_url != None:
celery_app = celery.Celery(broker=broker_url, backend=backend_url)
logg.info('creating celery app on {} with backend on {}'.format(broker_url, backend_url))
else:
celery_app = celery.Celery(broker=broker_url)
logg.info('creating celery app without results backend on {}'.format(broker_url))
return celery_app

View File

@@ -0,0 +1,21 @@
# external imports
from chainlib.eth.block import (
block_by_number,
Block,
)
from chainlib.eth.tx import (
receipt,
Tx,
)
from chainlib.interface import ChainInterface
class EthChainInterface(ChainInterface):
def __init__(self):
self._tx_receipt = receipt
self._block_by_number = block_by_number
self._block_from_src = Block.from_src
self._src_normalize = Tx.src_normalize
chain_interface = EthChainInterface()

View File

@@ -0,0 +1,63 @@
# standard imports
import os
import logging
# external imports
from chainlib.eth.cli import (
Config as BaseConfig,
Flag,
)
# local imports
from .base import CICFlag
script_dir = os.path.dirname(os.path.realpath(__file__))
logg = logging.getLogger(__name__)
class Config(BaseConfig):
local_base_config_dir = os.path.join(script_dir, '..', 'data', 'config')
@classmethod
def from_args(cls, args, arg_flags, local_arg_flags, extra_args={}, default_config_dir=None, base_config_dir=None, default_fee_limit=None):
expanded_base_config_dir = [cls.local_base_config_dir]
if base_config_dir != None:
if isinstance(base_config_dir, str):
base_config_dir = [base_config_dir]
for d in base_config_dir:
expanded_base_config_dir.append(d)
config = BaseConfig.from_args(args, arg_flags, extra_args=extra_args, default_config_dir=default_config_dir, base_config_dir=expanded_base_config_dir, load_callback=None)
local_args_override = {}
# if local_arg_flags & CICFlag.REDIS:
# local_args_override['REDIS_HOST'] = getattr(args, 'redis_host')
# local_args_override['REDIS_PORT'] = getattr(args, 'redis_port')
# local_args_override['REDIS_DB'] = getattr(args, 'redis_db')
# local_args_override['REDIS_TIMEOUT'] = getattr(args, 'redis_timeout')
if local_arg_flags & CICFlag.CHAIN:
local_args_override['CIC_REGISTRY_ADDRESS'] = getattr(args, 'registry_address')
if local_arg_flags & CICFlag.CELERY:
local_args_override['CELERY_QUEUE'] = getattr(args, 'celery_queue')
if local_arg_flags & CICFlag.SYNCER:
local_args_override['SYNCER_OFFSET'] = getattr(args, 'offset')
local_args_override['SYNCER_NO_HISTORY'] = getattr(args, 'no_history')
config.dict_override(local_args_override, 'local cli args')
# if local_arg_flags & CICFlag.REDIS_CALLBACK:
# config.add(getattr(args, 'redis_host_callback'), '_REDIS_HOST_CALLBACK')
# config.add(getattr(args, 'redis_port_callback'), '_REDIS_PORT_CALLBACK')
if local_arg_flags & CICFlag.CELERY:
config.add(config.true('CELERY_DEBUG'), 'CELERY_DEBUG', exists_ok=True)
logg.debug('config loaded:\n{}'.format(config))
return config

View File

@@ -0,0 +1,33 @@
# standard imports
import logging
# external imports
from cic_eth_registry import CICRegistry
from cic_eth_registry.lookup.declarator import AddressDeclaratorLookup
from cic_eth_registry.lookup.tokenindex import TokenIndexLookup
from chainlib.eth.constant import ZERO_ADDRESS
logg = logging.getLogger()
def connect_token_registry(self, conn, chain_spec, sender_address=ZERO_ADDRESS):
registry = CICRegistry(chain_spec, conn)
token_registry_address = registry.by_name('TokenRegistry', sender_address=sender_address)
logg.debug('using token registry address {}'.format(token_registry_address))
lookup = TokenIndexLookup(chain_spec, token_registry_address)
CICRegistry.add_lookup(lookup)
def connect_declarator(self, conn, chain_spec, trusted_addresses, sender_address=ZERO_ADDRESS):
registry = CICRegistry(chain_spec, conn)
declarator_address = registry.by_name('AddressDeclarator', sender_address=sender_address)
logg.debug('using declarator address {}'.format(declarator_address))
lookup = AddressDeclaratorLookup(chain_spec, declarator_address, trusted_addresses)
CICRegistry.add_lookup(lookup)
def connect_registry(conn, chain_spec, registry_address, sender_address=ZERO_ADDRESS):
CICRegistry.address = registry_address
registry = CICRegistry(chain_spec, conn)
registry_address = registry.by_name('ContractRegistry', sender_address=sender_address)
return registry

View File

@@ -0,0 +1,43 @@
# standard imports
import logging
# external imports
from chainlib.connection import (
RPCConnection,
ConnType,
)
from chainlib.eth.connection import EthUnixSignerConnection
from chainlib.chain import ChainSpec
logg = logging.getLogger(__name__)
class RPC:
def __init__(self, chain_spec, rpc_provider, signer_provider=None):
self.chain_spec = chain_spec
self.rpc_provider = rpc_provider
self.signer_provider = signer_provider
def get_default(self):
return RPCConnection.connect(self.chain_spec, 'default')
@staticmethod
def from_config(config):
chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC'))
RPCConnection.register_location(config.get('RPC_HTTP_PROVIDER'), chain_spec, 'default')
if config.get('SIGNER_PROVIDER'):
RPCConnection.register_constructor(ConnType.UNIX, EthUnixSignerConnection, tag='signer')
RPCConnection.register_location(config.get('SIGNER_PROVIDER'), chain_spec, 'signer')
rpc = RPC(chain_spec, config.get('RPC_HTTP_PROVIDER'), signer_provider=config.get('SIGNER_PROVIDER'))
logg.info('set up rpc: {}'.format(rpc))
return rpc
def __str__(self):
return 'RPC factory, chain {}, rpc {}, signer {}'.format(self.chain_spec, self.rpc_provider, self.signer_provider)

View File

@@ -0,0 +1,5 @@
[celery]
broker_url = redis://localhost:6379
result_url =
queue = cic-cache
debug = 0

View File

@@ -0,0 +1,4 @@
[cic]
registry_address =
trust_address =
health_modules = cic_eth.check.db,cic_eth.check.redis,cic_eth.check.signer,cic_eth.check.gas

View File

@@ -0,0 +1,10 @@
[database]
engine =
driver =
host =
port =
name = cic-cache
user =
password =
debug = 0
pool_size = 0

View File

@@ -0,0 +1,2 @@
[signer]
provider =

View File

@@ -0,0 +1,4 @@
[syncer]
loop_interval = 1
offset = 0
no_history = 0

View File

@@ -13,6 +13,9 @@ def list_transactions_mined(
session,
offset,
limit,
block_offset,
block_limit,
oldest=False,
):
"""Executes db query to return all confirmed transactions according to the specified offset and limit.
@@ -23,15 +26,62 @@ def list_transactions_mined(
:result: Result set
:rtype: SQLAlchemy.ResultProxy
"""
s = "SELECT block_number, tx_index FROM tx ORDER BY block_number DESC, tx_index DESC LIMIT {} OFFSET {}".format(limit, offset)
order_by = 'DESC'
if oldest:
order_by = 'ASC'
if block_offset:
if block_limit:
s = "SELECT block_number, tx_index FROM tx WHERE block_number >= {} and block_number <= {} ORDER BY block_number {}, tx_index {} LIMIT {} OFFSET {}".format(block_offset, block_limit, order_by, order_by, limit, offset)
else:
s = "SELECT block_number, tx_index FROM tx WHERE block_number >= {} ORDER BY block_number {}, tx_index {} LIMIT {} OFFSET {}".format(block_offset, order_by, order_by, limit, offset)
else:
s = "SELECT block_number, tx_index FROM tx ORDER BY block_number {}, tx_index {} LIMIT {} OFFSET {}".format(order_by, order_by, limit, offset)
r = session.execute(s)
return r
def list_transactions_mined_with_data(
session,
offset,
limit,
block_offset,
block_limit,
oldest=False,
):
"""Executes db query to return all confirmed transactions according to the specified offset and limit.
:param block_offset: First block to include in search
:type block_offset: int
:param block_limit: Last block to include in search
:type block_limit: int
:result: Result set
:rtype: SQLAlchemy.ResultProxy
"""
order_by = 'DESC'
if oldest:
order_by = 'ASC'
if block_offset:
if block_limit:
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} AND block_number <= {} ORDER BY block_number {}, tx_index {} LIMIT {} OFFSET {}".format(block_offset, block_limit, order_by, order_by, limit, offset)
else:
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} ORDER BY block_number {}, tx_index {} LIMIT {} OFFSET {}".format(block_offset, order_by, order_by, limit, offset)
else:
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id ORDER BY block_number {}, tx_index {} LIMIT {} OFFSET {}".format(order_by, order_by, limit, offset)
r = session.execute(s)
return r
def list_transactions_mined_with_data_index(
session,
offset,
end,
block_offset,
block_limit,
oldest=False,
):
"""Executes db query to return all confirmed transactions according to the specified offset and limit.
@@ -42,7 +92,87 @@ def list_transactions_mined_with_data(
:result: Result set
:rtype: SQLAlchemy.ResultProxy
"""
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} AND block_number <= {} ORDER BY block_number ASC, tx_index ASC".format(offset, end)
order_by = 'DESC'
if oldest:
order_by = 'ASC'
if block_offset:
if block_limit:
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} and block_number <= {} ORDER BY block_number {}, tx_index {} LIMIT {} OFFSET {}".format(block_offset, block_limit, order_by, order_by, offset, end)
else:
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} ORDER BY block_number {}, tx_index {} LIMIT {} OFFSET {}".format(block_offset, order_by, order_by, offset, end)
else:
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id ORDER BY block_number {}, tx_index {} LIMIT {} OFFSET {}".format(order_by, order_by, offset, end)
r = session.execute(s)
return r
def list_transactions_account_mined_with_data_index(
session,
address,
offset,
limit,
block_offset,
block_limit,
oldest=False,
):
"""Executes db query to return all confirmed transactions according to the specified offset and limit, filtered by address
:param offset: Offset in data set to return transactions from
:type offset: int
:param limit: Max number of transactions to retrieve
:type limit: int
:result: Result set
:rtype: SQLAlchemy.ResultProxy
"""
order_by = 'DESC'
if oldest:
order_by = 'ASC'
if block_offset:
if block_limit:
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} AND block_number <= {} AND (sender = '{}' OR recipient = '{}') ORDER BY block_number {}, tx_index {} LIMIT {} OFFSET {}".format(block_offset, block_limit, address, address, order_by, order_by, limit, offset)
else:
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} AND (sender = '{}' OR recipient = '{}') ORDER BY block_number {}, tx_index {} LIMIT {} OFFSET {}".format(block_offset, address, address, order_by, order_by, limit, offset)
else:
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE sender = '{}' OR recipient = '{}' ORDER BY block_number {}, tx_index {} LIMIT {} OFFSET {}".format(address, address, order_by, order_by, limit, offset)
r = session.execute(s)
return r
def list_transactions_account_mined_with_data(
session,
address,
offset,
limit,
block_offset,
block_limit,
oldest=False,
):
"""Executes db query to return all confirmed transactions according to the specified offset and limit.
:param block_offset: First block to include in search
:type block_offset: int
:param block_limit: Last block to include in search
:type block_limit: int
:result: Result set
:rtype: SQLAlchemy.ResultProxy
"""
order_by = 'DESC'
if oldest:
order_by = 'ASC'
if block_offset:
if block_limit:
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} AND block_number <= {} AND (sender = '{}' OR recipient = '{}') ORDER BY block_number {}, tx_index {} LIMIT {} OFFSET {}".format(block_offset, block_limit, address, address, order_by, order_by, limit, offset)
else:
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} AND (sender = '{}' OR recipient = '{}') ORDER BY block_number {}, tx_index {} LIMIT {} OFFSET {}".format(block_offset, address, address, order_by, order_by, limit, offset)
else:
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE sender = '{}' OR recipient = '{}' ORDER BY block_number {}, tx_index {} LIMIT {} OFFSET {}".format(address, address, order_by, order_by, limit, offset)
r = session.execute(s)
return r
@@ -53,6 +183,9 @@ def list_transactions_account_mined(
address,
offset,
limit,
block_offset,
block_limit,
oldest=False,
):
"""Same as list_transactions_mined(...), but only retrieves transaction where the specified account address is sender or recipient.
@@ -65,7 +198,20 @@ def list_transactions_account_mined(
:result: Result set
:rtype: SQLAlchemy.ResultProxy
"""
s = "SELECT block_number, tx_index FROM tx WHERE sender = '{}' OR recipient = '{}' ORDER BY block_number DESC, tx_index DESC LIMIT {} OFFSET {}".format(address, address, limit, offset)
order_by = 'DESC'
if oldest:
order_by = 'ASC'
if block_offset:
if block_limit:
s = "SELECT block_number, tx_index FROM tx WHERE block_number >= {} AND block_number <= {} AND (sender = '{}' OR recipient = '{}') ORDER BY block_number {}, tx_index {} LIMIT {} OFFSET {}".format(block_offset, block_limit, address, address, order_by, order_by, limit, offset)
else:
s = "SELECT block_number, tx_index FROM tx WHERE block_number >= {} AND (sender = '{}' OR recipient = '{}') ORDER BY block_number {}, tx_index {} LIMIT {} OFFSET {}".format(block_offset, address, address, order_by, order_by, limit, offset)
else:
s = "SELECT block_number, tx_index FROM tx WHERE sender = '{}' OR recipient = '{}' ORDER BY block_number {}, tx_index {} LIMIT {} OFFSET {}".format(address, address, order_by, order_by, limit, offset)
r = session.execute(s)
return r

View File

@@ -7,7 +7,7 @@ Create Date: 2021-04-01 08:10:29.156243
"""
from alembic import op
import sqlalchemy as sa
from chainsyncer.db.migrations.sqlalchemy import (
from chainsyncer.db.migrations.default.export import (
chainsyncer_upgrade,
chainsyncer_downgrade,
)

View File

@@ -91,13 +91,14 @@ def process_transactions_all_data(session, env):
if env.get('HTTP_X_CIC_CACHE_MODE') != 'all':
return None
offset = r[1]
end = r[2]
logg.debug('got data request {}'.format(env))
block_offset = r[1]
block_end = r[2]
if int(r[2]) < int(r[1]):
raise ValueError('cart before the horse, dude')
c = DataCache(session)
(lowest_block, highest_block, tx_cache) = c.load_transactions_with_data(offset, end)
(lowest_block, highest_block, tx_cache) = c.load_transactions_with_data(0, 0, block_offset, block_end, oldest=True) # oldest needs to be settable
for r in tx_cache:
r['date_block'] = r['date_block'].timestamp()

View File

@@ -8,15 +8,7 @@ import sys
import re
# external imports
import confini
import celery
import sqlalchemy
import rlp
import cic_base.config
import cic_base.log
import cic_base.argparse
import cic_base.rpc
from cic_base.eth.syncer import chain_interface
from cic_eth_registry import CICRegistry
from cic_eth_registry.error import UnknownContractError
from chainlib.chain import ChainSpec
@@ -34,6 +26,7 @@ from chainsyncer.driver.history import HistorySyncer
from chainsyncer.db.models.base import SessionBase
# local imports
import cic_cache.cli
from cic_cache.db import (
dsn_from_config,
add_tag,
@@ -43,32 +36,36 @@ from cic_cache.runnable.daemons.filters import (
FaucetFilter,
)
script_dir = os.path.realpath(os.path.dirname(__file__))
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
def add_block_args(argparser):
argparser.add_argument('--history-start', type=int, default=0, dest='history_start', help='Start block height for initial history sync')
argparser.add_argument('--no-history', action='store_true', dest='no_history', help='Skip initial history sync')
return argparser
# process args
arg_flags = cic_cache.cli.argflag_std_read
local_arg_flags = cic_cache.cli.argflag_local_sync
argparser = cic_cache.cli.ArgumentParser(arg_flags)
argparser.process_local_flags(local_arg_flags)
args = argparser.parse_args()
# process config
config = cic_cache.cli.Config.from_args(args, arg_flags, local_arg_flags)
logg = cic_base.log.create()
argparser = cic_base.argparse.create(script_dir, cic_base.argparse.full_template)
argparser = cic_base.argparse.add(argparser, add_block_args, 'block')
args = cic_base.argparse.parse(argparser, logg)
config = cic_base.config.create(args.c, args, args.env_prefix)
config.add(args.history_start, 'SYNCER_HISTORY_START', True)
config.add(args.no_history, '_NO_HISTORY', True)
cic_base.config.log(config)
# connect to database
dsn = dsn_from_config(config)
SessionBase.connect(dsn, debug=config.true('DATABASE_DEBUG'))
chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
# set up rpc
rpc = cic_cache.cli.RPC.from_config(config)
conn = rpc.get_default()
cic_base.rpc.setup(chain_spec, config.get('ETH_PROVIDER'))
# set up chain provisions
chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC'))
registry = None
try:
registry = cic_cache.cli.connect_registry(conn, chain_spec, config.get('CIC_REGISTRY_ADDRESS'))
except UnknownContractError as e:
logg.exception('Registry contract connection failed for {}: {}'.format(config.get('CIC_REGISTRY_ADDRESS'), e))
sys.exit(1)
logg.info('connected contract registry {}'.format(config.get('CIC_REGISTRY_ADDRESS')))
def register_filter_tags(filters, session):
@@ -95,14 +92,12 @@ def main():
syncers = []
#if SQLBackend.first(chain_spec):
# backend = SQLBackend.initial(chain_spec, block_offset)
syncer_backends = SQLBackend.resume(chain_spec, block_offset)
if len(syncer_backends) == 0:
initial_block_start = config.get('SYNCER_HISTORY_START')
initial_block_start = config.get('SYNCER_OFFSET')
initial_block_offset = block_offset
if config.get('_NO_HISTORY'):
if config.get('SYNCER_NO_HISTORY'):
initial_block_start = block_offset
initial_block_offset += 1
syncer_backends.append(SQLBackend.initial(chain_spec, initial_block_offset, start_block_height=initial_block_start))
@@ -112,10 +107,10 @@ def main():
logg.info('resuming sync session {}'.format(syncer_backend))
for syncer_backend in syncer_backends:
syncers.append(HistorySyncer(syncer_backend, chain_interface))
syncers.append(HistorySyncer(syncer_backend, cic_cache.cli.chain_interface))
syncer_backend = SQLBackend.live(chain_spec, block_offset+1)
syncers.append(HeadSyncer(syncer_backend, chain_interface))
syncers.append(HeadSyncer(syncer_backend, cic_cache.cli.chain_interface))
trusted_addresses_src = config.get('CIC_TRUST_ADDRESS')
if trusted_addresses_src == None:

View File

@@ -2,14 +2,17 @@
import celery
# local imports
from cic_cache.cache import BloomCache
from cic_cache.cache import (
BloomCache,
DataCache,
)
from cic_cache.db.models.base import SessionBase
celery_app = celery.current_app
@celery_app.task(bind=True)
def tx_filter(self, offset, limit, address=None, encoding='hex'):
def tx_filter(self, offset, limit, address=None, oldest=False, encoding='hex'):
queue = self.request.delivery_info.get('routing_key')
session = SessionBase.create_session()
@@ -17,9 +20,9 @@ def tx_filter(self, offset, limit, address=None, encoding='hex'):
c = BloomCache(session)
b = None
if address == None:
(lowest_block, highest_block, bloom_filter_block, bloom_filter_tx) = c.load_transactions(offset, limit)
(lowest_block, highest_block, bloom_filter_block, bloom_filter_tx) = c.load_transactions(offset, limit, oldest=oldest)
else:
(lowest_block, highest_block, bloom_filter_block, bloom_filter_tx) = c.load_transactions_account(address, offset, limit)
(lowest_block, highest_block, bloom_filter_block, bloom_filter_tx) = c.load_transactions_account(address, offset, limit, oldest=oldest)
session.close()
@@ -35,4 +38,17 @@ def tx_filter(self, offset, limit, address=None, encoding='hex'):
return o
@celery_app.task(bind=True)
def tx_filter_content(self, offset, limit, address=None, block_offset=None, block_limit=None, oldest=False, encoding='hex'):
session = SessionBase.create_session()
c = DataCache(session)
b = None
if address == None:
(lowest_block, highest_block, tx_cache) = c.load_transactions_with_data(offset, limit, block_offset=block_offset, block_limit=block_limit, oldest=oldest)
else:
(lowest_block, highest_block, tx_cache) = c.load_transactions_account_with_data_index(address, offset, limit, block_offset=block_offset, block_limit=block_limit)
session.close()
return (lowest_block, highest_block, tx_cache,)

View File

@@ -4,7 +4,7 @@ import semver
version = (
0,
2,
0,
1,
'alpha.2',
)

View File

@@ -1,2 +0,0 @@
[bancor]
dir =

View File

@@ -1,4 +1,3 @@
[cic]
registry_address =
chain_spec =
trust_address =

View File

@@ -1,3 +0,0 @@
[bancor]
registry_address =
dir = /usr/local/share/bancor

View File

@@ -1,4 +1,3 @@
[cic]
chain_spec =
registry_address =
trust_address = 0xEb3907eCad74a0013c259D5874AE7f22DcBcC95C

View File

@@ -1,2 +0,0 @@
[eth]
provider = http://localhost:63545

View File

@@ -1,3 +1,4 @@
[syncer]
loop_interval = 1
history_start = 0
offset = 0
no_history = 0

View File

@@ -1,2 +0,0 @@
[eth]
provider = ws://localhost:8545

View File

@@ -1,3 +0,0 @@
[syncer]
loop_interval = 5
history_start = 0

View File

@@ -10,9 +10,10 @@ COPY requirements.txt .
ARG EXTRA_INDEX_URL="https://pip.grassrootseconomics.net:8433"
ARG GITLAB_PYTHON_REGISTRY="https://gitlab.com/api/v4/projects/27624814/packages/pypi/simple"
ARG EXTRA_PIP_ARGS=""
RUN --mount=type=cache,mode=0755,target=/root/.cache/pip \
pip install --index-url https://pypi.org/simple \
--extra-index-url $GITLAB_PYTHON_REGISTRY --extra-index-url $EXTRA_INDEX_URL \
--extra-index-url $GITLAB_PYTHON_REGISTRY --extra-index-url $EXTRA_INDEX_URL $EXTRA_PIP_ARGS \
-r requirements.txt
COPY . .

View File

@@ -1,37 +0,0 @@
# syntax = docker/dockerfile:1.2
FROM registry.gitlab.com/grassrootseconomics/cic-base-images:python-3.8.6-dev-55da5f4e as dev
# RUN pip install $pip_extra_index_url_flag cic-base[full_graph]==0.1.2b9
COPY requirements.txt .
#RUN pip install $pip_extra_index_url_flag -r test_requirements.txt
#RUN pip install $pip_extra_index_url_flag .
#RUN pip install .[server]
ARG EXTRA_INDEX_URL="https://pip.grassrootseconomics.net:8433"
ARG GITLAB_PYTHON_REGISTRY="https://gitlab.com/api/v4/projects/27624814/packages/pypi/simple"
RUN pip install --index-url https://pypi.org/simple \
--extra-index-url $GITLAB_PYTHON_REGISTRY --extra-index-url $EXTRA_INDEX_URL \
-r requirements.txt
COPY . .
RUN python setup.py install
# ini files in config directory defines the configurable parameters for the application
# they can all be overridden by environment variables
# to generate a list of environment variables from configuration, use: confini-dump -z <dir> (executable provided by confini package)
COPY config/ /usr/local/etc/cic-cache/
# for db migrations
RUN git clone https://github.com/vishnubob/wait-for-it.git /usr/local/bin/wait-for-it/
COPY cic_cache/db/migrations/ /usr/local/share/cic-cache/alembic/
COPY /docker/start_tracker.sh ./start_tracker.sh
COPY /docker/db.sh ./db.sh
RUN chmod 755 ./*.sh
# Tracker
# ENTRYPOINT ["/usr/local/bin/cic-cache-tracker", "-vv"]
# Server
# ENTRYPOINT [ "/usr/local/bin/uwsgi", "--wsgi-file", "/usr/local/lib/python3.8/site-packages/cic_cache/runnable/server.py", "--http", ":80", "--pyargv", "-vv" ]
ENTRYPOINT []

View File

@@ -0,0 +1,10 @@
#! /bin/bash
set -e
pip install --extra-index-url https://pip.grassrootseconomics.net:8433 \
--extra-index-url https://gitlab.com/api/v4/projects/27624814/packages/pypi/simple \
-r test_requirements.txt
export PYTHONPATH=. && pytest -x --cov=cic_cache --cov-fail-under=90 --cov-report term-missing tests

View File

@@ -1,13 +1,15 @@
cic-base~=0.2.0a4
alembic==1.4.2
confini>=0.3.6rc3,<0.5.0
confini>=0.3.6rc4,<0.5.0
uwsgi==2.0.19.1
moolb~=0.1.0
cic-eth-registry~=0.5.6a2
moolb~=0.1.1b2
cic-eth-registry~=0.6.1a1
SQLAlchemy==1.3.20
semver==2.13.0
psycopg2==2.8.6
celery==4.4.7
redis==3.5.3
chainsyncer[sql]~=0.0.3a5
erc20-faucet~=0.2.2a2
chainsyncer[sql]>=0.0.6a3,<0.1.0
erc20-faucet>=0.3.2a1, <0.4.0
chainlib-eth>=0.0.9a7,<0.1.0
chainlib>=0.0.9a3,<0.1.0
eth-address-index>=0.2.3a1,<0.3.0

View File

@@ -23,11 +23,13 @@ licence_files =
[options]
python_requires = >= 3.6
include_package_data = True
packages =
cic_cache
cic_cache.tasks
cic_cache.db
cic_cache.db.models
cic_cache.cli
cic_cache.runnable
cic_cache.runnable.daemons
cic_cache.runnable.daemons.filters
@@ -39,3 +41,4 @@ console_scripts =
cic-cache-trackerd = cic_cache.runnable.daemons.tracker:main
cic-cache-serverd = cic_cache.runnable.daemons.server:main
cic-cache-taskerd = cic_cache.runnable.daemons.tasker:main
cic-cache-list = cic_cache.runable.list:main

View File

@@ -6,5 +6,5 @@ sqlparse==0.4.1
pytest-celery==0.0.0a1
eth_tester==0.5.0b3
py-evm==0.3.0a20
cic_base[full]==0.1.3a3+build.984b5cff
sarafu-faucet~=0.0.4a1
sarafu-faucet~=0.0.7a1
erc20-transfer-authorization>=0.3.5a1,<0.4.0

View File

@@ -0,0 +1,40 @@
# standard imports
import os
# external imports
import chainlib.cli
# local imports
import cic_cache.cli
script_dir = os.path.dirname(os.path.realpath(__file__))
config_dir = os.path.join(script_dir, '..', 'testdata', 'config')
def test_argumentparserto_config():
argparser = cic_cache.cli.ArgumentParser()
local_flags = 0xffff
argparser.process_local_flags(local_flags)
argparser.add_argument('--foo', type=str)
args = argparser.parse_args([
'-q', 'baz',
'--offset', '13',
'--no-history',
'-r','0xdeadbeef',
'-vv',
'--foo', 'bar',
])
extra_args = {
'foo': '_BARBARBAR',
}
config = cic_cache.cli.Config.from_args(args, chainlib.cli.argflag_std_base, local_flags, extra_args=extra_args, base_config_dir=config_dir)
assert config.get('_BARBARBAR') == 'bar'
assert config.get('CELERY_QUEUE') == 'baz'
assert config.get('SYNCER_NO_HISTORY') == True
assert config.get('SYNCER_OFFSET') == 13
assert config.get('CIC_REGISTRY_ADDRESS') == '0xdeadbeef'

View File

@@ -0,0 +1,17 @@
# standard imports
import tempfile
# local imports
import cic_cache.cli
def test_cli_celery():
cf = tempfile.mkdtemp()
config = {
'CELERY_RESULT_URL': 'filesystem://' + cf,
}
cic_cache.cli.CeleryApp.from_config(config)
config['CELERY_BROKER_URL'] = 'filesystem://' + cf
cic_cache.cli.CeleryApp.from_config(config)

View File

@@ -0,0 +1,68 @@
# external imports
import pytest
from chainlib.eth.gas import (
Gas,
RPCGasOracle,
)
from chainlib.eth.nonce import RPCNonceOracle
from chainlib.eth.block import (
block_latest,
Block,
)
from chainlib.eth.pytest.fixtures_chain import default_chain_spec
from chainlib.eth.pytest.fixtures_ethtester import *
from cic_eth_registry.pytest.fixtures_contracts import *
from hexathon import add_0x
# local imports
import cic_cache.cli
@pytest.mark.xfail()
def test_cli_rpc(
eth_rpc,
eth_signer,
default_chain_spec,
):
config = {
'CHAIN_SPEC': str(default_chain_spec),
'RPC_HTTP_PROVIDER': 'http://localhost:8545',
}
rpc = cic_cache.cli.RPC.from_config(config, default_label='foo')
conn = rpc.get_by_label('foo')
#o = block_latest()
#conn.do(o)
def test_cli_chain(
default_chain_spec,
eth_rpc,
eth_signer,
contract_roles,
):
ifc = cic_cache.cli.EthChainInterface()
nonce_oracle = RPCNonceOracle(contract_roles['CONTRACT_DEPLOYER'], conn=eth_rpc)
gas_oracle = RPCGasOracle(conn=eth_rpc)
c = Gas(default_chain_spec, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, signer=eth_signer)
recipient = add_0x(os.urandom(20).hex())
(tx_hash, o) = c.create(contract_roles['CONTRACT_DEPLOYER'], recipient, 1024)
r = eth_rpc.do(o)
o = ifc.tx_receipt(r)
r = eth_rpc.do(o)
assert r['status'] == 1
o = ifc.block_by_number(1)
block_src = eth_rpc.do(o)
block = ifc.block_from_src(block_src)
assert block.number == 1
with pytest.raises(KeyError):
assert block_src['gasUsed'] == 21000
assert block_src['gas_used'] == 21000
block_src = ifc.src_normalize(block_src)
assert block_src['gasUsed'] == 21000
assert block_src['gas_used'] == 21000

View File

@@ -64,7 +64,6 @@ def txs(
dt.timestamp(),
)
tx_number = 42
tx_hash_second = '0x' + os.urandom(32).hex()
tx_signed_second = '0x' + os.urandom(128).hex()
@@ -93,6 +92,44 @@ def txs(
]
@pytest.fixture(scope='function')
def more_txs(
init_database,
list_defaults,
list_actors,
list_tokens,
txs,
):
session = init_database
tx_number = 666
tx_hash = '0x' + os.urandom(32).hex()
tx_signed = '0x' + os.urandom(128).hex()
nonce = 3
dt = datetime.datetime.utcnow()
dt += datetime.timedelta(hours=1)
db.add_transaction(
session,
tx_hash,
list_defaults['block']+2,
tx_number,
list_actors['alice'],
list_actors['diane'],
list_tokens['bar'],
list_tokens['bar'],
2048,
4096,
False,
dt.timestamp(),
)
session.commit()
return [tx_hash] + txs
@pytest.fixture(scope='function')
def tag_txs(
init_database,

View File

@@ -8,6 +8,7 @@ import json
import pytest
# local imports
from cic_cache import db
from cic_cache import BloomCache
from cic_cache.cache import DataCache
@@ -18,7 +19,6 @@ def test_cache(
init_database,
list_defaults,
list_actors,
list_tokens,
txs,
):
@@ -37,9 +37,6 @@ def test_cache(
def test_cache_data(
init_database,
list_defaults,
list_actors,
list_tokens,
txs,
tag_txs,
):
@@ -47,10 +44,209 @@ def test_cache_data(
session = init_database
c = DataCache(session)
b = c.load_transactions_with_data(410000, 420000)
b = c.load_transactions_with_data(0, 3) #410000, 420000) #, 100, block_offset=410000, block_limit=420000, oldest=True)
assert len(b[2]) == 2
assert b[2][0]['tx_hash'] == txs[1]
assert b[2][1]['tx_type'] == 'unknown'
assert b[2][0]['tx_type'] == 'test.taag'
assert b[2][0]['tx_hash'] == txs[0]
assert b[2][0]['tx_type'] == 'unknown'
assert b[2][1]['tx_type'] == 'test.taag'
def test_cache_ranges(
init_database,
list_defaults,
list_actors,
list_tokens,
more_txs,
):
session = init_database
oldest = list_defaults['block'] - 1
mid = list_defaults['block']
newest = list_defaults['block'] + 2
c = BloomCache(session)
b = c.load_transactions(0, 100)
assert b[0] == oldest
assert b[1] == newest
b = c.load_transactions(1, 2)
assert b[0] == oldest
assert b[1] == mid
b = c.load_transactions(0, 2)
assert b[0] == mid
assert b[1] == newest
b = c.load_transactions(0, 1)
assert b[0] == newest
assert b[1] == newest
b = c.load_transactions(0, 100, oldest=True)
assert b[0] == oldest
assert b[1] == newest
b = c.load_transactions(0, 100, block_offset=list_defaults['block'])
assert b[0] == mid
assert b[1] == newest
b = c.load_transactions(0, 100, block_offset=list_defaults['block'] - 1, block_limit=list_defaults['block'])
assert b[0] == oldest
assert b[1] == mid
b = c.load_transactions(0, 100, block_offset=list_defaults['block'] - 1, block_limit=list_defaults['block'], oldest=True)
assert b[0] == oldest
assert b[1] == mid
# now check when supplying account
b = c.load_transactions_account(list_actors['alice'], 0, 100)
assert b[0] == oldest
assert b[1] == newest
b = c.load_transactions_account(list_actors['bob'], 0, 100)
assert b[0] == mid
assert b[1] == mid
b = c.load_transactions_account(list_actors['diane'], 0, 100)
assert b[0] == oldest
assert b[1] == newest
# add block filter to the mix
b = c.load_transactions_account(list_actors['alice'], 0, 100, block_offset=list_defaults['block'])
assert b[0] == mid
assert b[1] == newest
b = c.load_transactions_account(list_actors['alice'], 0, 100, block_offset=list_defaults['block'])
assert b[0] == mid
assert b[1] == newest
b = c.load_transactions_account(list_actors['bob'], 0, 100, block_offset=list_defaults['block'] - 1, block_limit=list_defaults['block'])
assert b[0] == mid
assert b[1] == mid
b = c.load_transactions_account(list_actors['diane'], 0, 100, block_offset=list_defaults['block'] - 1, block_limit=list_defaults['block'])
assert b[0] == oldest
assert b[1] == oldest
def test_cache_ranges_data(
init_database,
list_defaults,
list_actors,
list_tokens,
more_txs,
):
session = init_database
oldest = list_defaults['block'] - 1
mid = list_defaults['block']
newest = list_defaults['block'] + 2
c = DataCache(session)
b = c.load_transactions_with_data(0, 100)
assert b[0] == oldest
assert b[1] == newest
assert len(b[2]) == 3
assert b[2][0]['tx_hash'] == more_txs[0]
assert b[2][2]['tx_hash'] == more_txs[2]
b = c.load_transactions_with_data(1, 2)
assert b[0] == oldest
assert b[1] == mid
assert len(b[2]) == 2
assert b[2][0]['tx_hash'] == more_txs[1]
assert b[2][1]['tx_hash'] == more_txs[2]
b = c.load_transactions_with_data(0, 2)
assert b[0] == mid
assert b[1] == newest
assert len(b[2]) == 2
assert b[2][0]['tx_hash'] == more_txs[0]
assert b[2][1]['tx_hash'] == more_txs[1]
b = c.load_transactions_with_data(0, 1)
assert b[0] == newest
assert b[1] == newest
assert len(b[2]) == 1
assert b[2][0]['tx_hash'] == more_txs[0]
b = c.load_transactions_with_data(0, 100, oldest=True)
assert b[0] == oldest
assert b[1] == newest
assert len(b[2]) == 3
assert b[2][0]['tx_hash'] == more_txs[2]
assert b[2][1]['tx_hash'] == more_txs[1]
assert b[2][2]['tx_hash'] == more_txs[0]
b = c.load_transactions_with_data(0, 100, block_offset=list_defaults['block'])
assert b[0] == mid
assert b[1] == newest
assert len(b[2]) == 2
assert b[2][0]['tx_hash'] == more_txs[0]
assert b[2][1]['tx_hash'] == more_txs[1]
b = c.load_transactions_with_data(0, 100, block_offset=list_defaults['block'] - 1, block_limit=list_defaults['block'])
assert b[0] == oldest
assert b[1] == mid
assert len(b[2]) == 2
assert b[2][0]['tx_hash'] == more_txs[1]
assert b[2][1]['tx_hash'] == more_txs[2]
b = c.load_transactions_with_data(0, 100, block_offset=list_defaults['block'] - 1, block_limit=list_defaults['block'], oldest=True)
assert b[0] == oldest
assert b[1] == mid
assert len(b[2]) == 2
assert b[2][0]['tx_hash'] == more_txs[2]
assert b[2][1]['tx_hash'] == more_txs[1]
# now check when supplying account
b = c.load_transactions_account_with_data(list_actors['alice'], 0, 100)
assert b[0] == oldest
assert b[1] == newest
assert len(b[2]) == 3
assert b[2][0]['tx_hash'] == more_txs[0]
assert b[2][1]['tx_hash'] == more_txs[1]
assert b[2][2]['tx_hash'] == more_txs[2]
b = c.load_transactions_account_with_data(list_actors['bob'], 0, 100)
assert b[0] == mid
assert b[1] == mid
assert len(b[2]) == 1
assert b[2][0]['tx_hash'] == more_txs[1]
b = c.load_transactions_account_with_data(list_actors['diane'], 0, 100)
assert b[0] == oldest
assert b[1] == newest
assert len(b[2]) == 2
assert b[2][0]['tx_hash'] == more_txs[0]
assert b[2][1]['tx_hash'] == more_txs[2]
# add block filter to the mix
b = c.load_transactions_account_with_data(list_actors['alice'], 0, 100, block_offset=list_defaults['block'])
assert b[0] == mid
assert b[1] == newest
assert len(b[2]) == 2
assert b[2][0]['tx_hash'] == more_txs[0]
assert b[2][1]['tx_hash'] == more_txs[1]
b = c.load_transactions_account_with_data(list_actors['alice'], 0, 100, block_offset=list_defaults['block'])
assert b[0] == mid
assert b[1] == newest
assert len(b[2]) == 2
assert b[2][0]['tx_hash'] == more_txs[0]
assert b[2][1]['tx_hash'] == more_txs[1]
b = c.load_transactions_account_with_data(list_actors['bob'], 0, 100, block_offset=list_defaults['block'] - 1, block_limit=list_defaults['block'])
assert b[0] == mid
assert b[1] == mid
assert len(b[2]) == 1
assert b[2][0]['tx_hash'] == more_txs[1]
b = c.load_transactions_account_with_data(list_actors['diane'], 0, 100, block_offset=list_defaults['block'] - 1, block_limit=list_defaults['block'])
assert b[0] == oldest
assert b[1] == oldest
assert len(b[2]) == 1
assert b[2][0]['tx_hash'] == more_txs[2]

View File

@@ -0,0 +1,2 @@
[foo]
bar_baz = xyzzy

View File

@@ -46,7 +46,7 @@ def get_adjusted_balance(self, token_symbol, amount, timestamp):
def aux_setup(rpc, config, sender_address=ZERO_ADDRESS):
chain_spec_str = config.get('CIC_CHAIN_SPEC')
chain_spec_str = config.get('CHAIN_SPEC')
chain_spec = ChainSpec.from_chain_str(chain_spec_str)
token_symbol = config.get('CIC_DEFAULT_TOKEN_SYMBOL')

View File

@@ -1,5 +1,5 @@
celery==4.4.7
erc20-demurrage-token~=0.0.2a3
cic-eth-registry~=0.5.6a1
chainlib~=0.0.5a1
cic_eth~=0.12.0a2
erc20-demurrage-token~=0.0.3a1
cic-eth-registry~=0.5.8a1
chainlib~=0.0.7a1
cic_eth~=0.12.2a4

View File

@@ -1,6 +1,6 @@
[metadata]
name = cic-eth-aux-erc20-demurrage-token
version = 0.0.2a4
version = 0.0.2a6
description = cic-eth tasks supporting erc20 demurrage token
author = Louis Holbrook
author_email = dev@holbrook.no

View File

@@ -5,8 +5,7 @@ pytest-cov==2.10.1
eth-tester==0.5.0b3
py-evm==0.3.0a20
SQLAlchemy==1.3.20
cic-eth~=0.12.0a1
liveness~=0.0.1a7
eth-accounts-index==0.0.12a1
eth-contract-registry==0.5.6a1
eth-address-index==0.1.2a1
eth-accounts-index==0.1.1a1
eth-contract-registry==0.5.8a1
eth-address-index==0.2.1a1

View File

@@ -6,4 +6,5 @@ omit =
cic_eth/sync/head.py
cic_eth/sync/mempool.py
cic_eth/queue/state.py
cic_eth/cli
*redis*.py

View File

@@ -1,52 +1,16 @@
.cic_eth_variables:
variables:
APP_NAME: cic-eth
DOCKERFILE_PATH: docker/Dockerfile_ci
CONTEXT: apps/$APP_NAME
build-mr-cic-eth:
extends:
- .cic_eth_variables
- .py_build_target_dev
rules:
build-test-cic-eth:
stage: test
tags:
- integration
variables:
APP_NAME: cic-eth
MR_IMAGE_TAG: mr-$APP_NAME-$CI_COMMIT_REF_SLUG-$CI_COMMIT_SHORT_SHA
script:
- cd apps/cic-eth
- docker build -t $MR_IMAGE_TAG -f docker/Dockerfile .
- docker run $MR_IMAGE_TAG sh docker/run_tests.sh
rules:
- if: $CI_PIPELINE_SOURCE == "merge_request_event"
changes:
- apps/cic-eth/**/*
when: always
test-mr-cic-eth:
stage: test
extends:
- .cic_eth_variables
cache:
key:
files:
- test_requirements.txt
paths:
- /root/.cache/pip
image: $MR_IMAGE_TAG
script:
- cd apps/$APP_NAME/
- >
pip install --extra-index-url https://pip.grassrootseconomics.net:8433
--extra-index-url https://gitlab.com/api/v4/projects/27624814/packages/pypi/simple
-r admin_requirements.txt
-r services_requirements.txt
-r test_requirements.txt
- export PYTHONPATH=. && pytest -x --cov=cic_eth --cov-fail-under=90 --cov-report term-missing tests
needs: ["build-mr-cic-eth"]
rules:
- if: $CI_PIPELINE_SOURCE == "merge_request_event"
changes:
- apps/cic-eth/**/*
when: always
build-push-cic-eth:
extends:
- .py_build_push
- .cic_eth_variables
rules:
- if: $CI_COMMIT_BRANCH == "master"
changes:
- apps/cic-eth/**/*
- apps/$APP_NAME/**/*
when: always

View File

@@ -1,2 +1,2 @@
include *requirements.txt config/test/*
include *requirements.txt config/test/* cic_eth/data/config/*

View File

@@ -1,5 +1,5 @@
SQLAlchemy==1.3.20
cic-eth-registry>=0.5.6a2,<0.6.0
hexathon~=0.0.1a7
chainqueue>=0.0.3a1,<0.1.0
eth-erc20>=0.0.10a3,<0.1.0
cic-eth-registry>=0.6.1a2,<0.7.0
hexathon~=0.0.1a8
chainqueue>=0.0.4a6,<0.1.0
eth-erc20>=0.1.2a2,<0.2.0

View File

@@ -4,7 +4,6 @@ import logging
# external imports
import celery
from chainlib.eth.constant import ZERO_ADDRESS
from chainlib.chain import ChainSpec
from hexathon import (
add_0x,
@@ -20,18 +19,17 @@ from cic_eth.task import (
CriticalSQLAlchemyTask,
)
from cic_eth.error import LockedError
from cic_eth.encode import (
tx_normalize,
ZERO_ADDRESS_NORMAL,
)
celery_app = celery.current_app
logg = logging.getLogger()
def normalize_address(a):
if a == None:
return None
return add_0x(hex_uniform(strip_0x(a)))
@celery_app.task(base=CriticalSQLAlchemyTask)
def lock(chained_input, chain_spec_dict, address=ZERO_ADDRESS, flags=LockEnum.ALL, tx_hash=None):
def lock(chained_input, chain_spec_dict, address=ZERO_ADDRESS_NORMAL, flags=LockEnum.ALL, tx_hash=None):
"""Task wrapper to set arbitrary locks
:param chain_str: Chain spec string representation
@@ -43,7 +41,7 @@ def lock(chained_input, chain_spec_dict, address=ZERO_ADDRESS, flags=LockEnum.AL
:returns: New lock state for address
:rtype: number
"""
address = normalize_address(address)
address = tx_normalize.wallet_address(address)
chain_str = '::'
if chain_spec_dict != None:
chain_str = str(ChainSpec.from_dict(chain_spec_dict))
@@ -53,7 +51,7 @@ def lock(chained_input, chain_spec_dict, address=ZERO_ADDRESS, flags=LockEnum.AL
@celery_app.task(base=CriticalSQLAlchemyTask)
def unlock(chained_input, chain_spec_dict, address=ZERO_ADDRESS, flags=LockEnum.ALL):
def unlock(chained_input, chain_spec_dict, address=ZERO_ADDRESS_NORMAL, flags=LockEnum.ALL):
"""Task wrapper to reset arbitrary locks
:param chain_str: Chain spec string representation
@@ -65,7 +63,7 @@ def unlock(chained_input, chain_spec_dict, address=ZERO_ADDRESS, flags=LockEnum.
:returns: New lock state for address
:rtype: number
"""
address = normalize_address(address)
address = tx_normalize.wallet_address(address)
chain_str = '::'
if chain_spec_dict != None:
chain_str = str(ChainSpec.from_dict(chain_spec_dict))
@@ -75,7 +73,7 @@ def unlock(chained_input, chain_spec_dict, address=ZERO_ADDRESS, flags=LockEnum.
@celery_app.task(base=CriticalSQLAlchemyTask)
def lock_send(chained_input, chain_spec_dict, address=ZERO_ADDRESS, tx_hash=None):
def lock_send(chained_input, chain_spec_dict, address=ZERO_ADDRESS_NORMAL, tx_hash=None):
"""Task wrapper to set send lock
:param chain_str: Chain spec string representation
@@ -85,7 +83,7 @@ def lock_send(chained_input, chain_spec_dict, address=ZERO_ADDRESS, tx_hash=None
:returns: New lock state for address
:rtype: number
"""
address = normalize_address(address)
address = tx_normalize.wallet_address(address)
chain_str = str(ChainSpec.from_dict(chain_spec_dict))
r = Lock.set(chain_str, LockEnum.SEND, address=address, tx_hash=tx_hash)
logg.debug('Send locked for {}, flag now {}'.format(address, r))
@@ -93,7 +91,7 @@ def lock_send(chained_input, chain_spec_dict, address=ZERO_ADDRESS, tx_hash=None
@celery_app.task(base=CriticalSQLAlchemyTask)
def unlock_send(chained_input, chain_spec_dict, address=ZERO_ADDRESS):
def unlock_send(chained_input, chain_spec_dict, address=ZERO_ADDRESS_NORMAL):
"""Task wrapper to reset send lock
:param chain_str: Chain spec string representation
@@ -103,7 +101,7 @@ def unlock_send(chained_input, chain_spec_dict, address=ZERO_ADDRESS):
:returns: New lock state for address
:rtype: number
"""
address = normalize_address(address)
address = tx_normalize.wallet_address(address)
chain_str = str(ChainSpec.from_dict(chain_spec_dict))
r = Lock.reset(chain_str, LockEnum.SEND, address=address)
logg.debug('Send unlocked for {}, flag now {}'.format(address, r))
@@ -111,7 +109,7 @@ def unlock_send(chained_input, chain_spec_dict, address=ZERO_ADDRESS):
@celery_app.task(base=CriticalSQLAlchemyTask)
def lock_queue(chained_input, chain_spec_dict, address=ZERO_ADDRESS, tx_hash=None):
def lock_queue(chained_input, chain_spec_dict, address=ZERO_ADDRESS_NORMAL, tx_hash=None):
"""Task wrapper to set queue direct lock
:param chain_str: Chain spec string representation
@@ -121,7 +119,7 @@ def lock_queue(chained_input, chain_spec_dict, address=ZERO_ADDRESS, tx_hash=Non
:returns: New lock state for address
:rtype: number
"""
address = normalize_address(address)
address = tx_normalize.wallet_address(address)
chain_str = str(ChainSpec.from_dict(chain_spec_dict))
r = Lock.set(chain_str, LockEnum.QUEUE, address=address, tx_hash=tx_hash)
logg.debug('Queue direct locked for {}, flag now {}'.format(address, r))
@@ -129,7 +127,7 @@ def lock_queue(chained_input, chain_spec_dict, address=ZERO_ADDRESS, tx_hash=Non
@celery_app.task(base=CriticalSQLAlchemyTask)
def unlock_queue(chained_input, chain_spec_dict, address=ZERO_ADDRESS):
def unlock_queue(chained_input, chain_spec_dict, address=ZERO_ADDRESS_NORMAL):
"""Task wrapper to reset queue direct lock
:param chain_str: Chain spec string representation
@@ -139,7 +137,7 @@ def unlock_queue(chained_input, chain_spec_dict, address=ZERO_ADDRESS):
:returns: New lock state for address
:rtype: number
"""
address = normalize_address(address)
address = tx_normalize.wallet_address(address)
chain_str = str(ChainSpec.from_dict(chain_spec_dict))
r = Lock.reset(chain_str, LockEnum.QUEUE, address=address)
logg.debug('Queue direct unlocked for {}, flag now {}'.format(address, r))
@@ -148,12 +146,13 @@ def unlock_queue(chained_input, chain_spec_dict, address=ZERO_ADDRESS):
@celery_app.task(base=CriticalSQLAlchemyTask)
def check_lock(chained_input, chain_spec_dict, lock_flags, address=None):
address = normalize_address(address)
if address != None:
address = tx_normalize.wallet_address(address)
chain_str = '::'
if chain_spec_dict != None:
chain_str = str(ChainSpec.from_dict(chain_spec_dict))
session = SessionBase.create_session()
r = Lock.check(chain_str, lock_flags, address=ZERO_ADDRESS, session=session)
r = Lock.check(chain_str, lock_flags, address=ZERO_ADDRESS_NORMAL, session=session)
if address != None:
r |= Lock.check(chain_str, lock_flags, address=address, session=session)
if r > 0:

View File

@@ -33,6 +33,7 @@ from cic_eth.admin.ctrl import (
from cic_eth.queue.tx import queue_create
from cic_eth.eth.gas import create_check_gas_task
from cic_eth.task import BaseTask
from cic_eth.encode import tx_normalize
celery_app = celery.current_app
logg = logging.getLogger()
@@ -73,7 +74,7 @@ def shift_nonce(self, chainspec_dict, tx_hash_orig_hex, delta=1):
set_cancel(chain_spec, strip_0x(tx['hash']), manual=True, session=session)
query_address = add_0x(hex_uniform(strip_0x(address))) # aaaaargh
query_address = tx_normalize.wallet_address(address)
q = session.query(Otx)
q = q.join(TxCache)
q = q.filter(TxCache.sender==query_address)

View File

@@ -32,7 +32,6 @@ from chainqueue.db.enum import (
status_str,
)
from chainqueue.error import TxStateChangeError
from chainqueue.sql.query import get_tx
from eth_erc20 import ERC20
# local imports
@@ -40,6 +39,7 @@ from cic_eth.db.models.base import SessionBase
from cic_eth.db.models.role import AccountRole
from cic_eth.db.models.nonce import Nonce
from cic_eth.error import InitializationError
from cic_eth.queue.query import get_tx_local
app = celery.current_app
@@ -284,7 +284,7 @@ class AdminApi:
tx_hash_hex = None
session = SessionBase.create_session()
for k in txs.keys():
tx_dict = get_tx(chain_spec, k, session=session)
tx_dict = get_tx_local(chain_spec, k, session=session)
if tx_dict['nonce'] == nonce:
tx_hash_hex = k
session.close()

View File

@@ -520,9 +520,9 @@ class Api(ApiBase):
s_external_get = celery.signature(
external_task,
[
address,
offset,
limit,
address,
],
queue=external_queue,
)

View File

@@ -21,7 +21,7 @@ def health(*args, **kwargs):
session = SessionBase.create_session()
config = kwargs['config']
chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC'))
logg.debug('check gas balance of gas gifter for chain {}'.format(chain_spec))
try:

View File

@@ -15,7 +15,7 @@ logg = logging.getLogger().getChild(__name__)
def health(*args, **kwargs):
blocked = True
max_attempts = 5
conn = RPCConnection.connect(kwargs['config'].get('CIC_CHAIN_SPEC'), tag='signer')
conn = RPCConnection.connect(kwargs['config'].get('CHAIN_SPEC'), tag='signer')
for i in range(max_attempts):
idx = i + 1
logg.debug('attempt signer connection check {}/{}'.format(idx, max_attempts))

View File

@@ -0,0 +1,10 @@
# local imports
from .base import *
from .chain import (
EthChainInterface,
chain_interface,
)
from .rpc import RPC
from .arg import ArgumentParser
from .config import Config
from .celery import CeleryApp

View File

@@ -0,0 +1,31 @@
# external imports
from chainlib.eth.cli import ArgumentParser as BaseArgumentParser
# local imports
from .base import (
CICFlag,
Flag,
)
class ArgumentParser(BaseArgumentParser):
def process_local_flags(self, local_arg_flags):
if local_arg_flags & CICFlag.REDIS:
self.add_argument('--redis-host', dest='redis_host', type=str, help='redis host to use for task submission')
self.add_argument('--redis-port', dest='redis_port', type=int, help='redis host to use for task submission')
self.add_argument('--redis-db', dest='redis_db', type=int, help='redis db to use')
if local_arg_flags & CICFlag.REDIS_CALLBACK:
self.add_argument('--redis-host-callback', dest='redis_host_callback', default='localhost', type=str, help='redis host to use for callback')
self.add_argument('--redis-port-callback', dest='redis_port_callback', default=6379, type=int, help='redis port to use for callback')
self.add_argument('--redis-timeout', default=20.0, type=float, help='Redis callback timeout')
if local_arg_flags & CICFlag.CELERY:
self.add_argument('-q', '--celery-queue', dest='celery_queue', type=str, default='cic-eth', help='Task queue')
if local_arg_flags & CICFlag.SYNCER:
self.add_argument('--offset', type=int, default=0, help='Start block height for initial history sync')
self.add_argument('--no-history', action='store_true', dest='no_history', help='Skip initial history sync')
if local_arg_flags & CICFlag.CHAIN:
self.add_argument('-r', '--registry-address', type=str, dest='registry_address', help='CIC registry contract address')

View File

@@ -0,0 +1,31 @@
# standard imports
import enum
# external imports
from chainlib.eth.cli import (
argflag_std_read,
argflag_std_write,
argflag_std_base,
Flag,
)
class CICFlag(enum.IntEnum):
# celery - nibble 1
CELERY = 1
# redis - nibble 2
REDIS = 16
REDIS_CALLBACK = 32
# chain - nibble 3
CHAIN = 256
# sync - nibble 4
SYNCER = 4096
argflag_local_task = CICFlag.CELERY
argflag_local_taskcallback = argflag_local_task | CICFlag.REDIS | CICFlag.REDIS_CALLBACK
argflag_local_chain = CICFlag.CHAIN
argflag_local_sync = CICFlag.SYNCER | CICFlag.CHAIN

View File

@@ -0,0 +1,24 @@
# standard imports
import logging
# external imports
import celery
logg = logging.getLogger(__name__)
class CeleryApp:
@classmethod
def from_config(cls, config):
backend_url = config.get('CELERY_RESULT_URL')
broker_url = config.get('CELERY_BROKER_URL')
celery_app = None
if backend_url != None:
celery_app = celery.Celery(broker=broker_url, backend=backend_url)
logg.info('creating celery app on {} with backend on {}'.format(broker_url, backend_url))
else:
celery_app = celery.Celery(broker=broker_url)
logg.info('creating celery app without results backend on {}'.format(broker_url))
return celery_app

View File

@@ -0,0 +1,21 @@
# external imports
from chainlib.eth.block import (
block_by_number,
Block,
)
from chainlib.eth.tx import (
receipt,
Tx,
)
from chainlib.interface import ChainInterface
class EthChainInterface(ChainInterface):
def __init__(self):
self._tx_receipt = receipt
self._block_by_number = block_by_number
self._block_from_src = Block.from_src
self._src_normalize = Tx.src_normalize
chain_interface = EthChainInterface()

View File

@@ -0,0 +1,63 @@
# standard imports
import os
import logging
# external imports
from chainlib.eth.cli import (
Config as BaseConfig,
Flag,
)
# local imports
from .base import CICFlag
script_dir = os.path.dirname(os.path.realpath(__file__))
logg = logging.getLogger(__name__)
class Config(BaseConfig):
local_base_config_dir = os.path.join(script_dir, '..', 'data', 'config')
@classmethod
def from_args(cls, args, arg_flags, local_arg_flags, extra_args={}, default_config_dir=None, base_config_dir=None, default_fee_limit=None):
expanded_base_config_dir = [cls.local_base_config_dir]
if base_config_dir != None:
if isinstance(base_config_dir, str):
base_config_dir = [base_config_dir]
for d in base_config_dir:
expanded_base_config_dir.append(d)
config = BaseConfig.from_args(args, arg_flags, extra_args=extra_args, default_config_dir=default_config_dir, base_config_dir=expanded_base_config_dir, load_callback=None)
local_args_override = {}
if local_arg_flags & CICFlag.REDIS:
local_args_override['REDIS_HOST'] = getattr(args, 'redis_host')
local_args_override['REDIS_PORT'] = getattr(args, 'redis_port')
local_args_override['REDIS_DB'] = getattr(args, 'redis_db')
local_args_override['REDIS_TIMEOUT'] = getattr(args, 'redis_timeout')
if local_arg_flags & CICFlag.CHAIN:
local_args_override['CIC_REGISTRY_ADDRESS'] = getattr(args, 'registry_address')
if local_arg_flags & CICFlag.CELERY:
local_args_override['CELERY_QUEUE'] = getattr(args, 'celery_queue')
if local_arg_flags & CICFlag.SYNCER:
local_args_override['SYNCER_OFFSET'] = getattr(args, 'offset')
local_args_override['SYNCER_NO_HISTORY'] = getattr(args, 'no_history')
config.dict_override(local_args_override, 'local cli args')
if local_arg_flags & CICFlag.REDIS_CALLBACK:
config.add(getattr(args, 'redis_host_callback'), '_REDIS_HOST_CALLBACK')
config.add(getattr(args, 'redis_port_callback'), '_REDIS_PORT_CALLBACK')
if local_arg_flags & CICFlag.CELERY:
config.add(config.true('CELERY_DEBUG'), 'CELERY_DEBUG', exists_ok=True)
logg.debug('config loaded:\n{}'.format(config))
return config

View File

@@ -0,0 +1,90 @@
# standard imports
import logging
# external imports
from chainlib.connection import (
RPCConnection,
ConnType,
)
from chainlib.eth.connection import (
EthUnixSignerConnection,
EthHTTPSignerConnection,
)
from chainlib.chain import ChainSpec
logg = logging.getLogger(__name__)
class RPC:
def __init__(self, chain_spec, rpc_provider, signer_provider=None):
self.chain_spec = chain_spec
self.rpc_provider = rpc_provider
self.signer_provider = signer_provider
def get_default(self):
return self.get_by_label('default')
def get_by_label(self, label):
return RPCConnection.connect(self.chain_spec, label)
@staticmethod
def from_config(config, use_signer=False, default_label='default', signer_label='signer'):
chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC'))
RPCConnection.register_location(config.get('RPC_PROVIDER'), chain_spec, default_label)
if use_signer:
RPCConnection.register_constructor(ConnType.UNIX, EthUnixSignerConnection, signer_label)
RPCConnection.register_constructor(ConnType.HTTP, EthHTTPSignerConnection, signer_label)
RPCConnection.register_constructor(ConnType.HTTP_SSL, EthHTTPSignerConnection, signer_label)
RPCConnection.register_location(config.get('SIGNER_PROVIDER'), chain_spec, signer_label)
rpc = RPC(chain_spec, config.get('RPC_PROVIDER'), signer_provider=config.get('SIGNER_PROVIDER'))
logg.info('set up rpc: {}'.format(rpc))
return rpc
def __str__(self):
return 'RPC factory, chain {}, rpc {}, signer {}'.format(self.chain_spec, self.rpc_provider, self.signer_provider)
# TOOD: re-implement file backend option from omittec code:
#broker = config.get('CELERY_BROKER_URL')
#if broker[:4] == 'file':
# bq = tempfile.mkdtemp()
# bp = tempfile.mkdtemp()
# conf_update = {
# 'broker_url': broker,
# 'broker_transport_options': {
# 'data_folder_in': bq,
# 'data_folder_out': bq,
# 'data_folder_processed': bp,
# },
# }
# if config.true('CELERY_DEBUG'):
# conf_update['result_extended'] = True
# current_app.conf.update(conf_update)
# logg.warning('celery broker dirs queue i/o {} processed {}, will NOT be deleted on shutdown'.format(bq, bp))
#else:
# conf_update = {
# 'broker_url': broker,
# }
# if config.true('CELERY_DEBUG'):
# conf_update['result_extended'] = True
# current_app.conf.update(conf_update)
#
#result = config.get('CELERY_RESULT_URL')
#if result[:4] == 'file':
# rq = tempfile.mkdtemp()
# current_app.conf.update({
# 'result_backend': 'file://{}'.format(rq),
# })
# logg.warning('celery backend store dir {} created, will NOT be deleted on shutdown'.format(rq))
#else:
# current_app.conf.update({
# 'result_backend': result,
# })
#

View File

@@ -0,0 +1,5 @@
[celery]
broker_url = redis://localhost:6379
result_url =
queue = cic-eth
debug = 0

View File

@@ -1,8 +1,6 @@
[cic]
registry_address =
chain_spec = evm:bloxberg:8996
tx_retry_delay =
trust_address =
default_token_symbol = GFT
default_token_symbol =
health_modules = cic_eth.check.db,cic_eth.check.redis,cic_eth.check.signer,cic_eth.check.gas
run_dir = /run

View File

@@ -0,0 +1,10 @@
[database]
engine =
driver =
host =
port =
name =
user =
password =
debug = 0
pool_size = 0

View File

@@ -1,2 +1,2 @@
[SYNCER]
[dispatcher]
loop_interval = 1

View File

@@ -1,3 +1,2 @@
[eth]
provider = http://localhost:8545
gas_gifter_minimum_balance = 10000000000000000000000

View File

@@ -1,4 +1,5 @@
[redis]
host = localhost
port = 63379
port = 6379
db = 0
timeout = 20.0

View File

@@ -0,0 +1,3 @@
[retry]
delay =
batch_size =

View File

@@ -0,0 +1,2 @@
[signer]
provider =

View File

@@ -0,0 +1,4 @@
[syncer]
loop_interval = 1
offset = 0
no_history = 0

View File

@@ -8,8 +8,8 @@ Create Date: 2021-04-02 18:41:20.864265
import datetime
from alembic import op
import sqlalchemy as sa
from chainlib.eth.constant import ZERO_ADDRESS
from cic_eth.db.enum import LockEnum
from cic_eth.encode import ZERO_ADDRESS_NORMAL
# revision identifiers, used by Alembic.
@@ -30,7 +30,7 @@ def upgrade():
sa.Column("otx_id", sa.Integer, sa.ForeignKey('otx.id'), nullable=True),
)
op.create_index('idx_chain_address', 'lock', ['blockchain', 'address'], unique=True)
op.execute("INSERT INTO lock (address, date_created, blockchain, flags) VALUES('{}', '{}', '::', {})".format(ZERO_ADDRESS, datetime.datetime.utcnow(), LockEnum.INIT | LockEnum.SEND | LockEnum.QUEUE))
op.execute("INSERT INTO lock (address, date_created, blockchain, flags) VALUES('{}', '{}', '::', {})".format(ZERO_ADDRESS_NORMAL, datetime.datetime.utcnow(), LockEnum.INIT | LockEnum.SEND | LockEnum.QUEUE))
def downgrade():

View File

@@ -4,12 +4,12 @@ import logging
# third-party imports
from sqlalchemy import Column, String, Integer, DateTime, ForeignKey
from chainlib.eth.constant import ZERO_ADDRESS
from chainqueue.db.models.tx import TxCache
from chainqueue.db.models.otx import Otx
# local imports
from cic_eth.db.models.base import SessionBase
from cic_eth.encode import ZERO_ADDRESS_NORMAL
logg = logging.getLogger()
@@ -37,7 +37,7 @@ class Lock(SessionBase):
@staticmethod
def set(chain_str, flags, address=ZERO_ADDRESS, session=None, tx_hash=None):
def set(chain_str, flags, address=ZERO_ADDRESS_NORMAL, session=None, tx_hash=None):
"""Sets flags associated with the given address and chain.
If a flags entry does not exist it is created.
@@ -90,7 +90,7 @@ class Lock(SessionBase):
@staticmethod
def reset(chain_str, flags, address=ZERO_ADDRESS, session=None):
def reset(chain_str, flags, address=ZERO_ADDRESS_NORMAL, session=None):
"""Resets flags associated with the given address and chain.
If the resulting flags entry value is 0, the entry will be deleted.
@@ -134,7 +134,7 @@ class Lock(SessionBase):
@staticmethod
def check(chain_str, flags, address=ZERO_ADDRESS, session=None):
def check(chain_str, flags, address=ZERO_ADDRESS_NORMAL, session=None):
"""Checks whether all given flags are set for given address and chain.
Does not validate the address against any other tables or components.

View File

@@ -0,0 +1,16 @@
# external imports
from chainlib.eth.constant import ZERO_ADDRESS
from chainqueue.encode import TxHexNormalizer
from chainlib.eth.tx import unpack
tx_normalize = TxHexNormalizer()
ZERO_ADDRESS_NORMAL = tx_normalize.wallet_address(ZERO_ADDRESS)
def unpack_normal(signed_tx_bytes, chain_spec):
tx = unpack(signed_tx_bytes, chain_spec)
tx['hash'] = tx_normalize.tx_hash(tx['hash'])
tx['from'] = tx_normalize.wallet_address(tx['from'])
tx['to'] = tx_normalize.wallet_address(tx['to'])
return tx

View File

@@ -14,10 +14,7 @@ from chainlib.eth.sign import (
sign_message,
)
from chainlib.eth.address import to_checksum_address
from chainlib.eth.tx import (
TxFormat,
unpack,
)
from chainlib.eth.tx import TxFormat
from chainlib.chain import ChainSpec
from chainlib.error import JSONRPCException
from eth_accounts_index.registry import AccountRegistry
@@ -49,6 +46,10 @@ from cic_eth.eth.nonce import (
from cic_eth.queue.tx import (
register_tx,
)
from cic_eth.encode import (
unpack_normal,
ZERO_ADDRESS_NORMAL,
)
logg = logging.getLogger()
celery_app = celery.current_app
@@ -295,17 +296,17 @@ def cache_gift_data(
chain_spec = ChainSpec.from_dict(chain_spec_dict)
tx_signed_raw_bytes = bytes.fromhex(strip_0x(tx_signed_raw_hex))
tx = unpack(tx_signed_raw_bytes, chain_spec)
tx = unpack_normal(tx_signed_raw_bytes, chain_spec)
tx_data = Faucet.parse_give_to_request(tx['data'])
session = self.create_session()
tx_dict = {
'hash': tx_hash_hex,
'hash': tx['hash'],
'from': tx['from'],
'to': tx['to'],
'source_token': ZERO_ADDRESS,
'destination_token': ZERO_ADDRESS,
'source_token': ZERO_ADDRESS_NORMAL,
'destination_token': ZERO_ADDRESS_NORMAL,
'from_value': 0,
'to_value': 0,
}
@@ -334,17 +335,17 @@ def cache_account_data(
:rtype: tuple
"""
chain_spec = ChainSpec.from_dict(chain_spec_dict)
tx_signed_raw_bytes = bytes.fromhex(tx_signed_raw_hex[2:])
tx = unpack(tx_signed_raw_bytes, chain_spec)
tx_signed_raw_bytes = bytes.fromhex(strip_0x(tx_signed_raw_hex))
tx = unpack_normal(tx_signed_raw_bytes, chain_spec)
tx_data = AccountsIndex.parse_add_request(tx['data'])
session = SessionBase.create_session()
tx_dict = {
'hash': tx_hash_hex,
'hash': tx['hash'],
'from': tx['from'],
'to': tx['to'],
'source_token': ZERO_ADDRESS,
'destination_token': ZERO_ADDRESS,
'source_token': ZERO_ADDRESS_NORMAL,
'destination_token': ZERO_ADDRESS_NORMAL,
'from_value': 0,
'to_value': 0,
}

View File

@@ -3,8 +3,11 @@ import logging
# external imports
import celery
from hexathon import strip_0x
from chainlib.eth.constant import ZERO_ADDRESS
from hexathon import (
strip_0x,
add_0x,
)
#from chainlib.eth.constant import ZERO_ADDRESS
from chainlib.chain import ChainSpec
from chainlib.eth.address import is_checksum_address
from chainlib.connection import RPCConnection
@@ -21,7 +24,6 @@ from chainlib.eth.error import (
from chainlib.eth.tx import (
TxFactory,
TxFormat,
unpack,
)
from chainlib.eth.contract import (
abi_decode_single,
@@ -45,6 +47,7 @@ from cic_eth.eth.nonce import CustodialTaskNonceOracle
from cic_eth.queue.tx import (
queue_create,
register_tx,
unpack,
)
from cic_eth.queue.query import get_tx
from cic_eth.task import (
@@ -53,6 +56,11 @@ from cic_eth.task import (
CriticalSQLAlchemyAndSignerTask,
CriticalWeb3AndSignerTask,
)
from cic_eth.encode import (
tx_normalize,
ZERO_ADDRESS_NORMAL,
unpack_normal,
)
celery_app = celery.current_app
logg = logging.getLogger()
@@ -66,6 +74,7 @@ class MaxGasOracle:
return MAXIMUM_FEE_UNITS
#def create_check_gas_task(tx_signed_raws_hex, chain_spec, holder_address, gas=None, tx_hashes_hex=None, queue=None):
def create_check_gas_task(tx_signed_raws_hex, chain_spec, holder_address, gas=None, tx_hashes_hex=None, queue=None):
"""Creates a celery task signature for a check_gas task that adds the task to the outgoing queue to be processed by the dispatcher.
@@ -130,16 +139,16 @@ def cache_gas_data(
"""
chain_spec = ChainSpec.from_dict(chain_spec_dict)
tx_signed_raw_bytes = bytes.fromhex(strip_0x(tx_signed_raw_hex))
tx = unpack(tx_signed_raw_bytes, chain_spec)
tx = unpack_normal(tx_signed_raw_bytes, chain_spec)
session = SessionBase.create_session()
tx_dict = {
'hash': tx_hash_hex,
'hash': tx['hash'],
'from': tx['from'],
'to': tx['to'],
'source_token': ZERO_ADDRESS,
'destination_token': ZERO_ADDRESS,
'source_token': ZERO_ADDRESS_NORMAL,
'destination_token': ZERO_ADDRESS_NORMAL,
'from_value': tx['value'],
'to_value': tx['value'],
}
@@ -150,7 +159,7 @@ def cache_gas_data(
@celery_app.task(bind=True, throws=(OutOfGasError), base=CriticalSQLAlchemyAndWeb3Task)
def check_gas(self, tx_hashes, chain_spec_dict, txs=[], address=None, gas_required=MAXIMUM_FEE_UNITS):
def check_gas(self, tx_hashes_hex, chain_spec_dict, txs_hex=[], address=None, gas_required=MAXIMUM_FEE_UNITS):
"""Check the gas level of the sender address of a transaction.
If the account balance is not sufficient for the required gas, gas refill is requested and OutOfGasError raiser.
@@ -170,6 +179,21 @@ def check_gas(self, tx_hashes, chain_spec_dict, txs=[], address=None, gas_requir
:return: Signed raw transaction data list
:rtype: param txs, unchanged
"""
if address != None:
if not is_checksum_address(address):
raise ValueError('invalid address {}'.format(address))
address = tx_normalize.wallet_address(address)
address = add_0x(address)
tx_hashes = []
txs = []
for tx_hash in tx_hashes_hex:
tx_hash = tx_normalize.tx_hash(tx_hash)
tx_hashes.append(tx_hash)
for tx in txs_hex:
tx = tx_normalize.tx_wire(tx)
txs.append(tx)
chain_spec = ChainSpec.from_dict(chain_spec_dict)
logg.debug('txs {} tx_hashes {}'.format(txs, tx_hashes))
@@ -187,9 +211,6 @@ def check_gas(self, tx_hashes, chain_spec_dict, txs=[], address=None, gas_requir
raise ValueError('txs passed to check gas must all have same sender; had {} got {}'.format(address, tx['from']))
addresspass.append(address)
if not is_checksum_address(address):
raise ValueError('invalid address {}'.format(address))
queue = self.request.delivery_info.get('routing_key')
conn = RPCConnection.connect(chain_spec)
@@ -304,6 +325,7 @@ def refill_gas(self, recipient_address, chain_spec_dict):
# Determine value of gas tokens to send
# if an uncompleted gas refill for the same recipient already exists, we still need to spend the nonce
# however, we will perform a 0-value transaction instead
recipient_address = tx_normalize.wallet_address(recipient_address)
zero_amount = False
session = SessionBase.create_session()
status_filter = StatusBits.FINAL | StatusBits.NODE_ERROR | StatusBits.NETWORK_ERROR | StatusBits.UNKNOWN_ERROR
@@ -378,6 +400,7 @@ def resend_with_higher_gas(self, txold_hash_hex, chain_spec_dict, gas=None, defa
:returns: Transaction hash
:rtype: str, 0x-hex
"""
txold_hash_hex = tx_normalize.tx_hash(txold_hash_hex)
session = SessionBase.create_session()
otx = Otx.load(txold_hash_hex, session)

View File

@@ -12,9 +12,11 @@ from chainlib.eth.tx import (
transaction_by_block,
receipt,
)
from chainlib.eth.error import RequestMismatchException
from chainlib.eth.block import block_by_number
from chainlib.eth.contract import abi_decode_single
from chainlib.eth.constant import ZERO_ADDRESS
from chainlib.eth.tx import Tx
from hexathon import strip_0x
from cic_eth_registry import CICRegistry
from cic_eth_registry.erc20 import ERC20Token
@@ -23,6 +25,8 @@ from chainqueue.db.models.otx import Otx
from chainqueue.db.enum import StatusEnum
from chainqueue.sql.query import get_tx_cache
from eth_erc20 import ERC20
from erc20_faucet import Faucet
from potaahto.symbols import snake_and_camel
# local imports
from cic_eth.queue.time import tx_times
@@ -35,6 +39,32 @@ logg = logging.getLogger()
MAX_BLOCK_TX = 250
def parse_transaction(chain_spec, rpc, tx, sender_address=None):
try:
transfer_data = ERC20.parse_transfer_request(tx['input'])
tx_address = transfer_data[0]
tx_token_value = transfer_data[1]
logg.debug('matched transfer transaction {} in block {} sender {} recipient {} value {}'.format(tx['hash'], tx['block_number'], tx['from'], tx_address, tx_token_value))
return (tx_address, tx_token_value)
except RequestMismatchException:
pass
try:
transfer_data = Faucet.parse_give_to_request(tx['input'])
tx_address = transfer_data[0]
c = Faucet(chain_spec)
o = c.token_amount(tx['to'], sender_address=sender_address, height=tx['block_number'])
r = rpc.do(o)
tx_token_value = Faucet.parse_token_amount(r)
logg.debug('matched giveto transaction {} in block {} sender {} recipient {} value {}'.format(tx['hash'], tx['block_number'], tx['from'], tx_address, tx_token_value))
return (tx_address, tx_token_value)
except RequestMismatchException:
pass
return None
# TODO: Make this method easier to read
@celery_app.task(bind=True, base=BaseTask)
def list_tx_by_bloom(self, bloomspec, address, chain_spec_dict):
@@ -71,36 +101,39 @@ def list_tx_by_bloom(self, bloomspec, address, chain_spec_dict):
tx_filter = moolb.Bloom(databitlen, bloomspec['filter_rounds'], default_data=tx_filter_data)
txs = {}
logg.debug('processing filter with span low {} to high {}'.format(bloomspec['low'], bloomspec['high']))
for block_height in range(bloomspec['low'], bloomspec['high']):
block_height_bytes = block_height.to_bytes(4, 'big')
if block_filter.check(block_height_bytes):
logg.debug('filter matched block {}'.format(block_height))
o = block_by_number(block_height)
block = rpc.do(o)
logg.debug('block {}'.format(block))
for tx_index in range(0, len(block['transactions'])):
composite = tx_index + block_height
tx_index_bytes = composite.to_bytes(4, 'big')
if tx_filter.check(tx_index_bytes):
tx_index_bytes = tx_index.to_bytes(4, 'big')
composite = block_height_bytes + tx_index_bytes
if tx_filter.check(composite):
logg.debug('filter matched block {} tx {}'.format(block_height, tx_index))
o = transaction_by_block(block['hash'], tx_index)
try:
#tx = c.w3.eth.getTransactionByBlock(block_height, tx_index)
o = transaction_by_block(block['hash'], tx_index)
tx = rpc.do(o)
except Exception as e:
logg.debug('false positive on block {} tx {} ({})'.format(block_height, tx_index, e))
continue
tx = Tx(tx).src()
logg.debug('got tx {}'.format(tx))
tx_address = None
tx_token_value = 0
try:
transfer_data = ERC20.parse_transfer_request(tx['data'])
tx_address = transfer_data[0]
tx_token_value = transfer_data[1]
except ValueError:
logg.debug('not a transfer transaction, skipping {}'.format(tx))
transfer_data = parse_transaction(chain_spec, rpc, tx, sender_address=BaseTask.call_address)
if transfer_data == None:
continue
tx_address = transfer_data[0]
tx_token_value = transfer_data[1]
if address == tx_address:
status = StatusEnum.SENT
try:
@@ -136,6 +169,7 @@ def list_tx_by_bloom(self, bloomspec, address, chain_spec_dict):
return txs
# TODO: Surely it must be possible to optimize this
# TODO: DRY this with callback filter in cic_eth/runnable/manager
# TODO: Remove redundant fields from end representation (timestamp, tx_hash)

View File

@@ -15,6 +15,7 @@ from chainqueue.db.enum import (
# local imports
from cic_eth.db import SessionBase
from cic_eth.task import CriticalSQLAlchemyTask
from cic_eth.encode import tx_normalize
celery_app = celery.current_app
@@ -22,6 +23,9 @@ logg = logging.getLogger()
def __balance_outgoing_compatible(token_address, holder_address):
token_address = tx_normalize.executable_address(token_address)
holder_address = tx_normalize.wallet_address(holder_address)
session = SessionBase.create_session()
q = session.query(TxCache.from_value)
q = q.join(Otx)
@@ -58,6 +62,9 @@ def balance_outgoing(tokens, holder_address, chain_spec_dict):
def __balance_incoming_compatible(token_address, receiver_address):
token_address = tx_normalize.executable_address(token_address)
receiver_address = tx_normalize.wallet_address(receiver_address)
session = SessionBase.create_session()
q = session.query(TxCache.to_value)
q = q.join(Otx)
@@ -110,7 +117,7 @@ def assemble_balances(balances_collection):
logg.debug('received collection {}'.format(balances_collection))
for c in balances_collection:
for b in c:
address = b['address']
address = tx_normalize.executable_address(b['address'])
if tokens.get(address) == None:
tokens[address] = {
'address': address,

View File

@@ -6,6 +6,7 @@ import celery
from cic_eth.task import CriticalSQLAlchemyTask
from cic_eth.db import SessionBase
from cic_eth.db.models.lock import Lock
from cic_eth.encode import tx_normalize
celery_app = celery.current_app
@@ -21,6 +22,9 @@ def get_lock(address=None):
:returns: List of locks
:rtype: list of dicts
"""
if address != None:
address = tx_normalize.wallet_address(address)
session = SessionBase.create_session()
q = session.query(
Lock.date_created,

View File

@@ -4,8 +4,8 @@ import datetime
# external imports
import celery
from chainlib.chain import ChainSpec
from chainlib.eth.tx import unpack
import chainqueue.sql.query
from chainlib.eth.tx import unpack
from chainqueue.db.enum import (
StatusEnum,
is_alive,
@@ -20,6 +20,10 @@ from cic_eth.db.enum import LockEnum
from cic_eth.task import CriticalSQLAlchemyTask
from cic_eth.db.models.lock import Lock
from cic_eth.db.models.base import SessionBase
from cic_eth.encode import (
tx_normalize,
unpack_normal,
)
celery_app = celery.current_app
@@ -27,49 +31,76 @@ celery_app = celery.current_app
@celery_app.task(base=CriticalSQLAlchemyTask)
def get_tx_cache(chain_spec_dict, tx_hash):
chain_spec = ChainSpec.from_dict(chain_spec_dict)
session = SessionBase.create_session()
return get_tx_cache_local(chain_spec, tx_hash)
def get_tx_cache_local(chain_spec, tx_hash, session=None):
tx_hash = tx_normalize.tx_hash(tx_hash)
session = SessionBase.bind_session(session)
r = chainqueue.sql.query.get_tx_cache(chain_spec, tx_hash, session=session)
session.close()
SessionBase.release_session(session)
return r
@celery_app.task(base=CriticalSQLAlchemyTask)
def get_tx(chain_spec_dict, tx_hash):
chain_spec = ChainSpec.from_dict(chain_spec_dict)
session = SessionBase.create_session()
return get_tx_local(chain_spec, tx_hash)
def get_tx_local(chain_spec, tx_hash, session=None):
tx_hash = tx_normalize.tx_hash(tx_hash)
session = SessionBase.bind_session(session)
r = chainqueue.sql.query.get_tx(chain_spec, tx_hash, session=session)
session.close()
SessionBase.release_session(session)
return r
@celery_app.task(base=CriticalSQLAlchemyTask)
def get_account_tx(chain_spec_dict, address, as_sender=True, as_recipient=True, counterpart=None):
chain_spec = ChainSpec.from_dict(chain_spec_dict)
session = SessionBase.create_session()
return get_account_tx_local(chain_spec, address, as_sender=as_sender, as_recipient=as_recipient, counterpart=counterpart)
def get_account_tx_local(chain_spec, address, as_sender=True, as_recipient=True, counterpart=None, session=None):
address = tx_normalize.wallet_address(address)
session = SessionBase.bind_session(session)
r = chainqueue.sql.query.get_account_tx(chain_spec, address, as_sender=True, as_recipient=True, counterpart=None, session=session)
session.close()
SessionBase.release_session(session)
return r
@celery_app.task(base=CriticalSQLAlchemyTask)
def get_upcoming_tx_nolock(chain_spec_dict, status=StatusEnum.READYSEND, not_status=None, recipient=None, before=None, limit=0, session=None):
def get_upcoming_tx_nolock(chain_spec_dict, status=StatusEnum.READYSEND, not_status=None, recipient=None, before=None, limit=0):
chain_spec = ChainSpec.from_dict(chain_spec_dict)
return get_upcoming_tx_nolock_local(chain_spec, status=status, not_status=not_status, recipient=recipient, before=before, limit=limit)
def get_upcoming_tx_nolock_local(chain_spec, status=StatusEnum.READYSEND, not_status=None, recipient=None, before=None, limit=0, session=None):
recipient = tx_normalize.wallet_address(recipient)
session = SessionBase.create_session()
r = chainqueue.sql.query.get_upcoming_tx(chain_spec, status, not_status=not_status, recipient=recipient, before=before, limit=limit, session=session, decoder=unpack)
r = chainqueue.sql.query.get_upcoming_tx(chain_spec, status, not_status=not_status, recipient=recipient, before=before, limit=limit, session=session, decoder=unpack_normal)
session.close()
return r
def get_status_tx(chain_spec, status, not_status=None, before=None, exact=False, limit=0, session=None):
return chainqueue.sql.query.get_status_tx_cache(chain_spec, status, not_status=not_status, before=before, exact=exact, limit=limit, session=session, decoder=unpack)
return chainqueue.sql.query.get_status_tx_cache(chain_spec, status, not_status=not_status, before=before, exact=exact, limit=limit, session=session, decoder=unpack_normal)
def get_paused_tx(chain_spec, status=None, sender=None, session=None, decoder=None):
return chainqueue.sql.query.get_paused_tx_cache(chain_spec, status=status, sender=sender, session=session, decoder=unpack)
sender = tx_normalize.wallet_address(sender)
return chainqueue.sql.query.get_paused_tx_cache(chain_spec, status=status, sender=sender, session=session, decoder=unpack_normal)
def get_nonce_tx(chain_spec, nonce, sender):
return get_nonce_tx_cache(chain_spec, nonce, sender, decoder=unpack)
sender = tx_normalize.wallet_address(sender)
return get_nonce_tx_local(chain_spec, nonce, sender)
def get_nonce_tx_local(chain_spec, nonce, sender, session=None):
sender = tx_normalize.wallet_address(sender)
return chainqueue.sql.query.get_nonce_tx_cache(chain_spec, nonce, sender, decoder=unpack_normal, session=session)
def get_upcoming_tx(chain_spec, status=StatusEnum.READYSEND, not_status=None, recipient=None, before=None, limit=0, session=None):
@@ -91,6 +122,8 @@ def get_upcoming_tx(chain_spec, status=StatusEnum.READYSEND, not_status=None, re
:returns: Transactions
:rtype: dict, with transaction hash as key, signed raw transaction as value
"""
if recipient != None:
recipient = tx_normalize.wallet_address(recipient)
session = SessionBase.bind_session(session)
q_outer = session.query(
TxCache.sender,

View File

@@ -6,12 +6,14 @@ import chainqueue.sql.state
import celery
from cic_eth.task import CriticalSQLAlchemyTask
from cic_eth.db.models.base import SessionBase
from cic_eth.encode import tx_normalize
celery_app = celery.current_app
@celery_app.task(base=CriticalSQLAlchemyTask)
def set_sent(chain_spec_dict, tx_hash, fail=False):
tx_hash = tx_normalize.tx_hash(tx_hash)
chain_spec = ChainSpec.from_dict(chain_spec_dict)
session = SessionBase.create_session()
r = chainqueue.sql.state.set_sent(chain_spec, tx_hash, fail, session=session)
@@ -21,6 +23,7 @@ def set_sent(chain_spec_dict, tx_hash, fail=False):
@celery_app.task(base=CriticalSQLAlchemyTask)
def set_final(chain_spec_dict, tx_hash, block=None, tx_index=None, fail=False):
tx_hash = tx_normalize.tx_hash(tx_hash)
chain_spec = ChainSpec.from_dict(chain_spec_dict)
session = SessionBase.create_session()
r = chainqueue.sql.state.set_final(chain_spec, tx_hash, block=block, tx_index=tx_index, fail=fail, session=session)
@@ -30,6 +33,7 @@ def set_final(chain_spec_dict, tx_hash, block=None, tx_index=None, fail=False):
@celery_app.task(base=CriticalSQLAlchemyTask)
def set_cancel(chain_spec_dict, tx_hash, manual=False):
tx_hash = tx_normalize.tx_hash(tx_hash)
chain_spec = ChainSpec.from_dict(chain_spec_dict)
session = SessionBase.create_session()
r = chainqueue.sql.state.set_cancel(chain_spec, tx_hash, manual, session=session)
@@ -39,6 +43,7 @@ def set_cancel(chain_spec_dict, tx_hash, manual=False):
@celery_app.task(base=CriticalSQLAlchemyTask)
def set_rejected(chain_spec_dict, tx_hash):
tx_hash = tx_normalize.tx_hash(tx_hash)
chain_spec = ChainSpec.from_dict(chain_spec_dict)
session = SessionBase.create_session()
r = chainqueue.sql.state.set_rejected(chain_spec, tx_hash, session=session)
@@ -48,6 +53,7 @@ def set_rejected(chain_spec_dict, tx_hash):
@celery_app.task(base=CriticalSQLAlchemyTask)
def set_fubar(chain_spec_dict, tx_hash):
tx_hash = tx_normalize.tx_hash(tx_hash)
chain_spec = ChainSpec.from_dict(chain_spec_dict)
session = SessionBase.create_session()
r = chainqueue.sql.state.set_fubar(chain_spec, tx_hash, session=session)
@@ -57,6 +63,7 @@ def set_fubar(chain_spec_dict, tx_hash):
@celery_app.task(base=CriticalSQLAlchemyTask)
def set_manual(chain_spec_dict, tx_hash):
tx_hash = tx_normalize.tx_hash(tx_hash)
chain_spec = ChainSpec.from_dict(chain_spec_dict)
session = SessionBase.create_session()
r = chainqueue.sql.state.set_manual(chain_spec, tx_hash, session=session)
@@ -66,6 +73,7 @@ def set_manual(chain_spec_dict, tx_hash):
@celery_app.task(base=CriticalSQLAlchemyTask)
def set_ready(chain_spec_dict, tx_hash):
tx_hash = tx_normalize.tx_hash(tx_hash)
chain_spec = ChainSpec.from_dict(chain_spec_dict)
session = SessionBase.create_session()
r = chainqueue.sql.state.set_ready(chain_spec, tx_hash, session=session)
@@ -75,6 +83,7 @@ def set_ready(chain_spec_dict, tx_hash):
@celery_app.task(base=CriticalSQLAlchemyTask)
def set_reserved(chain_spec_dict, tx_hash):
tx_hash = tx_normalize.tx_hash(tx_hash)
chain_spec = ChainSpec.from_dict(chain_spec_dict)
session = SessionBase.create_session()
r = chainqueue.sql.state.set_reserved(chain_spec, tx_hash, session=session)
@@ -84,6 +93,7 @@ def set_reserved(chain_spec_dict, tx_hash):
@celery_app.task(base=CriticalSQLAlchemyTask)
def set_waitforgas(chain_spec_dict, tx_hash):
tx_hash = tx_normalize.tx_hash(tx_hash)
chain_spec = ChainSpec.from_dict(chain_spec_dict)
session = SessionBase.create_session()
r = chainqueue.sql.state.set_waitforgas(chain_spec, tx_hash, session=session)
@@ -93,6 +103,7 @@ def set_waitforgas(chain_spec_dict, tx_hash):
@celery_app.task(base=CriticalSQLAlchemyTask)
def get_state_log(chain_spec_dict, tx_hash):
tx_hash = tx_normalize.tx_hash(tx_hash)
chain_spec = ChainSpec.from_dict(chain_spec_dict)
session = SessionBase.create_session()
r = chainqueue.sql.state.get_state_log(chain_spec, tx_hash, session=session)
@@ -102,6 +113,7 @@ def get_state_log(chain_spec_dict, tx_hash):
@celery_app.task(base=CriticalSQLAlchemyTask)
def obsolete(chain_spec_dict, tx_hash, final):
tx_hash = tx_normalize.tx_hash(tx_hash)
chain_spec = ChainSpec.from_dict(chain_spec_dict)
session = SessionBase.create_session()
r = chainqueue.sql.state.obsolete_by_cache(chain_spec, tx_hash, final, session=session)

View File

@@ -13,6 +13,7 @@ from chainqueue.error import NotLocalTxError
# local imports
from cic_eth.task import CriticalSQLAlchemyAndWeb3Task
from cic_eth.db.models.base import SessionBase
from cic_eth.encode import tx_normalize
celery_app = celery.current_app
@@ -20,6 +21,7 @@ logg = logging.getLogger()
def tx_times(tx_hash, chain_spec, session=None):
tx_hash = tx_normalize.tx_hash(tx_hash)
session = SessionBase.bind_session(session)

View File

@@ -32,12 +32,16 @@ from cic_eth.db import SessionBase
from cic_eth.db.enum import LockEnum
from cic_eth.task import CriticalSQLAlchemyTask
from cic_eth.error import LockedError
from cic_eth.encode import tx_normalize
celery_app = celery.current_app
logg = logging.getLogger()
def queue_create(chain_spec, nonce, holder_address, tx_hash, signed_tx, session=None):
tx_hash = tx_normalize.tx_hash(tx_hash)
signed_tx = tx_normalize.tx_hash(signed_tx)
holder_address = tx_normalize.wallet_address(holder_address)
session = SessionBase.bind_session(session)
lock = Lock.check_aggregate(str(chain_spec), LockEnum.QUEUE, holder_address, session=session)
@@ -67,6 +71,8 @@ def register_tx(tx_hash_hex, tx_signed_raw_hex, chain_spec, queue, cache_task=No
:returns: Tuple; Transaction hash, signed raw transaction data
:rtype: tuple
"""
tx_hash_hex = tx_normalize.tx_hash(tx_hash_hex)
tx_signed_raw_hex = tx_normalize.tx_hash(tx_signed_raw_hex)
logg.debug('adding queue tx {}:{} -> {}'.format(chain_spec, tx_hash_hex, tx_signed_raw_hex))
tx_signed_raw = bytes.fromhex(strip_0x(tx_signed_raw_hex))
tx = unpack(tx_signed_raw, chain_spec)

View File

@@ -7,54 +7,30 @@ import json
import argparse
# external imports
import celery
import confini
import redis
from xdg.BaseDirectory import xdg_config_home
from chainlib.chain import ChainSpec
# local imports
import cic_eth.cli
from cic_eth.api import Api
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger('create_account_script')
logging.getLogger('confini').setLevel(logging.WARNING)
logging.getLogger('gnupg').setLevel(logging.WARNING)
logg = logging.getLogger()
default_config_dir = os.environ.get('CONFINI_DIR', '/usr/local/etc/cic')
argparser = argparse.ArgumentParser()
arg_flags = cic_eth.cli.argflag_std_base
local_arg_flags = cic_eth.cli.argflag_local_taskcallback
argparser = cic_eth.cli.ArgumentParser(arg_flags)
argparser.add_argument('--no-register', dest='no_register', action='store_true', help='Do not register new account in on-chain accounts index')
argparser.add_argument('-c', type=str, default=default_config_dir, help='config file')
argparser.add_argument('-i', '--chain-spec', dest='i', type=str, help='chain spec')
argparser.add_argument('--redis-host', dest='redis_host', type=str, help='redis host to use for task submission')
argparser.add_argument('--redis-port', dest='redis_port', type=int, help='redis host to use for task submission')
argparser.add_argument('--redis-db', dest='redis_db', type=int, help='redis db to use for task submission and callback')
argparser.add_argument('--redis-host-callback', dest='redis_host_callback', default='localhost', type=str, help='redis host to use for callback')
argparser.add_argument('--redis-port-callback', dest='redis_port_callback', default=6379, type=int, help='redis port to use for callback')
argparser.add_argument('--timeout', default=20.0, type=float, help='Callback timeout')
argparser.add_argument('-q', type=str, default='cic-eth', help='Task queue')
argparser.add_argument('-v', action='store_true', help='Be verbose')
argparser.add_argument('-vv', action='store_true', help='Be more verbose')
argparser.process_local_flags(local_arg_flags)
args = argparser.parse_args()
if args.vv:
logg.setLevel(logging.DEBUG)
if args.v:
logg.setLevel(logging.INFO)
config_dir = args.c
config = confini.Config(config_dir, os.environ.get('CONFINI_ENV_PREFIX'))
config.process()
args_override = {
'CIC_CHAIN_SPEC': getattr(args, 'i'),
'REDIS_HOST': getattr(args, 'redis_host'),
'REDIS_PORT': getattr(args, 'redis_port'),
'REDIS_DB': getattr(args, 'redis_db'),
extra_args = {
'no_register': None,
}
config.dict_override(args_override, 'cli')
celery_app = celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL'))
config = cic_eth.cli.Config.from_args(args, arg_flags, local_arg_flags, extra_args=extra_args)
celery_app = cic_eth.cli.CeleryApp.from_config(config)
def main():
redis_host = config.get('REDIS_HOST')
@@ -68,20 +44,20 @@ def main():
ps.get_message()
api = Api(
config.get('CIC_CHAIN_SPEC'),
queue=args.q,
callback_param='{}:{}:{}:{}'.format(args.redis_host_callback, args.redis_port_callback, redis_db, redis_channel),
config.get('CHAIN_SPEC'),
queue=config.get('CELERY_QUEUE'),
callback_param='{}:{}:{}:{}'.format(config.get('_REDIS_HOST_CALLBACK'), config.get('_REDIS_PORT_CALLBACK'), config.get('REDIS_DB'), redis_channel),
callback_task='cic_eth.callbacks.redis.redis',
callback_queue=args.q,
callback_queue=config.get('CELERY_QUEUE'),
)
register = not args.no_register
register = not config.get('_NO_REGISTER')
logg.debug('register {}'.format(register))
t = api.create_account(register=register)
ps.get_message()
try:
o = ps.get_message(timeout=args.timeout)
o = ps.get_message(timeout=config.get('REDIS_TIMEOUT'))
except TimeoutError as e:
sys.stderr.write('got no new address from cic-eth before timeout: {}\n'.format(e))
sys.exit(1)

View File

@@ -12,64 +12,38 @@ from chainlib.eth.constant import ZERO_ADDRESS
from chainlib.eth.address import is_checksum_address
# local imports
import cic_eth.cli
from cic_eth.api.admin import AdminApi
from cic_eth.db.enum import LockEnum
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
default_format = 'terminal'
default_config_dir = os.environ.get('CONFINI_DIR', '/usr/local/etc/cic')
argparser = argparse.ArgumentParser()
argparser.add_argument('-p', '--provider', dest='p', default='http://localhost:8545', type=str, help='Web3 provider url (http only)')
argparser.add_argument('-f', '--format', dest='f', default=default_format, type=str, help='Output format')
argparser.add_argument('-c', type=str, default=default_config_dir, help='config root to use')
argparser.add_argument('-i', '--chain-spec', dest='i', type=str, help='chain spec')
argparser.add_argument('-q', type=str, default='cic-eth', help='celery queue to submit transaction tasks to')
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
argparser.add_argument('-v', action='store_true', help='Be verbose')
argparser.add_argument('-vv', help='be more verbose', action='store_true')
arg_flags = cic_eth.cli.argflag_std_read
local_arg_flags = cic_eth.cli.argflag_local_task | cic_eth.cli.argflag_local_chain
argparser = cic_eth.cli.ArgumentParser(arg_flags)
argparser.add_argument('--no-register', dest='no_register', action='store_true', help='Do not register new account in on-chain accounts index')
argparser.process_local_flags(local_arg_flags)
def process_lock_args(argparser):
argparser.add_argument('flags', type=str, help='Flags to manipulate')
argparser.add_argument('address', default=ZERO_ADDRESS, nargs='?', type=str, help='Ethereum address to unlock,')
sub = argparser.add_subparsers()
sub = argparser.add_subparsers(help='')
sub.dest = "command"
sub_lock = sub.add_parser('lock', help='Set or reset locks')
sub_unlock = sub.add_parser('unlock', help='Set or reset locks')
process_lock_args(sub_lock)
process_lock_args(sub_unlock)
args = argparser.parse_args()
if args.v == True:
logging.getLogger().setLevel(logging.INFO)
elif args.vv == True:
logging.getLogger().setLevel(logging.DEBUG)
config = cic_eth.cli.Config.from_args(args, arg_flags, local_arg_flags)
config_dir = os.path.join(args.c)
os.makedirs(config_dir, 0o777, True)
config = confini.Config(config_dir, args.env_prefix)
config.process()
args_override = {
'ETH_PROVIDER': getattr(args, 'p'),
'CIC_CHAIN_SPEC': getattr(args, 'i'),
}
# override args
config.dict_override(args_override, 'cli')
config.censor('PASSWORD', 'DATABASE')
config.censor('PASSWORD', 'SSL')
logg.debug('config loaded from {}:\n{}'.format(config_dir, config))
celery_app = cic_eth.cli.CeleryApp.from_config(config)
celery_app = celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL'))
chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC'))
queue = args.q
chain_spec = None
if config.get('CIC_CHAIN_SPEC') != None and config.get('CIC_CHAIN_SPEC') != '::':
chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
admin_api = AdminApi(None)
@@ -100,7 +74,7 @@ def main():
args.address,
flags,
],
queue=queue,
queue=config.get('CELERY_QUEUE'),
)
t = s.apply_async()
logg.debug('unlock {} on {} task {}'.format(flags, args.address, t))
@@ -119,7 +93,7 @@ def main():
args.address,
flags,
],
queue=queue,
queue=config.get('CELERY_QUEUE'),
)
t = s.apply_async()
logg.debug('lock {} on {} task {}'.format(flags, args.address, t))

View File

@@ -8,8 +8,7 @@ import sys
import re
import datetime
# third-party imports
import confini
# external imports
import celery
from cic_eth_registry import CICRegistry
from chainlib.chain import ChainSpec
@@ -24,7 +23,7 @@ from chainqueue.error import NotLocalTxError
from chainqueue.sql.state import set_reserved
# local imports
import cic_eth
import cic_eth.cli
from cic_eth.db import SessionBase
from cic_eth.db.enum import LockEnum
from cic_eth.db import dsn_from_config
@@ -39,51 +38,30 @@ from cic_eth.error import (
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
arg_flags = cic_eth.cli.argflag_std_read
local_arg_flags = cic_eth.cli.argflag_local_sync | cic_eth.cli.argflag_local_task
argparser = cic_eth.cli.ArgumentParser(arg_flags)
argparser.process_local_flags(local_arg_flags)
args = argparser.parse_args()
config_dir = os.path.join('/usr/local/etc/cic-eth')
config = cic_eth.cli.Config.from_args(args, arg_flags, local_arg_flags)
argparser = argparse.ArgumentParser(description='daemon that monitors transactions in new blocks')
argparser.add_argument('-p', '--provider', default='http://localhost:8545', dest='p', type=str, help='rpc provider')
argparser.add_argument('-c', type=str, default=config_dir, help='config root to use')
argparser.add_argument('-i', '--chain-spec', dest='i', type=str, help='chain spec')
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
argparser.add_argument('-q', type=str, default='cic-eth', help='celery queue to submit transaction tasks to')
argparser.add_argument('-v', help='be verbose', action='store_true')
argparser.add_argument('-vv', help='be more verbose', action='store_true')
args = argparser.parse_args(sys.argv[1:])
if args.v == True:
logging.getLogger().setLevel(logging.INFO)
elif args.vv == True:
logging.getLogger().setLevel(logging.DEBUG)
config_dir = os.path.join(args.c)
os.makedirs(config_dir, 0o777, True)
config = confini.Config(config_dir, args.env_prefix)
config.process()
# override args
args_override = {
'CIC_CHAIN_SPEC': getattr(args, 'i'),
'ETH_PROVIDER': getattr(args, 'p'),
}
config.dict_override(args_override, 'cli flag')
config.censor('PASSWORD', 'DATABASE')
config.censor('PASSWORD', 'SSL')
logg.debug('config loaded from {}:\n{}'.format(config_dir, config))
app = celery.Celery(backend=config.get('CELERY_RESULT_URL'), broker=config.get('CELERY_BROKER_URL'))
queue = args.q
# connect to celery
celery_app = cic_eth.cli.CeleryApp.from_config(config)
# connect to database
dsn = dsn_from_config(config)
SessionBase.connect(dsn, debug=config.true('DATABASE_DEBUG'))
chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC'))
RPCConnection.register_location(config.get('ETH_PROVIDER'), chain_spec, tag='default')
# set up rpc
rpc = cic_eth.cli.RPC.from_config(config)
conn = rpc.get_default()
run = True
class DispatchSyncer:
yield_delay = 0.0005
@@ -123,14 +101,14 @@ class DispatchSyncer:
LockEnum.QUEUE,
tx['from'],
],
queue=queue,
queue=config.get('CELERY_QUEUE'),
)
s_send = celery.signature(
'cic_eth.eth.tx.send',
[
self.chain_spec.asdict(),
],
queue=queue,
queue=config.get('CELERY_QUEUE'),
)
s_check.link(s_send)
t = s_check.apply_async()

View File

@@ -10,15 +10,14 @@ from chainlib.eth.tx import unpack
from chainqueue.db.enum import StatusBits
from chainqueue.db.models.tx import TxCache
from chainqueue.db.models.otx import Otx
from chainqueue.sql.query import get_paused_tx_cache as get_paused_tx
from chainlib.eth.address import to_checksum_address
# local imports
from cic_eth.db.models.base import SessionBase
from cic_eth.eth.gas import create_check_gas_task
from cic_eth.queue.query import get_paused_tx
from .base import SyncFilter
#logg = logging.getLogger().getChild(__name__)
logg = logging.getLogger()

View File

@@ -6,7 +6,6 @@ import argparse
import re
# external imports
import confini
import celery
from cic_eth_registry import CICRegistry
from chainlib.chain import ChainSpec
@@ -14,6 +13,7 @@ from chainlib.connection import RPCConnection
from chainsyncer.filter import SyncFilter
# local imports
import cic_eth.cli
from cic_eth.db import dsn_from_config
from cic_eth.db import SessionBase
from cic_eth.admin.ctrl import lock_send
@@ -25,66 +25,41 @@ from cic_eth.stat import init_chain_stat
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
config_dir = os.path.join('/usr/local/etc/cic-eth')
argparser = argparse.ArgumentParser(description='daemon that monitors transactions in new blocks')
argparser.add_argument('-p', '--provider', dest='p', type=str, help='rpc provider')
argparser.add_argument('-c', type=str, default=config_dir, help='config root to use')
argparser.add_argument('-i', '--chain-spec', dest='i', type=str, help='chain spec')
arg_flags = cic_eth.cli.argflag_std_read
local_arg_flags = cic_eth.cli.argflag_local_sync | cic_eth.cli.argflag_local_task
argparser = cic_eth.cli.ArgumentParser(arg_flags)
argparser.add_argument('--batch-size', dest='batch_size', type=int, default=50, help='max amount of txs to resend per iteration')
argparser.add_argument('--retry-delay', dest='retry_delay', type=int, help='seconds to wait for retrying a transaction that is marked as sent')
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
argparser.add_argument('-q', type=str, default='cic-eth', help='celery queue to submit transaction tasks to')
argparser.add_argument('-v', help='be verbose', action='store_true')
argparser.add_argument('-vv', help='be more verbose', action='store_true')
args = argparser.parse_args(sys.argv[1:])
argparser.add_argument('--retry-delay', dest='retry_delay', type=int, default=20, help='seconds to wait for retrying a transaction that is marked as sent')
argparser.process_local_flags(local_arg_flags)
args = argparser.parse_args()
if args.v == True:
logging.getLogger().setLevel(logging.INFO)
elif args.vv == True:
logging.getLogger().setLevel(logging.DEBUG)
config_dir = os.path.join(args.c)
os.makedirs(config_dir, 0o777, True)
config = confini.Config(config_dir, args.env_prefix)
config.process()
# override args
args_override = {
'ETH_PROVIDER': getattr(args, 'p'),
'CIC_CHAIN_SPEC': getattr(args, 'i'),
'CIC_TX_RETRY_DELAY': getattr(args, 'retry_delay'),
extra_args = {
'retry_delay': 'RETRY_DELAY',
'batch_size': 'RETRY_BATCH_SIZE',
}
config.dict_override(args_override, 'cli flag')
config.censor('PASSWORD', 'DATABASE')
config.censor('PASSWORD', 'SSL')
logg.debug('config loaded from {}:\n{}'.format(config_dir, config))
config.add(args.batch_size, '_BATCH_SIZE', True)
app = celery.Celery(backend=config.get('CELERY_RESULT_URL'), broker=config.get('CELERY_BROKER_URL'))
queue = args.q
chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
RPCConnection.register_location(config.get('ETH_PROVIDER'), chain_spec, tag='default')
config = cic_eth.cli.Config.from_args(args, arg_flags, local_arg_flags, extra_args=extra_args)
# connect to database
dsn = dsn_from_config(config)
SessionBase.connect(dsn, debug=config.true('DATABASE_DEBUG'))
chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC'))
# set up rpc
rpc = cic_eth.cli.RPC.from_config(config)
conn = rpc.get_default()
def main():
conn = RPCConnection.connect(chain_spec, 'default')
straggler_delay = int(config.get('CIC_TX_RETRY_DELAY'))
straggler_delay = int(config.get('RETRY_DELAY'))
loop_interval = config.get('SYNCER_LOOP_INTERVAL')
if loop_interval == None:
stat = init_chain_stat(conn)
loop_interval = stat.block_average()
syncer = RetrySyncer(conn, chain_spec, straggler_delay, batch_size=config.get('_BATCH_SIZE'))
syncer = RetrySyncer(conn, chain_spec, cic_eth.cli.chain_interface, straggler_delay, batch_size=config.get('RETRY_BATCH_SIZE'))
syncer.backend.set(0, 0)
fltr = StragglerFilter(chain_spec, queue=queue)
fltr = StragglerFilter(chain_spec, queue=config.get('CELERY_QUEUE'))
syncer.add_filter(fltr)
syncer.loop(int(loop_interval), conn)

View File

@@ -21,14 +21,17 @@ from chainlib.eth.connection import (
EthUnixSignerConnection,
EthHTTPSignerConnection,
)
from chainlib.eth.address import to_checksum_address
from chainlib.chain import ChainSpec
from chainqueue.db.models.otx import Otx
from cic_eth_registry.error import UnknownContractError
from cic_eth_registry.erc20 import ERC20Token
from hexathon import add_0x
import liveness.linux
# local imports
import cic_eth.cli
from cic_eth.eth import (
erc20,
tx,
@@ -66,118 +69,62 @@ from cic_eth.registry import (
)
from cic_eth.task import BaseTask
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
script_dir = os.path.dirname(os.path.realpath(__file__))
env = list(os.environ.keys())
env.sort()
for k in env:
logg.debug("env {} {}".format(k, os.environ[k]))
config_dir = os.path.join('/usr/local/etc/cic-eth')
argparser = argparse.ArgumentParser()
argparser.add_argument('-p', '--provider', dest='p', type=str, help='rpc provider')
argparser.add_argument('-c', type=str, default=config_dir, help='config file')
argparser.add_argument('-q', type=str, default='cic-eth', help='queue name for worker tasks')
argparser.add_argument('-r', type=str, help='CIC registry address')
arg_flags = cic_eth.cli.argflag_std_read
local_arg_flags = cic_eth.cli.argflag_local_task
argparser = cic_eth.cli.ArgumentParser(arg_flags)
argparser.process_local_flags(local_arg_flags)
argparser.add_argument('--default-token-symbol', dest='default_token_symbol', type=str, help='Symbol of default token to use')
argparser.add_argument('--trace-queue-status', default=None, dest='trace_queue_status', action='store_true', help='set to perist all queue entry status changes to storage')
argparser.add_argument('-i', '--chain-spec', dest='i', type=str, help='chain spec')
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
argparser.add_argument('--aux-all', action='store_true', help='include tasks from all submodules from the aux module path')
argparser.add_argument('--aux', action='append', type=str, default=[], help='add single submodule from the aux module path')
argparser.add_argument('-v', action='store_true', help='be verbose')
argparser.add_argument('-vv', action='store_true', help='be more verbose')
args = argparser.parse_args()
if args.vv:
logging.getLogger().setLevel(logging.DEBUG)
elif args.v:
logging.getLogger().setLevel(logging.INFO)
config = confini.Config(args.c, args.env_prefix)
config.process()
# override args
args_override = {
'CIC_CHAIN_SPEC': getattr(args, 'i'),
'CIC_REGISTRY_ADDRESS': getattr(args, 'r'),
'CIC_DEFAULT_TOKEN_SYMBOL': getattr(args, 'default_token_symbol'),
'ETH_PROVIDER': getattr(args, 'p'),
'TASKS_TRACE_QUEUE_STATUS': getattr(args, 'trace_queue_status'),
# process config
extra_args = {
'default_token_symbol': 'CIC_DEFAULT_TOKEN_SYMBOL',
'aux_all': None,
'aux': None,
'trace_queue_status': 'TASKS_TRACE_QUEUE_STATUS',
}
config.add(args.q, '_CELERY_QUEUE', True)
config.dict_override(args_override, 'cli flag')
config.censor('PASSWORD', 'DATABASE')
config.censor('PASSWORD', 'SSL')
logg.debug('config loaded from {}:\n{}'.format(args.c, config))
config = cic_eth.cli.Config.from_args(args, arg_flags, local_arg_flags)
health_modules = config.get('CIC_HEALTH_MODULES', [])
if len(health_modules) != 0:
health_modules = health_modules.split(',')
logg.debug('health mods {}'.format(health_modules))
# connect to celery
celery_app = cic_eth.cli.CeleryApp.from_config(config)
# set up rpc
rpc = cic_eth.cli.RPC.from_config(config, use_signer=True)
conn = rpc.get_default()
# connect to database
dsn = dsn_from_config(config)
SessionBase.connect(dsn, pool_size=int(config.get('DATABASE_POOL_SIZE')), debug=config.true('DATABASE_DEBUG'))
# set up celery
current_app = celery.Celery(__name__)
broker = config.get('CELERY_BROKER_URL')
if broker[:4] == 'file':
bq = tempfile.mkdtemp()
bp = tempfile.mkdtemp()
conf_update = {
'broker_url': broker,
'broker_transport_options': {
'data_folder_in': bq,
'data_folder_out': bq,
'data_folder_processed': bp,
},
}
if config.true('CELERY_DEBUG'):
conf_update['result_extended'] = True
current_app.conf.update(conf_update)
logg.warning('celery broker dirs queue i/o {} processed {}, will NOT be deleted on shutdown'.format(bq, bp))
else:
conf_update = {
'broker_url': broker,
}
if config.true('CELERY_DEBUG'):
conf_update['result_extended'] = True
current_app.conf.update(conf_update)
result = config.get('CELERY_RESULT_URL')
if result[:4] == 'file':
rq = tempfile.mkdtemp()
current_app.conf.update({
'result_backend': 'file://{}'.format(rq),
})
logg.warning('celery backend store dir {} created, will NOT be deleted on shutdown'.format(rq))
else:
current_app.conf.update({
'result_backend': result,
})
chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
RPCConnection.register_constructor(ConnType.UNIX, EthUnixSignerConnection, 'signer')
RPCConnection.register_constructor(ConnType.HTTP, EthHTTPSignerConnection, 'signer')
RPCConnection.register_constructor(ConnType.HTTP_SSL, EthHTTPSignerConnection, 'signer')
RPCConnection.register_location(config.get('ETH_PROVIDER'), chain_spec, 'default')
RPCConnection.register_location(config.get('SIGNER_SOCKET_PATH'), chain_spec, 'signer')
Otx.tracing = config.true('TASKS_TRACE_QUEUE_STATUS')
#import cic_eth.checks.gas
#if not cic_eth.checks.gas.health(config=config):
# raise RuntimeError()
# execute health checks
# TODO: health should be separate service with endpoint that can be queried
health_modules = config.get('CIC_HEALTH_MODULES', [])
if len(health_modules) != 0:
health_modules = health_modules.split(',')
logg.debug('health mods {}'.format(health_modules))
liveness.linux.load(health_modules, rundir=config.get('CIC_RUN_DIR'), config=config, unit='cic-eth-tasker')
rpc = RPCConnection.connect(chain_spec, 'default')
# set up chain provisions
chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC'))
registry = None
try:
registry = connect_registry(rpc, chain_spec, config.get('CIC_REGISTRY_ADDRESS'))
registry = connect_registry(conn, chain_spec, config.get('CIC_REGISTRY_ADDRESS'))
except UnknownContractError as e:
logg.exception('Registry contract connection failed for {}: {}'.format(config.get('CIC_REGISTRY_ADDRESS'), e))
sys.exit(1)
@@ -188,15 +135,15 @@ if trusted_addresses_src == None:
logg.critical('At least one trusted address must be declared in CIC_TRUST_ADDRESS')
sys.exit(1)
trusted_addresses = trusted_addresses_src.split(',')
for address in trusted_addresses:
for i, address in enumerate(trusted_addresses):
if config.get('_UNSAFE'):
trusted_addresses[i] = to_checksum_address(address)
logg.info('using trusted address {}'.format(address))
connect_declarator(conn, chain_spec, trusted_addresses)
connect_token_registry(conn, chain_spec)
connect_declarator(rpc, chain_spec, trusted_addresses)
connect_token_registry(rpc, chain_spec)
# detect aux
# detect auxiliary task modules (plugins)
# TODO: move to separate file
#aux_dir = os.path.join(script_dir, '..', '..', 'aux')
aux = []
if args.aux_all:
if len(args.aux) > 0:
@@ -249,36 +196,24 @@ elif len(args.aux) > 0:
for v in aux:
mname = 'cic_eth_aux.' + v
mod = importlib.import_module(mname)
mod.aux_setup(rpc, config)
mod.aux_setup(conn, config)
logg.info('loaded aux module {}'.format(mname))
def main():
argv = ['worker']
if args.vv:
argv.append('--loglevel=DEBUG')
elif args.v:
argv.append('--loglevel=INFO')
log_level = logg.getEffectiveLevel()
log_level_name = logging.getLevelName(log_level)
argv.append('--loglevel=' + log_level_name)
argv.append('-Q')
argv.append(args.q)
argv.append(config.get('CELERY_QUEUE'))
argv.append('-n')
argv.append(args.q)
# if config.true('SSL_ENABLE_CLIENT'):
# Callback.ssl = True
# Callback.ssl_cert_file = config.get('SSL_CERT_FILE')
# Callback.ssl_key_file = config.get('SSL_KEY_FILE')
# Callback.ssl_password = config.get('SSL_PASSWORD')
#
# if config.get('SSL_CA_FILE') != '':
# Callback.ssl_ca_file = config.get('SSL_CA_FILE')
rpc = RPCConnection.connect(chain_spec, 'default')
argv.append(config.get('CELERY_QUEUE'))
BaseTask.default_token_symbol = config.get('CIC_DEFAULT_TOKEN_SYMBOL')
BaseTask.default_token_address = registry.by_name(BaseTask.default_token_symbol)
default_token = ERC20Token(chain_spec, rpc, BaseTask.default_token_address)
default_token.load(rpc)
default_token = ERC20Token(chain_spec, conn, BaseTask.default_token_address)
default_token.load(conn)
BaseTask.default_token_decimals = default_token.decimals
BaseTask.default_token_name = default_token.name
@@ -286,13 +221,13 @@ def main():
logg.info('default token set to {} {}'.format(BaseTask.default_token_symbol, BaseTask.default_token_address))
liveness.linux.set(rundir=config.get('CIC_RUN_DIR'))
current_app.worker_main(argv)
celery_app.worker_main(argv)
liveness.linux.reset(rundir=config.get('CIC_RUN_DIR'))
@celery.signals.eventlet_pool_postshutdown.connect
def shutdown(sender=None, headers=None, body=None, **kwargs):
logg.warning('in shudown event hook')
logg.warning('in shutdown event hook')
if __name__ == '__main__':

View File

@@ -8,14 +8,6 @@ import sys
import re
# external imports
import confini
import celery
import rlp
import cic_base.config
import cic_base.log
import cic_base.argparse
import cic_base.rpc
from cic_base.eth.syncer import chain_interface
from cic_eth_registry.error import UnknownContractError
from chainlib.chain import ChainSpec
from chainlib.eth.constant import ZERO_ADDRESS
@@ -30,8 +22,13 @@ from chainsyncer.backend.sql import SQLBackend
from chainsyncer.driver.head import HeadSyncer
from chainsyncer.driver.history import HistorySyncer
from chainsyncer.db.models.base import SessionBase
from chainlib.eth.address import (
is_checksum_address,
to_checksum_address,
)
# local imports
import cic_eth.cli
from cic_eth.db import dsn_from_config
from cic_eth.runnable.daemons.filters import (
CallbackFilter,
@@ -47,61 +44,50 @@ from cic_eth.registry import (
connect_token_registry,
)
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
script_dir = os.path.realpath(os.path.dirname(__file__))
arg_flags = cic_eth.cli.argflag_std_read
local_arg_flags = cic_eth.cli.argflag_local_sync
argparser = cic_eth.cli.ArgumentParser(arg_flags)
argparser.process_local_flags(local_arg_flags)
args = argparser.parse_args()
def add_block_args(argparser):
argparser.add_argument('--history-start', type=int, default=0, dest='history_start', help='Start block height for initial history sync')
argparser.add_argument('--no-history', action='store_true', dest='no_history', help='Skip initial history sync')
return argparser
# process config
config = cic_eth.cli.Config.from_args(args, arg_flags, local_arg_flags)
# connect to celery
cic_eth.cli.CeleryApp.from_config(config)
logg = cic_base.log.create()
argparser = cic_base.argparse.create(script_dir, cic_base.argparse.full_template)
argparser = cic_base.argparse.add(argparser, add_block_args, 'block')
args = cic_base.argparse.parse(argparser, logg)
config = cic_base.config.create(args.c, args, args.env_prefix)
config.add(args.y, '_KEYSTORE_FILE', True)
config.add(args.q, '_CELERY_QUEUE', True)
config.add(args.history_start, 'SYNCER_HISTORY_START', True)
config.add(args.no_history, '_NO_HISTORY', True)
cic_base.config.log(config)
# set up database
dsn = dsn_from_config(config)
SessionBase.connect(dsn, pool_size=16, debug=config.true('DATABASE_DEBUG'))
chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
# set up rpc
rpc = cic_eth.cli.RPC.from_config(config)
conn = rpc.get_default()
cic_base.rpc.setup(chain_spec, config.get('ETH_PROVIDER'))
rpc = RPCConnection.connect(chain_spec, 'default')
# set up chain provisions
chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC'))
registry = None
try:
registry = connect_registry(rpc, chain_spec, config.get('CIC_REGISTRY_ADDRESS'))
registry = connect_registry(conn, chain_spec, config.get('CIC_REGISTRY_ADDRESS'))
except UnknownContractError as e:
logg.exception('Registry contract connection failed for {}: {}'.format(config.get('CIC_REGISTRY_ADDRESS'), e))
sys.exit(1)
logg.info('connected contract registry {}'.format(config.get('CIC_REGISTRY_ADDRESS')))
def main():
# connect to celery
celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL'))
# Connect to blockchain with chainlib
o = block_latest()
r = rpc.do(o)
r = conn.do(o)
block_current = int(r, 16)
block_offset = block_current + 1
loop_interval = config.get('SYNCER_LOOP_INTERVAL')
if loop_interval == None:
stat = init_chain_stat(rpc, block_start=block_current)
stat = init_chain_stat(conn, block_start=block_current)
loop_interval = stat.block_average()
logg.debug('current block height {}'.format(block_offset))
@@ -113,9 +99,9 @@ def main():
syncer_backends = SQLBackend.resume(chain_spec, block_offset)
if len(syncer_backends) == 0:
initial_block_start = config.get('SYNCER_HISTORY_START')
initial_block_start = config.get('SYNCER_OFFSET')
initial_block_offset = block_offset
if config.get('_NO_HISTORY'):
if config.true('SYNCER_NO_HISTORY'):
initial_block_start = block_offset
initial_block_offset += 1
syncer_backends.append(SQLBackend.initial(chain_spec, initial_block_offset, start_block_height=initial_block_start))
@@ -128,40 +114,45 @@ def main():
for syncer_backend in syncer_backends:
try:
syncers.append(HistorySyncer(syncer_backend, chain_interface))
syncers.append(HistorySyncer(syncer_backend, cic_eth.cli.chain_interface))
logg.info('Initializing HISTORY syncer on backend {}'.format(syncer_backend))
except AttributeError:
logg.info('Initializing HEAD syncer on backend {}'.format(syncer_backend))
syncers.append(HeadSyncer(syncer_backend, chain_interface))
syncers.append(HeadSyncer(syncer_backend, cic_eth.cli.chain_interface))
connect_registry(rpc, chain_spec, config.get('CIC_REGISTRY_ADDRESS'))
connect_registry(conn, chain_spec, config.get('CIC_REGISTRY_ADDRESS'))
trusted_addresses_src = config.get('CIC_TRUST_ADDRESS')
if trusted_addresses_src == None:
logg.critical('At least one trusted address must be declared in CIC_TRUST_ADDRESS')
sys.exit(1)
trusted_addresses = trusted_addresses_src.split(',')
for address in trusted_addresses:
for i, address in enumerate(trusted_addresses):
if not config.get('_UNSAFE'):
if not is_checksum_address(address):
raise ValueError('address {} is not a valid checksum address'.format(address))
else:
trusted_addresses[i] = to_checksum_address(address)
logg.info('using trusted address {}'.format(address))
connect_declarator(rpc, chain_spec, trusted_addresses)
connect_token_registry(rpc, chain_spec)
connect_declarator(conn, chain_spec, trusted_addresses)
connect_token_registry(conn, chain_spec)
CallbackFilter.trusted_addresses = trusted_addresses
callback_filters = []
for cb in config.get('TASKS_TRANSFER_CALLBACKS', '').split(','):
task_split = cb.split(':')
task_queue = config.get('_CELERY_QUEUE')
task_queue = config.get('CELERY_QUEUE')
if len(task_split) > 1:
task_queue = task_split[0]
callback_filter = CallbackFilter(chain_spec, task_split[1], task_queue)
callback_filters.append(callback_filter)
tx_filter = TxFilter(chain_spec, config.get('_CELERY_QUEUE'))
tx_filter = TxFilter(chain_spec, config.get('CELERY_QUEUE'))
account_registry_address = registry.by_name('AccountRegistry')
registration_filter = RegistrationFilter(chain_spec, account_registry_address, queue=config.get('_CELERY_QUEUE'))
registration_filter = RegistrationFilter(chain_spec, account_registry_address, queue=config.get('CELERY_QUEUE'))
gas_filter = GasFilter(chain_spec, config.get('_CELERY_QUEUE'))
gas_filter = GasFilter(chain_spec, config.get('CELERY_QUEUE'))
#transfer_auth_filter = TransferAuthFilter(registry, chain_spec, config.get('_CELERY_QUEUE'))
@@ -176,7 +167,7 @@ def main():
for cf in callback_filters:
syncer.add_filter(cf)
r = syncer.loop(int(loop_interval), rpc)
r = syncer.loop(int(loop_interval), conn)
sys.stderr.write("sync {} done at block {}\n".format(syncer, r))
i += 1

View File

@@ -12,50 +12,27 @@ import confini
import celery
# local imports
import cic_eth.cli
from cic_eth.api import Api
from cic_eth.api.admin import AdminApi
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
default_format = 'terminal'
default_config_dir = os.environ.get('CONFINI_DIR', '/usr/local/etc/cic')
argparser = argparse.ArgumentParser()
argparser.add_argument('-i', '--chain-spec', dest='i', type=str, help='chain spec')
argparser.add_argument('-c', type=str, default=default_config_dir, help='config root to use')
argparser.add_argument('-q', type=str, default='cic-eth', help='celery queue to submit transaction tasks to')
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
argparser.add_argument('-v', action='store_true', help='Be verbose')
argparser.add_argument('-vv', help='be more verbose', action='store_true')
arg_flags = cic_eth.cli.argflag_std_base
local_arg_flags = cic_eth.cli.argflag_local_taskcallback
argparser = cic_eth.cli.ArgumentParser(arg_flags)
argparser.process_local_flags(local_arg_flags)
args = argparser.parse_args()
if args.v == True:
logging.getLogger().setLevel(logging.INFO)
elif args.vv == True:
logging.getLogger().setLevel(logging.DEBUG)
config = cic_eth.cli.Config.from_args(args, arg_flags, local_arg_flags)
config_dir = os.path.join(args.c)
os.makedirs(config_dir, 0o777, True)
config = confini.Config(config_dir, args.env_prefix)
config.process()
args_override = {
'CIC_CHAIN_SPEC': getattr(args, 'i'),
}
config.dict_override(args_override, 'cli args')
config.censor('PASSWORD', 'DATABASE')
config.censor('PASSWORD', 'SSL')
logg.debug('config loaded from {}:\n{}'.format(config_dir, config))
celery_app = cic_eth.cli.CeleryApp.from_config(config)
celery_app = celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL'))
queue = args.q
api = Api(config.get('CIC_CHAIN_SPEC'), queue=queue)
api = Api(config.get('CHAIN_SPEC'), queue=config.get('CELERY_QUEUE'))
admin_api = AdminApi(None)
def main():
t = admin_api.registry()
registry_address = t.get()

Some files were not shown because too many files have changed in this diff Show More