Compare commits

...

28 Commits

Author SHA1 Message Date
3dcfbe59fe Merge branch 'philip/translations-v1' into 'master'
Adds updated translations.

See merge request grassrootseconomics/cic-internal-integration!328
2022-01-10 07:31:04 +00:00
0dd21f3970 Adds updated translations. 2022-01-10 07:31:03 +00:00
6ca8632cde Merge branch 'sohail/help-menu-fix' into 'master'
fix: ussd help menu

See merge request grassrootseconomics/cic-internal-integration!327
2022-01-10 06:56:52 +00:00
b7dc290992 fix: ussd help menu 2022-01-10 06:56:52 +00:00
e69801ea08 Merge branch 'philip/ussd-poler' into 'master'
Philip/ussd poler

See merge request grassrootseconomics/cic-internal-integration!326
2022-01-10 04:44:51 +00:00
50a596e707 Philip/ussd poler 2022-01-10 04:44:50 +00:00
cb61e45e4c Merge branch 'philip/ussd-data-files' into 'master'
Move cic-ussd datafiles to apps/cic-ussd/cic_ussd/data/

See merge request grassrootseconomics/cic-internal-integration!325
2022-01-07 10:50:49 +00:00
e5b06b18d7 Move cic-ussd datafiles to apps/cic-ussd/cic_ussd/data/ 2022-01-07 10:50:49 +00:00
William Luke
5d1a30021a Merge branch 'fix/docs-link' into 'master'
fix: link to docs

See merge request grassrootseconomics/cic-internal-integration!324
2022-01-05 12:22:15 +00:00
William Luke
ade3f4e917 fix: link to docs 2022-01-05 15:19:11 +03:00
c3e924ae8f Merge branch 'sohail/alt-build' into 'master'
feat (devops): add local image build for releases

See merge request grassrootseconomics/cic-internal-integration!322
2022-01-05 08:11:38 +00:00
ff4c42dc24 feat (devops): add local image build for releases 2022-01-05 08:11:38 +00:00
b8cd7eec56 Merge branch 'philip/bump-test-coverage' into 'master'
Rehabilitate test coverage in ussd and cic-notify

See merge request grassrootseconomics/cic-internal-integration!323
2022-01-04 16:51:02 +00:00
837e8da650 Rehabilitate test coverage in ussd and cic-notify 2022-01-04 16:51:02 +00:00
fe3f2c2549 Merge branch 'philip/cleanup-hardening' into 'master'
USSD Hardening and Cleanups

See merge request grassrootseconomics/cic-internal-integration!320
2022-01-04 16:16:01 +00:00
46f25e5678 USSD Hardening and Cleanups 2022-01-04 16:16:00 +00:00
03c7c1ddbc Merge branch 'lash/improve-cache' into 'master'
refactor: Improve cic-cache

See merge request grassrootseconomics/cic-internal-integration!303
2022-01-04 16:01:03 +00:00
Louis Holbrook
104ff8a76a refactor: Improve cic-cache 2022-01-04 16:01:01 +00:00
b5653a704c Merge branch 'lash/rules-of-engagement' into 'master'
Add updated version of old rules of engagement for core team

See merge request grassrootseconomics/cic-internal-integration!321
2022-01-03 06:36:52 +00:00
Louis Holbrook
3b6ea6db77 Add updated version of old rules of engagement for core team 2022-01-03 06:36:52 +00:00
7d3ff89fe5 Merge branch 'lash/token-checksum-address-fix' into 'master'
bug: Normalize token addresses in db for erc20 operations

See merge request grassrootseconomics/cic-internal-integration!304
2021-12-22 18:25:29 +00:00
Louis Holbrook
dceae4e5d8 bug: Normalize token addresses in db for erc20 operations 2021-12-22 18:25:29 +00:00
46a4ccfd9c Merge branch 'lash/bloxberg-seeding' into 'master'
migrations: Enable deployment and data seeding to Bloxberg

See merge request grassrootseconomics/cic-internal-integration!301
2021-12-22 18:24:05 +00:00
Louis Holbrook
d7c4cb71eb migrations: Enable deployment and data seeding to Bloxberg 2021-12-22 18:24:05 +00:00
b15cfee1c9 Merge branch 'sohail/pip-url-fix' into 'master'
fix: update default pip to new url

Closes #171

See merge request grassrootseconomics/cic-internal-integration!316
2021-12-10 14:55:50 +00:00
efb1967f46 fix: update default pip to new url 2021-12-10 14:55:50 +00:00
019824d1f4 Merge branch 'bvander/docs-updates' into 'master'
documentation: updated the docs with new links and getting started

See merge request grassrootseconomics/cic-internal-integration!314
2021-12-09 09:20:03 +00:00
d4c7fd3d7e documentation: updated the docs with new links and getting started 2021-12-09 09:20:03 +00:00
235 changed files with 6107 additions and 2278 deletions

117
CONTRIBUTING_CORE.md Normal file
View File

@ -0,0 +1,117 @@
# CORE TEAM CONTRIBUTION GUIDE
# 1. Transparency
1.1 Use work logs for reflection of work done, aswell as telling your peers about changes that may affect their own tasks
1.2 A work log SHOULD be submitted after a "unit of work" is complete.
1.2.1 A "unit of work" should not span more than one full day's worth of work.
1.2.2 A "unit of work" should be small enough that the log entries give useful insight.
1.3 Individual logs are reviewed in weekly meetings
<!--1.4 Bullet point list of topics and one or more sub-points describing each item in short sentences, eg;
```
- Core
* fixed foo
* fixed bar
- Frontend
* connected bar to baz
```-->
1.4 Work log format is defined in []()
1.5 Link to issue/MR in bullet point where appropriate
1.6
# 2. Code hygiene
2.1 Keep function names and variable names short
2.2 Keep code files, functions and test fixtures short
2.3 The less magic the better. Recombinable and replaceable is king
2.4 Group imports by `standard`, `external`, `local`, `test` - in that order
2.5 Only auto-import when necessary, and always with a minimum of side-effects
2.6 Use custom errors. Let them bubble up
2.7 No logs in tight loops
2.8 Keep executable main routine minimal. Pass variables (do not use globals) in main business logic function
2.9 Test coverage MUST be kept higher than 90% after changes
2.10 Docstrings. Always. Always!
# 3. Versioning
3.1 Use [Semantic Versioning](https://semver.org/)
3.2 When merging code, explicit dependencies SHOULD NOT use pre-release version
# 4. Issues
4.1 Issue title should use [Convention Commit structure](https://www.conventionalcommits.org/en/v1.0.0-beta.2/)
4.2 Issues need proper problem statement
4.2.1. What is the current state
4.2.2. If current state is not behaving as expected, what was the expected state
4.2.3. What is the desired new state.
4.3 Issues need proper resolution statement
4.3.1. Bullet point list of short sentences describing practical steps to reach desired state
4.3.2. Builet point list of external resources informing the issue and resolution
4.4 Tasks needs to be appropriately labelled using GROUP labels.
# 5. Code submission
5.1 A branch and new MR is always created BEFORE THE WORK STARTS
5.2 An MR should solve ONE SINGLE PART of a problem
5.3 Every MR should have at least ONE ISSUE associated with it. Ideally issue can be closed when MR is merged
5.4 MRs should not be open for more than one week (during normal operation periods)
5.5 MR should ideally not be longer than 400 lines of changes of logic
5.6 MRs that MOVE or DELETE code should not CHANGE that same code in a single MR. Scope MOVEs and DELETEs in separate commits (or even better, separate MRs) for transparency
# 6. Code reviews
6.1 At least one peer review before merge
6.2 If MR is too long, evaluate whether this affects the quality of the review negatively. If it does, expect to be asked to split it up
6.3 Evaluate changes against associated issues' problem statement and proposed resolution steps. If there is a mismatch, either MR needs to change or issue needs to be amended accordingly
6.4 Make sure all technical debt introduced by MR is documented in issues. Add them according to criteria in section ISSUES if not
6.5 If CI is not working, reviewer MUST make sure code builds and runs
6.6 Behave!
6.6.1 Don't be a jerk
6.6.2 Don't block needlessly
6.6.3 Say please

View File

@ -1,41 +1,19 @@
# cic-internal-integration # Community Inclusion Currency Stack (CIC Stack)
A custodial evm wallet for executing transactions via USSD
## Getting started ## Getting started
This repo uses docker-compose and docker buildkit. Set the following environment variables to get started: This repo uses docker-compose and docker buildkit. Set the following environment variables to get started:
``` ```
export COMPOSE_DOCKER_CLI_BUILD=1 export COMPOSE_DOCKER_CLI_BUILD=1
export DOCKER_BUILDKIT=1 export DOCKER_BUILDKIT=1
``` ```
start services, database, redis and local ethereum node To get started see [./apps/contract-migration/README.md](./apps/contract-migration/README.md)
```
docker-compose up -d
```
Run app/contract-migration to deploy contracts ## Documentation
```
RUN_MASK=3 docker-compose up contract-migration
```
stop cluster [https://docs.grassecon.org/software/](https://docs.grassecon.org/software/)
```
docker-compose down
```
stop cluster and delete data
```
docker-compose down -v --remove-orphans
```
rebuild an images
```
docker-compose up --build <service_name>
```
to delete the buildkit cache
```
docker builder prune --filter type=exec.cachemount
```

View File

@ -1 +1 @@
include *requirements.txt cic_cache/data/config/* include *requirements.txt cic_cache/data/config/* cic_cache/db/migrations/default/* cic_cache/db/migrations/default/versions/*

View File

@ -14,7 +14,7 @@ class ArgumentParser(BaseArgumentParser):
if local_arg_flags & CICFlag.CELERY: if local_arg_flags & CICFlag.CELERY:
self.add_argument('-q', '--celery-queue', dest='celery_queue', type=str, default='cic-cache', help='Task queue') self.add_argument('-q', '--celery-queue', dest='celery_queue', type=str, default='cic-cache', help='Task queue')
if local_arg_flags & CICFlag.SYNCER: if local_arg_flags & CICFlag.SYNCER:
self.add_argument('--offset', type=int, default=0, help='Start block height for initial history sync') self.add_argument('--offset', type=int, help='Start block height for initial history sync')
self.add_argument('--no-history', action='store_true', dest='no_history', help='Skip initial history sync') self.add_argument('--no-history', action='store_true', dest='no_history', help='Skip initial history sync')
if local_arg_flags & CICFlag.CHAIN: if local_arg_flags & CICFlag.CHAIN:
self.add_argument('-r', '--registry-address', type=str, dest='registry_address', help='CIC registry contract address') self.add_argument('-r', '--registry-address', type=str, dest='registry_address', help='CIC registry contract address')

View File

@ -1,4 +1,4 @@
[cic] [cic]
registry_address = registry_address =
trust_address = trust_address =
health_modules = cic_eth.check.db,cic_eth.check.redis,cic_eth.check.signer,cic_eth.check.gas health_modules =

View File

@ -3,7 +3,8 @@ engine =
driver = driver =
host = host =
port = port =
name = cic-cache #name = cic-cache
prefix =
user = user =
password = password =
debug = 0 debug = 0

View File

@ -9,21 +9,26 @@ from .list import (
tag_transaction, tag_transaction,
add_tag, add_tag,
) )
from cic_cache.db.models.base import SessionBase
logg = logging.getLogger() logg = logging.getLogger()
def dsn_from_config(config): def dsn_from_config(config, name):
scheme = config.get('DATABASE_ENGINE') scheme = config.get('DATABASE_ENGINE')
if config.get('DATABASE_DRIVER') != None: if config.get('DATABASE_DRIVER') != None:
scheme += '+{}'.format(config.get('DATABASE_DRIVER')) scheme += '+{}'.format(config.get('DATABASE_DRIVER'))
database_name = name
if config.get('DATABASE_PREFIX'):
database_name = '{}_{}'.format(config.get('DATABASE_PREFIX'), database_name)
dsn = '' dsn = ''
if config.get('DATABASE_ENGINE') == 'sqlite': if config.get('DATABASE_ENGINE') == 'sqlite':
SessionBase.poolable = False
dsn = '{}:///{}'.format( dsn = '{}:///{}'.format(
scheme, scheme,
config.get('DATABASE_NAME'), database_name,
) )
else: else:
@ -33,7 +38,7 @@ def dsn_from_config(config):
config.get('DATABASE_PASSWORD'), config.get('DATABASE_PASSWORD'),
config.get('DATABASE_HOST'), config.get('DATABASE_HOST'),
config.get('DATABASE_PORT'), config.get('DATABASE_PORT'),
config.get('DATABASE_NAME'), database_name,
) )
logg.debug('parsed dsn from config: {}'.format(dsn)) logg.debug('parsed dsn from config: {}'.format(dsn))
return dsn return dsn

View File

@ -5,7 +5,11 @@ import re
import base64 import base64
# external imports # external imports
from hexathon import add_0x from hexathon import (
add_0x,
strip_0x,
)
from chainlib.encode import TxHexNormalizer
# local imports # local imports
from cic_cache.cache import ( from cic_cache.cache import (
@ -16,27 +20,72 @@ from cic_cache.cache import (
logg = logging.getLogger(__name__) logg = logging.getLogger(__name__)
#logg = logging.getLogger() #logg = logging.getLogger()
re_transactions_all_bloom = r'/tx/(\d+)?/?(\d+)/?' re_transactions_all_bloom = r'/tx/?(\d+)?/?(\d+)?/?(\d+)?/?(\d+)?/?'
re_transactions_account_bloom = r'/tx/user/((0x)?[a-fA-F0-9]+)(/(\d+)(/(\d+))?)?/?' re_transactions_account_bloom = r'/tx/user/((0x)?[a-fA-F0-9]+)(/(\d+)(/(\d+))?)?/?'
re_transactions_all_data = r'/txa/(\d+)?/?(\d+)/?' re_transactions_all_data = r'/txa/?(\d+)?/?(\d+)?/?(\d+)?/?(\d+)?/?'
re_transactions_account_data = r'/txa/user/((0x)?[a-fA-F0-9]+)(/(\d+)(/(\d+))?)?/?'
re_default_limit = r'/defaultlimit/?'
DEFAULT_LIMIT = 100 DEFAULT_LIMIT = 100
tx_normalize = TxHexNormalizer()
def parse_query_account(r):
address = strip_0x(r[1])
#address = tx_normalize.wallet_address(address)
limit = DEFAULT_LIMIT
g = r.groups()
if len(g) > 3:
limit = int(r[4])
if limit == 0:
limit = DEFAULT_LIMIT
offset = 0
if len(g) > 4:
offset = int(r[6])
logg.debug('account query is address {} offset {} limit {}'.format(address, offset, limit))
return (address, offset, limit,)
# r is an re.Match
def parse_query_any(r):
limit = DEFAULT_LIMIT
offset = 0
block_offset = None
block_end = None
if r.lastindex != None:
if r.lastindex > 0:
limit = int(r[1])
if r.lastindex > 1:
offset = int(r[2])
if r.lastindex > 2:
block_offset = int(r[3])
if r.lastindex > 3:
block_end = int(r[4])
if block_end < block_offset:
raise ValueError('cart before the horse, dude')
logg.debug('data query is offset {} limit {} block_offset {} block_end {}'.format(offset, limit, block_offset, block_end))
return (offset, limit, block_offset, block_end,)
def process_default_limit(session, env):
r = re.match(re_default_limit, env.get('PATH_INFO'))
if not r:
return None
return ('application/json', str(DEFAULT_LIMIT).encode('utf-8'),)
def process_transactions_account_bloom(session, env): def process_transactions_account_bloom(session, env):
r = re.match(re_transactions_account_bloom, env.get('PATH_INFO')) r = re.match(re_transactions_account_bloom, env.get('PATH_INFO'))
if not r: if not r:
return None return None
logg.debug('match account bloom')
address = r[1] (address, offset, limit,) = parse_query_account(r)
if r[2] == None:
address = add_0x(address)
offset = 0
if r.lastindex > 2:
offset = r[4]
limit = DEFAULT_LIMIT
if r.lastindex > 4:
limit = r[6]
c = BloomCache(session) c = BloomCache(session)
(lowest_block, highest_block, bloom_filter_block, bloom_filter_tx) = c.load_transactions_account(address, offset, limit) (lowest_block, highest_block, bloom_filter_block, bloom_filter_tx) = c.load_transactions_account(address, offset, limit)
@ -59,13 +108,9 @@ def process_transactions_all_bloom(session, env):
r = re.match(re_transactions_all_bloom, env.get('PATH_INFO')) r = re.match(re_transactions_all_bloom, env.get('PATH_INFO'))
if not r: if not r:
return None return None
logg.debug('match all bloom')
offset = DEFAULT_LIMIT (limit, offset, block_offset, block_end,) = parse_query_any(r)
if r.lastindex > 0:
offset = r[1]
limit = 0
if r.lastindex > 1:
limit = r[2]
c = BloomCache(session) c = BloomCache(session)
(lowest_block, highest_block, bloom_filter_block, bloom_filter_tx) = c.load_transactions(offset, limit) (lowest_block, highest_block, bloom_filter_block, bloom_filter_tx) = c.load_transactions(offset, limit)
@ -88,17 +133,16 @@ def process_transactions_all_data(session, env):
r = re.match(re_transactions_all_data, env.get('PATH_INFO')) r = re.match(re_transactions_all_data, env.get('PATH_INFO'))
if not r: if not r:
return None return None
if env.get('HTTP_X_CIC_CACHE_MODE') != 'all': #if env.get('HTTP_X_CIC_CACHE_MODE') != 'all':
return None # return None
logg.debug('match all data')
logg.debug('got data request {}'.format(env)) logg.debug('got data request {}'.format(env))
block_offset = r[1]
block_end = r[2] (offset, limit, block_offset, block_end) = parse_query_any(r)
if int(r[2]) < int(r[1]):
raise ValueError('cart before the horse, dude')
c = DataCache(session) c = DataCache(session)
(lowest_block, highest_block, tx_cache) = c.load_transactions_with_data(0, 0, block_offset, block_end, oldest=True) # oldest needs to be settable (lowest_block, highest_block, tx_cache) = c.load_transactions_with_data(offset, limit, block_offset, block_end, oldest=True) # oldest needs to be settable
for r in tx_cache: for r in tx_cache:
r['date_block'] = r['date_block'].timestamp() r['date_block'] = r['date_block'].timestamp()
@ -113,3 +157,30 @@ def process_transactions_all_data(session, env):
j = json.dumps(o) j = json.dumps(o)
return ('application/json', j.encode('utf-8'),) return ('application/json', j.encode('utf-8'),)
def process_transactions_account_data(session, env):
r = re.match(re_transactions_account_data, env.get('PATH_INFO'))
if not r:
return None
logg.debug('match account data')
#if env.get('HTTP_X_CIC_CACHE_MODE') != 'all':
# return None
(address, offset, limit,) = parse_query_account(r)
c = DataCache(session)
(lowest_block, highest_block, tx_cache) = c.load_transactions_account_with_data(address, offset, limit)
for r in tx_cache:
r['date_block'] = r['date_block'].timestamp()
o = {
'low': lowest_block,
'high': highest_block,
'data': tx_cache,
}
j = json.dumps(o)
return ('application/json', j.encode('utf-8'),)

View File

@ -12,21 +12,20 @@ import cic_cache.cli
from cic_cache.db import dsn_from_config from cic_cache.db import dsn_from_config
from cic_cache.db.models.base import SessionBase from cic_cache.db.models.base import SessionBase
from cic_cache.runnable.daemons.query import ( from cic_cache.runnable.daemons.query import (
process_default_limit,
process_transactions_account_bloom, process_transactions_account_bloom,
process_transactions_account_data,
process_transactions_all_bloom, process_transactions_all_bloom,
process_transactions_all_data, process_transactions_all_data,
) )
import cic_cache.cli
logging.basicConfig(level=logging.WARNING) logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger() logg = logging.getLogger()
rootdir = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
dbdir = os.path.join(rootdir, 'cic_cache', 'db')
migrationsdir = os.path.join(dbdir, 'migrations')
# process args arg_flags = cic_cache.cli.argflag_std_read
arg_flags = cic_cache.cli.argflag_std_base local_arg_flags = cic_cache.cli.argflag_local_sync | cic_cache.cli.argflag_local_task
local_arg_flags = cic_cache.cli.argflag_local_task
argparser = cic_cache.cli.ArgumentParser(arg_flags) argparser = cic_cache.cli.ArgumentParser(arg_flags)
argparser.process_local_flags(local_arg_flags) argparser.process_local_flags(local_arg_flags)
args = argparser.parse_args() args = argparser.parse_args()
@ -35,7 +34,7 @@ args = argparser.parse_args()
config = cic_cache.cli.Config.from_args(args, arg_flags, local_arg_flags) config = cic_cache.cli.Config.from_args(args, arg_flags, local_arg_flags)
# connect to database # connect to database
dsn = dsn_from_config(config) dsn = dsn_from_config(config, 'cic_cache')
SessionBase.connect(dsn, config.true('DATABASE_DEBUG')) SessionBase.connect(dsn, config.true('DATABASE_DEBUG'))
@ -47,9 +46,11 @@ def application(env, start_response):
session = SessionBase.create_session() session = SessionBase.create_session()
for handler in [ for handler in [
process_transactions_account_data,
process_transactions_account_bloom,
process_transactions_all_data, process_transactions_all_data,
process_transactions_all_bloom, process_transactions_all_bloom,
process_transactions_account_bloom, process_default_limit,
]: ]:
r = None r = None
try: try:

View File

@ -3,6 +3,7 @@ import logging
import os import os
import sys import sys
import argparse import argparse
import tempfile
# third-party imports # third-party imports
import celery import celery
@ -28,7 +29,7 @@ args = argparser.parse_args()
config = cic_cache.cli.Config.from_args(args, arg_flags, local_arg_flags) config = cic_cache.cli.Config.from_args(args, arg_flags, local_arg_flags)
# connect to database # connect to database
dsn = dsn_from_config(config) dsn = dsn_from_config(config, 'cic_cache')
SessionBase.connect(dsn) SessionBase.connect(dsn)
# set up celery # set up celery

View File

@ -50,7 +50,7 @@ args = argparser.parse_args()
config = cic_cache.cli.Config.from_args(args, arg_flags, local_arg_flags) config = cic_cache.cli.Config.from_args(args, arg_flags, local_arg_flags)
# connect to database # connect to database
dsn = dsn_from_config(config) dsn = dsn_from_config(config, 'cic_cache')
SessionBase.connect(dsn, debug=config.true('DATABASE_DEBUG')) SessionBase.connect(dsn, debug=config.true('DATABASE_DEBUG'))
# set up rpc # set up rpc
@ -95,10 +95,10 @@ def main():
syncer_backends = SQLBackend.resume(chain_spec, block_offset) syncer_backends = SQLBackend.resume(chain_spec, block_offset)
if len(syncer_backends) == 0: if len(syncer_backends) == 0:
initial_block_start = config.get('SYNCER_OFFSET') initial_block_start = int(config.get('SYNCER_OFFSET'))
initial_block_offset = block_offset initial_block_offset = int(block_offset)
if config.get('SYNCER_NO_HISTORY'): if config.get('SYNCER_NO_HISTORY'):
initial_block_start = block_offset initial_block_start = initial_block_offset
initial_block_offset += 1 initial_block_offset += 1
syncer_backends.append(SQLBackend.initial(chain_spec, initial_block_offset, start_block_height=initial_block_start)) syncer_backends.append(SQLBackend.initial(chain_spec, initial_block_offset, start_block_height=initial_block_start))
logg.info('found no backends to resume, adding initial sync from history start {} end {}'.format(initial_block_start, initial_block_offset)) logg.info('found no backends to resume, adding initial sync from history start {} end {}'.format(initial_block_start, initial_block_offset))

View File

@ -5,7 +5,7 @@ version = (
0, 0,
2, 2,
1, 1,
'alpha.2', 'alpha.3',
) )
version_object = semver.VersionInfo( version_object = semver.VersionInfo(

View File

@ -1,3 +0,0 @@
[celery]
broker_url = redis:///
result_url = redis:///

View File

@ -1,3 +0,0 @@
[cic]
registry_address =
trust_address =

View File

View File

@ -1,9 +0,0 @@
[database]
NAME=cic_cache
USER=postgres
PASSWORD=
HOST=localhost
PORT=5432
ENGINE=postgresql
DRIVER=psycopg2
DEBUG=0

View File

@ -1,3 +0,0 @@
[celery]
broker_url = redis://localhost:63379
result_url = redis://localhost:63379

View File

@ -1,3 +0,0 @@
[cic]
registry_address =
trust_address = 0xEb3907eCad74a0013c259D5874AE7f22DcBcC95C

View File

@ -1,9 +0,0 @@
[database]
NAME=cic_cache
USER=grassroots
PASSWORD=
HOST=localhost
PORT=63432
ENGINE=postgresql
DRIVER=psycopg2
DEBUG=0

View File

@ -1,4 +0,0 @@
[syncer]
loop_interval = 1
offset = 0
no_history = 0

View File

@ -1,2 +0,0 @@
[bancor]
dir =

View File

@ -1,4 +1,3 @@
[cic] [cic]
registry_address = registry_address =
chain_spec =
trust_address = trust_address =

View File

@ -1,5 +1,5 @@
[database] [database]
NAME=cic-cache-test PREFIX=cic-cache-test
USER=postgres USER=postgres
PASSWORD= PASSWORD=
HOST=localhost HOST=localhost

View File

@ -1,5 +0,0 @@
[eth]
#ws_provider = ws://localhost:8546
#ttp_provider = http://localhost:8545
provider = http://localhost:8545
#chain_id =

View File

@ -1,4 +1,4 @@
openapi: "3.0.3" openapi: "3.0.2"
info: info:
title: Grassroots Economics CIC Cache title: Grassroots Economics CIC Cache
description: Cache of processed transaction data from Ethereum blockchain and worker queues description: Cache of processed transaction data from Ethereum blockchain and worker queues
@ -9,17 +9,34 @@ info:
email: will@grassecon.org email: will@grassecon.org
license: license:
name: GPLv3 name: GPLv3
version: 0.1.0 version: 0.2.0
paths: paths:
/tx/{offset}/{limit}: /defaultlimit:
description: Bloom filter for batch of latest transactions summary: The default limit value of result sets.
get:
tags:
- transactions
description:
Retrieve default limit
operationId: limit.default
responses:
200:
description: Limit query successful
content:
application/json:
schema:
$ref: "#/components/schemas/Limit"
/tx:
summary: Bloom filter for batch of latest transactions
description: Generate a bloom filter of the latest transactions in the cache. The number of maximum number of transactions returned is returned by the `/defaultlimit` API call.
get: get:
tags: tags:
- transactions - transactions
description: description:
Retrieve transactions Retrieve transactions
operationId: tx.get operationId: tx.get.latest
responses: responses:
200: 200:
description: Transaction query successful. description: Transaction query successful.
@ -29,27 +46,109 @@ paths:
$ref: "#/components/schemas/BlocksBloom" $ref: "#/components/schemas/BlocksBloom"
/tx/{limit}:
summary: Bloom filter for batch of latest transactions
description: Generate a bloom filter of the latest transactions in the cache. If `limit` is 0, the number of maximum number of transactions returned is returned by the `/defaultlimit` API call.
get:
tags:
- transactions
description:
Retrieve transactions
operationId: tx.get.latest.limit
responses:
200:
description: Transaction query successful. Results are ordered from newest to oldest.
content:
application/json:
schema:
$ref: "#/components/schemas/BlocksBloom"
parameters: parameters:
- name: offset
in: path
schema:
type: integer
format: int32
- name: limit - name: limit
in: path in: path
required: true
schema: schema:
type: integer type: integer
format: int32 format: int32
/tx/{address}/{offset}/{limit}: /tx/{limit}/{offset}:
description: Bloom filter for batch of latest transactions by account summary: Bloom filter for batch of latest transactions
description: Generate a bloom filter of the latest transactions in the cache. If `limit` is 0, the number of maximum number of transactions returned is returned by the `/defaultlimit` API call.
get: get:
tags: tags:
- transactions - transactions
description: description:
Retrieve transactions Retrieve transactions
operationId: tx.get operationId: tx.get.latest.range
responses:
200:
description: Transaction query successful. Results are ordered from newest to oldest.
content:
application/json:
schema:
$ref: "#/components/schemas/BlocksBloom"
parameters:
- name: limit
in: path
required: true
schema:
type: integer
format: int32
- name: offset
in: path
required: true
schema:
type: integer
format: int32
/tx/{limit}/{offset}/{block_offset}:
summary: Bloom filter for batch of transactions since a particular block.
description: Generate a bloom filter of the latest transactions since a particular block in the cache. The block parameter is inclusive. If `limit` is 0, the number of maximum number of transactions returned is returned by the `/defaultlimit` API call.
get:
tags:
- transactions
description:
Retrieve transactions
operationId: tx.get.latest.range.block.offset
responses:
200:
description: Transaction query successful. Results are ordered from oldest to newest.
content:
application/json:
schema:
$ref: "#/components/schemas/BlocksBloom"
parameters:
- name: limit
in: path
required: true
schema:
type: integer
format: int32
- name: offset
in: path
required: true
schema:
type: integer
format: int32
- name: block_offset
in: path
required: true
schema:
type: integer
format: int32
/tx/{limit}/{offset}/{block_offset}/{block_end}:
summary: Bloom filter for batch of transactions within a particular block range.
description: Generate a bloom filter of the latest transactions within a particular block range in the cache. The block parameters are inclusive. If `limit` is 0, the number of maximum number of transactions returned is returned by the `/defaultlimit` API call.
get:
tags:
- transactions
description:
Retrieve transactions
operationId: tx.get.latest.range.block.range
responses: responses:
200: 200:
description: Transaction query successful. description: Transaction query successful.
@ -58,6 +157,49 @@ paths:
schema: schema:
$ref: "#/components/schemas/BlocksBloom" $ref: "#/components/schemas/BlocksBloom"
parameters:
- name: limit
in: path
required: true
schema:
type: integer
format: int32
- name: offset
in: path
required: true
schema:
type: integer
format: int32
- name: block_offset
in: path
required: true
schema:
type: integer
format: int32
- name: block_end
in: path
required: true
schema:
type: integer
format: int32
/tx/{address}:
summary: Bloom filter for batch of latest transactions by account.
description: Generate a bloom filter of the latest transactions where a specific account is the spender or beneficiary.
get:
tags:
- transactions
description:
Retrieve transactions
operationId: tx.get.user
responses:
200:
description: Transaction query successful.
content:
application/json:
schema:
$ref: "#/components/schemas/BlocksBloom"
parameters: parameters:
- name: address - name: address
@ -65,26 +207,342 @@ paths:
required: true required: true
schema: schema:
type: string type: string
- name: offset
/tx/{address}/{limit}:
summary: Bloom filter for batch of latest transactions by account.
description: Generate a bloom filter of the latest transactions where a specific account is the spender or beneficiary. If `limit` is 0, the number of maximum number of transactions returned is returned by the `/defaultlimit` API call.
get:
tags:
- transactions
description:
Retrieve transactions
operationId: tx.get.user.limit
responses:
200:
description: Transaction query successful.
content:
application/json:
schema:
$ref: "#/components/schemas/BlocksBloom"
parameters:
- name: address
in: path in: path
required: true
schema: schema:
type: integer type: string
format: int32
- name: limit - name: limit
in: path in: path
required: true
schema: schema:
type: integer type: integer
format: int32 format: int32
/tx/{address}/{limit}/{offset}:
summary: Bloom filter for batch of latest transactions by account
description: Generate a bloom filter of the latest transactions where a specific account is the spender or beneficiary. If `limit` is 0, the number of maximum number of transactions returned is returned by the `/defaultlimit` API call.
get:
tags:
- transactions
description:
Retrieve transactions
operationId: tx.get.user.range
responses:
200:
description: Transaction query successful.
content:
application/json:
schema:
$ref: "#/components/schemas/BlocksBloom"
parameters:
- name: address
in: path
required: true
schema:
type: string
- name: limit
in: path
required: true
schema:
type: integer
format: int32
- name: offset
in: path
required: true
schema:
type: integer
format: int32
/txa:
summary: Cached data for latest transactions.
description: Return data entries of the latest transactions in the cache. The number of maximum number of transactions returned is returned by the `/defaultlimit` API call.
get:
tags:
- transactions
description:
Retrieve transactions
operationId: txa.get.latest
responses:
200:
description: Transaction query successful.
content:
application/json:
schema:
$ref: "#/components/schemas/TransactionList"
/txa/{limit}:
summary: Cached data for latest transactions.
description: Return data entries of the latest transactions in the cache. If `limit` is 0, the number of maximum number of transactions returned is returned by the `/defaultlimit` API call.
get:
tags:
- transactions
description:
Retrieve transactions
operationId: txa.get.latest.limit
responses:
200:
description: Transaction query successful.
content:
application/json:
schema:
$ref: "#/components/schemas/TransactionList"
parameters:
- name: limit
in: path
required: true
schema:
type: integer
format: int32
/txa/{limit}/{offset}:
summary: Cached data for latest transactions.
description: Return data entries of the latest transactions in the cache. If `limit` is 0, the number of maximum number of transactions returned is returned by the `/defaultlimit` API call.
get:
tags:
- transactions
description:
Retrieve transactions
operationId: txa.get.latest.range
responses:
200:
description: Transaction query successful.
content:
application/json:
schema:
$ref: "#/components/schemas/TransactionList"
parameters:
- name: limit
in: path
required: true
schema:
type: integer
format: int32
- name: offset
in: path
required: true
schema:
type: integer
format: int32
/txa/{limit}/{offset}/{block_offset}:
summary: Cached data for transactions since a particular block.
description: Return cached data entries of transactions since a particular block. The block parameter is inclusive. If `limit` is 0, the number of maximum number of transactions returned is returned by the `/defaultlimit` API call.
get:
tags:
- transactions
description:
Retrieve transactions
operationId: txa.get.latest.range.block.offset
responses:
200:
description: Transaction query successful.
content:
application/json:
schema:
$ref: "#/components/schemas/TransactionList"
parameters:
- name: limit
in: path
required: true
schema:
type: integer
format: int32
- name: offset
in: path
required: true
schema:
type: integer
format: int32
- name: block_offset
in: path
required: true
schema:
type: integer
format: int32
/txa/{limit}/{offset}/{block_offset}/{block_end}:
summary: Cached data for transactions within a particular block range.
description: Return cached data entries of transactions within a particular block range in the cache. The block parameters are inclusive. If `limit` is 0, the number of maximum number of transactions returned is returned by the `/defaultlimit` API call.
get:
tags:
- transactions
description:
Retrieve transactions
operationId: txa.get.latest.range.block.range
responses:
200:
description: Transaction query successful.
content:
application/json:
schema:
$ref: "#/components/schemas/TransactionList"
parameters:
- name: limit
in: path
required: true
schema:
type: integer
format: int32
- name: offset
in: path
required: true
schema:
type: integer
format: int32
- name: block_offset
in: path
required: true
schema:
type: integer
format: int32
- name: block_end
in: path
required: true
schema:
type: integer
format: int32
/txa/{address}:
summary: Cached data for batch of latest transactions by account.
description: Return cached data of the latest transactions where a specific account is the spender or beneficiary.
get:
tags:
- transactions
description:
Retrieve transactions
operationId: txa.get.user
responses:
200:
description: Transaction query successful.
content:
application/json:
schema:
$ref: "#/components/schemas/TransactionList"
parameters:
- name: address
in: path
required: true
schema:
type: string
/txa/{address}/{limit}:
summary: Cached data for batch of latest transactions by account.
description: Return cached data of the latest transactions where a specific account is the spender or beneficiary. If `limit` is 0, the number of maximum number of transactions returned is returned by the `/defaultlimit` API call.
get:
tags:
- transactions
description:
Retrieve transactions
operationId: txa.get.user.limit
responses:
200:
description: Transaction query successful.
content:
application/json:
schema:
$ref: "#/components/schemas/TransactionList"
parameters:
- name: address
in: path
required: true
schema:
type: string
- name: limit
in: path
required: true
schema:
type: integer
format: int32
/txa/{address}/{limit}/{offset}:
summary: Cached data for batch of latest transactions by account.
description: Return cached data of the latest transactions where a specific account is the spender or beneficiary. If `limit` is 0, the number of maximum number of transactions returned is returned by the `/defaultlimit` API call.
get:
tags:
- transactions
description:
Retrieve transactions
operationId: txa.get.user.range
responses:
200:
description: Transaction query successful.
content:
application/json:
schema:
$ref: "#/components/schemas/TransactionList"
parameters:
- name: address
in: path
required: true
schema:
type: string
- name: limit
in: path
required: true
schema:
type: integer
format: int32
- name: offset
in: path
required: true
schema:
type: integer
format: int32
components: components:
schemas: schemas:
Limit:
type: integer
format: int32
BlocksBloom: BlocksBloom:
type: object type: object
properties: properties:
low: low:
type: int type: integer
format: int32 format: int32
description: The lowest block number included in the filter description: The lowest block number included in the filter
high:
type: integer
format: int32
description: The highest block number included in the filter
block_filter: block_filter:
type: string type: string
format: byte format: byte
@ -97,6 +555,89 @@ components:
type: string type: string
description: Hashing algorithm (currently only using sha256) description: Hashing algorithm (currently only using sha256)
filter_rounds: filter_rounds:
type: int type: integer
format: int32 format: int32
description: Number of hash rounds used to create the filter description: Number of hash rounds used to create the filter
TransactionList:
type: object
properties:
low:
type: integer
format: int32
description: The lowest block number included in the result set
high:
type: integer
format: int32
description: The highest block number included in the filter
data:
type: array
description: Cached transaction data
items:
$ref: "#/components/schemas/Transaction"
Transaction:
type: object
properties:
block_number:
type: integer
format: int64
description: Block number transaction was included in.
tx_hash:
type: string
description: Transaction hash, in hex.
date_block:
type: integer
format: int32
description: Block timestamp.
sender:
type: string
description: Spender address, in hex.
recipient:
type: string
description: Beneficiary address, in hex.
from_value:
type: integer
format: int64
description: Value deducted from spender's balance.
to_value:
type: integer
format: int64
description: Value added to beneficiary's balance.
source_token:
type: string
description: Network address of token in which `from_value` is denominated.
destination_token:
type: string
description: Network address of token in which `to_value` is denominated.
success:
type: boolean
description: Network consensus state on whether the transaction was successful or not.
tx_type:
type: string
enum:
- erc20.faucet
- faucet.give_to
examples:
data_last:
summary: Get the latest cached transactions, using the server's default limit.
value: "/txa"
data_limit:
summary: Get the last 42 cached transactions.
value: "/txa/42"
data_range:
summary: Get the next 42 cached transactions, starting from the 13th (zero-indexed).
value: "/txa/42/13"
data_range_block_offset:
summary: Get the next 42 cached transactions, starting from block 1337 (inclusive).
value: "/txa/42/0/1337"
data_range_block_offset:
summary: Get the next 42 cached transactions within blocks 1337 and 1453 (inclusive).
value: "/txa/42/0/1337/1453"
data_range_block_range:
summary: Get the next 42 cached transactions after the 13th, within blocks 1337 and 1453 (inclusive).
value: "/txa/42/13/1337/1453"

View File

@ -4,9 +4,9 @@ FROM $DOCKER_REGISTRY/cic-base-images:python-3.8.6-dev-e8eb2ee2
COPY requirements.txt . COPY requirements.txt .
ARG EXTRA_PIP_INDEX_URL="https://pip.grassrootseconomics.net:8433" ARG EXTRA_PIP_INDEX_URL=https://pip.grassrootseconomics.net
ARG EXTRA_PIP_ARGS="" ARG EXTRA_PIP_ARGS=""
ARG PIP_INDEX_URL="https://pypi.org/simple" ARG PIP_INDEX_URL=https://pypi.org/simple
RUN --mount=type=cache,mode=0755,target=/root/.cache/pip \ RUN --mount=type=cache,mode=0755,target=/root/.cache/pip \
pip install --index-url $PIP_INDEX_URL \ pip install --index-url $PIP_INDEX_URL \
@ -14,14 +14,9 @@ RUN --mount=type=cache,mode=0755,target=/root/.cache/pip \
--extra-index-url $EXTRA_PIP_INDEX_URL $EXTRA_PIP_ARGS \ --extra-index-url $EXTRA_PIP_INDEX_URL $EXTRA_PIP_ARGS \
-r requirements.txt -r requirements.txt
COPY . . COPY . .
RUN pip install . --extra-index-url $EXTRA_PIP_INDEX_URL
RUN python setup.py install
# ini files in config directory defines the configurable parameters for the application
# they can all be overridden by environment variables
# to generate a list of environment variables from configuration, use: confini-dump -z <dir> (executable provided by confini package)
#COPY config/ /usr/local/etc/cic-cache/
# for db migrations # for db migrations
COPY ./aux/wait-for-it/wait-for-it.sh ./ COPY ./aux/wait-for-it/wait-for-it.sh ./

View File

@ -2,5 +2,5 @@
set -e set -e
>&2 echo executing database migration >&2 echo executing database migration
python scripts/migrate.py -c /usr/local/etc/cic-cache --migrations-dir /usr/local/share/cic-cache/alembic -vv python scripts/migrate_cic_cache.py --migrations-dir /usr/local/share/cic-cache/alembic -vv
set +e set +e

View File

@ -2,7 +2,7 @@
set -e set -e
pip install --extra-index-url https://pip.grassrootseconomics.net:8433 \ pip install --extra-index-url https://pip.grassrootseconomics.net \
--extra-index-url https://gitlab.com/api/v4/projects/27624814/packages/pypi/simple \ --extra-index-url https://gitlab.com/api/v4/projects/27624814/packages/pypi/simple \
-r test_requirements.txt -r test_requirements.txt

View File

@ -1,14 +1,15 @@
alembic==1.4.2 alembic==1.4.2
confini>=0.3.6rc4,<0.5.0 confini~=0.5.3
uwsgi==2.0.19.1 uwsgi==2.0.19.1
moolb~=0.1.1b2 moolb~=0.2.0
cic-eth-registry~=0.6.1a1 cic-eth-registry~=0.6.6
SQLAlchemy==1.3.20 SQLAlchemy==1.3.20
semver==2.13.0 semver==2.13.0
psycopg2==2.8.6 psycopg2==2.8.6
celery==4.4.7 celery==4.4.7
redis==3.5.3 redis==3.5.3
chainsyncer[sql]>=0.0.6a3,<0.1.0 chainsyncer[sql]~=0.0.7
erc20-faucet>=0.3.2a2, <0.4.0 erc20-faucet~=0.3.2
chainlib-eth>=0.0.9a14,<0.1.0 chainlib-eth~=0.0.15
eth-address-index>=0.2.3a4,<0.3.0 eth-address-index~=0.2.4
okota~=0.2.5

View File

@ -1,54 +1,55 @@
#!/usr/bin/python #!/usr/bin/python3
# standard imports
import os import os
import argparse import argparse
import logging import logging
import re import re
# external imports
import alembic import alembic
from alembic.config import Config as AlembicConfig from alembic.config import Config as AlembicConfig
import confini import confini
# local imports
from cic_cache.db import dsn_from_config from cic_cache.db import dsn_from_config
import cic_cache.cli
logging.basicConfig(level=logging.WARNING) logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger() logg = logging.getLogger()
# BUG: the dbdir doesn't work after script install # BUG: the dbdir doesn't work after script install
rootdir = os.path.dirname(os.path.dirname(os.path.realpath(__file__))) rootdir = os.path.dirname(os.path.dirname(os.path.realpath(cic_cache.__file__)))
dbdir = os.path.join(rootdir, 'cic_cache', 'db') dbdir = os.path.join(rootdir, 'cic_cache', 'db')
migrationsdir = os.path.join(dbdir, 'migrations') default_migrations_dir = os.path.join(dbdir, 'migrations')
configdir = os.path.join(rootdir, 'cic_cache', 'data', 'config') configdir = os.path.join(rootdir, 'cic_cache', 'data', 'config')
#config_dir = os.path.join('/usr/local/etc/cic-cache') #config_dir = os.path.join('/usr/local/etc/cic-cache')
argparser = argparse.ArgumentParser() arg_flags = cic_cache.cli.argflag_std_base
argparser.add_argument('-c', type=str, help='config file') local_arg_flags = cic_cache.cli.argflag_local_sync
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration') argparser = cic_cache.cli.ArgumentParser(arg_flags)
argparser.add_argument('--migrations-dir', dest='migrations_dir', default=migrationsdir, type=str, help='path to alembic migrations directory') argparser.process_local_flags(local_arg_flags)
argparser.add_argument('--reset', action='store_true', help='downgrade before upgrading') argparser.add_argument('--reset', action='store_true', help='downgrade before upgrading')
argparser.add_argument('-f', action='store_true', help='force action') argparser.add_argument('-f', '--force', action='store_true', help='force action')
argparser.add_argument('-v', action='store_true', help='be verbose') argparser.add_argument('--migrations-dir', dest='migrations_dir', default=default_migrations_dir, type=str, help='migrations directory')
argparser.add_argument('-vv', action='store_true', help='be more verbose')
args = argparser.parse_args() args = argparser.parse_args()
if args.vv: extra_args = {
logging.getLogger().setLevel(logging.DEBUG) 'reset': None,
elif args.v: 'force': None,
logging.getLogger().setLevel(logging.INFO) 'migrations_dir': None,
}
# process config
config = cic_cache.cli.Config.from_args(args, arg_flags, local_arg_flags, extra_args=extra_args)
config = confini.Config(configdir, args.env_prefix) migrations_dir = os.path.join(config.get('_MIGRATIONS_DIR'), config.get('DATABASE_ENGINE', 'default'))
config.process()
config.censor('PASSWORD', 'DATABASE')
config.censor('PASSWORD', 'SSL')
logg.debug('config:\n{}'.format(config))
migrations_dir = os.path.join(args.migrations_dir, config.get('DATABASE_ENGINE'))
if not os.path.isdir(migrations_dir): if not os.path.isdir(migrations_dir):
logg.debug('migrations dir for engine {} not found, reverting to default'.format(config.get('DATABASE_ENGINE'))) logg.debug('migrations dir for engine {} not found, reverting to default'.format(config.get('DATABASE_ENGINE')))
migrations_dir = os.path.join(args.migrations_dir, 'default') migrations_dir = os.path.join(args.migrations_dir, 'default')
# connect to database # connect to database
dsn = dsn_from_config(config) dsn = dsn_from_config(config, 'cic_cache')
logg.info('using migrations dir {}'.format(migrations_dir)) logg.info('using migrations dir {}'.format(migrations_dir))

View File

@ -1,6 +1,7 @@
[metadata] [metadata]
name = cic-cache name = cic-cache
description = CIC Cache API and server description = CIC Cache API and server
version = 0.3.0a2
author = Louis Holbrook author = Louis Holbrook
author_email = dev@holbrook.no author_email = dev@holbrook.no
url = https://gitlab.com/grassrootseconomics/cic-eth url = https://gitlab.com/grassrootseconomics/cic-eth
@ -34,7 +35,7 @@ packages =
cic_cache.runnable.daemons cic_cache.runnable.daemons
cic_cache.runnable.daemons.filters cic_cache.runnable.daemons.filters
scripts = scripts =
./scripts/migrate.py ./scripts/migrate_cic_cache.py
[options.entry_points] [options.entry_points]
console_scripts = console_scripts =

View File

@ -1,38 +1,39 @@
from setuptools import setup from setuptools import setup
import configparser # import configparser
import os import os
import time
from cic_cache.version import ( # import time
version_object,
version_string
)
class PleaseCommitFirstError(Exception): # from cic_cache.version import (
pass # version_object,
# version_string
def git_hash(): # )
import subprocess #
git_diff = subprocess.run(['git', 'diff'], capture_output=True) # class PleaseCommitFirstError(Exception):
if len(git_diff.stdout) > 0: # pass
raise PleaseCommitFirstError() #
git_hash = subprocess.run(['git', 'rev-parse', 'HEAD'], capture_output=True) # def git_hash():
git_hash_brief = git_hash.stdout.decode('utf-8')[:8] # import subprocess
return git_hash_brief # git_diff = subprocess.run(['git', 'diff'], capture_output=True)
# if len(git_diff.stdout) > 0:
version_string = str(version_object) # raise PleaseCommitFirstError()
# git_hash = subprocess.run(['git', 'rev-parse', 'HEAD'], capture_output=True)
try: # git_hash_brief = git_hash.stdout.decode('utf-8')[:8]
version_git = git_hash() # return git_hash_brief
version_string += '+build.{}'.format(version_git) #
except FileNotFoundError: # version_string = str(version_object)
time_string_pair = str(time.time()).split('.') #
version_string += '+build.{}{:<09d}'.format( # try:
time_string_pair[0], # version_git = git_hash()
int(time_string_pair[1]), # version_string += '+build.{}'.format(version_git)
) # except FileNotFoundError:
print('final version string will be {}'.format(version_string)) # time_string_pair = str(time.time()).split('.')
# version_string += '+build.{}{:<09d}'.format(
# time_string_pair[0],
# int(time_string_pair[1]),
# )
# print('final version string will be {}'.format(version_string))
requirements = [] requirements = []
f = open('requirements.txt', 'r') f = open('requirements.txt', 'r')
@ -52,9 +53,8 @@ while True:
test_requirements.append(l.rstrip()) test_requirements.append(l.rstrip())
f.close() f.close()
setup( setup(
version=version_string, # version=version_string,
install_requires=requirements, install_requires=requirements,
tests_require=test_requirements, tests_require=test_requirements,
) )

View File

@ -7,4 +7,4 @@ pytest-celery==0.0.0a1
eth_tester==0.5.0b3 eth_tester==0.5.0b3
py-evm==0.3.0a20 py-evm==0.3.0a20
sarafu-faucet~=0.0.7a1 sarafu-faucet~=0.0.7a1
erc20-transfer-authorization>=0.3.5a1,<0.4.0 erc20-transfer-authorization~=0.3.6

View File

@ -6,6 +6,7 @@ import datetime
# external imports # external imports
import pytest import pytest
import moolb import moolb
from chainlib.encode import TxHexNormalizer
# local imports # local imports
from cic_cache import db from cic_cache import db
@ -42,6 +43,8 @@ def txs(
list_tokens, list_tokens,
): ):
tx_normalize = TxHexNormalizer()
session = init_database session = init_database
tx_number = 13 tx_number = 13
@ -54,10 +57,10 @@ def txs(
tx_hash_first, tx_hash_first,
list_defaults['block'], list_defaults['block'],
tx_number, tx_number,
list_actors['alice'], tx_normalize.wallet_address(list_actors['alice']),
list_actors['bob'], tx_normalize.wallet_address(list_actors['bob']),
list_tokens['foo'], tx_normalize.executable_address(list_tokens['foo']),
list_tokens['foo'], tx_normalize.executable_address(list_tokens['foo']),
1024, 1024,
2048, 2048,
True, True,
@ -74,10 +77,10 @@ def txs(
tx_hash_second, tx_hash_second,
list_defaults['block']-1, list_defaults['block']-1,
tx_number, tx_number,
list_actors['diane'], tx_normalize.wallet_address(list_actors['diane']),
list_actors['alice'], tx_normalize.wallet_address(list_actors['alice']),
list_tokens['foo'], tx_normalize.executable_address(list_tokens['foo']),
list_tokens['foo'], tx_normalize.wallet_address(list_tokens['foo']),
1024, 1024,
2048, 2048,
False, False,
@ -103,6 +106,8 @@ def more_txs(
session = init_database session = init_database
tx_normalize = TxHexNormalizer()
tx_number = 666 tx_number = 666
tx_hash = '0x' + os.urandom(32).hex() tx_hash = '0x' + os.urandom(32).hex()
tx_signed = '0x' + os.urandom(128).hex() tx_signed = '0x' + os.urandom(128).hex()
@ -115,10 +120,10 @@ def more_txs(
tx_hash, tx_hash,
list_defaults['block']+2, list_defaults['block']+2,
tx_number, tx_number,
list_actors['alice'], tx_normalize.wallet_address(list_actors['alice']),
list_actors['diane'], tx_normalize.wallet_address(list_actors['diane']),
list_tokens['bar'], tx_normalize.executable_address(list_tokens['bar']),
list_tokens['bar'], tx_normalize.executable_address(list_tokens['bar']),
2048, 2048,
4096, 4096,
False, False,

View File

@ -14,7 +14,8 @@ logg = logging.getLogger(__file__)
@pytest.fixture(scope='session') @pytest.fixture(scope='session')
def load_config(): def load_config():
config_dir = os.path.join(root_dir, 'config/test') config_dir = os.path.join(root_dir, 'config/test')
conf = confini.Config(config_dir, 'CICTEST') schema_config_dir = os.path.join(root_dir, 'cic_cache', 'data', 'config')
conf = confini.Config(schema_config_dir, 'CICTEST', override_dirs=config_dir)
conf.process() conf.process()
logg.debug('config {}'.format(conf)) logg.debug('config {}'.format(conf))
return conf return conf

View File

@ -24,11 +24,15 @@ def database_engine(
if load_config.get('DATABASE_ENGINE') == 'sqlite': if load_config.get('DATABASE_ENGINE') == 'sqlite':
SessionBase.transactional = False SessionBase.transactional = False
SessionBase.poolable = False SessionBase.poolable = False
name = 'cic_cache'
database_name = name
if load_config.get('DATABASE_PREFIX'):
database_name = '{}_{}'.format(load_config.get('DATABASE_PREFIX'), database_name)
try: try:
os.unlink(load_config.get('DATABASE_NAME')) os.unlink(database_name)
except FileNotFoundError: except FileNotFoundError:
pass pass
dsn = dsn_from_config(load_config) dsn = dsn_from_config(load_config, name)
SessionBase.connect(dsn, debug=load_config.true('DATABASE_DEBUG')) SessionBase.connect(dsn, debug=load_config.true('DATABASE_DEBUG'))
return dsn return dsn

View File

@ -14,7 +14,7 @@ def test_api_all_data(
): ):
env = { env = {
'PATH_INFO': '/txa/410000/420000', 'PATH_INFO': '/txa/100/0/410000/420000',
'HTTP_X_CIC_CACHE_MODE': 'all', 'HTTP_X_CIC_CACHE_MODE': 'all',
} }
j = process_transactions_all_data(init_database, env) j = process_transactions_all_data(init_database, env)
@ -23,7 +23,7 @@ def test_api_all_data(
assert len(o['data']) == 2 assert len(o['data']) == 2
env = { env = {
'PATH_INFO': '/txa/420000/410000', 'PATH_INFO': '/txa/100/0/420000/410000',
'HTTP_X_CIC_CACHE_MODE': 'all', 'HTTP_X_CIC_CACHE_MODE': 'all',
} }

View File

@ -6,6 +6,7 @@ import json
# external imports # external imports
import pytest import pytest
from chainlib.encode import TxHexNormalizer
# local imports # local imports
from cic_cache import db from cic_cache import db
@ -62,6 +63,8 @@ def test_cache_ranges(
session = init_database session = init_database
tx_normalize = TxHexNormalizer()
oldest = list_defaults['block'] - 1 oldest = list_defaults['block'] - 1
mid = list_defaults['block'] mid = list_defaults['block']
newest = list_defaults['block'] + 2 newest = list_defaults['block'] + 2
@ -100,32 +103,39 @@ def test_cache_ranges(
assert b[1] == mid assert b[1] == mid
# now check when supplying account # now check when supplying account
b = c.load_transactions_account(list_actors['alice'], 0, 100) account = tx_normalize.wallet_address(list_actors['alice'])
b = c.load_transactions_account(account, 0, 100)
assert b[0] == oldest assert b[0] == oldest
assert b[1] == newest assert b[1] == newest
b = c.load_transactions_account(list_actors['bob'], 0, 100) account = tx_normalize.wallet_address(list_actors['bob'])
b = c.load_transactions_account(account, 0, 100)
assert b[0] == mid assert b[0] == mid
assert b[1] == mid assert b[1] == mid
b = c.load_transactions_account(list_actors['diane'], 0, 100) account = tx_normalize.wallet_address(list_actors['diane'])
b = c.load_transactions_account(account, 0, 100)
assert b[0] == oldest assert b[0] == oldest
assert b[1] == newest assert b[1] == newest
# add block filter to the mix # add block filter to the mix
b = c.load_transactions_account(list_actors['alice'], 0, 100, block_offset=list_defaults['block']) account = tx_normalize.wallet_address(list_actors['alice'])
b = c.load_transactions_account(account, 0, 100, block_offset=list_defaults['block'])
assert b[0] == mid assert b[0] == mid
assert b[1] == newest assert b[1] == newest
b = c.load_transactions_account(list_actors['alice'], 0, 100, block_offset=list_defaults['block']) account = tx_normalize.wallet_address(list_actors['alice'])
b = c.load_transactions_account(account, 0, 100, block_offset=list_defaults['block'])
assert b[0] == mid assert b[0] == mid
assert b[1] == newest assert b[1] == newest
b = c.load_transactions_account(list_actors['bob'], 0, 100, block_offset=list_defaults['block'] - 1, block_limit=list_defaults['block']) account = tx_normalize.wallet_address(list_actors['bob'])
b = c.load_transactions_account(account, 0, 100, block_offset=list_defaults['block'] - 1, block_limit=list_defaults['block'])
assert b[0] == mid assert b[0] == mid
assert b[1] == mid assert b[1] == mid
b = c.load_transactions_account(list_actors['diane'], 0, 100, block_offset=list_defaults['block'] - 1, block_limit=list_defaults['block']) account = tx_normalize.wallet_address(list_actors['diane'])
b = c.load_transactions_account(account, 0, 100, block_offset=list_defaults['block'] - 1, block_limit=list_defaults['block'])
assert b[0] == oldest assert b[0] == oldest
assert b[1] == oldest assert b[1] == oldest
@ -140,6 +150,8 @@ def test_cache_ranges_data(
session = init_database session = init_database
tx_normalize = TxHexNormalizer()
oldest = list_defaults['block'] - 1 oldest = list_defaults['block'] - 1
mid = list_defaults['block'] mid = list_defaults['block']
newest = list_defaults['block'] + 2 newest = list_defaults['block'] + 2
@ -203,7 +215,8 @@ def test_cache_ranges_data(
assert b[2][1]['tx_hash'] == more_txs[1] assert b[2][1]['tx_hash'] == more_txs[1]
# now check when supplying account # now check when supplying account
b = c.load_transactions_account_with_data(list_actors['alice'], 0, 100) account = tx_normalize.wallet_address(list_actors['alice'])
b = c.load_transactions_account_with_data(account, 0, 100)
assert b[0] == oldest assert b[0] == oldest
assert b[1] == newest assert b[1] == newest
assert len(b[2]) == 3 assert len(b[2]) == 3
@ -211,13 +224,15 @@ def test_cache_ranges_data(
assert b[2][1]['tx_hash'] == more_txs[1] assert b[2][1]['tx_hash'] == more_txs[1]
assert b[2][2]['tx_hash'] == more_txs[2] assert b[2][2]['tx_hash'] == more_txs[2]
b = c.load_transactions_account_with_data(list_actors['bob'], 0, 100) account = tx_normalize.wallet_address(list_actors['bob'])
b = c.load_transactions_account_with_data(account, 0, 100)
assert b[0] == mid assert b[0] == mid
assert b[1] == mid assert b[1] == mid
assert len(b[2]) == 1 assert len(b[2]) == 1
assert b[2][0]['tx_hash'] == more_txs[1] assert b[2][0]['tx_hash'] == more_txs[1]
b = c.load_transactions_account_with_data(list_actors['diane'], 0, 100) account = tx_normalize.wallet_address(list_actors['diane'])
b = c.load_transactions_account_with_data(account, 0, 100)
assert b[0] == oldest assert b[0] == oldest
assert b[1] == newest assert b[1] == newest
assert len(b[2]) == 2 assert len(b[2]) == 2
@ -225,27 +240,31 @@ def test_cache_ranges_data(
assert b[2][1]['tx_hash'] == more_txs[2] assert b[2][1]['tx_hash'] == more_txs[2]
# add block filter to the mix # add block filter to the mix
b = c.load_transactions_account_with_data(list_actors['alice'], 0, 100, block_offset=list_defaults['block']) account = tx_normalize.wallet_address(list_actors['alice'])
b = c.load_transactions_account_with_data(account, 0, 100, block_offset=list_defaults['block'])
assert b[0] == mid assert b[0] == mid
assert b[1] == newest assert b[1] == newest
assert len(b[2]) == 2 assert len(b[2]) == 2
assert b[2][0]['tx_hash'] == more_txs[0] assert b[2][0]['tx_hash'] == more_txs[0]
assert b[2][1]['tx_hash'] == more_txs[1] assert b[2][1]['tx_hash'] == more_txs[1]
b = c.load_transactions_account_with_data(list_actors['alice'], 0, 100, block_offset=list_defaults['block']) account = tx_normalize.wallet_address(list_actors['alice'])
b = c.load_transactions_account_with_data(account, 0, 100, block_offset=list_defaults['block'])
assert b[0] == mid assert b[0] == mid
assert b[1] == newest assert b[1] == newest
assert len(b[2]) == 2 assert len(b[2]) == 2
assert b[2][0]['tx_hash'] == more_txs[0] assert b[2][0]['tx_hash'] == more_txs[0]
assert b[2][1]['tx_hash'] == more_txs[1] assert b[2][1]['tx_hash'] == more_txs[1]
b = c.load_transactions_account_with_data(list_actors['bob'], 0, 100, block_offset=list_defaults['block'] - 1, block_limit=list_defaults['block']) account = tx_normalize.wallet_address(list_actors['bob'])
b = c.load_transactions_account_with_data(account, 0, 100, block_offset=list_defaults['block'] - 1, block_limit=list_defaults['block'])
assert b[0] == mid assert b[0] == mid
assert b[1] == mid assert b[1] == mid
assert len(b[2]) == 1 assert len(b[2]) == 1
assert b[2][0]['tx_hash'] == more_txs[1] assert b[2][0]['tx_hash'] == more_txs[1]
b = c.load_transactions_account_with_data(list_actors['diane'], 0, 100, block_offset=list_defaults['block'] - 1, block_limit=list_defaults['block']) account = tx_normalize.wallet_address(list_actors['diane'])
b = c.load_transactions_account_with_data(account, 0, 100, block_offset=list_defaults['block'] - 1, block_limit=list_defaults['block'])
assert b[0] == oldest assert b[0] == oldest
assert b[1] == oldest assert b[1] == oldest
assert len(b[2]) == 1 assert len(b[2]) == 1

View File

@ -82,7 +82,7 @@ def test_query_regex(
[ [
('alice', None, None, [(420000, 13), (419999, 42)]), ('alice', None, None, [(420000, 13), (419999, 42)]),
('alice', None, 1, [(420000, 13)]), ('alice', None, 1, [(420000, 13)]),
('alice', 1, None, [(419999, 42)]), # 420000 == list_defaults['block'] ('alice', 1, 1, [(419999, 42)]), # 420000 == list_defaults['block']
('alice', 2, None, []), # 420000 == list_defaults['block'] ('alice', 2, None, []), # 420000 == list_defaults['block']
], ],
) )
@ -107,10 +107,11 @@ def test_query_process_txs_account(
path_info = '/tx/user/0x' + strip_0x(actor) path_info = '/tx/user/0x' + strip_0x(actor)
if query_offset != None: if query_offset != None:
path_info += '/' + str(query_offset) path_info += '/' + str(query_offset)
if query_limit != None: if query_limit == None:
if query_offset == None: query_limit = 100
path_info += '/0' path_info += '/' + str(query_limit)
path_info += '/' + str(query_limit) if query_offset == None:
path_info += '/0'
env = { env = {
'PATH_INFO': path_info, 'PATH_INFO': path_info,
} }
@ -192,7 +193,7 @@ def test_query_process_txs_bloom(
@pytest.mark.parametrize( @pytest.mark.parametrize(
'query_block_start, query_block_end, query_match_count', 'query_block_start, query_block_end, query_match_count',
[ [
(None, 42, 0), (1, 42, 0),
(420000, 420001, 1), (420000, 420001, 1),
(419999, 419999, 1), # matches are inclusive (419999, 419999, 1), # matches are inclusive
(419999, 420000, 2), (419999, 420000, 2),
@ -211,7 +212,7 @@ def test_query_process_txs_data(
query_match_count, query_match_count,
): ):
path_info = '/txa' path_info = '/txa/100/0'
if query_block_start != None: if query_block_start != None:
path_info += '/' + str(query_block_start) path_info += '/' + str(query_block_start)
if query_block_end != None: if query_block_end != None:
@ -227,4 +228,5 @@ def test_query_process_txs_data(
assert r != None assert r != None
o = json.loads(r[1]) o = json.loads(r[1])
logg.debug('oo {}'.format(o))
assert len(o['data']) == query_match_count assert len(o['data']) == query_match_count

View File

@ -1,5 +1,5 @@
celery==4.4.7 celery==4.4.7
erc20-demurrage-token~=0.0.5a3 erc20-demurrage-token~=0.0.6
cic-eth-registry~=0.6.1a6 cic-eth-registry~=0.6.3
chainlib~=0.0.9rc1 chainlib~=0.0.14
cic_eth~=0.12.4a11 cic_eth~=0.12.6

View File

@ -1,6 +1,6 @@
[metadata] [metadata]
name = cic-eth-aux-erc20-demurrage-token name = cic-eth-aux-erc20-demurrage-token
version = 0.0.2a7 version = 0.0.3
description = cic-eth tasks supporting erc20 demurrage token description = cic-eth tasks supporting erc20 demurrage token
author = Louis Holbrook author = Louis Holbrook
author_email = dev@holbrook.no author_email = dev@holbrook.no

View File

@ -1,5 +1,4 @@
SQLAlchemy==1.3.20 SQLAlchemy==1.3.20
cic-eth-registry>=0.6.1a6,<0.7.0 hexathon~=0.1.0
hexathon~=0.0.1a8 chainqueue~=0.0.6a4
chainqueue>=0.0.4a6,<0.1.0 eth-erc20~=0.1.5
eth-erc20>=0.1.2a2,<0.2.0

View File

@ -123,7 +123,7 @@ class AdminApi:
return s_lock.apply_async() return s_lock.apply_async()
def tag_account(self, tag, address_hex, chain_spec): def tag_account(self, chain_spec, tag, address):
"""Persistently associate an address with a plaintext tag. """Persistently associate an address with a plaintext tag.
Some tags are known by the system and is used to resolve addresses to use for certain transactions. Some tags are known by the system and is used to resolve addresses to use for certain transactions.
@ -138,7 +138,7 @@ class AdminApi:
'cic_eth.eth.account.set_role', 'cic_eth.eth.account.set_role',
[ [
tag, tag,
address_hex, address,
chain_spec.asdict(), chain_spec.asdict(),
], ],
queue=self.queue, queue=self.queue,
@ -146,6 +146,30 @@ class AdminApi:
return s_tag.apply_async() return s_tag.apply_async()
def get_tag_account(self, chain_spec, tag=None, address=None):
if address != None:
s_tag = celery.signature(
'cic_eth.eth.account.role',
[
address,
chain_spec.asdict(),
],
queue=self.queue,
)
else:
s_tag = celery.signature(
'cic_eth.eth.account.role_account',
[
tag,
chain_spec.asdict(),
],
queue=self.queue,
)
return s_tag.apply_async()
def have_account(self, address_hex, chain_spec): def have_account(self, address_hex, chain_spec):
s_have = celery.signature( s_have = celery.signature(
'cic_eth.eth.account.have', 'cic_eth.eth.account.have',
@ -503,7 +527,7 @@ class AdminApi:
queue=self.queue, queue=self.queue,
) )
t = s.apply_async() t = s.apply_async()
role = t.get() role = t.get()[0][1]
if role != None: if role != None:
tx['sender_description'] = role tx['sender_description'] = role
@ -556,7 +580,7 @@ class AdminApi:
queue=self.queue, queue=self.queue,
) )
t = s.apply_async() t = s.apply_async()
role = t.get() role = t.get()[0][1]
if role != None: if role != None:
tx['recipient_description'] = role tx['recipient_description'] = role

View File

@ -515,7 +515,7 @@ class Api(ApiBase):
:param password: Password to encode the password with in the backend (careful, you will have to remember it) :param password: Password to encode the password with in the backend (careful, you will have to remember it)
:type password: str :type password: str
:param register: Register the new account in accounts index backend :param register: Register the new account in accounts index backend
:type password: bool :type register: bool
:returns: uuid of root task :returns: uuid of root task
:rtype: celery.Task :rtype: celery.Task
""" """

View File

@ -12,8 +12,9 @@ from cic_eth.db.models.base import SessionBase
from cic_eth.db.enum import LockEnum from cic_eth.db.enum import LockEnum
from cic_eth.error import LockedError from cic_eth.error import LockedError
from cic_eth.admin.ctrl import check_lock from cic_eth.admin.ctrl import check_lock
from cic_eth.eth.gas import have_gas_minimum
logg = logging.getLogger().getChild(__name__) logg = logging.getLogger(__name__)
def health(*args, **kwargs): def health(*args, **kwargs):
@ -31,18 +32,15 @@ def health(*args, **kwargs):
return True return True
gas_provider = AccountRole.get_address('GAS_GIFTER', session=session) gas_provider = AccountRole.get_address('GAS_GIFTER', session=session)
min_gas = int(config.get('ETH_GAS_HOLDER_MINIMUM_UNITS')) * int(config.get('ETH_GAS_GIFTER_REFILL_BUFFER'))
if config.get('ETH_MIN_FEE_PRICE'):
min_gas *= int(config.get('ETH_MIN_FEE_PRICE'))
r = have_gas_minimum(chain_spec, gas_provider, min_gas, session=session)
session.close() session.close()
rpc = RPCConnection.connect(chain_spec, 'default') if not r:
o = balance(gas_provider) logg.error('EEK! gas gifter has balance {}, below minimum {}'.format(r, min_gas))
r = rpc.do(o)
try:
r = int(r, 16)
except TypeError:
r = int(r)
gas_min = int(config.get('ETH_GAS_GIFTER_MINIMUM_BALANCE'))
if r < gas_min:
logg.error('EEK! gas gifter has balance {}, below minimum {}'.format(r, gas_min))
return False
return True return r

View File

@ -0,0 +1,18 @@
# external imports
from chainlib.chain import ChainSpec
# local imports
from cic_eth.admin.ctrl import check_lock
from cic_eth.enum import LockEnum
from cic_eth.error import LockedError
def health(*args, **kwargs):
config = kwargs['config']
chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC'))
try:
check_lock(None, chain_spec.asdict(), LockEnum.START)
except LockedError as e:
return False
return True

View File

@ -16,16 +16,22 @@ class ArgumentParser(BaseArgumentParser):
self.add_argument('--redis-port', dest='redis_port', type=int, help='redis host to use for task submission') self.add_argument('--redis-port', dest='redis_port', type=int, help='redis host to use for task submission')
self.add_argument('--redis-db', dest='redis_db', type=int, help='redis db to use') self.add_argument('--redis-db', dest='redis_db', type=int, help='redis db to use')
if local_arg_flags & CICFlag.REDIS_CALLBACK: if local_arg_flags & CICFlag.REDIS_CALLBACK:
self.add_argument('--redis-host-callback', dest='redis_host_callback', default='localhost', type=str, help='redis host to use for callback') self.add_argument('--redis-host-callback', dest='redis_host_callback', type=str, help='redis host to use for callback (defaults to redis host)')
self.add_argument('--redis-port-callback', dest='redis_port_callback', default=6379, type=int, help='redis port to use for callback') self.add_argument('--redis-port-callback', dest='redis_port_callback', type=int, help='redis port to use for callback (defaults to redis port)')
self.add_argument('--redis-timeout', default=20.0, type=float, help='Redis callback timeout') self.add_argument('--redis-timeout', default=20.0, type=float, help='Redis callback timeout')
if local_arg_flags & CICFlag.CELERY: if local_arg_flags & CICFlag.CELERY:
self.add_argument('--celery-scheme', type=str, help='Celery broker scheme (defaults to "redis")')
self.add_argument('--celery-host', type=str, help='Celery broker host (defaults to redis host)')
self.add_argument('--celery-port', type=str, help='Celery broker port (defaults to redis port)')
self.add_argument('--celery-db', type=int, help='Celery broker db (defaults to redis db)')
self.add_argument('--celery-result-scheme', type=str, help='Celery result backend scheme (defaults to celery broker scheme)')
self.add_argument('--celery-result-host', type=str, help='Celery result backend host (defaults to celery broker host)')
self.add_argument('--celery-result-port', type=str, help='Celery result backend port (defaults to celery broker port)')
self.add_argument('--celery-result-db', type=int, help='Celery result backend db (defaults to celery broker db)')
self.add_argument('--celery-no-result', action='store_true', help='Disable the Celery results backend')
self.add_argument('-q', '--celery-queue', dest='celery_queue', type=str, default='cic-eth', help='Task queue') self.add_argument('-q', '--celery-queue', dest='celery_queue', type=str, default='cic-eth', help='Task queue')
if local_arg_flags & CICFlag.SYNCER: if local_arg_flags & CICFlag.SYNCER:
self.add_argument('--offset', type=int, default=0, help='Start block height for initial history sync') self.add_argument('--offset', type=int, help='Start block height for initial history sync')
self.add_argument('--no-history', action='store_true', dest='no_history', help='Skip initial history sync') self.add_argument('--no-history', action='store_true', dest='no_history', help='Skip initial history sync')
if local_arg_flags & CICFlag.CHAIN: if local_arg_flags & CICFlag.CHAIN:
self.add_argument('-r', '--registry-address', type=str, dest='registry_address', help='CIC registry contract address') self.add_argument('-r', '--registry-address', type=str, dest='registry_address', help='CIC registry contract address')

View File

@ -24,7 +24,7 @@ class CICFlag(enum.IntEnum):
# sync - nibble 4 # sync - nibble 4
SYNCER = 4096 SYNCER = 4096
argflag_local_base = argflag_std_base | Flag.CHAIN_SPEC
argflag_local_task = CICFlag.CELERY argflag_local_task = CICFlag.CELERY
argflag_local_taskcallback = argflag_local_task | CICFlag.REDIS | CICFlag.REDIS_CALLBACK argflag_local_taskcallback = argflag_local_task | CICFlag.REDIS | CICFlag.REDIS_CALLBACK
argflag_local_chain = CICFlag.CHAIN argflag_local_chain = CICFlag.CHAIN

View File

@ -1,12 +1,18 @@
# standard imports # standard imports
import os import os
import logging import logging
import urllib.parse
import copy
# external imports # external imports
from chainlib.eth.cli import ( from chainlib.eth.cli import (
Config as BaseConfig, Config as BaseConfig,
Flag, Flag,
) )
from urlybird.merge import (
urlhostmerge,
urlmerge,
)
# local imports # local imports
from .base import CICFlag from .base import CICFlag
@ -40,6 +46,7 @@ class Config(BaseConfig):
if local_arg_flags & CICFlag.CHAIN: if local_arg_flags & CICFlag.CHAIN:
local_args_override['CIC_REGISTRY_ADDRESS'] = getattr(args, 'registry_address') local_args_override['CIC_REGISTRY_ADDRESS'] = getattr(args, 'registry_address')
if local_arg_flags & CICFlag.CELERY: if local_arg_flags & CICFlag.CELERY:
local_args_override['CELERY_QUEUE'] = getattr(args, 'celery_queue') local_args_override['CELERY_QUEUE'] = getattr(args, 'celery_queue')
@ -49,15 +56,71 @@ class Config(BaseConfig):
config.dict_override(local_args_override, 'local cli args') config.dict_override(local_args_override, 'local cli args')
if local_arg_flags & CICFlag.REDIS_CALLBACK: local_celery_args_override = {}
config.add(getattr(args, 'redis_host_callback'), '_REDIS_HOST_CALLBACK')
config.add(getattr(args, 'redis_port_callback'), '_REDIS_PORT_CALLBACK')
if local_arg_flags & CICFlag.CELERY: if local_arg_flags & CICFlag.CELERY:
hostport = urlhostmerge(
None,
config.get('REDIS_HOST'),
config.get('REDIS_PORT'),
)
db = getattr(args, 'redis_db', None)
if db != None:
db = str(db)
redis_url = (
'redis',
hostport,
db,
)
celery_config_url = urllib.parse.urlsplit(config.get('CELERY_BROKER_URL'))
hostport = urlhostmerge(
celery_config_url[1],
getattr(args, 'celery_host', None),
getattr(args, 'celery_port', None),
)
db = getattr(args, 'redis_db', None)
if db != None:
db = str(db)
celery_arg_url = (
getattr(args, 'celery_scheme', None),
hostport,
db,
)
celery_url = urlmerge(redis_url, celery_config_url, celery_arg_url)
celery_url_string = urllib.parse.urlunsplit(celery_url)
local_celery_args_override['CELERY_BROKER_URL'] = celery_url_string
if not getattr(args, 'celery_no_result'):
local_celery_args_override['CELERY_RESULT_URL'] = config.get('CELERY_RESULT_URL')
if local_celery_args_override['CELERY_RESULT_URL'] == None:
local_celery_args_override['CELERY_RESULT_URL'] = local_celery_args_override['CELERY_BROKER_URL']
celery_config_url = urllib.parse.urlsplit(local_celery_args_override['CELERY_RESULT_URL'])
hostport = urlhostmerge(
celery_config_url[1],
getattr(args, 'celery_result_host', None),
getattr(args, 'celery_result_port', None),
)
celery_arg_url = (
getattr(args, 'celery_result_scheme', None),
hostport,
getattr(args, 'celery_result_db', None),
)
celery_url = urlmerge(celery_config_url, celery_arg_url)
logg.debug('celery url {} {}'.format(celery_config_url, celery_url))
celery_url_string = urllib.parse.urlunsplit(celery_url)
local_celery_args_override['CELERY_RESULT_URL'] = celery_url_string
config.add(config.true('CELERY_DEBUG'), 'CELERY_DEBUG', exists_ok=True) config.add(config.true('CELERY_DEBUG'), 'CELERY_DEBUG', exists_ok=True)
config.dict_override(local_celery_args_override, 'local celery cli args')
if local_arg_flags & CICFlag.REDIS_CALLBACK:
redis_host_callback = getattr(args, 'redis_host_callback', config.get('REDIS_HOST'))
redis_port_callback = getattr(args, 'redis_port_callback', config.get('REDIS_PORT'))
config.add(redis_host_callback, '_REDIS_HOST_CALLBACK')
config.add(redis_port_callback, '_REDIS_PORT_CALLBACK')
logg.debug('config loaded:\n{}'.format(config)) logg.debug('config loaded:\n{}'.format(config))
return config return config

View File

@ -1,5 +1,5 @@
[celery] [celery]
broker_url = redis://localhost:6379 broker_url =
result_url = result_url =
queue = cic-eth queue = cic-eth
debug = 0 debug = 0

View File

@ -2,5 +2,5 @@
registry_address = registry_address =
trust_address = trust_address =
default_token_symbol = default_token_symbol =
health_modules = cic_eth.check.db,cic_eth.check.redis,cic_eth.check.signer,cic_eth.check.gas health_modules = cic_eth.check.db,cic_eth.check.redis,cic_eth.check.signer,cic_eth.check.gas,cic_eth.check.start
run_dir = /run run_dir = /run

View File

@ -1,2 +1,6 @@
[eth] [eth]
gas_gifter_minimum_balance = 10000000000000000000000 gas_holder_minimum_units = 180000
gas_holder_refill_units = 15
gas_holder_refill_threshold = 3
gas_gifter_refill_buffer = 3
min_fee_price = 1

View File

@ -23,7 +23,7 @@ def upgrade():
op.create_table( op.create_table(
'lock', 'lock',
sa.Column('id', sa.Integer, primary_key=True), sa.Column('id', sa.Integer, primary_key=True),
sa.Column("address", sa.String(42), nullable=True), sa.Column("address", sa.String, nullable=True),
sa.Column('blockchain', sa.String), sa.Column('blockchain', sa.String),
sa.Column("flags", sa.BIGINT(), nullable=False, default=0), sa.Column("flags", sa.BIGINT(), nullable=False, default=0),
sa.Column("date_created", sa.DateTime, nullable=False, default=datetime.datetime.utcnow), sa.Column("date_created", sa.DateTime, nullable=False, default=datetime.datetime.utcnow),

View File

@ -0,0 +1,31 @@
"""Add gas cache
Revision ID: c91cafc3e0c1
Revises: aee12aeb47ec
Create Date: 2021-10-28 20:45:34.239865
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'c91cafc3e0c1'
down_revision = 'aee12aeb47ec'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'gas_cache',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column("address", sa.String, nullable=False),
sa.Column("tx_hash", sa.String, nullable=True),
sa.Column("method", sa.String, nullable=True),
sa.Column("value", sa.BIGINT(), nullable=False),
)
def downgrade():
op.drop_table('gas_cache')

View File

@ -0,0 +1,27 @@
# standard imports
import logging
# external imports
from sqlalchemy import Column, String, NUMERIC
# local imports
from .base import SessionBase
logg = logging.getLogger(__name__)
class GasCache(SessionBase):
"""Provides gas budget cache for token operations
"""
__tablename__ = 'gas_cache'
address = Column(String())
tx_hash = Column(String())
method = Column(String())
value = Column(NUMERIC())
def __init__(self, address, method, value, tx_hash):
self.address = address
self.tx_hash = tx_hash
self.method = method
self.value = value

View File

@ -12,7 +12,7 @@ from cic_eth.error import (
IntegrityError, IntegrityError,
) )
logg = logging.getLogger() logg = logging.getLogger(__name__)
class Nonce(SessionBase): class Nonce(SessionBase):
@ -21,7 +21,7 @@ class Nonce(SessionBase):
__tablename__ = 'nonce' __tablename__ = 'nonce'
nonce = Column(Integer) nonce = Column(Integer)
address_hex = Column(String(42)) address_hex = Column(String(40))
@staticmethod @staticmethod

View File

@ -25,7 +25,21 @@ class AccountRole(SessionBase):
address_hex = Column(String(42)) address_hex = Column(String(42))
# TODO: @staticmethod
def all(session=None):
session = SessionBase.bind_session(session)
pairs = []
q = session.query(AccountRole.tag, AccountRole.address_hex)
for r in q.all():
pairs.append((r[1], r[0]),)
SessionBase.release_session(session)
return pairs
@staticmethod @staticmethod
def get_address(tag, session): def get_address(tag, session):
"""Get Ethereum address matching the given tag """Get Ethereum address matching the given tag

View File

@ -69,9 +69,12 @@ class StatusEnum(enum.IntEnum):
class LockEnum(enum.IntEnum): class LockEnum(enum.IntEnum):
""" """
STICKY: When set, reset is not possible STICKY: When set, reset is not possible
INIT: When set, startup is possible without second level sanity checks (e.g. gas gifter balance)
START: When set, startup is not possible, regardless of state
CREATE: Disable creation of accounts CREATE: Disable creation of accounts
SEND: Disable sending to network SEND: Disable sending to network
QUEUE: Disable queueing new or modified transactions QUEUE: Disable queueing new or modified transactions
QUERY: Disable all queue state and transaction queries
""" """
STICKY=1 STICKY=1
INIT=2 INIT=2
@ -79,7 +82,8 @@ class LockEnum(enum.IntEnum):
SEND=8 SEND=8
QUEUE=16 QUEUE=16
QUERY=32 QUERY=32
ALL=int(0xfffffffffffffffe) START=int(0x80000000)
ALL=int(0x7ffffffe)
def status_str(v, bits_only=False): def status_str(v, bits_only=False):

View File

@ -64,8 +64,10 @@ class LockedError(Exception):
class SeppukuError(Exception): class SeppukuError(Exception):
"""Exception base class for all errors that should cause system shutdown """Exception base class for all errors that should cause system shutdown
""" """
def __init__(self, message, lockdown=False):
self.message = message
self.lockdown = lockdown
class SignerError(SeppukuError): class SignerError(SeppukuError):

View File

@ -136,7 +136,7 @@ def register(self, account_address, chain_spec_dict, writer_address=None):
# Generate and sign transaction # Generate and sign transaction
rpc_signer = RPCConnection.connect(chain_spec, 'signer') rpc_signer = RPCConnection.connect(chain_spec, 'signer')
nonce_oracle = CustodialTaskNonceOracle(writer_address, self.request.root_id, session=session) #, default_nonce) nonce_oracle = CustodialTaskNonceOracle(writer_address, self.request.root_id, session=session) #, default_nonce)
gas_oracle = self.create_gas_oracle(rpc, AccountRegistry.gas) gas_oracle = self.create_gas_oracle(rpc, code_callback=AccountRegistry.gas)
account_registry = AccountsIndex(chain_spec, signer=rpc_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle) account_registry = AccountsIndex(chain_spec, signer=rpc_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle)
(tx_hash_hex, tx_signed_raw_hex) = account_registry.add(account_registry_address, writer_address, account_address, tx_format=TxFormat.RLP_SIGNED) (tx_hash_hex, tx_signed_raw_hex) = account_registry.add(account_registry_address, writer_address, account_address, tx_format=TxFormat.RLP_SIGNED)
rpc_signer.disconnect() rpc_signer.disconnect()
@ -192,7 +192,7 @@ def gift(self, account_address, chain_spec_dict):
# Generate and sign transaction # Generate and sign transaction
rpc_signer = RPCConnection.connect(chain_spec, 'signer') rpc_signer = RPCConnection.connect(chain_spec, 'signer')
nonce_oracle = CustodialTaskNonceOracle(account_address, self.request.root_id, session=session) #, default_nonce) nonce_oracle = CustodialTaskNonceOracle(account_address, self.request.root_id, session=session) #, default_nonce)
gas_oracle = self.create_gas_oracle(rpc, MinterFaucet.gas) gas_oracle = self.create_gas_oracle(rpc, code_callback=MinterFaucet.gas)
faucet = Faucet(chain_spec, signer=rpc_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle) faucet = Faucet(chain_spec, signer=rpc_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle)
(tx_hash_hex, tx_signed_raw_hex) = faucet.give_to(faucet_address, account_address, account_address, tx_format=TxFormat.RLP_SIGNED) (tx_hash_hex, tx_signed_raw_hex) = faucet.give_to(faucet_address, account_address, account_address, tx_format=TxFormat.RLP_SIGNED)
rpc_signer.disconnect() rpc_signer.disconnect()
@ -266,19 +266,46 @@ def set_role(self, tag, address, chain_spec_dict):
@celery_app.task(bind=True, base=BaseTask) @celery_app.task(bind=True, base=BaseTask)
def role(self, address, chain_spec_dict): def role(self, address, chain_spec_dict):
"""Return account role for address """Return account role for address and/or role
:param account: Account to check :param account: Account to check
:type account: str, 0x-hex :type account: str, 0x-hex
:param chain_str: Chain spec string representation :param chain_spec_dict: Chain spec dict representation
:type chain_str: str :type chain_spec_dict: dict
:returns: Account, or None if not exists :returns: Account, or None if not exists
:rtype: Varies :rtype: Varies
""" """
session = self.create_session() session = self.create_session()
role_tag = AccountRole.role_for(address, session=session) role_tag = AccountRole.role_for(address, session=session)
session.close() session.close()
return role_tag return [(address, role_tag,)]
@celery_app.task(bind=True, base=BaseTask)
def role_account(self, role_tag, chain_spec_dict):
"""Return address for role.
If the role parameter is None, will return addresses for all roles.
:param role_tag: Role to match
:type role_tag: str
:param chain_spec_dict: Chain spec dict representation
:type chain_spec_dict: dict
:returns: List with a single account/tag pair for a single tag, or a list of account and tag pairs for all tags
:rtype: list
"""
session = self.create_session()
pairs = None
if role_tag != None:
addr = AccountRole.get_address(role_tag, session=session)
pairs = [(addr, role_tag,)]
else:
pairs = AccountRole.all(session=session)
session.close()
return pairs
@celery_app.task(bind=True, base=CriticalSQLAlchemyTask) @celery_app.task(bind=True, base=CriticalSQLAlchemyTask)

View File

@ -10,6 +10,9 @@ from chainlib.eth.tx import (
TxFormat, TxFormat,
unpack, unpack,
) )
from chainlib.eth.contract import (
ABIContractEncoder,
)
from cic_eth_registry import CICRegistry from cic_eth_registry import CICRegistry
from cic_eth_registry.erc20 import ERC20Token from cic_eth_registry.erc20 import ERC20Token
from hexathon import ( from hexathon import (
@ -19,7 +22,7 @@ from hexathon import (
from chainqueue.error import NotLocalTxError from chainqueue.error import NotLocalTxError
from eth_erc20 import ERC20 from eth_erc20 import ERC20
from chainqueue.sql.tx import cache_tx_dict from chainqueue.sql.tx import cache_tx_dict
from okota.token_index import to_identifier from okota.token_index.index import to_identifier
# local imports # local imports
from cic_eth.db.models.base import SessionBase from cic_eth.db.models.base import SessionBase
@ -31,10 +34,8 @@ from cic_eth.error import (
YouAreBrokeError, YouAreBrokeError,
) )
from cic_eth.queue.tx import register_tx from cic_eth.queue.tx import register_tx
from cic_eth.eth.gas import ( from cic_eth.eth.gas import create_check_gas_task
create_check_gas_task, from cic_eth.eth.util import CacheGasOracle
MaxGasOracle,
)
from cic_eth.ext.address import translate_address from cic_eth.ext.address import translate_address
from cic_eth.task import ( from cic_eth.task import (
CriticalSQLAlchemyTask, CriticalSQLAlchemyTask,
@ -45,13 +46,14 @@ from cic_eth.task import (
from cic_eth.eth.nonce import CustodialTaskNonceOracle from cic_eth.eth.nonce import CustodialTaskNonceOracle
from cic_eth.encode import tx_normalize from cic_eth.encode import tx_normalize
from cic_eth.eth.trust import verify_proofs from cic_eth.eth.trust import verify_proofs
from cic_eth.error import SignerError
celery_app = celery.current_app celery_app = celery.current_app
logg = logging.getLogger() logg = logging.getLogger()
@celery_app.task(base=CriticalWeb3Task) @celery_app.task(bind=True, base=CriticalWeb3Task)
def balance(tokens, holder_address, chain_spec_dict): def balance(self, tokens, holder_address, chain_spec_dict):
"""Return token balances for a list of tokens for given address """Return token balances for a list of tokens for given address
:param tokens: Token addresses :param tokens: Token addresses
@ -70,8 +72,9 @@ def balance(tokens, holder_address, chain_spec_dict):
for t in tokens: for t in tokens:
address = t['address'] address = t['address']
logg.debug('address {} {}'.format(address, holder_address)) logg.debug('address {} {}'.format(address, holder_address))
gas_oracle = self.create_gas_oracle(rpc, min_price=self.min_fee_price)
token = ERC20Token(chain_spec, rpc, add_0x(address)) token = ERC20Token(chain_spec, rpc, add_0x(address))
c = ERC20(chain_spec) c = ERC20(chain_spec, gas_oracle=gas_oracle)
o = c.balance_of(address, holder_address, sender_address=caller_address) o = c.balance_of(address, holder_address, sender_address=caller_address)
r = rpc.do(o) r = rpc.do(o)
t['balance_network'] = c.parse_balance(r) t['balance_network'] = c.parse_balance(r)
@ -154,8 +157,12 @@ def transfer_from(self, tokens, holder_address, receiver_address, value, chain_s
rpc_signer = RPCConnection.connect(chain_spec, 'signer') rpc_signer = RPCConnection.connect(chain_spec, 'signer')
session = self.create_session() session = self.create_session()
nonce_oracle = CustodialTaskNonceOracle(holder_address, self.request.root_id, session=session) nonce_oracle = CustodialTaskNonceOracle(holder_address, self.request.root_id, session=session)
gas_oracle = self.create_gas_oracle(rpc, MaxGasOracle.gas) enc = ABIContractEncoder()
enc.method('transferFrom')
method = enc.get()
gas_oracle = self.create_gas_oracle(rpc, t['address'], method=method, session=session, min_price=self.min_fee_price)
c = ERC20(chain_spec, signer=rpc_signer, gas_oracle=gas_oracle, nonce_oracle=nonce_oracle) c = ERC20(chain_spec, signer=rpc_signer, gas_oracle=gas_oracle, nonce_oracle=nonce_oracle)
try: try:
(tx_hash_hex, tx_signed_raw_hex) = c.transfer_from(t['address'], spender_address, holder_address, receiver_address, value, tx_format=TxFormat.RLP_SIGNED) (tx_hash_hex, tx_signed_raw_hex) = c.transfer_from(t['address'], spender_address, holder_address, receiver_address, value, tx_format=TxFormat.RLP_SIGNED)
@ -225,8 +232,12 @@ def transfer(self, tokens, holder_address, receiver_address, value, chain_spec_d
rpc_signer = RPCConnection.connect(chain_spec, 'signer') rpc_signer = RPCConnection.connect(chain_spec, 'signer')
session = self.create_session() session = self.create_session()
enc = ABIContractEncoder()
enc.method('transfer')
method = enc.get()
gas_oracle = self.create_gas_oracle(rpc, t['address'], method=method, session=session, min_price=self.min_fee_price)
nonce_oracle = CustodialTaskNonceOracle(holder_address, self.request.root_id, session=session) nonce_oracle = CustodialTaskNonceOracle(holder_address, self.request.root_id, session=session)
gas_oracle = self.create_gas_oracle(rpc, MaxGasOracle.gas)
c = ERC20(chain_spec, signer=rpc_signer, gas_oracle=gas_oracle, nonce_oracle=nonce_oracle) c = ERC20(chain_spec, signer=rpc_signer, gas_oracle=gas_oracle, nonce_oracle=nonce_oracle)
try: try:
(tx_hash_hex, tx_signed_raw_hex) = c.transfer(t['address'], holder_address, receiver_address, value, tx_format=TxFormat.RLP_SIGNED) (tx_hash_hex, tx_signed_raw_hex) = c.transfer(t['address'], holder_address, receiver_address, value, tx_format=TxFormat.RLP_SIGNED)
@ -294,8 +305,12 @@ def approve(self, tokens, holder_address, spender_address, value, chain_spec_dic
rpc_signer = RPCConnection.connect(chain_spec, 'signer') rpc_signer = RPCConnection.connect(chain_spec, 'signer')
session = self.create_session() session = self.create_session()
nonce_oracle = CustodialTaskNonceOracle(holder_address, self.request.root_id, session=session) nonce_oracle = CustodialTaskNonceOracle(holder_address, self.request.root_id, session=session)
gas_oracle = self.create_gas_oracle(rpc, MaxGasOracle.gas) enc = ABIContractEncoder()
enc.method('approve')
method = enc.get()
gas_oracle = self.create_gas_oracle(rpc, t['address'], method=method, session=session)
c = ERC20(chain_spec, signer=rpc_signer, gas_oracle=gas_oracle, nonce_oracle=nonce_oracle) c = ERC20(chain_spec, signer=rpc_signer, gas_oracle=gas_oracle, nonce_oracle=nonce_oracle)
try: try:
(tx_hash_hex, tx_signed_raw_hex) = c.approve(t['address'], holder_address, spender_address, value, tx_format=TxFormat.RLP_SIGNED) (tx_hash_hex, tx_signed_raw_hex) = c.approve(t['address'], holder_address, spender_address, value, tx_format=TxFormat.RLP_SIGNED)
@ -382,6 +397,8 @@ def cache_transfer_data(
sender_address = tx_normalize.wallet_address(tx['from']) sender_address = tx_normalize.wallet_address(tx['from'])
recipient_address = tx_normalize.wallet_address(tx_data[0]) recipient_address = tx_normalize.wallet_address(tx_data[0])
token_value = tx_data[1] token_value = tx_data[1]
source_token_address = tx_normalize.executable_address(tx['to'])
destination_token_address = source_token_address
session = SessionBase.create_session() session = SessionBase.create_session()
@ -389,8 +406,8 @@ def cache_transfer_data(
'hash': tx_hash_hex, 'hash': tx_hash_hex,
'from': sender_address, 'from': sender_address,
'to': recipient_address, 'to': recipient_address,
'source_token': tx['to'], 'source_token': source_token_address,
'destination_token': tx['to'], 'destination_token': destination_token_address,
'from_value': token_value, 'from_value': token_value,
'to_value': token_value, 'to_value': token_value,
} }
@ -422,14 +439,16 @@ def cache_transfer_from_data(
spender_address = tx_data[0] spender_address = tx_data[0]
recipient_address = tx_data[1] recipient_address = tx_data[1]
token_value = tx_data[2] token_value = tx_data[2]
source_token_address = tx_normalize.executable_address(tx['to'])
destination_token_address = source_token_address
session = SessionBase.create_session() session = SessionBase.create_session()
tx_dict = { tx_dict = {
'hash': tx_hash_hex, 'hash': tx_hash_hex,
'from': tx['from'], 'from': tx['from'],
'to': recipient_address, 'to': recipient_address,
'source_token': tx['to'], 'source_token': source_token_address,
'destination_token': tx['to'], 'destination_token': destination_token_address,
'from_value': token_value, 'from_value': token_value,
'to_value': token_value, 'to_value': token_value,
} }
@ -461,14 +480,16 @@ def cache_approve_data(
sender_address = tx_normalize.wallet_address(tx['from']) sender_address = tx_normalize.wallet_address(tx['from'])
recipient_address = tx_normalize.wallet_address(tx_data[0]) recipient_address = tx_normalize.wallet_address(tx_data[0])
token_value = tx_data[1] token_value = tx_data[1]
source_token_address = tx_normalize.executable_address(tx['to'])
destination_token_address = source_token_address
session = SessionBase.create_session() session = SessionBase.create_session()
tx_dict = { tx_dict = {
'hash': tx_hash_hex, 'hash': tx_hash_hex,
'from': sender_address, 'from': sender_address,
'to': recipient_address, 'to': recipient_address,
'source_token': tx['to'], 'source_token': source_token_address,
'destination_token': tx['to'], 'destination_token': destination_token_address,
'from_value': token_value, 'from_value': token_value,
'to_value': token_value, 'to_value': token_value,
} }

View File

@ -41,6 +41,7 @@ from chainqueue.db.models.tx import TxCache
from chainqueue.db.models.otx import Otx from chainqueue.db.models.otx import Otx
# local imports # local imports
from cic_eth.db.models.gas_cache import GasCache
from cic_eth.db.models.role import AccountRole from cic_eth.db.models.role import AccountRole
from cic_eth.db.models.base import SessionBase from cic_eth.db.models.base import SessionBase
from cic_eth.error import ( from cic_eth.error import (
@ -65,17 +66,56 @@ from cic_eth.encode import (
ZERO_ADDRESS_NORMAL, ZERO_ADDRESS_NORMAL,
unpack_normal, unpack_normal,
) )
from cic_eth.error import SeppukuError
from cic_eth.eth.util import MAXIMUM_FEE_UNITS
celery_app = celery.current_app celery_app = celery.current_app
logg = logging.getLogger() logg = logging.getLogger()
MAXIMUM_FEE_UNITS = 8000000
class MaxGasOracle: @celery_app.task(base=CriticalSQLAlchemyTask)
def apply_gas_value_cache(address, method, value, tx_hash):
return apply_gas_value_cache_local(address, method, value, tx_hash)
def gas(code=None):
return MAXIMUM_FEE_UNITS def apply_gas_value_cache_local(address, method, value, tx_hash, session=None):
address = tx_normalize.executable_address(address)
tx_hash = tx_normalize.tx_hash(tx_hash)
value = int(value)
session = SessionBase.bind_session(session)
q = session.query(GasCache)
q = q.filter(GasCache.address==address)
q = q.filter(GasCache.method==method)
o = q.first()
if o == None:
o = GasCache(address, method, value, tx_hash)
elif value > o.value:
o.value = value
o.tx_hash = strip_0x(tx_hash)
session.add(o)
session.commit()
SessionBase.release_session(session)
def have_gas_minimum(chain_spec, address, min_gas, session=None, rpc=None):
if rpc == None:
rpc = RPCConnection.connect(chain_spec, 'default')
o = balance(add_0x(address))
r = rpc.do(o)
try:
r = int(r)
except ValueError:
r = strip_0x(r)
r = int(r, 16)
logg.debug('have gas minimum {} have gas {} minimum is {}'.format(address, r, min_gas))
if r < min_gas:
return False
return True
def create_check_gas_task(tx_signed_raws_hex, chain_spec, holder_address, gas=None, tx_hashes_hex=None, queue=None): def create_check_gas_task(tx_signed_raws_hex, chain_spec, holder_address, gas=None, tx_hashes_hex=None, queue=None):
@ -357,6 +397,13 @@ def refill_gas(self, recipient_address, chain_spec_dict):
# set up evm RPC connection # set up evm RPC connection
rpc = RPCConnection.connect(chain_spec, 'default') rpc = RPCConnection.connect(chain_spec, 'default')
# check the gas balance of the gifter
if not have_gas_minimum(chain_spec, gas_provider, self.safe_gas_refill_amount):
raise SeppukuError('Noooooooooooo; gas gifter {} is broke!'.format(gas_provider))
if not have_gas_minimum(chain_spec, gas_provider, self.safe_gas_gifter_balance):
logg.error('Gas gifter {} gas balance is below the safe level to operate!'.format(gas_provider))
# set up transaction builder # set up transaction builder
nonce_oracle = CustodialTaskNonceOracle(gas_provider, self.request.root_id, session=session) nonce_oracle = CustodialTaskNonceOracle(gas_provider, self.request.root_id, session=session)
gas_oracle = self.create_gas_oracle(rpc) gas_oracle = self.create_gas_oracle(rpc)

View File

@ -0,0 +1,54 @@
# standard imports
import logging
# external imports
from chainlib.eth.gas import RPCGasOracle
from hexathon import strip_0x
# local imports
from cic_eth.db.models.gas_cache import GasCache
from cic_eth.encode import tx_normalize
from cic_eth.db.models.base import SessionBase
MAXIMUM_FEE_UNITS = 8000000
logg = logging.getLogger(__name__)
class MaxGasOracle(RPCGasOracle):
def get_fee_units(self, code=None):
return MAXIMUM_FEE_UNITS
class CacheGasOracle(MaxGasOracle):
"""Returns a previously recorded value for fee unit expenditure for a contract call, if it exists. Otherwise returns max units.
:todo: instead of max units, connect a pluggable gas heuristics engine.
"""
def __init__(self, conn, address, method=None, session=None, min_price=None, id_generator=None):
super(CacheGasOracle, self).__init__(conn, code_callback=self.get_fee_units, min_price=min_price, id_generator=id_generator)
self.value = None
self.address = address
self.method = method
address = tx_normalize.executable_address(address)
session = SessionBase.bind_session(session)
q = session.query(GasCache)
q = q.filter(GasCache.address==address)
if method != None:
method = strip_0x(method)
q = q.filter(GasCache.method==method)
o = q.first()
if o != None:
self.value = int(o.value)
SessionBase.release_session(session)
def get_fee_units(self, code=None):
if self.value != None:
logg.debug('found stored gas unit value {} for address {} method {}'.format(self.value, self.address, self.method))
return self.value
return super(CacheGasOracle, self).get_fee_units(code=code)

View File

@ -8,15 +8,14 @@ import confini
script_dir = os.path.dirname(os.path.realpath(__file__)) script_dir = os.path.dirname(os.path.realpath(__file__))
root_dir = os.path.dirname(os.path.dirname(script_dir)) root_dir = os.path.dirname(os.path.dirname(script_dir))
config_dir = os.path.join(root_dir, 'cic_eth', 'data', 'config')
logg = logging.getLogger(__name__) logg = logging.getLogger(__name__)
@pytest.fixture(scope='session') @pytest.fixture(scope='session')
def load_config(): def load_config():
config_dir = os.environ.get('CONFINI_DIR') override_config_dir = os.path.join(root_dir, 'config', 'test')
if config_dir == None: conf = confini.Config(config_dir, 'CICTEST', override_dirs=[override_config_dir])
config_dir = os.path.join(root_dir, 'config/test')
conf = confini.Config(config_dir, 'CICTEST')
conf.process() conf.process()
logg.debug('config {}'.format(conf)) logg.debug('config {}'.format(conf))
return conf return conf

View File

@ -72,7 +72,7 @@ def __balance_incoming_compatible(token_address, receiver_address):
status_compare = dead() status_compare = dead()
q = q.filter(Otx.status.op('&')(status_compare)==0) q = q.filter(Otx.status.op('&')(status_compare)==0)
# TODO: this can change the result for the recipient if tx is later obsoleted and resubmission is delayed. # TODO: this can change the result for the recipient if tx is later obsoleted and resubmission is delayed.
q = q.filter(Otx.status.op('&')(StatusBits.IN_NETWORK)==StatusBits.IN_NETWORK) #q = q.filter(Otx.status.op('&')(StatusBits.IN_NETWORK)==StatusBits.IN_NETWORK)
q = q.filter(TxCache.destination_token_address==token_address) q = q.filter(TxCache.destination_token_address==token_address)
delta = 0 delta = 0
for r in q.all(): for r in q.all():

View File

@ -3,3 +3,4 @@ from .tx import TxFilter
from .gas import GasFilter from .gas import GasFilter
from .register import RegistrationFilter from .register import RegistrationFilter
from .transferauth import TransferAuthFilter from .transferauth import TransferAuthFilter
from .token import TokenFilter

View File

@ -0,0 +1,63 @@
# standard imports
import logging
# external imports
from eth_erc20 import ERC20
from chainlib.eth.contract import (
ABIContractEncoder,
ABIContractType,
)
from chainlib.eth.constant import ZERO_ADDRESS
from chainlib.eth.address import is_same_address
from chainlib.eth.error import RequestMismatchException
from cic_eth_registry import CICRegistry
from cic_eth_registry.erc20 import ERC20Token
from eth_token_index import TokenUniqueSymbolIndex
import celery
# local imports
from .base import SyncFilter
logg = logging.getLogger(__name__)
class TokenFilter(SyncFilter):
def __init__(self, chain_spec, queue, call_address=ZERO_ADDRESS):
self.queue = queue
self.chain_spec = chain_spec
self.caller_address = call_address
def filter(self, conn, block, tx, db_session=None):
if not tx.payload:
return (None, None)
try:
r = ERC20.parse_transfer_request(tx.payload)
except RequestMismatchException:
return (None, None)
token_address = tx.inputs[0]
token = ERC20Token(self.chain_spec, conn, token_address)
registry = CICRegistry(self.chain_spec, conn)
r = registry.by_name(token.symbol, sender_address=self.caller_address)
if is_same_address(r, ZERO_ADDRESS):
return None
enc = ABIContractEncoder()
enc.method('transfer')
method = enc.get()
s = celery.signature(
'cic_eth.eth.gas.apply_gas_value_cache',
[
token_address,
method,
tx.gas_used,
tx.hash,
],
queue=self.queue,
)
return s.apply_async()

View File

@ -67,7 +67,10 @@ from cic_eth.registry import (
connect_declarator, connect_declarator,
connect_token_registry, connect_token_registry,
) )
from cic_eth.task import BaseTask from cic_eth.task import (
BaseTask,
CriticalWeb3Task,
)
logging.basicConfig(level=logging.WARNING) logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger() logg = logging.getLogger()
@ -76,18 +79,18 @@ arg_flags = cic_eth.cli.argflag_std_read
local_arg_flags = cic_eth.cli.argflag_local_task local_arg_flags = cic_eth.cli.argflag_local_task
argparser = cic_eth.cli.ArgumentParser(arg_flags) argparser = cic_eth.cli.ArgumentParser(arg_flags)
argparser.process_local_flags(local_arg_flags) argparser.process_local_flags(local_arg_flags)
#argparser.add_argument('--default-token-symbol', dest='default_token_symbol', type=str, help='Symbol of default token to use')
argparser.add_argument('--trace-queue-status', default=None, dest='trace_queue_status', action='store_true', help='set to perist all queue entry status changes to storage') argparser.add_argument('--trace-queue-status', default=None, dest='trace_queue_status', action='store_true', help='set to perist all queue entry status changes to storage')
argparser.add_argument('--aux-all', action='store_true', help='include tasks from all submodules from the aux module path') argparser.add_argument('--aux-all', action='store_true', help='include tasks from all submodules from the aux module path')
argparser.add_argument('--min-fee-price', dest='min_fee_price', type=int, help='set minimum fee price for transactions, in wei')
argparser.add_argument('--aux', action='append', type=str, default=[], help='add single submodule from the aux module path') argparser.add_argument('--aux', action='append', type=str, default=[], help='add single submodule from the aux module path')
args = argparser.parse_args() args = argparser.parse_args()
# process config # process config
extra_args = { extra_args = {
# 'default_token_symbol': 'CIC_DEFAULT_TOKEN_SYMBOL',
'aux_all': None, 'aux_all': None,
'aux': None, 'aux': None,
'trace_queue_status': 'TASKS_TRACE_QUEUE_STATUS', 'trace_queue_status': 'TASKS_TRACE_QUEUE_STATUS',
'min_fee_price': 'ETH_MIN_FEE_PRICE',
} }
config = cic_eth.cli.Config.from_args(args, arg_flags, local_arg_flags) config = cic_eth.cli.Config.from_args(args, arg_flags, local_arg_flags)
@ -215,6 +218,7 @@ def main():
argv.append('-n') argv.append('-n')
argv.append(config.get('CELERY_QUEUE')) argv.append(config.get('CELERY_QUEUE'))
# TODO: More elegant way of setting queue-wide settings
BaseTask.default_token_symbol = default_token_symbol BaseTask.default_token_symbol = default_token_symbol
BaseTask.default_token_address = default_token_address BaseTask.default_token_address = default_token_address
default_token = ERC20Token(chain_spec, conn, add_0x(BaseTask.default_token_address)) default_token = ERC20Token(chain_spec, conn, add_0x(BaseTask.default_token_address))
@ -222,6 +226,14 @@ def main():
BaseTask.default_token_decimals = default_token.decimals BaseTask.default_token_decimals = default_token.decimals
BaseTask.default_token_name = default_token.name BaseTask.default_token_name = default_token.name
BaseTask.trusted_addresses = trusted_addresses BaseTask.trusted_addresses = trusted_addresses
CriticalWeb3Task.safe_gas_refill_amount = int(config.get('ETH_GAS_HOLDER_MINIMUM_UNITS')) * int(config.get('ETH_GAS_HOLDER_REFILL_UNITS'))
CriticalWeb3Task.safe_gas_threshold_amount = int(config.get('ETH_GAS_HOLDER_MINIMUM_UNITS')) * int(config.get('ETH_GAS_HOLDER_REFILL_THRESHOLD'))
CriticalWeb3Task.safe_gas_gifter_balance = int(config.get('ETH_GAS_HOLDER_MINIMUM_UNITS')) * int(config.get('ETH_GAS_GIFTER_REFILL_BUFFER'))
if config.get('ETH_MIN_FEE_PRICE'):
BaseTask.min_fee_price = int(config.get('ETH_MIN_FEE_PRICE'))
CriticalWeb3Task.safe_gas_threshold_amount *= BaseTask.min_fee_price
CriticalWeb3Task.safe_gas_refill_amount *= BaseTask.min_fee_price
CriticalWeb3Task.safe_gas_gifter_balance *= BaseTask.min_fee_price
BaseTask.run_dir = config.get('CIC_RUN_DIR') BaseTask.run_dir = config.get('CIC_RUN_DIR')
logg.info('default token set to {} {}'.format(BaseTask.default_token_symbol, BaseTask.default_token_address)) logg.info('default token set to {} {}'.format(BaseTask.default_token_symbol, BaseTask.default_token_address))

View File

@ -36,6 +36,7 @@ from cic_eth.runnable.daemons.filters import (
TxFilter, TxFilter,
RegistrationFilter, RegistrationFilter,
TransferAuthFilter, TransferAuthFilter,
TokenFilter,
) )
from cic_eth.stat import init_chain_stat from cic_eth.stat import init_chain_stat
from cic_eth.registry import ( from cic_eth.registry import (
@ -99,10 +100,10 @@ def main():
syncer_backends = SQLBackend.resume(chain_spec, block_offset) syncer_backends = SQLBackend.resume(chain_spec, block_offset)
if len(syncer_backends) == 0: if len(syncer_backends) == 0:
initial_block_start = config.get('SYNCER_OFFSET') initial_block_start = int(config.get('SYNCER_OFFSET'))
initial_block_offset = block_offset initial_block_offset = int(block_offset)
if config.true('SYNCER_NO_HISTORY'): if config.true('SYNCER_NO_HISTORY'):
initial_block_start = block_offset initial_block_start = initial_block_offset
initial_block_offset += 1 initial_block_offset += 1
syncer_backends.append(SQLBackend.initial(chain_spec, initial_block_offset, start_block_height=initial_block_start)) syncer_backends.append(SQLBackend.initial(chain_spec, initial_block_offset, start_block_height=initial_block_start))
logg.info('found no backends to resume, adding initial sync from history start {} end {}'.format(initial_block_start, initial_block_offset)) logg.info('found no backends to resume, adding initial sync from history start {} end {}'.format(initial_block_start, initial_block_offset))
@ -154,6 +155,8 @@ def main():
gas_filter = GasFilter(chain_spec, config.get('CELERY_QUEUE')) gas_filter = GasFilter(chain_spec, config.get('CELERY_QUEUE'))
token_gas_cache_filter = TokenFilter(chain_spec, config.get('CELERY_QUEUE'))
#transfer_auth_filter = TransferAuthFilter(registry, chain_spec, config.get('_CELERY_QUEUE')) #transfer_auth_filter = TransferAuthFilter(registry, chain_spec, config.get('_CELERY_QUEUE'))
i = 0 i = 0
@ -163,6 +166,7 @@ def main():
syncer.add_filter(registration_filter) syncer.add_filter(registration_filter)
# TODO: the two following filter functions break the filter loop if return uuid. Pro: less code executed. Con: Possibly unintuitive flow break # TODO: the two following filter functions break the filter loop if return uuid. Pro: less code executed. Con: Possibly unintuitive flow break
syncer.add_filter(tx_filter) syncer.add_filter(tx_filter)
syncer.add_filter(token_gas_cache_filter)
#syncer.add_filter(transfer_auth_filter) #syncer.add_filter(transfer_auth_filter)
for cf in callback_filters: for cf in callback_filters:
syncer.add_filter(cf) syncer.add_filter(cf)

View File

@ -8,6 +8,7 @@ import re
# external imports # external imports
import cic_eth.cli import cic_eth.cli
from chainlib.chain import ChainSpec from chainlib.chain import ChainSpec
from chainlib.eth.address import is_address
from xdg.BaseDirectory import xdg_config_home from xdg.BaseDirectory import xdg_config_home
# local imports # local imports
@ -21,12 +22,18 @@ logg = logging.getLogger()
arg_flags = cic_eth.cli.argflag_std_base | cic_eth.cli.Flag.UNSAFE | cic_eth.cli.Flag.CHAIN_SPEC arg_flags = cic_eth.cli.argflag_std_base | cic_eth.cli.Flag.UNSAFE | cic_eth.cli.Flag.CHAIN_SPEC
local_arg_flags = cic_eth.cli.argflag_local_taskcallback local_arg_flags = cic_eth.cli.argflag_local_taskcallback
argparser = cic_eth.cli.ArgumentParser(arg_flags) argparser = cic_eth.cli.ArgumentParser(arg_flags)
argparser.add_positional('tag', type=str, help='address tag') argparser.add_argument('--set', action='store_true', help='sets the given tag')
argparser.add_positional('address', type=str, help='address') argparser.add_argument('--tag', type=str, help='operate on the given tag')
argparser.add_positional('address', required=False, type=str, help='address associated with tag')
argparser.process_local_flags(local_arg_flags) argparser.process_local_flags(local_arg_flags)
args = argparser.parse_args() args = argparser.parse_args()
config = cic_eth.cli.Config.from_args(args, arg_flags, local_arg_flags) extra_args = {
'set': None,
'tag': None,
'address': None,
}
config = cic_eth.cli.Config.from_args(args, arg_flags, local_arg_flags, extra_args=extra_args)
celery_app = cic_eth.cli.CeleryApp.from_config(config) celery_app = cic_eth.cli.CeleryApp.from_config(config)
@ -39,7 +46,17 @@ api = AdminApi(None)
def main(): def main():
admin_api.tag_account(args.tag, args.address, chain_spec) if config.get('_ADDRESS') != None and not is_address(config.get('_ADDRESS')):
sys.stderr.write('Invalid address {}'.format(config.get('_ADDRESS')))
sys.exit(1)
if config.get('_SET'):
admin_api.tag_account(chain_spec, config.get('_TAG'), config.get('_ADDRESS'))
else:
t = admin_api.get_tag_account(chain_spec, tag=config.get('_TAG'), address=config.get('_ADDRESS'))
r = t.get()
for v in r:
sys.stdout.write('{}\t{}\n'.format(v[1], v[0]))
if __name__ == '__main__': if __name__ == '__main__':

View File

@ -18,7 +18,7 @@ from cic_eth.api import Api
logging.basicConfig(level=logging.WARNING) logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger('create_account_script') logg = logging.getLogger('create_account_script')
arg_flags = cic_eth.cli.argflag_std_base arg_flags = cic_eth.cli.argflag_local_base
local_arg_flags = cic_eth.cli.argflag_local_taskcallback local_arg_flags = cic_eth.cli.argflag_local_taskcallback
argparser = cic_eth.cli.ArgumentParser(arg_flags) argparser = cic_eth.cli.ArgumentParser(arg_flags)
argparser.add_argument('--token-symbol', dest='token_symbol', type=str, help='Token symbol') argparser.add_argument('--token-symbol', dest='token_symbol', type=str, help='Token symbol')

View File

@ -16,9 +16,14 @@ import confini
import celery import celery
from chainlib.chain import ChainSpec from chainlib.chain import ChainSpec
from chainlib.eth.connection import EthHTTPConnection from chainlib.eth.connection import EthHTTPConnection
from hexathon import add_0x from hexathon import (
add_0x,
strip_0x,
uniform as hex_uniform,
)
# local imports # local imports
import cic_eth.cli
from cic_eth.api.admin import AdminApi from cic_eth.api.admin import AdminApi
from cic_eth.db.enum import ( from cic_eth.db.enum import (
StatusEnum, StatusEnum,
@ -31,59 +36,35 @@ logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger() logg = logging.getLogger()
default_format = 'terminal' default_format = 'terminal'
default_config_dir = os.environ.get('CONFINI_DIR', '/usr/local/etc/cic')
argparser = argparse.ArgumentParser() arg_flags = cic_eth.cli.argflag_std_base
argparser.add_argument('-p', '--provider', dest='p', default='http://localhost:8545', type=str, help='Web3 provider url (http only)') local_arg_flags = cic_eth.cli.argflag_local_taskcallback
argparser.add_argument('-r', '--registry-address', dest='r', type=str, help='CIC registry address') argparser = cic_eth.cli.ArgumentParser(arg_flags)
argparser.add_argument('-f', '--format', dest='f', default=default_format, type=str, help='Output format') argparser.add_argument('-f', '--format', dest='f', default=default_format, type=str, help='Output format')
argparser.add_argument('--status-raw', dest='status_raw', action='store_true', help='Output status bit enum names only')
argparser.add_argument('-c', type=str, default=default_config_dir, help='config root to use')
argparser.add_argument('-i', '--chain-spec', dest='i', type=str, help='chain spec')
argparser.add_argument('-q', type=str, default='cic-eth', help='celery queue to submit transaction tasks to')
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
argparser.add_argument('-v', action='store_true', help='Be verbose')
argparser.add_argument('-vv', help='be more verbose', action='store_true')
argparser.add_argument('query', type=str, help='Transaction, transaction hash, account or "lock"') argparser.add_argument('query', type=str, help='Transaction, transaction hash, account or "lock"')
argparser.process_local_flags(local_arg_flags)
args = argparser.parse_args() args = argparser.parse_args()
if args.v == True:
logging.getLogger().setLevel(logging.INFO)
elif args.vv == True:
logging.getLogger().setLevel(logging.DEBUG)
extra_args = {
config_dir = os.path.join(args.c) 'f': '_FORMAT',
os.makedirs(config_dir, 0o777, True) 'query': '_QUERY',
config = confini.Config(config_dir, args.env_prefix)
config.process()
args_override = {
'ETH_PROVIDER': getattr(args, 'p'),
'CIC_CHAIN_SPEC': getattr(args, 'i'),
'CIC_REGISTRY_ADDRESS': getattr(args, 'r'),
} }
# override args config = cic_eth.cli.Config.from_args(args, arg_flags, local_arg_flags, extra_args=extra_args)
config.dict_override(args_override, 'cli args')
config.censor('PASSWORD', 'DATABASE')
config.censor('PASSWORD', 'SSL')
logg.debug('config loaded from {}:\n{}'.format(config_dir, config))
try: celery_app = cic_eth.cli.CeleryApp.from_config(config)
config.add(add_0x(args.query), '_QUERY', True) queue = config.get('CELERY_QUEUE')
except:
config.add(args.query, '_QUERY', True)
celery_app = celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL')) chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC'))
queue = args.q # connect to celery
celery_app = cic_eth.cli.CeleryApp.from_config(config)
chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC')) # set up rpc
rpc = cic_eth.cli.RPC.from_config(config) #, use_signer=True)
conn = rpc.get_default()
rpc = EthHTTPConnection(args.p) admin_api = AdminApi(conn)
#registry_address = config.get('CIC_REGISTRY_ADDRESS')
admin_api = AdminApi(rpc)
t = admin_api.registry() t = admin_api.registry()
registry_address = t.get() registry_address = t.get()
@ -113,7 +94,7 @@ def render_tx(o, **kwargs):
for v in o.get('status_log', []): for v in o.get('status_log', []):
d = datetime.datetime.fromisoformat(v[0]) d = datetime.datetime.fromisoformat(v[0])
e = status_str(v[1], args.status_raw) e = status_str(v[1], config.get('_RAW'))
content += '{}: {}\n'.format(d, e) content += '{}: {}\n'.format(d, e)
return content return content
@ -154,20 +135,24 @@ def render_lock(o, **kwargs):
def main(): def main():
txs = [] txs = []
renderer = render_tx renderer = render_tx
if len(config.get('_QUERY')) > 66:
#registry = connect_registry(rpc, chain_spec, registry_address)
#admin_api.tx(chain_spec, tx_raw=config.get('_QUERY'), registry=registry, renderer=renderer)
admin_api.tx(chain_spec, tx_raw=config.get('_QUERY'), renderer=renderer)
elif len(config.get('_QUERY')) > 42:
#registry = connect_registry(rpc, chain_spec, registry_address)
#admin_api.tx(chain_spec, tx_hash=config.get('_QUERY'), registry=registry, renderer=renderer)
admin_api.tx(chain_spec, tx_hash=config.get('_QUERY'), renderer=renderer)
elif len(config.get('_QUERY')) == 42: query = config.get('_QUERY')
#registry = connect_registry(rpc, chain_spec, registry_address) try:
txs = admin_api.account(chain_spec, config.get('_QUERY'), include_recipient=False, renderer=render_account) query = hex_uniform(strip_0x(query))
except TypeError:
pass
except ValueError:
pass
if len(query) > 64:
admin_api.tx(chain_spec, tx_raw=query, renderer=renderer)
elif len(query) > 40:
admin_api.tx(chain_spec, tx_hash=query, renderer=renderer)
elif len(query) == 40:
txs = admin_api.account(chain_spec, query, include_recipient=False, renderer=render_account)
renderer = render_account renderer = render_account
elif len(config.get('_QUERY')) >= 4 and config.get('_QUERY')[:4] == 'lock': elif len(query) >= 4 and query[:4] == 'lock':
t = admin_api.get_lock() t = admin_api.get_lock()
txs = t.get() txs = t.get()
renderer = render_lock renderer = render_lock
@ -175,7 +160,7 @@ def main():
r = renderer(txs) r = renderer(txs)
sys.stdout.write(r + '\n') sys.stdout.write(r + '\n')
else: else:
raise ValueError('cannot parse argument {}'.format(config.get('_QUERY'))) raise ValueError('cannot parse argument {}'.format(query))
if __name__ == '__main__': if __name__ == '__main__':

View File

@ -17,6 +17,7 @@ from cic_eth_registry.error import UnknownContractError
# local imports # local imports
from cic_eth.error import SeppukuError from cic_eth.error import SeppukuError
from cic_eth.db.models.base import SessionBase from cic_eth.db.models.base import SessionBase
from cic_eth.eth.util import CacheGasOracle, MaxGasOracle
#logg = logging.getLogger().getChild(__name__) #logg = logging.getLogger().getChild(__name__)
logg = logging.getLogger() logg = logging.getLogger()
@ -24,19 +25,46 @@ logg = logging.getLogger()
celery_app = celery.current_app celery_app = celery.current_app
class BaseTask(celery.Task): class BaseTask(celery.Task):
session_func = SessionBase.create_session session_func = SessionBase.create_session
call_address = ZERO_ADDRESS call_address = ZERO_ADDRESS
trusted_addresses = [] trusted_addresses = []
create_nonce_oracle = RPCNonceOracle min_fee_price = 1
create_gas_oracle = RPCGasOracle min_fee_limit = 30000
default_token_address = None default_token_address = None
default_token_symbol = None default_token_symbol = None
default_token_name = None default_token_name = None
default_token_decimals = None default_token_decimals = None
run_dir = '/run' run_dir = '/run'
def create_gas_oracle(self, conn, address=None, *args, **kwargs):
x = None
if address is None:
x = RPCGasOracle(
conn,
code_callback=kwargs.get('code_callback', self.get_min_fee_limit),
min_price=self.min_fee_price,
id_generator=kwargs.get('id_generator'),
)
else:
x = MaxGasOracle(conn)
x.code_callback = x.get_fee_units
return x
def get_min_fee_limit(self, code):
return self.min_fee_limit
def get_min_fee_limit(self, code):
return self.min_fee_limit
def create_session(self): def create_session(self):
return BaseTask.session_func() return BaseTask.session_func()
@ -78,19 +106,18 @@ class CriticalWeb3Task(CriticalTask):
autoretry_for = ( autoretry_for = (
ConnectionError, ConnectionError,
) )
safe_gas_threshold_amount = 2000000000 * 60000 * 3 safe_gas_threshold_amount = 60000 * 3
safe_gas_refill_amount = safe_gas_threshold_amount * 5 safe_gas_refill_amount = safe_gas_threshold_amount * 5
safe_gas_gifter_balance = safe_gas_threshold_amount * 5 * 100
class CriticalSQLAlchemyAndWeb3Task(CriticalTask): class CriticalSQLAlchemyAndWeb3Task(CriticalWeb3Task):
autoretry_for = ( autoretry_for = (
sqlalchemy.exc.DatabaseError, sqlalchemy.exc.DatabaseError,
sqlalchemy.exc.TimeoutError, sqlalchemy.exc.TimeoutError,
ConnectionError, ConnectionError,
sqlalchemy.exc.ResourceClosedError, sqlalchemy.exc.ResourceClosedError,
) )
safe_gas_threshold_amount = 2000000000 * 60000 * 3
safe_gas_refill_amount = safe_gas_threshold_amount * 5
class CriticalSQLAlchemyAndSignerTask(CriticalTask): class CriticalSQLAlchemyAndSignerTask(CriticalTask):
@ -100,13 +127,10 @@ class CriticalSQLAlchemyAndSignerTask(CriticalTask):
sqlalchemy.exc.ResourceClosedError, sqlalchemy.exc.ResourceClosedError,
) )
class CriticalWeb3AndSignerTask(CriticalTask): class CriticalWeb3AndSignerTask(CriticalWeb3Task):
autoretry_for = ( autoretry_for = (
ConnectionError, ConnectionError,
) )
safe_gas_threshold_amount = 2000000000 * 60000 * 3
safe_gas_refill_amount = safe_gas_threshold_amount * 5
@celery_app.task() @celery_app.task()
def check_health(self): def check_health(self):

View File

@ -1,2 +0,0 @@
[accounts]
writer_address =

View File

@ -1,2 +0,0 @@
[bancor]
dir = tests/testdata/bancor

View File

@ -1,5 +1,3 @@
[celery] [celery]
broker_url = filesystem:// broker_url = filesystem://
result_url = filesystem:// result_url = filesystem://
#broker_url = redis://
#result_url = redis://

View File

@ -1,2 +0,0 @@
[chain]
spec =

View File

@ -1,4 +0,0 @@
[cic]
registry_address =
chain_spec =
trust_address =

View File

@ -1,2 +0,0 @@
[dispatcher]
loop_interval = 0.1

View File

@ -1,8 +0,0 @@
[eth]
#ws_provider = ws://localhost:8546
#ttp_provider = http://localhost:8545
provider = http://localhost:8545
gas_provider_address =
#chain_id =
abi_dir =
faucet_giver_address =

View File

@ -1,5 +1,2 @@
[signer] [signer]
socket_path = /run/crypto-dev-signer/jsonrpc.ipc provider = /run/crypto-dev-signer/jsonrpc.ipc
secret = deedbeef
database_name = signer_test
dev_keys_path =

View File

@ -1,6 +0,0 @@
[SSL]
enable_client = false
cert_file =
key_file =
password =
ca_file =

View File

@ -1,2 +0,0 @@
[SYNCER]
loop_interval = 1

View File

@ -7,17 +7,10 @@ FROM $DOCKER_REGISTRY/cic-base-images:python-3.8.6-dev-e8eb2ee2
# TODO can we take all the requirements out of setup.py and just do a pip install -r requirements.txt && python setup.py # TODO can we take all the requirements out of setup.py and just do a pip install -r requirements.txt && python setup.py
#COPY cic-eth/requirements.txt . #COPY cic-eth/requirements.txt .
ARG EXTRA_PIP_INDEX_URL=https://pip.grassrootseconomics.net:8433 ARG EXTRA_PIP_INDEX_URL=https://pip.grassrootseconomics.net
ARG EXTRA_PIP_ARGS="" ARG EXTRA_PIP_ARGS=""
ARG PIP_INDEX_URL=https://pypi.org/simple ARG PIP_INDEX_URL=https://pypi.org/simple
RUN --mount=type=cache,mode=0755,target=/root/.cache/pip \
pip install --index-url $PIP_INDEX_URL \
--pre \
--extra-index-url $EXTRA_PIP_INDEX_URL $EXTRA_PIP_ARGS \
cic-eth-aux-erc20-demurrage-token~=0.0.2a7
COPY *requirements.txt ./ COPY *requirements.txt ./
RUN --mount=type=cache,mode=0755,target=/root/.cache/pip \ RUN --mount=type=cache,mode=0755,target=/root/.cache/pip \
pip install --index-url $PIP_INDEX_URL \ pip install --index-url $PIP_INDEX_URL \
@ -40,8 +33,6 @@ RUN chmod 755 *.sh
# # they can all be overridden by environment variables # # they can all be overridden by environment variables
# # to generate a list of environment variables from configuration, use: confini-dump -z <dir> (executable provided by confini package) # # to generate a list of environment variables from configuration, use: confini-dump -z <dir> (executable provided by confini package)
#COPY config/ /usr/local/etc/cic-eth/ #COPY config/ /usr/local/etc/cic-eth/
COPY cic_eth/db/migrations/ /usr/local/share/cic-eth/alembic/
COPY crypto_dev_signer_config/ /usr/local/etc/crypto-dev-signer/
# TODO this kind of code sharing across projects should be discouraged...can we make util a library? # TODO this kind of code sharing across projects should be discouraged...can we make util a library?
#COPY util/liveness/health.sh /usr/local/bin/health.sh #COPY util/liveness/health.sh /usr/local/bin/health.sh
@ -66,8 +57,7 @@ ENTRYPOINT []
## # they can all be overridden by environment variables ## # they can all be overridden by environment variables
## # to generate a list of environment variables from configuration, use: confini-dump -z <dir> (executable provided by confini package) ## # to generate a list of environment variables from configuration, use: confini-dump -z <dir> (executable provided by confini package)
#COPY config/ /usr/local/etc/cic-eth/ #COPY config/ /usr/local/etc/cic-eth/
#COPY cic_eth/db/migrations/ /usr/local/share/cic-eth/alembic/ COPY cic_eth/db/migrations/ /usr/local/share/cic-eth/alembic/
#COPY crypto_dev_signer_config/ /usr/local/etc/crypto-dev-signer/
#COPY scripts/ scripts/ #COPY scripts/ scripts/
# #
## TODO this kind of code sharing across projects should be discouraged...can we make util a library? ## TODO this kind of code sharing across projects should be discouraged...can we make util a library?

View File

@ -2,7 +2,7 @@
set -e set -e
pip install --extra-index-url https://pip.grassrootseconomics.net:8433 --extra-index-url https://gitlab.com/api/v4/projects/27624814/packages/pypi/simple \ pip install --extra-index-url https://pip.grassrootseconomics.net --extra-index-url https://gitlab.com/api/v4/projects/27624814/packages/pypi/simple \
-r admin_requirements.txt \ -r admin_requirements.txt \
-r services_requirements.txt \ -r services_requirements.txt \
-r test_requirements.txt -r test_requirements.txt

View File

@ -1,4 +1,7 @@
celery==4.4.7 celery==4.4.7
chainlib-eth>=0.0.10a16,<0.1.0
semver==2.13.0 semver==2.13.0
crypto-dev-signer>=0.4.15rc2,<0.5.0 chainlib-eth~=0.0.15
urlybird~=0.0.1
cic-eth-registry~=0.6.6
cic-types~=0.2.1a8
cic-eth-aux-erc20-demurrage-token~=0.0.3

View File

@ -1,16 +1,15 @@
chainqueue>=0.0.6a1,<0.1.0 chainqueue~=0.0.6a4
chainsyncer[sql]>=0.0.7a3,<0.1.0 chainsyncer[sql]~=0.0.7
alembic==1.4.2 alembic==1.4.2
confini>=0.3.6rc4,<0.5.0 confini~=0.5.3
redis==3.5.3 redis==3.5.3
hexathon~=0.0.1a8 hexathon~=0.1.0
pycryptodome==3.10.1 pycryptodome==3.10.1
liveness~=0.0.1a7 liveness~=0.0.1a7
eth-address-index>=0.2.4a1,<0.3.0 eth-address-index~=0.2.4
eth-accounts-index>=0.1.2a3,<0.2.0 eth-accounts-index~=0.1.2
cic-eth-registry>=0.6.1a6,<0.7.0 erc20-faucet~=0.3.2
erc20-faucet>=0.3.2a2,<0.4.0 erc20-transfer-authorization~=0.3.6
erc20-transfer-authorization>=0.3.5a2,<0.4.0 sarafu-faucet~=0.0.7
sarafu-faucet>=0.0.7a2,<0.1.0 moolb~=0.2.0
moolb~=0.1.1b2 okota~=0.2.5
okota>=0.2.4a6,<0.3.0

View File

@ -1,7 +1,7 @@
[metadata] [metadata]
name = cic-eth name = cic-eth
#version = attr: cic_eth.version.__version_string__ #version = attr: cic_eth.version.__version_string__
version = 0.12.4a13 version = 0.12.7
description = CIC Network Ethereum interaction description = CIC Network Ethereum interaction
author = Louis Holbrook author = Louis Holbrook
author_email = dev@holbrook.no author_email = dev@holbrook.no

View File

@ -6,4 +6,5 @@ pytest-redis==2.0.0
redis==3.5.3 redis==3.5.3
eth-tester==0.5.0b3 eth-tester==0.5.0b3
py-evm==0.3.0a20 py-evm==0.3.0a20
eth-erc20~=0.1.2a2 eth-erc20~=0.1.5
erc20-transfer-authorization~=0.3.6

View File

@ -0,0 +1,98 @@
# external imports
from eth_erc20 import ERC20
from chainlib.connection import RPCConnection
from chainlib.eth.nonce import RPCNonceOracle
from chainlib.eth.gas import (
Gas,
OverrideGasOracle,
)
from chainlib.eth.tx import (
TxFormat,
receipt,
raw,
unpack,
Tx,
)
from chainlib.eth.block import (
Block,
block_latest,
block_by_number,
)
from chainlib.eth.address import is_same_address
from chainlib.eth.contract import ABIContractEncoder
from hexathon import strip_0x
from eth_token_index import TokenUniqueSymbolIndex
# local imports
from cic_eth.runnable.daemons.filters.token import TokenFilter
from cic_eth.db.models.gas_cache import GasCache
from cic_eth.db.models.base import SessionBase
def test_filter_gas(
default_chain_spec,
init_database,
eth_rpc,
eth_signer,
contract_roles,
agent_roles,
token_roles,
foo_token,
token_registry,
register_lookups,
register_tokens,
celery_session_worker,
cic_registry,
):
rpc = RPCConnection.connect(default_chain_spec, 'default')
nonce_oracle = RPCNonceOracle(token_roles['FOO_TOKEN_OWNER'], eth_rpc)
gas_oracle = OverrideGasOracle(price=1000000000, limit=1000000)
c = ERC20(default_chain_spec, signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle)
(tx_hash_hex, tx_signed_raw_hex) = c.transfer(foo_token, token_roles['FOO_TOKEN_OWNER'], agent_roles['ALICE'], 100, tx_format=TxFormat.RLP_SIGNED)
o = raw(tx_signed_raw_hex)
eth_rpc.do(o)
o = receipt(tx_hash_hex)
rcpt = eth_rpc.do(o)
assert rcpt['status'] == 1
fltr = TokenFilter(default_chain_spec, queue=None, call_address=agent_roles['ALICE'])
o = block_latest()
r = eth_rpc.do(o)
o = block_by_number(r, include_tx=False)
r = eth_rpc.do(o)
block = Block(r)
block.txs = [tx_hash_hex]
tx_signed_raw_bytes = bytes.fromhex(strip_0x(tx_signed_raw_hex))
tx_src = unpack(tx_signed_raw_bytes, default_chain_spec)
tx = Tx(tx_src, block=block)
tx.apply_receipt(rcpt)
t = fltr.filter(eth_rpc, block, tx, db_session=init_database)
assert t.get() == None
nonce_oracle = RPCNonceOracle(contract_roles['CONTRACT_DEPLOYER'], eth_rpc)
c = TokenUniqueSymbolIndex(default_chain_spec, signer=eth_signer, nonce_oracle=nonce_oracle)
(tx_hash_hex_register, o) = c.register(token_registry, contract_roles['CONTRACT_DEPLOYER'], foo_token)
eth_rpc.do(o)
o = receipt(tx_hash_hex)
r = eth_rpc.do(o)
assert r['status'] == 1
t = fltr.filter(eth_rpc, block, tx, db_session=init_database)
r = t.get_leaf()
assert t.successful()
q = init_database.query(GasCache)
q = q.filter(GasCache.tx_hash==strip_0x(tx_hash_hex))
o = q.first()
assert is_same_address(o.address, strip_0x(foo_token))
assert o.value > 0
enc = ABIContractEncoder()
enc.method('transfer')
method = enc.get()
assert o.method == method

View File

@ -2,7 +2,7 @@
set -e set -e
pip install --extra-index-url https://pip.grassrootseconomics.net:8433 --extra-index-url https://gitlab.com/api/v4/projects/27624814/packages/pypi/simple pip install --extra-index-url https://pip.grassrootseconomics.net --extra-index-url https://gitlab.com/api/v4/projects/27624814/packages/pypi/simple
-r admin_requirements.txt -r admin_requirements.txt
-r services_requirements.txt -r services_requirements.txt
-r test_requirements.txt -r test_requirements.txt

View File

@ -103,11 +103,11 @@ def test_tag_account(
api = AdminApi(eth_rpc, queue=None) api = AdminApi(eth_rpc, queue=None)
t = api.tag_account('foo', agent_roles['ALICE'], default_chain_spec) t = api.tag_account(default_chain_spec, 'foo', agent_roles['ALICE'])
t.get() t.get()
t = api.tag_account('bar', agent_roles['BOB'], default_chain_spec) t = api.tag_account(default_chain_spec, 'bar', agent_roles['BOB'])
t.get() t.get()
t = api.tag_account('bar', agent_roles['CAROL'], default_chain_spec) t = api.tag_account(default_chain_spec, 'bar', agent_roles['CAROL'])
t.get() t.get()
assert AccountRole.get_address('foo', init_database) == tx_normalize.wallet_address(agent_roles['ALICE']) assert AccountRole.get_address('foo', init_database) == tx_normalize.wallet_address(agent_roles['ALICE'])
@ -288,7 +288,6 @@ def test_fix_nonce(
init_database.commit() init_database.commit()
logg.debug('!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!')
txs = get_nonce_tx_local(default_chain_spec, 3, agent_roles['ALICE'], session=init_database) txs = get_nonce_tx_local(default_chain_spec, 3, agent_roles['ALICE'], session=init_database)
ks = txs.keys() ks = txs.keys()
assert len(ks) == 2 assert len(ks) == 2

View File

@ -191,11 +191,17 @@ def test_tokens(
break break
api_param = str(uuid.uuid4()) api_param = str(uuid.uuid4())
fp = os.path.join(CallbackTask.mmap_path, api_param)
f = open(fp, 'wb+')
f.write(b'\x00')
f.close()
api = Api(str(default_chain_spec), queue=None, callback_param=api_param, callback_task='cic_eth.pytest.mock.callback.test_callback') api = Api(str(default_chain_spec), queue=None, callback_param=api_param, callback_task='cic_eth.pytest.mock.callback.test_callback')
t = api.tokens(['BAR'], proof=[[bar_token_declaration]]) t = api.tokens(['BAR'], proof=[[bar_token_declaration]])
r = t.get() r = t.get()
logg.debug('rr {} {}'.format(r, t.children)) logg.debug('rr {} {}'.format(r, t.children))
while True: while True:
fp = os.path.join(CallbackTask.mmap_path, api_param) fp = os.path.join(CallbackTask.mmap_path, api_param)
try: try:

View File

@ -141,9 +141,57 @@ def test_role_task(
) )
t = s.apply_async() t = s.apply_async()
r = t.get() r = t.get()
assert r == 'foo' assert r[0][0] == address
assert r[0][1] == 'foo'
def test_get_role_task(
init_database,
celery_session_worker,
default_chain_spec,
):
address_foo = '0x' + os.urandom(20).hex()
role_foo = AccountRole.set('foo', address_foo)
init_database.add(role_foo)
address_bar = '0x' + os.urandom(20).hex()
role_bar = AccountRole.set('bar', address_bar)
init_database.add(role_bar)
init_database.commit()
s = celery.signature(
'cic_eth.eth.account.role_account',
[
'bar',
default_chain_spec.asdict(),
],
queue=None,
)
t = s.apply_async()
r = t.get()
assert r[0][0] == address_bar
assert r[0][1] == 'bar'
s = celery.signature(
'cic_eth.eth.account.role_account',
[
None,
default_chain_spec.asdict(),
],
queue=None,
)
t = s.apply_async()
r = t.get()
x_tags = ['foo', 'bar']
x_addrs = [address_foo, address_bar]
for v in r:
x_addrs.remove(v[0])
x_tags.remove(v[1])
assert len(x_tags) == 0
assert len(x_addrs) == 0
def test_gift( def test_gift(
init_database, init_database,

View File

@ -35,10 +35,26 @@ from hexathon import strip_0x
from cic_eth.eth.gas import cache_gas_data from cic_eth.eth.gas import cache_gas_data
from cic_eth.error import OutOfGasError from cic_eth.error import OutOfGasError
from cic_eth.queue.tx import queue_create from cic_eth.queue.tx import queue_create
from cic_eth.task import BaseTask
logg = logging.getLogger() logg = logging.getLogger()
def test_task_gas_limit(
eth_rpc,
eth_signer,
default_chain_spec,
agent_roles,
celery_session_worker,
):
rpc = RPCConnection.connect(default_chain_spec, 'default')
gas_oracle = BaseTask().create_gas_oracle(rpc)
c = Gas(default_chain_spec, signer=eth_signer, gas_oracle=gas_oracle)
(tx_hash_hex, o) = c.create(agent_roles['ALICE'], agent_roles['BOB'], 10, tx_format=TxFormat.RLP_SIGNED)
tx = unpack(bytes.fromhex(strip_0x(o)), default_chain_spec)
assert (tx['gas'], BaseTask.min_fee_price)
def test_task_check_gas_ok( def test_task_check_gas_ok(
default_chain_spec, default_chain_spec,
eth_rpc, eth_rpc,

View File

@ -143,7 +143,7 @@ def test_incoming_balance(
'converters': [], 'converters': [],
} }
b = balance_incoming([token_data], recipient, default_chain_spec.asdict()) b = balance_incoming([token_data], recipient, default_chain_spec.asdict())
assert b[0]['balance_incoming'] == 0 assert b[0]['balance_incoming'] == 1000
otx.readysend(session=init_database) otx.readysend(session=init_database)
init_database.flush() init_database.flush()
@ -152,8 +152,8 @@ def test_incoming_balance(
otx.sent(session=init_database) otx.sent(session=init_database)
init_database.commit() init_database.commit()
b = balance_incoming([token_data], recipient, default_chain_spec.asdict()) #b = balance_incoming([token_data], recipient, default_chain_spec.asdict())
assert b[0]['balance_incoming'] == 1000 #assert b[0]['balance_incoming'] == 1000
otx.success(block=1024, session=init_database) otx.success(block=1024, session=init_database)
init_database.commit() init_database.commit()

View File

@ -1,7 +1,5 @@
crypto-dev-signer>=0.4.15rc2,<=0.4.15 chainqueue~=0.0.6a4
chainqueue>=0.0.5a3,<0.1.0
cic-eth-registry>=0.6.1a6,<0.7.0
redis==3.5.3 redis==3.5.3
hexathon~=0.0.1a8 hexathon~=0.1.0
pycryptodome==3.10.1 pycryptodome==3.10.1
pyxdg==0.27 pyxdg==0.27

View File

@ -1,10 +1,9 @@
[DATABASE] [database]
user = postgres name=cic_notify_test
password = user=
host = localhost password=
port = 5432 host=localhost
name = /tmp/cic-notify.db port=
#engine = postgresql engine=sqlite
#driver = psycopg2 driver=pysqlite
engine = sqlite debug=0
driver = pysqlite

View File

@ -0,0 +1,7 @@
[report]
omit =
venv/*
scripts/*
cic_notify/db/migrations/*
cic_notify/runnable/*
cic_notify/version.py

View File

@ -3,6 +3,7 @@ import logging
import re import re
# third-party imports # third-party imports
import cic_notify.tasks.sms.db
from celery.app.control import Inspect from celery.app.control import Inspect
import celery import celery
@ -13,45 +14,16 @@ app = celery.current_app
logging.basicConfig(level=logging.DEBUG) logging.basicConfig(level=logging.DEBUG)
logg = logging.getLogger() logg = logging.getLogger()
sms_tasks_matcher = r"^(cic_notify.tasks.sms)(\.\w+)?"
re_q = r'^cic-notify'
def get_sms_queue_tasks(app, task_prefix='cic_notify.tasks.sms.'):
host_queues = []
i = Inspect(app=app)
qs = i.active_queues()
for host in qs.keys():
for q in qs[host]:
if re.match(re_q, q['name']):
host_queues.append((host, q['name'],))
task_prefix_len = len(task_prefix)
queue_tasks = []
for (host, queue) in host_queues:
i = Inspect(app=app, destination=[host])
for tasks in i.registered_tasks().values():
for task in tasks:
if len(task) >= task_prefix_len and task[:task_prefix_len] == task_prefix:
queue_tasks.append((queue, task,))
return queue_tasks
class Api: class Api:
# TODO: Implement callback strategy def __init__(self, queue: any = 'cic-notify'):
def __init__(self, queue=None):
""" """
:param queue: The queue on which to execute notification tasks :param queue: The queue on which to execute notification tasks
:type queue: str :type queue: str
""" """
self.queue = queue self.queue = queue
self.sms_tasks = get_sms_queue_tasks(app)
logg.debug('sms tasks {}'.format(self.sms_tasks))
def sms(self, message: str, recipient: str):
def sms(self, message, recipient):
"""This function chains all sms tasks in order to send a message, log and persist said data to disk """This function chains all sms tasks in order to send a message, log and persist said data to disk
:param message: The message to be sent to the recipient. :param message: The message to be sent to the recipient.
:type message: str :type message: str
@ -60,24 +32,9 @@ class Api:
:return: a celery Task :return: a celery Task
:rtype: Celery.Task :rtype: Celery.Task
""" """
signatures = [] s_send = celery.signature('cic_notify.tasks.sms.africastalking.send', [message, recipient], queue=self.queue)
for q in self.sms_tasks: s_log = celery.signature('cic_notify.tasks.sms.log.log', [message, recipient], queue=self.queue)
s_persist_notification = celery.signature(
if not self.queue: 'cic_notify.tasks.sms.db.persist_notification', [message, recipient], queue=self.queue)
queue = q[0] signatures = [s_send, s_log, s_persist_notification]
else: return celery.group(signatures)()
queue = self.queue
signature = celery.signature(
q[1],
[
message,
recipient,
],
queue=queue,
)
signatures.append(signature)
t = celery.group(signatures)()
return t

Some files were not shown because too many files have changed in this diff Show More