Compare commits

..

33 Commits

Author SHA1 Message Date
nolash
efa23797f3 Bump aux version 2021-07-08 11:48:50 +02:00
nolash
2c0f65ed73 Remove hardcoded value 2021-07-07 17:42:47 +02:00
nolash
3f6fbc1c61 Noop calc for non-demurrage token in aux 2021-07-07 14:17:50 +02:00
nolash
d26af206cb Add correct extras selector for contract migration cic-eth include 2021-07-07 13:52:19 +02:00
nolash
36085f218f Add necessary services requirements 2021-07-07 11:59:51 +02:00
nolash
d1d680c9aa Add aux-all to tasker in local cluster 2021-07-07 10:32:01 +02:00
nolash
abd4024322 Bump version 2021-07-07 09:59:20 +02:00
nolash
644b9ec4ec Merge remote-tracking branch 'origin/master' into lash/demurrage-task 2021-07-07 09:58:57 +02:00
nolash
bc52e46620 Merge remote-tracking branch 'origin/master' into lash/demurrage-task 2021-07-07 09:47:09 +02:00
nolash
ba729a0a54 Properly fix mock path in cic-eth test 2021-07-07 09:32:52 +02:00
nolash
cb4927edc9 Correct mocks path in cic tests, faulty contract-migration build 2021-07-07 09:24:40 +02:00
nolash
185458e320 Merge remote-tracking branch 'origin/master' into lash/demurrage-task 2021-07-05 13:05:02 +02:00
nolash
d9214ddd62 Bump cic-eth dep to future build 2021-07-05 11:59:52 +02:00
nolash
b5d6d80d8e Add demurrage token task api 2021-07-05 11:58:07 +02:00
nolash
2f09ac9110 Change aux module to unpopulated base module name 2021-07-05 11:28:27 +02:00
nolash
bc8851ad06 Merge branch 'lash/new-sarafu-token' into lash/demurrage-task 2021-07-05 10:41:35 +02:00
nolash
46840040a0 Remove dev dockerfile 2021-07-05 10:41:19 +02:00
nolash
4e18060589 Remove dev dockerfile 2021-07-05 10:40:32 +02:00
nolash
08a77f9818 Move cic-eth token fixture to package, add demurrage token calc task 2021-07-05 10:29:33 +02:00
nolash
5fab939270 Upgrade erc20 demurrage token deploy, explicit multi mode flag 2021-07-04 13:53:03 +02:00
nolash
51109db487 Merge branch 'lash/new-sarafu-token' into lash/demurrage-task 2021-07-04 13:26:22 +02:00
nolash
f44e2c8b45 Move registry initialization before aux 2021-07-04 13:21:07 +02:00
nolash
40a7eec6ad Add aux requirement for demurrage task 2021-07-02 21:32:48 +02:00
nolash
6a75ce37c4 Add demurrage token amount adjust task 2021-07-02 21:25:47 +02:00
nolash
9b2f2ab0b1 Reduce profanity 2021-07-01 11:05:31 +02:00
nolash
98a38b7117 Upgrade cic-base for contract-migration 2021-06-30 22:51:23 +02:00
nolash
a2ca61355d Merge remote-tracking branch 'origin/master' into lash/new-sarafu-token 2021-06-30 22:01:07 +02:00
nolash
b0638f2262 Rename env var for token symbol in contract migration 2021-06-30 21:16:06 +02:00
nolash
ea4c68f311 Merge remote-tracking branch 'origin/master' into lash/new-sarafu-token 2021-06-30 16:47:36 +02:00
nolash
ba0dc9371e Add colors to run script for contract migrations 2021-06-10 07:16:59 +02:00
nolash
03ac6633a2 Merge branch 'master' into lash/new-sarafu-token 2021-06-09 15:00:02 +02:00
nolash
4dc8dff369 Remove foo 2021-06-05 17:13:14 +02:00
nolash
a74e69aeb3 Introduce dev dockerfile for contract migration enabling faster iteration in dev 2021-06-05 17:06:37 +02:00
429 changed files with 14682 additions and 9539 deletions

1
.gitignore vendored
View File

@@ -13,4 +13,3 @@ build/
**/coverage **/coverage
**/.venv **/.venv
.idea .idea
**/.vim

View File

@@ -2,6 +2,4 @@
omit = omit =
.venv/* .venv/*
scripts/* scripts/*
cic_cache/db/migrations/* cic_cache/db/postgres/*
cic_cache/version.py
cic_cache/cli

View File

@@ -1,4 +0,0 @@
.git
.cache
.dot
**/doc

View File

@@ -1,52 +1,22 @@
.cic_cache_variables: .cic_cache_variables:
variables: variables:
APP_NAME: cic-cache APP_NAME: cic-cache
DOCKERFILE_PATH: docker/Dockerfile_ci DOCKERFILE_PATH: $APP_NAME/docker/Dockerfile
CONTEXT: apps/$APP_NAME
.cic_cache_changes_target:
rules:
- changes:
- $CONTEXT/$APP_NAME/*
build-mr-cic-cache: build-mr-cic-cache:
extends: extends:
- .cic_cache_changes_target
- .py_build_merge_request - .py_build_merge_request
- .cic_cache_variables - .cic_cache_variables
rules:
- if: $CI_PIPELINE_SOURCE == "merge_request_event"
changes:
- apps/cic-cache/**/*
when: always
test-mr-cic-cache:
stage: test
extends:
- .cic_cache_variables
cache:
key:
files:
- test_requirements.txt
paths:
- /root/.cache/pip
image: $MR_IMAGE_TAG
script:
- cd apps/$APP_NAME/
- >
pip install --extra-index-url https://pip.grassrootseconomics.net:8433
--extra-index-url https://gitlab.com/api/v4/projects/27624814/packages/pypi/simple
-r test_requirements.txt
- export PYTHONPATH=. && pytest -x --cov=cic_cache --cov-fail-under=90 --cov-report term-missing tests
needs: ["build-mr-cic-cache"]
rules:
- if: $CI_PIPELINE_SOURCE == "merge_request_event"
changes:
- apps/$APP_NAME/**/*
when: always
build-push-cic-cache: build-push-cic-cache:
extends: extends:
- .py_build_push - .py_build_push
- .cic_cache_variables - .cic_cache_variables
rules:
- if: $CI_COMMIT_BRANCH == "master"
changes:
- apps/cic-cache/**/*
when: always

View File

@@ -1 +0,0 @@
include *requirements.txt cic_cache/data/config/*

View File

@@ -55,37 +55,15 @@ class Api:
queue=callback_queue, queue=callback_queue,
) )
def list(self, offset=0, limit=100, address=None, oldest=False): def list(self, offset, limit, address=None):
s = celery.signature( s = celery.signature(
'cic_cache.tasks.tx.tx_filter', 'cic_cache.tasks.tx.tx_filter',
[ [
offset, 0,
limit, 100,
address, address,
oldest,
], ],
queue=self.queue, queue=None
)
if self.callback_param != None:
s.link(self.callback_success).on_error(self.callback_error)
t = s.apply_async()
return t
def list_content(self, offset=0, limit=100, address=None, block_offset=None, block_limit=None, oldest=False):
s = celery.signature(
'cic_cache.tasks.tx.tx_filter_content',
[
offset,
limit,
address,
block_offset,
block_limit,
oldest,
],
queue=self.queue,
) )
if self.callback_param != None: if self.callback_param != None:
s.link(self.callback_success).on_error(self.callback_error) s.link(self.callback_success).on_error(self.callback_error)

View File

@@ -10,17 +10,11 @@ from cic_cache.db.list import (
list_transactions_mined, list_transactions_mined,
list_transactions_account_mined, list_transactions_account_mined,
list_transactions_mined_with_data, list_transactions_mined_with_data,
list_transactions_mined_with_data_index,
list_transactions_account_mined_with_data_index,
list_transactions_account_mined_with_data,
) )
logg = logging.getLogger() logg = logging.getLogger()
DEFAULT_FILTER_SIZE = 8192 * 8
DEFAULT_LIMIT = 100
class Cache: class Cache:
def __init__(self, session): def __init__(self, session):
@@ -31,12 +25,12 @@ class BloomCache(Cache):
@staticmethod @staticmethod
def __get_filter_size(n): def __get_filter_size(n):
n = DEFAULT_FILTER_SIZE n = 8192 * 8
logg.warning('filter size hardcoded to {}'.format(n)) logg.warning('filter size hardcoded to {}'.format(n))
return n return n
def load_transactions(self, offset, limit, block_offset=None, block_limit=None, oldest=False): def load_transactions(self, offset, limit):
"""Retrieves a list of transactions from cache and creates a bloom filter pointing to blocks and transactions. """Retrieves a list of transactions from cache and creates a bloom filter pointing to blocks and transactions.
Block and transaction numbers are serialized as 32-bit big-endian numbers. The input to the second bloom filter is the concatenation of the serialized block number and transaction index. Block and transaction numbers are serialized as 32-bit big-endian numbers. The input to the second bloom filter is the concatenation of the serialized block number and transaction index.
@@ -53,7 +47,7 @@ class BloomCache(Cache):
:return: Lowest block, bloom filter for blocks, bloom filter for blocks|tx :return: Lowest block, bloom filter for blocks, bloom filter for blocks|tx
:rtype: tuple :rtype: tuple
""" """
rows = list_transactions_mined(self.session, offset, limit, block_offset=block_offset, block_limit=block_limit, oldest=oldest) rows = list_transactions_mined(self.session, offset, limit)
f_block = moolb.Bloom(BloomCache.__get_filter_size(limit), 3) f_block = moolb.Bloom(BloomCache.__get_filter_size(limit), 3)
f_blocktx = moolb.Bloom(BloomCache.__get_filter_size(limit), 3) f_blocktx = moolb.Bloom(BloomCache.__get_filter_size(limit), 3)
@@ -62,12 +56,7 @@ class BloomCache(Cache):
for r in rows: for r in rows:
if highest_block == -1: if highest_block == -1:
highest_block = r[0] highest_block = r[0]
lowest_block = r[0] lowest_block = r[0]
else:
if oldest:
highest_block = r[0]
else:
lowest_block = r[0]
block = r[0].to_bytes(4, byteorder='big') block = r[0].to_bytes(4, byteorder='big')
tx = r[1].to_bytes(4, byteorder='big') tx = r[1].to_bytes(4, byteorder='big')
f_block.add(block) f_block.add(block)
@@ -76,7 +65,7 @@ class BloomCache(Cache):
return (lowest_block, highest_block, f_block.to_bytes(), f_blocktx.to_bytes(),) return (lowest_block, highest_block, f_block.to_bytes(), f_blocktx.to_bytes(),)
def load_transactions_account(self, address, offset, limit, block_offset=None, block_limit=None, oldest=False): def load_transactions_account(self, address, offset, limit):
"""Same as load_transactions(...), but only retrieves transactions where the specified account address is sender or recipient. """Same as load_transactions(...), but only retrieves transactions where the specified account address is sender or recipient.
:param address: Address to retrieve transactions for. :param address: Address to retrieve transactions for.
@@ -88,7 +77,7 @@ class BloomCache(Cache):
:return: Lowest block, bloom filter for blocks, bloom filter for blocks|tx :return: Lowest block, bloom filter for blocks, bloom filter for blocks|tx
:rtype: tuple :rtype: tuple
""" """
rows = list_transactions_account_mined(self.session, address, offset, limit, block_offset=block_offset, block_limit=block_limit, oldest=oldest) rows = list_transactions_account_mined(self.session, address, offset, limit)
f_block = moolb.Bloom(BloomCache.__get_filter_size(limit), 3) f_block = moolb.Bloom(BloomCache.__get_filter_size(limit), 3)
f_blocktx = moolb.Bloom(BloomCache.__get_filter_size(limit), 3) f_blocktx = moolb.Bloom(BloomCache.__get_filter_size(limit), 3)
@@ -97,12 +86,7 @@ class BloomCache(Cache):
for r in rows: for r in rows:
if highest_block == -1: if highest_block == -1:
highest_block = r[0] highest_block = r[0]
lowest_block = r[0] lowest_block = r[0]
else:
if oldest:
highest_block = r[0]
else:
lowest_block = r[0]
block = r[0].to_bytes(4, byteorder='big') block = r[0].to_bytes(4, byteorder='big')
tx = r[1].to_bytes(4, byteorder='big') tx = r[1].to_bytes(4, byteorder='big')
f_block.add(block) f_block.add(block)
@@ -113,21 +97,8 @@ class BloomCache(Cache):
class DataCache(Cache): class DataCache(Cache):
def load_transactions_with_data(self, offset, limit, block_offset=None, block_limit=None, oldest=False): def load_transactions_with_data(self, offset, end):
if limit == 0: rows = list_transactions_mined_with_data(self.session, offset, end)
limit = DEFAULT_LIMIT
rows = list_transactions_mined_with_data(self.session, offset, limit, block_offset, block_limit, oldest=oldest)
return self.__process_rows(rows, oldest)
def load_transactions_account_with_data(self, address, offset, limit, block_offset=None, block_limit=None, oldest=False):
if limit == 0:
limit = DEFAULT_LIMIT
rows = list_transactions_account_mined_with_data(self.session, address, offset, limit, block_offset, block_limit, oldest=oldest)
return self.__process_rows(rows, oldest)
def __process_rows(self, rows, oldest):
tx_cache = [] tx_cache = []
highest_block = -1; highest_block = -1;
lowest_block = -1; lowest_block = -1;
@@ -135,12 +106,7 @@ class DataCache(Cache):
for r in rows: for r in rows:
if highest_block == -1: if highest_block == -1:
highest_block = r['block_number'] highest_block = r['block_number']
lowest_block = r['block_number'] lowest_block = r['block_number']
else:
if oldest:
highest_block = r['block_number']
else:
lowest_block = r['block_number']
tx_type = 'unknown' tx_type = 'unknown'
if r['value'] != None: if r['value'] != None:

View File

@@ -1,15 +0,0 @@
# local imports
from .base import *
from .chain import (
EthChainInterface,
chain_interface,
)
from .rpc import RPC
from .arg import ArgumentParser
from .config import Config
from .celery import CeleryApp
from .registry import (
connect_registry,
connect_token_registry,
connect_declarator,
)

View File

@@ -1,20 +0,0 @@
# external imports
from chainlib.eth.cli import ArgumentParser as BaseArgumentParser
# local imports
from .base import (
CICFlag,
Flag,
)
class ArgumentParser(BaseArgumentParser):
def process_local_flags(self, local_arg_flags):
if local_arg_flags & CICFlag.CELERY:
self.add_argument('-q', '--celery-queue', dest='celery_queue', type=str, default='cic-cache', help='Task queue')
if local_arg_flags & CICFlag.SYNCER:
self.add_argument('--offset', type=int, default=0, help='Start block height for initial history sync')
self.add_argument('--no-history', action='store_true', dest='no_history', help='Skip initial history sync')
if local_arg_flags & CICFlag.CHAIN:
self.add_argument('-r', '--registry-address', type=str, dest='registry_address', help='CIC registry contract address')

View File

@@ -1,31 +0,0 @@
# standard imports
import enum
# external imports
from chainlib.eth.cli import (
argflag_std_read,
argflag_std_write,
argflag_std_base,
Flag,
)
class CICFlag(enum.IntEnum):
# celery - nibble 1
CELERY = 1
# redis - nibble 2
# REDIS = 16
# REDIS_CALLBACK = 32
# chain - nibble 3
CHAIN = 256
# sync - nibble 4
SYNCER = 4096
argflag_local_task = CICFlag.CELERY
#argflag_local_taskcallback = argflag_local_task | CICFlag.REDIS | CICFlag.REDIS_CALLBACK
argflag_local_chain = CICFlag.CHAIN
argflag_local_sync = CICFlag.SYNCER | CICFlag.CHAIN

View File

@@ -1,24 +0,0 @@
# standard imports
import logging
# external imports
import celery
logg = logging.getLogger(__name__)
class CeleryApp:
@classmethod
def from_config(cls, config):
backend_url = config.get('CELERY_RESULT_URL')
broker_url = config.get('CELERY_BROKER_URL')
celery_app = None
if backend_url != None:
celery_app = celery.Celery(broker=broker_url, backend=backend_url)
logg.info('creating celery app on {} with backend on {}'.format(broker_url, backend_url))
else:
celery_app = celery.Celery(broker=broker_url)
logg.info('creating celery app without results backend on {}'.format(broker_url))
return celery_app

View File

@@ -1,21 +0,0 @@
# external imports
from chainlib.eth.block import (
block_by_number,
Block,
)
from chainlib.eth.tx import (
receipt,
Tx,
)
from chainlib.interface import ChainInterface
class EthChainInterface(ChainInterface):
def __init__(self):
self._tx_receipt = receipt
self._block_by_number = block_by_number
self._block_from_src = Block.from_src
self._src_normalize = Tx.src_normalize
chain_interface = EthChainInterface()

View File

@@ -1,63 +0,0 @@
# standard imports
import os
import logging
# external imports
from chainlib.eth.cli import (
Config as BaseConfig,
Flag,
)
# local imports
from .base import CICFlag
script_dir = os.path.dirname(os.path.realpath(__file__))
logg = logging.getLogger(__name__)
class Config(BaseConfig):
local_base_config_dir = os.path.join(script_dir, '..', 'data', 'config')
@classmethod
def from_args(cls, args, arg_flags, local_arg_flags, extra_args={}, default_config_dir=None, base_config_dir=None, default_fee_limit=None):
expanded_base_config_dir = [cls.local_base_config_dir]
if base_config_dir != None:
if isinstance(base_config_dir, str):
base_config_dir = [base_config_dir]
for d in base_config_dir:
expanded_base_config_dir.append(d)
config = BaseConfig.from_args(args, arg_flags, extra_args=extra_args, default_config_dir=default_config_dir, base_config_dir=expanded_base_config_dir, load_callback=None)
local_args_override = {}
# if local_arg_flags & CICFlag.REDIS:
# local_args_override['REDIS_HOST'] = getattr(args, 'redis_host')
# local_args_override['REDIS_PORT'] = getattr(args, 'redis_port')
# local_args_override['REDIS_DB'] = getattr(args, 'redis_db')
# local_args_override['REDIS_TIMEOUT'] = getattr(args, 'redis_timeout')
if local_arg_flags & CICFlag.CHAIN:
local_args_override['CIC_REGISTRY_ADDRESS'] = getattr(args, 'registry_address')
if local_arg_flags & CICFlag.CELERY:
local_args_override['CELERY_QUEUE'] = getattr(args, 'celery_queue')
if local_arg_flags & CICFlag.SYNCER:
local_args_override['SYNCER_OFFSET'] = getattr(args, 'offset')
local_args_override['SYNCER_NO_HISTORY'] = getattr(args, 'no_history')
config.dict_override(local_args_override, 'local cli args')
# if local_arg_flags & CICFlag.REDIS_CALLBACK:
# config.add(getattr(args, 'redis_host_callback'), '_REDIS_HOST_CALLBACK')
# config.add(getattr(args, 'redis_port_callback'), '_REDIS_PORT_CALLBACK')
if local_arg_flags & CICFlag.CELERY:
config.add(config.true('CELERY_DEBUG'), 'CELERY_DEBUG', exists_ok=True)
logg.debug('config loaded:\n{}'.format(config))
return config

View File

@@ -1,33 +0,0 @@
# standard imports
import logging
# external imports
from cic_eth_registry import CICRegistry
from cic_eth_registry.lookup.declarator import AddressDeclaratorLookup
from cic_eth_registry.lookup.tokenindex import TokenIndexLookup
from chainlib.eth.constant import ZERO_ADDRESS
logg = logging.getLogger()
def connect_token_registry(self, conn, chain_spec, sender_address=ZERO_ADDRESS):
registry = CICRegistry(chain_spec, conn)
token_registry_address = registry.by_name('TokenRegistry', sender_address=sender_address)
logg.debug('using token registry address {}'.format(token_registry_address))
lookup = TokenIndexLookup(chain_spec, token_registry_address)
CICRegistry.add_lookup(lookup)
def connect_declarator(self, conn, chain_spec, trusted_addresses, sender_address=ZERO_ADDRESS):
registry = CICRegistry(chain_spec, conn)
declarator_address = registry.by_name('AddressDeclarator', sender_address=sender_address)
logg.debug('using declarator address {}'.format(declarator_address))
lookup = AddressDeclaratorLookup(chain_spec, declarator_address, trusted_addresses)
CICRegistry.add_lookup(lookup)
def connect_registry(conn, chain_spec, registry_address, sender_address=ZERO_ADDRESS):
CICRegistry.address = registry_address
registry = CICRegistry(chain_spec, conn)
registry_address = registry.by_name('ContractRegistry', sender_address=sender_address)
return registry

View File

@@ -1,43 +0,0 @@
# standard imports
import logging
# external imports
from chainlib.connection import (
RPCConnection,
ConnType,
)
from chainlib.eth.connection import EthUnixSignerConnection
from chainlib.chain import ChainSpec
logg = logging.getLogger(__name__)
class RPC:
def __init__(self, chain_spec, rpc_provider, signer_provider=None):
self.chain_spec = chain_spec
self.rpc_provider = rpc_provider
self.signer_provider = signer_provider
def get_default(self):
return RPCConnection.connect(self.chain_spec, 'default')
@staticmethod
def from_config(config):
chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC'))
RPCConnection.register_location(config.get('RPC_HTTP_PROVIDER'), chain_spec, 'default')
if config.get('SIGNER_PROVIDER'):
RPCConnection.register_constructor(ConnType.UNIX, EthUnixSignerConnection, tag='signer')
RPCConnection.register_location(config.get('SIGNER_PROVIDER'), chain_spec, 'signer')
rpc = RPC(chain_spec, config.get('RPC_HTTP_PROVIDER'), signer_provider=config.get('SIGNER_PROVIDER'))
logg.info('set up rpc: {}'.format(rpc))
return rpc
def __str__(self):
return 'RPC factory, chain {}, rpc {}, signer {}'.format(self.chain_spec, self.rpc_provider, self.signer_provider)

View File

@@ -1,5 +0,0 @@
[celery]
broker_url = redis://localhost:6379
result_url =
queue = cic-cache
debug = 0

View File

@@ -1,4 +0,0 @@
[cic]
registry_address =
trust_address =
health_modules = cic_eth.check.db,cic_eth.check.redis,cic_eth.check.signer,cic_eth.check.gas

View File

@@ -1,10 +0,0 @@
[database]
engine =
driver =
host =
port =
name = cic-cache
user =
password =
debug = 0
pool_size = 0

View File

@@ -1,2 +0,0 @@
[signer]
provider =

View File

@@ -1,4 +0,0 @@
[syncer]
loop_interval = 1
offset = 0
no_history = 0

View File

@@ -13,9 +13,6 @@ def list_transactions_mined(
session, session,
offset, offset,
limit, limit,
block_offset,
block_limit,
oldest=False,
): ):
"""Executes db query to return all confirmed transactions according to the specified offset and limit. """Executes db query to return all confirmed transactions according to the specified offset and limit.
@@ -26,62 +23,15 @@ def list_transactions_mined(
:result: Result set :result: Result set
:rtype: SQLAlchemy.ResultProxy :rtype: SQLAlchemy.ResultProxy
""" """
order_by = 'DESC' s = "SELECT block_number, tx_index FROM tx ORDER BY block_number DESC, tx_index DESC LIMIT {} OFFSET {}".format(limit, offset)
if oldest:
order_by = 'ASC'
if block_offset:
if block_limit:
s = "SELECT block_number, tx_index FROM tx WHERE block_number >= {} and block_number <= {} ORDER BY block_number {}, tx_index {} LIMIT {} OFFSET {}".format(block_offset, block_limit, order_by, order_by, limit, offset)
else:
s = "SELECT block_number, tx_index FROM tx WHERE block_number >= {} ORDER BY block_number {}, tx_index {} LIMIT {} OFFSET {}".format(block_offset, order_by, order_by, limit, offset)
else:
s = "SELECT block_number, tx_index FROM tx ORDER BY block_number {}, tx_index {} LIMIT {} OFFSET {}".format(order_by, order_by, limit, offset)
r = session.execute(s) r = session.execute(s)
return r return r
def list_transactions_mined_with_data( def list_transactions_mined_with_data(
session,
offset,
limit,
block_offset,
block_limit,
oldest=False,
):
"""Executes db query to return all confirmed transactions according to the specified offset and limit.
:param block_offset: First block to include in search
:type block_offset: int
:param block_limit: Last block to include in search
:type block_limit: int
:result: Result set
:rtype: SQLAlchemy.ResultProxy
"""
order_by = 'DESC'
if oldest:
order_by = 'ASC'
if block_offset:
if block_limit:
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} AND block_number <= {} ORDER BY block_number {}, tx_index {} LIMIT {} OFFSET {}".format(block_offset, block_limit, order_by, order_by, limit, offset)
else:
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} ORDER BY block_number {}, tx_index {} LIMIT {} OFFSET {}".format(block_offset, order_by, order_by, limit, offset)
else:
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id ORDER BY block_number {}, tx_index {} LIMIT {} OFFSET {}".format(order_by, order_by, limit, offset)
r = session.execute(s)
return r
def list_transactions_mined_with_data_index(
session, session,
offset, offset,
end, end,
block_offset,
block_limit,
oldest=False,
): ):
"""Executes db query to return all confirmed transactions according to the specified offset and limit. """Executes db query to return all confirmed transactions according to the specified offset and limit.
@@ -92,87 +42,7 @@ def list_transactions_mined_with_data_index(
:result: Result set :result: Result set
:rtype: SQLAlchemy.ResultProxy :rtype: SQLAlchemy.ResultProxy
""" """
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} AND block_number <= {} ORDER BY block_number ASC, tx_index ASC".format(offset, end)
order_by = 'DESC'
if oldest:
order_by = 'ASC'
if block_offset:
if block_limit:
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} and block_number <= {} ORDER BY block_number {}, tx_index {} LIMIT {} OFFSET {}".format(block_offset, block_limit, order_by, order_by, offset, end)
else:
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} ORDER BY block_number {}, tx_index {} LIMIT {} OFFSET {}".format(block_offset, order_by, order_by, offset, end)
else:
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id ORDER BY block_number {}, tx_index {} LIMIT {} OFFSET {}".format(order_by, order_by, offset, end)
r = session.execute(s)
return r
def list_transactions_account_mined_with_data_index(
session,
address,
offset,
limit,
block_offset,
block_limit,
oldest=False,
):
"""Executes db query to return all confirmed transactions according to the specified offset and limit, filtered by address
:param offset: Offset in data set to return transactions from
:type offset: int
:param limit: Max number of transactions to retrieve
:type limit: int
:result: Result set
:rtype: SQLAlchemy.ResultProxy
"""
order_by = 'DESC'
if oldest:
order_by = 'ASC'
if block_offset:
if block_limit:
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} AND block_number <= {} AND (sender = '{}' OR recipient = '{}') ORDER BY block_number {}, tx_index {} LIMIT {} OFFSET {}".format(block_offset, block_limit, address, address, order_by, order_by, limit, offset)
else:
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} AND (sender = '{}' OR recipient = '{}') ORDER BY block_number {}, tx_index {} LIMIT {} OFFSET {}".format(block_offset, address, address, order_by, order_by, limit, offset)
else:
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE sender = '{}' OR recipient = '{}' ORDER BY block_number {}, tx_index {} LIMIT {} OFFSET {}".format(address, address, order_by, order_by, limit, offset)
r = session.execute(s)
return r
def list_transactions_account_mined_with_data(
session,
address,
offset,
limit,
block_offset,
block_limit,
oldest=False,
):
"""Executes db query to return all confirmed transactions according to the specified offset and limit.
:param block_offset: First block to include in search
:type block_offset: int
:param block_limit: Last block to include in search
:type block_limit: int
:result: Result set
:rtype: SQLAlchemy.ResultProxy
"""
order_by = 'DESC'
if oldest:
order_by = 'ASC'
if block_offset:
if block_limit:
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} AND block_number <= {} AND (sender = '{}' OR recipient = '{}') ORDER BY block_number {}, tx_index {} LIMIT {} OFFSET {}".format(block_offset, block_limit, address, address, order_by, order_by, limit, offset)
else:
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} AND (sender = '{}' OR recipient = '{}') ORDER BY block_number {}, tx_index {} LIMIT {} OFFSET {}".format(block_offset, address, address, order_by, order_by, limit, offset)
else:
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, success, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE sender = '{}' OR recipient = '{}' ORDER BY block_number {}, tx_index {} LIMIT {} OFFSET {}".format(address, address, order_by, order_by, limit, offset)
r = session.execute(s) r = session.execute(s)
return r return r
@@ -183,9 +53,6 @@ def list_transactions_account_mined(
address, address,
offset, offset,
limit, limit,
block_offset,
block_limit,
oldest=False,
): ):
"""Same as list_transactions_mined(...), but only retrieves transaction where the specified account address is sender or recipient. """Same as list_transactions_mined(...), but only retrieves transaction where the specified account address is sender or recipient.
@@ -198,20 +65,7 @@ def list_transactions_account_mined(
:result: Result set :result: Result set
:rtype: SQLAlchemy.ResultProxy :rtype: SQLAlchemy.ResultProxy
""" """
s = "SELECT block_number, tx_index FROM tx WHERE sender = '{}' OR recipient = '{}' ORDER BY block_number DESC, tx_index DESC LIMIT {} OFFSET {}".format(address, address, limit, offset)
order_by = 'DESC'
if oldest:
order_by = 'ASC'
if block_offset:
if block_limit:
s = "SELECT block_number, tx_index FROM tx WHERE block_number >= {} AND block_number <= {} AND (sender = '{}' OR recipient = '{}') ORDER BY block_number {}, tx_index {} LIMIT {} OFFSET {}".format(block_offset, block_limit, address, address, order_by, order_by, limit, offset)
else:
s = "SELECT block_number, tx_index FROM tx WHERE block_number >= {} AND (sender = '{}' OR recipient = '{}') ORDER BY block_number {}, tx_index {} LIMIT {} OFFSET {}".format(block_offset, address, address, order_by, order_by, limit, offset)
else:
s = "SELECT block_number, tx_index FROM tx WHERE sender = '{}' OR recipient = '{}' ORDER BY block_number {}, tx_index {} LIMIT {} OFFSET {}".format(address, address, order_by, order_by, limit, offset)
r = session.execute(s) r = session.execute(s)
return r return r

View File

@@ -7,7 +7,7 @@ Create Date: 2021-04-01 08:10:29.156243
""" """
from alembic import op from alembic import op
import sqlalchemy as sa import sqlalchemy as sa
from chainsyncer.db.migrations.default.export import ( from chainsyncer.db.migrations.sqlalchemy import (
chainsyncer_upgrade, chainsyncer_upgrade,
chainsyncer_downgrade, chainsyncer_downgrade,
) )

View File

@@ -100,4 +100,3 @@ class SessionBase(Model):
logg.debug('destroying session {}'.format(session_key)) logg.debug('destroying session {}'.format(session_key))
session.commit() session.commit()
session.close() session.close()
del SessionBase.localsessions[session_key]

View File

@@ -4,9 +4,6 @@ import json
import re import re
import base64 import base64
# external imports
from hexathon import add_0x
# local imports # local imports
from cic_cache.cache import ( from cic_cache.cache import (
BloomCache, BloomCache,
@@ -14,11 +11,10 @@ from cic_cache.cache import (
) )
logg = logging.getLogger(__name__) logg = logging.getLogger(__name__)
#logg = logging.getLogger()
re_transactions_all_bloom = r'/tx/(\d+)?/?(\d+)/?' re_transactions_all_bloom = r'/tx/(\d+)?/?(\d+)/?'
re_transactions_account_bloom = r'/tx/user/((0x)?[a-fA-F0-9]+)(/(\d+)(/(\d+))?)?/?' re_transactions_account_bloom = r'/tx/user/((0x)?[a-fA-F0-9]+)/?(\d+)?/?(\d+)/?'
re_transactions_all_data = r'/txa/(\d+)?/?(\d+)/?' re_transactions_all_data = r'/txa/(\d+)/(\d+)/?'
DEFAULT_LIMIT = 100 DEFAULT_LIMIT = 100
@@ -30,13 +26,13 @@ def process_transactions_account_bloom(session, env):
address = r[1] address = r[1]
if r[2] == None: if r[2] == None:
address = add_0x(address) address = '0x' + address
offset = 0 offset = DEFAULT_LIMIT
if r.lastindex > 2: if r.lastindex > 2:
offset = r[4] offset = r[3]
limit = DEFAULT_LIMIT limit = 0
if r.lastindex > 4: if r.lastindex > 3:
limit = r[6] limit = r[4]
c = BloomCache(session) c = BloomCache(session)
(lowest_block, highest_block, bloom_filter_block, bloom_filter_tx) = c.load_transactions_account(address, offset, limit) (lowest_block, highest_block, bloom_filter_block, bloom_filter_tx) = c.load_transactions_account(address, offset, limit)
@@ -91,14 +87,13 @@ def process_transactions_all_data(session, env):
if env.get('HTTP_X_CIC_CACHE_MODE') != 'all': if env.get('HTTP_X_CIC_CACHE_MODE') != 'all':
return None return None
logg.debug('got data request {}'.format(env)) offset = r[1]
block_offset = r[1] end = r[2]
block_end = r[2]
if int(r[2]) < int(r[1]): if int(r[2]) < int(r[1]):
raise ValueError('cart before the horse, dude') raise ValueError('cart before the horse, dude')
c = DataCache(session) c = DataCache(session)
(lowest_block, highest_block, tx_cache) = c.load_transactions_with_data(0, 0, block_offset, block_end, oldest=True) # oldest needs to be settable (lowest_block, highest_block, tx_cache) = c.load_transactions_with_data(offset, end)
for r in tx_cache: for r in tx_cache:
r['date_block'] = r['date_block'].timestamp() r['date_block'] = r['date_block'].timestamp()

View File

@@ -8,7 +8,15 @@ import sys
import re import re
# external imports # external imports
import confini
import celery
import sqlalchemy import sqlalchemy
import rlp
import cic_base.config
import cic_base.log
import cic_base.argparse
import cic_base.rpc
from cic_base.eth.syncer import chain_interface
from cic_eth_registry import CICRegistry from cic_eth_registry import CICRegistry
from cic_eth_registry.error import UnknownContractError from cic_eth_registry.error import UnknownContractError
from chainlib.chain import ChainSpec from chainlib.chain import ChainSpec
@@ -26,7 +34,6 @@ from chainsyncer.driver.history import HistorySyncer
from chainsyncer.db.models.base import SessionBase from chainsyncer.db.models.base import SessionBase
# local imports # local imports
import cic_cache.cli
from cic_cache.db import ( from cic_cache.db import (
dsn_from_config, dsn_from_config,
add_tag, add_tag,
@@ -36,36 +43,32 @@ from cic_cache.runnable.daemons.filters import (
FaucetFilter, FaucetFilter,
) )
logging.basicConfig(level=logging.WARNING) script_dir = os.path.realpath(os.path.dirname(__file__))
logg = logging.getLogger()
# process args def add_block_args(argparser):
arg_flags = cic_cache.cli.argflag_std_read argparser.add_argument('--history-start', type=int, default=0, dest='history_start', help='Start block height for initial history sync')
local_arg_flags = cic_cache.cli.argflag_local_sync argparser.add_argument('--no-history', action='store_true', dest='no_history', help='Skip initial history sync')
argparser = cic_cache.cli.ArgumentParser(arg_flags) return argparser
argparser.process_local_flags(local_arg_flags)
args = argparser.parse_args()
# process config
config = cic_cache.cli.Config.from_args(args, arg_flags, local_arg_flags)
# connect to database logg = cic_base.log.create()
argparser = cic_base.argparse.create(script_dir, cic_base.argparse.full_template)
argparser = cic_base.argparse.add(argparser, add_block_args, 'block')
args = cic_base.argparse.parse(argparser, logg)
config = cic_base.config.create(args.c, args, args.env_prefix)
config.add(args.history_start, 'SYNCER_HISTORY_START', True)
config.add(args.no_history, '_NO_HISTORY', True)
cic_base.config.log(config)
dsn = dsn_from_config(config) dsn = dsn_from_config(config)
SessionBase.connect(dsn, debug=config.true('DATABASE_DEBUG')) SessionBase.connect(dsn, debug=config.true('DATABASE_DEBUG'))
# set up rpc chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
rpc = cic_cache.cli.RPC.from_config(config)
conn = rpc.get_default()
# set up chain provisions cic_base.rpc.setup(chain_spec, config.get('ETH_PROVIDER'))
chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC'))
registry = None
try:
registry = cic_cache.cli.connect_registry(conn, chain_spec, config.get('CIC_REGISTRY_ADDRESS'))
except UnknownContractError as e:
logg.exception('Registry contract connection failed for {}: {}'.format(config.get('CIC_REGISTRY_ADDRESS'), e))
sys.exit(1)
logg.info('connected contract registry {}'.format(config.get('CIC_REGISTRY_ADDRESS')))
def register_filter_tags(filters, session): def register_filter_tags(filters, session):
@@ -92,12 +95,14 @@ def main():
syncers = [] syncers = []
#if SQLBackend.first(chain_spec):
# backend = SQLBackend.initial(chain_spec, block_offset)
syncer_backends = SQLBackend.resume(chain_spec, block_offset) syncer_backends = SQLBackend.resume(chain_spec, block_offset)
if len(syncer_backends) == 0: if len(syncer_backends) == 0:
initial_block_start = config.get('SYNCER_OFFSET') initial_block_start = config.get('SYNCER_HISTORY_START')
initial_block_offset = block_offset initial_block_offset = block_offset
if config.get('SYNCER_NO_HISTORY'): if config.get('_NO_HISTORY'):
initial_block_start = block_offset initial_block_start = block_offset
initial_block_offset += 1 initial_block_offset += 1
syncer_backends.append(SQLBackend.initial(chain_spec, initial_block_offset, start_block_height=initial_block_start)) syncer_backends.append(SQLBackend.initial(chain_spec, initial_block_offset, start_block_height=initial_block_start))
@@ -107,10 +112,10 @@ def main():
logg.info('resuming sync session {}'.format(syncer_backend)) logg.info('resuming sync session {}'.format(syncer_backend))
for syncer_backend in syncer_backends: for syncer_backend in syncer_backends:
syncers.append(HistorySyncer(syncer_backend, cic_cache.cli.chain_interface)) syncers.append(HistorySyncer(syncer_backend, chain_interface))
syncer_backend = SQLBackend.live(chain_spec, block_offset+1) syncer_backend = SQLBackend.live(chain_spec, block_offset+1)
syncers.append(HeadSyncer(syncer_backend, cic_cache.cli.chain_interface)) syncers.append(HeadSyncer(syncer_backend, chain_interface))
trusted_addresses_src = config.get('CIC_TRUST_ADDRESS') trusted_addresses_src = config.get('CIC_TRUST_ADDRESS')
if trusted_addresses_src == None: if trusted_addresses_src == None:

View File

@@ -2,17 +2,14 @@
import celery import celery
# local imports # local imports
from cic_cache.cache import ( from cic_cache.cache import BloomCache
BloomCache,
DataCache,
)
from cic_cache.db.models.base import SessionBase from cic_cache.db.models.base import SessionBase
celery_app = celery.current_app celery_app = celery.current_app
@celery_app.task(bind=True) @celery_app.task(bind=True)
def tx_filter(self, offset, limit, address=None, oldest=False, encoding='hex'): def tx_filter(self, offset, limit, address=None, encoding='hex'):
queue = self.request.delivery_info.get('routing_key') queue = self.request.delivery_info.get('routing_key')
session = SessionBase.create_session() session = SessionBase.create_session()
@@ -20,9 +17,9 @@ def tx_filter(self, offset, limit, address=None, oldest=False, encoding='hex'):
c = BloomCache(session) c = BloomCache(session)
b = None b = None
if address == None: if address == None:
(lowest_block, highest_block, bloom_filter_block, bloom_filter_tx) = c.load_transactions(offset, limit, oldest=oldest) (lowest_block, highest_block, bloom_filter_block, bloom_filter_tx) = c.load_transactions(offset, limit)
else: else:
(lowest_block, highest_block, bloom_filter_block, bloom_filter_tx) = c.load_transactions_account(address, offset, limit, oldest=oldest) (lowest_block, highest_block, bloom_filter_block, bloom_filter_tx) = c.load_transactions_account(address, offset, limit)
session.close() session.close()
@@ -38,17 +35,4 @@ def tx_filter(self, offset, limit, address=None, oldest=False, encoding='hex'):
return o return o
@celery_app.task(bind=True)
def tx_filter_content(self, offset, limit, address=None, block_offset=None, block_limit=None, oldest=False, encoding='hex'):
session = SessionBase.create_session()
c = DataCache(session)
b = None
if address == None:
(lowest_block, highest_block, tx_cache) = c.load_transactions_with_data(offset, limit, block_offset=block_offset, block_limit=block_limit, oldest=oldest)
else:
(lowest_block, highest_block, tx_cache) = c.load_transactions_account_with_data_index(address, offset, limit, block_offset=block_offset, block_limit=block_limit)
session.close()
return (lowest_block, highest_block, tx_cache,)

View File

@@ -4,8 +4,8 @@ import semver
version = ( version = (
0, 0,
2, 2,
1, 0,
'alpha.1', 'alpha.2',
) )
version_object = semver.VersionInfo( version_object = semver.VersionInfo(

View File

@@ -0,0 +1,2 @@
[bancor]
dir =

View File

@@ -1,3 +1,4 @@
[cic] [cic]
registry_address = registry_address =
chain_spec =
trust_address = trust_address =

View File

@@ -0,0 +1,3 @@
[bancor]
registry_address =
dir = /usr/local/share/bancor

View File

@@ -1,3 +1,4 @@
[cic] [cic]
chain_spec =
registry_address = registry_address =
trust_address = 0xEb3907eCad74a0013c259D5874AE7f22DcBcC95C trust_address = 0xEb3907eCad74a0013c259D5874AE7f22DcBcC95C

View File

@@ -0,0 +1,2 @@
[eth]
provider = http://localhost:63545

View File

@@ -1,4 +1,3 @@
[syncer] [syncer]
loop_interval = 1 loop_interval = 1
offset = 0 history_start = 0
no_history = 0

View File

@@ -0,0 +1,2 @@
[eth]
provider = ws://localhost:8545

View File

@@ -0,0 +1,3 @@
[syncer]
loop_interval = 5
history_start = 0

View File

@@ -1,39 +1,52 @@
# syntax = docker/dockerfile:1.2 FROM python:3.8.6-slim-buster
FROM registry.gitlab.com/grassrootseconomics/cic-base-images:python-3.8.6-dev-55da5f4e as dev
# RUN pip install $pip_extra_index_url_flag cic-base[full_graph]==0.1.2b9
COPY requirements.txt . #COPY --from=0 /usr/local/share/cic/solidity/ /usr/local/share/cic/solidity/
#RUN pip install $pip_extra_index_url_flag -r test_requirements.txt
#RUN pip install $pip_extra_index_url_flag .
#RUN pip install .[server]
ARG EXTRA_INDEX_URL="https://pip.grassrootseconomics.net:8433" WORKDIR /usr/src/cic-cache
ARG GITLAB_PYTHON_REGISTRY="https://gitlab.com/api/v4/projects/27624814/packages/pypi/simple"
ARG EXTRA_PIP_ARGS=""
RUN --mount=type=cache,mode=0755,target=/root/.cache/pip \
pip install --index-url https://pypi.org/simple \
--extra-index-url $GITLAB_PYTHON_REGISTRY --extra-index-url $EXTRA_INDEX_URL $EXTRA_PIP_ARGS \
-r requirements.txt
COPY . . ARG pip_extra_index_url_flag='--index https://pypi.org/simple --extra-index-url https://pip.grassrootseconomics.net:8433'
ARG root_requirement_file='requirements.txt'
RUN python setup.py install #RUN apk update && \
# apk add gcc musl-dev gnupg libpq
#RUN apk add postgresql-dev
#RUN apk add linux-headers
#RUN apk add libffi-dev
RUN apt-get update && \
apt install -y gcc gnupg libpq-dev wget make g++ gnupg bash procps git
# Copy shared requirements from top of mono-repo
RUN echo "copying root req file ${root_requirement_file}"
RUN pip install $pip_extra_index_url_flag cic-base[full_graph]==0.1.2b9
COPY cic-cache/requirements.txt ./
COPY cic-cache/setup.cfg \
cic-cache/setup.py \
./
COPY cic-cache/cic_cache/ ./cic_cache/
COPY cic-cache/scripts/ ./scripts/
COPY cic-cache/test_requirements.txt ./
RUN pip install $pip_extra_index_url_flag -r test_requirements.txt
RUN pip install $pip_extra_index_url_flag .
RUN pip install .[server]
COPY cic-cache/tests/ ./tests/
#COPY db/ cic-cache/db
#RUN apk add postgresql-client
# ini files in config directory defines the configurable parameters for the application # ini files in config directory defines the configurable parameters for the application
# they can all be overridden by environment variables # they can all be overridden by environment variables
# to generate a list of environment variables from configuration, use: confini-dump -z <dir> (executable provided by confini package) # to generate a list of environment variables from configuration, use: confini-dump -z <dir> (executable provided by confini package)
COPY config/ /usr/local/etc/cic-cache/ COPY cic-cache/config/ /usr/local/etc/cic-cache/
# for db migrations # for db migrations
RUN git clone https://github.com/vishnubob/wait-for-it.git /usr/local/bin/wait-for-it/ RUN git clone https://github.com/vishnubob/wait-for-it.git /usr/local/bin/wait-for-it/
COPY cic_cache/db/migrations/ /usr/local/share/cic-cache/alembic/ COPY cic-cache/cic_cache/db/migrations/ /usr/local/share/cic-cache/alembic/
COPY /docker/start_tracker.sh ./start_tracker.sh COPY cic-cache/docker/start_tracker.sh ./start_tracker.sh
COPY /docker/db.sh ./db.sh COPY cic-cache/docker/db.sh ./db.sh
RUN chmod 755 ./*.sh RUN chmod 755 ./*.sh
# Tracker # Tracker
# ENTRYPOINT ["/usr/local/bin/cic-cache-tracker", "-vv"] # ENTRYPOINT ["/usr/local/bin/cic-cache-tracker", "-vv"]
# Server # Server
# ENTRYPOINT [ "/usr/local/bin/uwsgi", "--wsgi-file", "/usr/local/lib/python3.8/site-packages/cic_cache/runnable/server.py", "--http", ":80", "--pyargv", "-vv" ] # ENTRYPOINT [ "/usr/local/bin/uwsgi", "--wsgi-file", "/usr/local/lib/python3.8/site-packages/cic_cache/runnable/server.py", "--http", ":80", "--pyargv", "-vv" ]
ENTRYPOINT []

View File

@@ -1,38 +0,0 @@
# syntax = docker/dockerfile:1.2
FROM registry.gitlab.com/grassrootseconomics/cic-base-images:python-3.8.6-dev-55da5f4e as dev
# RUN pip install $pip_extra_index_url_flag cic-base[full_graph]==0.1.2b9
COPY requirements.txt .
#RUN pip install $pip_extra_index_url_flag -r test_requirements.txt
#RUN pip install $pip_extra_index_url_flag .
#RUN pip install .[server]
ARG EXTRA_INDEX_URL="https://pip.grassrootseconomics.net:8433"
ARG GITLAB_PYTHON_REGISTRY="https://gitlab.com/api/v4/projects/27624814/packages/pypi/simple"
ARG EXTRA_PIP_ARGS=""
RUN pip install --index-url https://pypi.org/simple \
--extra-index-url $GITLAB_PYTHON_REGISTRY --extra-index-url $EXTRA_INDEX_URL $EXTRA_PIP_ARGS \
-r requirements.txt
COPY . .
RUN python setup.py install
# ini files in config directory defines the configurable parameters for the application
# they can all be overridden by environment variables
# to generate a list of environment variables from configuration, use: confini-dump -z <dir> (executable provided by confini package)
COPY config/ /usr/local/etc/cic-cache/
# for db migrations
RUN git clone https://github.com/vishnubob/wait-for-it.git /usr/local/bin/wait-for-it/
COPY cic_cache/db/migrations/ /usr/local/share/cic-cache/alembic/
COPY /docker/start_tracker.sh ./start_tracker.sh
COPY /docker/db.sh ./db.sh
RUN chmod 755 ./*.sh
# Tracker
# ENTRYPOINT ["/usr/local/bin/cic-cache-tracker", "-vv"]
# Server
# ENTRYPOINT [ "/usr/local/bin/uwsgi", "--wsgi-file", "/usr/local/lib/python3.8/site-packages/cic_cache/runnable/server.py", "--http", ":80", "--pyargv", "-vv" ]
ENTRYPOINT []

View File

@@ -1,15 +1,13 @@
cic-base==0.1.3a3+build.984b5cff
alembic==1.4.2 alembic==1.4.2
confini>=0.3.6rc4,<0.5.0 confini~=0.3.6rc3
uwsgi==2.0.19.1 uwsgi==2.0.19.1
moolb~=0.1.1b2 moolb~=0.1.0
cic-eth-registry~=0.6.1a1 cic-eth-registry~=0.5.6a1
SQLAlchemy==1.3.20 SQLAlchemy==1.3.20
semver==2.13.0 semver==2.13.0
psycopg2==2.8.6 psycopg2==2.8.6
celery==4.4.7 celery==4.4.7
redis==3.5.3 redis==3.5.3
chainsyncer[sql]>=0.0.6a1,<0.1.0 chainsyncer[sql]~=0.0.3a3
erc20-faucet>=0.3.2a1, <0.4.0 erc20-faucet~=0.2.2a1
chainlib-eth>=0.0.9a3,<0.1.0
chainlib>=0.0.9a2,<0.1.0
eth-address-index>=0.2.3a1,<0.3.0

View File

@@ -23,13 +23,11 @@ licence_files =
[options] [options]
python_requires = >= 3.6 python_requires = >= 3.6
include_package_data = True
packages = packages =
cic_cache cic_cache
cic_cache.tasks cic_cache.tasks
cic_cache.db cic_cache.db
cic_cache.db.models cic_cache.db.models
cic_cache.cli
cic_cache.runnable cic_cache.runnable
cic_cache.runnable.daemons cic_cache.runnable.daemons
cic_cache.runnable.daemons.filters cic_cache.runnable.daemons.filters
@@ -41,4 +39,3 @@ console_scripts =
cic-cache-trackerd = cic_cache.runnable.daemons.tracker:main cic-cache-trackerd = cic_cache.runnable.daemons.tracker:main
cic-cache-serverd = cic_cache.runnable.daemons.server:main cic-cache-serverd = cic_cache.runnable.daemons.server:main
cic-cache-taskerd = cic_cache.runnable.daemons.tasker:main cic-cache-taskerd = cic_cache.runnable.daemons.tasker:main
cic-cache-list = cic_cache.runable.list:main

View File

@@ -6,5 +6,5 @@ sqlparse==0.4.1
pytest-celery==0.0.0a1 pytest-celery==0.0.0a1
eth_tester==0.5.0b3 eth_tester==0.5.0b3
py-evm==0.3.0a20 py-evm==0.3.0a20
sarafu-faucet~=0.0.7a1 cic_base[full]==0.1.3a3+build.984b5cff
erc20-transfer-authorization>=0.3.5a1,<0.4.0 sarafu-faucet~=0.0.4a1

View File

@@ -1,40 +0,0 @@
# standard imports
import os
# external imports
import chainlib.cli
# local imports
import cic_cache.cli
script_dir = os.path.dirname(os.path.realpath(__file__))
config_dir = os.path.join(script_dir, '..', 'testdata', 'config')
def test_argumentparserto_config():
argparser = cic_cache.cli.ArgumentParser()
local_flags = 0xffff
argparser.process_local_flags(local_flags)
argparser.add_argument('--foo', type=str)
args = argparser.parse_args([
'-q', 'baz',
'--offset', '13',
'--no-history',
'-r','0xdeadbeef',
'-vv',
'--foo', 'bar',
])
extra_args = {
'foo': '_BARBARBAR',
}
config = cic_cache.cli.Config.from_args(args, chainlib.cli.argflag_std_base, local_flags, extra_args=extra_args, base_config_dir=config_dir)
assert config.get('_BARBARBAR') == 'bar'
assert config.get('CELERY_QUEUE') == 'baz'
assert config.get('SYNCER_NO_HISTORY') == True
assert config.get('SYNCER_OFFSET') == 13
assert config.get('CIC_REGISTRY_ADDRESS') == '0xdeadbeef'

View File

@@ -1,17 +0,0 @@
# standard imports
import tempfile
# local imports
import cic_cache.cli
def test_cli_celery():
cf = tempfile.mkdtemp()
config = {
'CELERY_RESULT_URL': 'filesystem://' + cf,
}
cic_cache.cli.CeleryApp.from_config(config)
config['CELERY_BROKER_URL'] = 'filesystem://' + cf
cic_cache.cli.CeleryApp.from_config(config)

View File

@@ -1,68 +0,0 @@
# external imports
import pytest
from chainlib.eth.gas import (
Gas,
RPCGasOracle,
)
from chainlib.eth.nonce import RPCNonceOracle
from chainlib.eth.block import (
block_latest,
Block,
)
from chainlib.eth.pytest.fixtures_chain import default_chain_spec
from chainlib.eth.pytest.fixtures_ethtester import *
from cic_eth_registry.pytest.fixtures_contracts import *
from hexathon import add_0x
# local imports
import cic_cache.cli
@pytest.mark.xfail()
def test_cli_rpc(
eth_rpc,
eth_signer,
default_chain_spec,
):
config = {
'CHAIN_SPEC': str(default_chain_spec),
'RPC_HTTP_PROVIDER': 'http://localhost:8545',
}
rpc = cic_cache.cli.RPC.from_config(config, default_label='foo')
conn = rpc.get_by_label('foo')
#o = block_latest()
#conn.do(o)
def test_cli_chain(
default_chain_spec,
eth_rpc,
eth_signer,
contract_roles,
):
ifc = cic_cache.cli.EthChainInterface()
nonce_oracle = RPCNonceOracle(contract_roles['CONTRACT_DEPLOYER'], conn=eth_rpc)
gas_oracle = RPCGasOracle(conn=eth_rpc)
c = Gas(default_chain_spec, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, signer=eth_signer)
recipient = add_0x(os.urandom(20).hex())
(tx_hash, o) = c.create(contract_roles['CONTRACT_DEPLOYER'], recipient, 1024)
r = eth_rpc.do(o)
o = ifc.tx_receipt(r)
r = eth_rpc.do(o)
assert r['status'] == 1
o = ifc.block_by_number(1)
block_src = eth_rpc.do(o)
block = ifc.block_from_src(block_src)
assert block.number == 1
with pytest.raises(KeyError):
assert block_src['gasUsed'] == 21000
assert block_src['gas_used'] == 21000
block_src = ifc.src_normalize(block_src)
assert block_src['gasUsed'] == 21000
assert block_src['gas_used'] == 21000

View File

@@ -5,12 +5,9 @@ import datetime
# external imports # external imports
import pytest import pytest
import moolb
# local imports # local imports
from cic_cache import db from cic_cache import db
from cic_cache import BloomCache
from cic_cache.cache import DEFAULT_FILTER_SIZE
script_dir = os.path.dirname(os.path.realpath(__file__)) script_dir = os.path.dirname(os.path.realpath(__file__))
root_dir = os.path.dirname(script_dir) root_dir = os.path.dirname(script_dir)
@@ -64,6 +61,7 @@ def txs(
dt.timestamp(), dt.timestamp(),
) )
tx_number = 42 tx_number = 42
tx_hash_second = '0x' + os.urandom(32).hex() tx_hash_second = '0x' + os.urandom(32).hex()
tx_signed_second = '0x' + os.urandom(128).hex() tx_signed_second = '0x' + os.urandom(128).hex()
@@ -92,44 +90,6 @@ def txs(
] ]
@pytest.fixture(scope='function')
def more_txs(
init_database,
list_defaults,
list_actors,
list_tokens,
txs,
):
session = init_database
tx_number = 666
tx_hash = '0x' + os.urandom(32).hex()
tx_signed = '0x' + os.urandom(128).hex()
nonce = 3
dt = datetime.datetime.utcnow()
dt += datetime.timedelta(hours=1)
db.add_transaction(
session,
tx_hash,
list_defaults['block']+2,
tx_number,
list_actors['alice'],
list_actors['diane'],
list_tokens['bar'],
list_tokens['bar'],
2048,
4096,
False,
dt.timestamp(),
)
session.commit()
return [tx_hash] + txs
@pytest.fixture(scope='function') @pytest.fixture(scope='function')
def tag_txs( def tag_txs(
init_database, init_database,
@@ -141,7 +101,3 @@ def tag_txs(
db.tag_transaction(init_database, txs[1], 'taag', domain='test') db.tag_transaction(init_database, txs[1], 'taag', domain='test')
@pytest.fixture(scope='session')
def zero_filter():
return moolb.Bloom(DEFAULT_FILTER_SIZE, 3)

View File

@@ -10,7 +10,6 @@ from sqlalchemy import text
from chainlib.eth.tx import Tx from chainlib.eth.tx import Tx
from chainlib.eth.block import Block from chainlib.eth.block import Block
from chainlib.chain import ChainSpec from chainlib.chain import ChainSpec
from chainlib.eth.error import RequestMismatchException
from hexathon import ( from hexathon import (
strip_0x, strip_0x,
add_0x, add_0x,
@@ -19,21 +18,10 @@ from hexathon import (
# local imports # local imports
from cic_cache.db import add_tag from cic_cache.db import add_tag
from cic_cache.runnable.daemons.filters.erc20 import ERC20TransferFilter from cic_cache.runnable.daemons.filters.erc20 import ERC20TransferFilter
from cic_cache.runnable.daemons.filters.base import TagSyncFilter
logg = logging.getLogger() logg = logging.getLogger()
def test_base_filter_str(
init_database,
):
f = TagSyncFilter('foo')
assert 'foo' == str(f)
f = TagSyncFilter('foo', domain='bar')
assert 'bar.foo' == str(f)
def test_erc20_filter( def test_erc20_filter(
eth_rpc, eth_rpc,
foo_token, foo_token,
@@ -79,95 +67,3 @@ def test_erc20_filter(
s = text("SELECT x.tx_hash FROM tag a INNER JOIN tag_tx_link l ON l.tag_id = a.id INNER JOIN tx x ON x.id = l.tx_id WHERE a.domain = :a AND a.value = :b") s = text("SELECT x.tx_hash FROM tag a INNER JOIN tag_tx_link l ON l.tag_id = a.id INNER JOIN tx x ON x.id = l.tx_id WHERE a.domain = :a AND a.value = :b")
r = init_database.execute(s, {'a': fltr.tag_domain, 'b': fltr.tag_name}).fetchone() r = init_database.execute(s, {'a': fltr.tag_domain, 'b': fltr.tag_name}).fetchone()
assert r[0] == tx.hash assert r[0] == tx.hash
def test_erc20_filter_nocontract(
eth_rpc,
foo_token,
init_database,
list_defaults,
list_actors,
tags,
):
chain_spec = ChainSpec('foo', 'bar', 42, 'baz')
fltr = ERC20TransferFilter(chain_spec)
add_tag(init_database, fltr.tag_name, domain=fltr.tag_domain)
# incomplete args
data = 'a9059cbb'
data += strip_0x(list_actors['alice'])
data += '1000'.ljust(64, '0')
block = Block({
'hash': os.urandom(32).hex(),
'number': 42,
'timestamp': datetime.datetime.utcnow().timestamp(),
'transactions': [],
})
tx = Tx({
'to': os.urandom(20).hex(),
'from': list_actors['bob'],
'data': data,
'value': 0,
'hash': os.urandom(32).hex(),
'nonce': 13,
'gasPrice': 10000000,
'gas': 123456,
})
block.txs.append(tx)
tx.block = block
assert not fltr.filter(eth_rpc, block, tx, db_session=init_database)
@pytest.mark.parametrize(
'contract_method,contract_input,expected_exception',
[
('a9059cbb', os.urandom(32).hex(), ValueError), # not enough args
('a9059cbb', os.urandom(31).hex(), ValueError), # wrong arg boundary
('a9059cbc', os.urandom(64).hex(), RequestMismatchException), # wrong method
],
)
def test_erc20_filter_bogus(
eth_rpc,
foo_token,
init_database,
list_defaults,
list_actors,
tags,
contract_method,
contract_input,
expected_exception,
):
chain_spec = ChainSpec('foo', 'bar', 42, 'baz')
fltr = ERC20TransferFilter(chain_spec)
add_tag(init_database, fltr.tag_name, domain=fltr.tag_domain)
# incomplete args
data = contract_method
data += contract_input
block = Block({
'hash': os.urandom(32).hex(),
'number': 42,
'timestamp': datetime.datetime.utcnow().timestamp(),
'transactions': [],
})
tx = Tx({
'to': foo_token,
'from': list_actors['bob'],
'data': data,
'value': 0,
'hash': os.urandom(32).hex(),
'nonce': 13,
'gasPrice': 10000000,
'gas': 123456,
})
block.txs.append(tx)
tx.block = block
assert not fltr.filter(eth_rpc, block, tx, db_session=init_database)

View File

@@ -8,7 +8,6 @@ import json
import pytest import pytest
# local imports # local imports
from cic_cache import db
from cic_cache import BloomCache from cic_cache import BloomCache
from cic_cache.cache import DataCache from cic_cache.cache import DataCache
@@ -19,6 +18,7 @@ def test_cache(
init_database, init_database,
list_defaults, list_defaults,
list_actors, list_actors,
list_tokens,
txs, txs,
): ):
@@ -37,6 +37,9 @@ def test_cache(
def test_cache_data( def test_cache_data(
init_database, init_database,
list_defaults,
list_actors,
list_tokens,
txs, txs,
tag_txs, tag_txs,
): ):
@@ -44,209 +47,10 @@ def test_cache_data(
session = init_database session = init_database
c = DataCache(session) c = DataCache(session)
b = c.load_transactions_with_data(0, 3) #410000, 420000) #, 100, block_offset=410000, block_limit=420000, oldest=True) b = c.load_transactions_with_data(410000, 420000)
assert len(b[2]) == 2 assert len(b[2]) == 2
assert b[2][0]['tx_hash'] == txs[0] assert b[2][0]['tx_hash'] == txs[1]
assert b[2][0]['tx_type'] == 'unknown' assert b[2][1]['tx_type'] == 'unknown'
assert b[2][1]['tx_type'] == 'test.taag' assert b[2][0]['tx_type'] == 'test.taag'
def test_cache_ranges(
init_database,
list_defaults,
list_actors,
list_tokens,
more_txs,
):
session = init_database
oldest = list_defaults['block'] - 1
mid = list_defaults['block']
newest = list_defaults['block'] + 2
c = BloomCache(session)
b = c.load_transactions(0, 100)
assert b[0] == oldest
assert b[1] == newest
b = c.load_transactions(1, 2)
assert b[0] == oldest
assert b[1] == mid
b = c.load_transactions(0, 2)
assert b[0] == mid
assert b[1] == newest
b = c.load_transactions(0, 1)
assert b[0] == newest
assert b[1] == newest
b = c.load_transactions(0, 100, oldest=True)
assert b[0] == oldest
assert b[1] == newest
b = c.load_transactions(0, 100, block_offset=list_defaults['block'])
assert b[0] == mid
assert b[1] == newest
b = c.load_transactions(0, 100, block_offset=list_defaults['block'] - 1, block_limit=list_defaults['block'])
assert b[0] == oldest
assert b[1] == mid
b = c.load_transactions(0, 100, block_offset=list_defaults['block'] - 1, block_limit=list_defaults['block'], oldest=True)
assert b[0] == oldest
assert b[1] == mid
# now check when supplying account
b = c.load_transactions_account(list_actors['alice'], 0, 100)
assert b[0] == oldest
assert b[1] == newest
b = c.load_transactions_account(list_actors['bob'], 0, 100)
assert b[0] == mid
assert b[1] == mid
b = c.load_transactions_account(list_actors['diane'], 0, 100)
assert b[0] == oldest
assert b[1] == newest
# add block filter to the mix
b = c.load_transactions_account(list_actors['alice'], 0, 100, block_offset=list_defaults['block'])
assert b[0] == mid
assert b[1] == newest
b = c.load_transactions_account(list_actors['alice'], 0, 100, block_offset=list_defaults['block'])
assert b[0] == mid
assert b[1] == newest
b = c.load_transactions_account(list_actors['bob'], 0, 100, block_offset=list_defaults['block'] - 1, block_limit=list_defaults['block'])
assert b[0] == mid
assert b[1] == mid
b = c.load_transactions_account(list_actors['diane'], 0, 100, block_offset=list_defaults['block'] - 1, block_limit=list_defaults['block'])
assert b[0] == oldest
assert b[1] == oldest
def test_cache_ranges_data(
init_database,
list_defaults,
list_actors,
list_tokens,
more_txs,
):
session = init_database
oldest = list_defaults['block'] - 1
mid = list_defaults['block']
newest = list_defaults['block'] + 2
c = DataCache(session)
b = c.load_transactions_with_data(0, 100)
assert b[0] == oldest
assert b[1] == newest
assert len(b[2]) == 3
assert b[2][0]['tx_hash'] == more_txs[0]
assert b[2][2]['tx_hash'] == more_txs[2]
b = c.load_transactions_with_data(1, 2)
assert b[0] == oldest
assert b[1] == mid
assert len(b[2]) == 2
assert b[2][0]['tx_hash'] == more_txs[1]
assert b[2][1]['tx_hash'] == more_txs[2]
b = c.load_transactions_with_data(0, 2)
assert b[0] == mid
assert b[1] == newest
assert len(b[2]) == 2
assert b[2][0]['tx_hash'] == more_txs[0]
assert b[2][1]['tx_hash'] == more_txs[1]
b = c.load_transactions_with_data(0, 1)
assert b[0] == newest
assert b[1] == newest
assert len(b[2]) == 1
assert b[2][0]['tx_hash'] == more_txs[0]
b = c.load_transactions_with_data(0, 100, oldest=True)
assert b[0] == oldest
assert b[1] == newest
assert len(b[2]) == 3
assert b[2][0]['tx_hash'] == more_txs[2]
assert b[2][1]['tx_hash'] == more_txs[1]
assert b[2][2]['tx_hash'] == more_txs[0]
b = c.load_transactions_with_data(0, 100, block_offset=list_defaults['block'])
assert b[0] == mid
assert b[1] == newest
assert len(b[2]) == 2
assert b[2][0]['tx_hash'] == more_txs[0]
assert b[2][1]['tx_hash'] == more_txs[1]
b = c.load_transactions_with_data(0, 100, block_offset=list_defaults['block'] - 1, block_limit=list_defaults['block'])
assert b[0] == oldest
assert b[1] == mid
assert len(b[2]) == 2
assert b[2][0]['tx_hash'] == more_txs[1]
assert b[2][1]['tx_hash'] == more_txs[2]
b = c.load_transactions_with_data(0, 100, block_offset=list_defaults['block'] - 1, block_limit=list_defaults['block'], oldest=True)
assert b[0] == oldest
assert b[1] == mid
assert len(b[2]) == 2
assert b[2][0]['tx_hash'] == more_txs[2]
assert b[2][1]['tx_hash'] == more_txs[1]
# now check when supplying account
b = c.load_transactions_account_with_data(list_actors['alice'], 0, 100)
assert b[0] == oldest
assert b[1] == newest
assert len(b[2]) == 3
assert b[2][0]['tx_hash'] == more_txs[0]
assert b[2][1]['tx_hash'] == more_txs[1]
assert b[2][2]['tx_hash'] == more_txs[2]
b = c.load_transactions_account_with_data(list_actors['bob'], 0, 100)
assert b[0] == mid
assert b[1] == mid
assert len(b[2]) == 1
assert b[2][0]['tx_hash'] == more_txs[1]
b = c.load_transactions_account_with_data(list_actors['diane'], 0, 100)
assert b[0] == oldest
assert b[1] == newest
assert len(b[2]) == 2
assert b[2][0]['tx_hash'] == more_txs[0]
assert b[2][1]['tx_hash'] == more_txs[2]
# add block filter to the mix
b = c.load_transactions_account_with_data(list_actors['alice'], 0, 100, block_offset=list_defaults['block'])
assert b[0] == mid
assert b[1] == newest
assert len(b[2]) == 2
assert b[2][0]['tx_hash'] == more_txs[0]
assert b[2][1]['tx_hash'] == more_txs[1]
b = c.load_transactions_account_with_data(list_actors['alice'], 0, 100, block_offset=list_defaults['block'])
assert b[0] == mid
assert b[1] == newest
assert len(b[2]) == 2
assert b[2][0]['tx_hash'] == more_txs[0]
assert b[2][1]['tx_hash'] == more_txs[1]
b = c.load_transactions_account_with_data(list_actors['bob'], 0, 100, block_offset=list_defaults['block'] - 1, block_limit=list_defaults['block'])
assert b[0] == mid
assert b[1] == mid
assert len(b[2]) == 1
assert b[2][0]['tx_hash'] == more_txs[1]
b = c.load_transactions_account_with_data(list_actors['diane'], 0, 100, block_offset=list_defaults['block'] - 1, block_limit=list_defaults['block'])
assert b[0] == oldest
assert b[1] == oldest
assert len(b[2]) == 1
assert b[2][0]['tx_hash'] == more_txs[2]

View File

@@ -1,230 +0,0 @@
# standard imports
import logging
import json
import base64
import copy
import re
# external imports
import pytest
from hexathon import strip_0x
# local imports
from cic_cache.runnable.daemons.query import *
logg = logging.getLogger()
@pytest.mark.parametrize(
'query_path_prefix, query_role, query_address_index, query_offset, query_offset_index, query_limit, query_limit_index, match_re',
[
('/tx/user/', 'alice', 0, None, 3, None, 5, re_transactions_account_bloom),
('/tx/user/', 'alice', 0, 42, 3, None, 5, re_transactions_account_bloom),
('/tx/user/', 'alice', 0, 42, 3, 13, 5, re_transactions_account_bloom),
('/tx/', None, 0, None, 3, None, 5, re_transactions_all_bloom),
('/tx/', None, 0, 42, 3, None, 5, re_transactions_all_bloom),
('/tx/', None, 0, 42, 3, 13, 5, re_transactions_all_bloom),
('/txa/', None, 0, None, 3, None, 5, re_transactions_all_data),
('/txa/', None, 0, 42, 3, None, 5, re_transactions_all_data),
('/txa/', None, 0, 42, 3, 13, 5, re_transactions_all_data),
],
)
def test_query_regex(
list_actors,
query_path_prefix,
query_role,
query_address_index,
query_offset,
query_offset_index,
query_limit,
query_limit_index,
match_re,
):
paths = []
path = query_path_prefix
query_address = None
if query_role != None:
query_address = strip_0x(list_actors[query_role])
paths.append(path + '0x' + query_address)
paths.append(path + query_address)
if query_offset != None:
if query_limit != None:
for i in range(len(paths)-1):
paths[i] += '/{}/{}'.format(query_offset, query_limit)
else:
for i in range(len(paths)-1):
paths[i] += '/' + str(query_offset)
for i in range(len(paths)):
paths.append(paths[i] + '/')
for p in paths:
logg.debug('testing path {} against {}'.format(p, match_re))
m = re.match(match_re, p)
l = len(m.groups())
logg.debug('laast index match {} groups {}'.format(m.lastindex, l))
for i in range(l+1):
logg.debug('group {} {}'.format(i, m[i]))
if m.lastindex >= query_offset_index:
assert query_offset == int(m[query_offset_index + 1])
if m.lastindex >= query_limit_index:
assert query_limit == int(m[query_limit_index + 1])
if query_address_index != None:
match_address = strip_0x(m[query_address_index + 1])
assert query_address == match_address
@pytest.mark.parametrize(
'role_name, query_offset, query_limit, query_match',
[
('alice', None, None, [(420000, 13), (419999, 42)]),
('alice', None, 1, [(420000, 13)]),
('alice', 1, None, [(419999, 42)]), # 420000 == list_defaults['block']
('alice', 2, None, []), # 420000 == list_defaults['block']
],
)
def test_query_process_txs_account(
init_database,
list_defaults,
list_actors,
list_tokens,
txs,
zero_filter,
role_name,
query_offset,
query_limit,
query_match,
):
actor = None
try:
actor = list_actors[role_name]
except KeyError:
actor = os.urandom(20).hex()
path_info = '/tx/user/0x' + strip_0x(actor)
if query_offset != None:
path_info += '/' + str(query_offset)
if query_limit != None:
if query_offset == None:
path_info += '/0'
path_info += '/' + str(query_limit)
env = {
'PATH_INFO': path_info,
}
logg.debug('using path {}'.format(path_info))
r = process_transactions_account_bloom(init_database, env)
assert r != None
o = json.loads(r[1])
block_filter_data = base64.b64decode(o['block_filter'].encode('utf-8'))
zero_filter_data = zero_filter.to_bytes()
if len(query_match) == 0:
assert block_filter_data == zero_filter_data
return
assert block_filter_data != zero_filter_data
block_filter = copy.copy(zero_filter)
block_filter.merge(block_filter_data)
block_filter_data = block_filter.to_bytes()
assert block_filter_data != zero_filter_data
for (block, tx) in query_match:
block = block.to_bytes(4, byteorder='big')
assert block_filter.check(block)
@pytest.mark.parametrize(
'query_offset, query_limit, query_match',
[
(None, 2, [(420000, 13), (419999, 42)]),
(0, 1, [(420000, 13)]),
(1, 1, [(419999, 42)]),
(2, 0, []),
],
)
def test_query_process_txs_bloom(
init_database,
list_defaults,
list_actors,
list_tokens,
txs,
zero_filter,
query_offset,
query_limit,
query_match,
):
path_info = '/tx'
if query_offset != None:
path_info += '/' + str(query_offset)
if query_limit != None:
if query_offset == None:
path_info += '/0'
path_info += '/' + str(query_limit)
env = {
'PATH_INFO': path_info,
}
logg.debug('using path {}'.format(path_info))
r = process_transactions_all_bloom(init_database, env)
assert r != None
o = json.loads(r[1])
block_filter_data = base64.b64decode(o['block_filter'].encode('utf-8'))
zero_filter_data = zero_filter.to_bytes()
if len(query_match) == 0:
assert block_filter_data == zero_filter_data
return
assert block_filter_data != zero_filter_data
block_filter = copy.copy(zero_filter)
block_filter.merge(block_filter_data)
block_filter_data = block_filter.to_bytes()
assert block_filter_data != zero_filter_data
for (block, tx) in query_match:
block = block.to_bytes(4, byteorder='big')
assert block_filter.check(block)
@pytest.mark.parametrize(
'query_block_start, query_block_end, query_match_count',
[
(None, 42, 0),
(420000, 420001, 1),
(419999, 419999, 1), # matches are inclusive
(419999, 420000, 2),
(419999, 420001, 2),
],
)
def test_query_process_txs_data(
init_database,
list_defaults,
list_actors,
list_tokens,
txs,
zero_filter,
query_block_start,
query_block_end,
query_match_count,
):
path_info = '/txa'
if query_block_start != None:
path_info += '/' + str(query_block_start)
if query_block_end != None:
if query_block_start == None:
path_info += '/0'
path_info += '/' + str(query_block_end)
env = {
'PATH_INFO': path_info,
'HTTP_X_CIC_CACHE_MODE': 'all',
}
logg.debug('using path {}'.format(path_info))
r = process_transactions_all_data(init_database, env)
assert r != None
o = json.loads(r[1])
assert len(o['data']) == query_match_count

View File

@@ -1,2 +0,0 @@
[foo]
bar_baz = xyzzy

View File

@@ -46,7 +46,7 @@ def get_adjusted_balance(self, token_symbol, amount, timestamp):
def aux_setup(rpc, config, sender_address=ZERO_ADDRESS): def aux_setup(rpc, config, sender_address=ZERO_ADDRESS):
chain_spec_str = config.get('CHAIN_SPEC') chain_spec_str = config.get('CIC_CHAIN_SPEC')
chain_spec = ChainSpec.from_chain_str(chain_spec_str) chain_spec = ChainSpec.from_chain_str(chain_spec_str)
token_symbol = config.get('CIC_DEFAULT_TOKEN_SYMBOL') token_symbol = config.get('CIC_DEFAULT_TOKEN_SYMBOL')

View File

@@ -1,5 +1,5 @@
celery==4.4.7 celery==4.4.7
erc20-demurrage-token~=0.0.3a1 erc20-demurrage-token~=0.0.2a3
cic-eth-registry~=0.5.8a1 cic-eth-registry~=0.5.6a1
chainlib~=0.0.7a1 chainlib~=0.0.5a1
cic_eth~=0.12.2a4 cic_eth~=0.12.0a2

View File

@@ -1,6 +1,6 @@
[metadata] [metadata]
name = cic-eth-aux-erc20-demurrage-token name = cic-eth-aux-erc20-demurrage-token
version = 0.0.2a6 version = 0.0.2a4
description = cic-eth tasks supporting erc20 demurrage token description = cic-eth tasks supporting erc20 demurrage token
author = Louis Holbrook author = Louis Holbrook
author_email = dev@holbrook.no author_email = dev@holbrook.no

View File

@@ -5,7 +5,8 @@ pytest-cov==2.10.1
eth-tester==0.5.0b3 eth-tester==0.5.0b3
py-evm==0.3.0a20 py-evm==0.3.0a20
SQLAlchemy==1.3.20 SQLAlchemy==1.3.20
cic-eth~=0.12.0a1
liveness~=0.0.1a7 liveness~=0.0.1a7
eth-accounts-index==0.1.1a1 eth-accounts-index==0.0.12a1
eth-contract-registry==0.5.8a1 eth-contract-registry==0.5.6a1
eth-address-index==0.2.1a1 eth-address-index==0.1.2a1

View File

@@ -6,5 +6,4 @@ omit =
cic_eth/sync/head.py cic_eth/sync/head.py
cic_eth/sync/mempool.py cic_eth/sync/mempool.py
cic_eth/queue/state.py cic_eth/queue/state.py
cic_eth/cli
*redis*.py *redis*.py

View File

@@ -1,6 +0,0 @@
.git
.cache
.dot
**/doc
**/.venv
**/venv

View File

@@ -1,52 +1,31 @@
.cic_eth_variables: .cic_eth_variables:
variables: variables:
APP_NAME: cic-eth APP_NAME: cic-eth
DOCKERFILE_PATH: docker/Dockerfile_ci DOCKERFILE_PATH: $APP_NAME/docker/Dockerfile
CONTEXT: apps/$APP_NAME
.cic_eth_mr_changes_target:
rules:
- if: $CI_PIPELINE_SOURCE == "merge_request_event"
changes:
- $CONTEXT/$APP_NAME/**/*
when: always
build-mr-cic-eth: build-mr-cic-eth:
extends: extends:
- .cic_eth_variables - .cic_eth_variables
- .py_build_target_dev - .cic_eth_mr_changes_target
rules: - .py_build_target_test
- if: $CI_PIPELINE_SOURCE == "merge_request_event"
changes:
- apps/cic-eth/**/*
when: always
test-mr-cic-eth: test-mr-cic-eth:
stage: test
extends: extends:
- .cic_eth_variables - .cic_eth_variables
cache: - .cic_eth_mr_changes_target
key: stage: test
files: image: $IMAGE_TAG_BASE
- test_requirements.txt
paths:
- /root/.cache/pip
image: $MR_IMAGE_TAG
script: script:
- cd apps/$APP_NAME/ - cd apps/$APP_NAME/
- > - pytest -x --cov=cic_eth --cov-fail-under=90 --cov-report term-missing tests
pip install --extra-index-url https://pip.grassrootseconomics.net:8433
--extra-index-url https://gitlab.com/api/v4/projects/27624814/packages/pypi/simple
-r admin_requirements.txt
-r services_requirements.txt
-r test_requirements.txt
- export PYTHONPATH=. && pytest -x --cov=cic_eth --cov-fail-under=90 --cov-report term-missing tests
needs: ["build-mr-cic-eth"]
rules:
- if: $CI_PIPELINE_SOURCE == "merge_request_event"
changes:
- apps/cic-eth/**/*
when: always
build-push-cic-eth: build-push-cic-eth:
extends: extends:
- .py_build_push - .py_build_push
- .cic_eth_variables
rules:
- if: $CI_COMMIT_BRANCH == "master"
changes:
- apps/cic-eth/**/*
when: always

View File

@@ -1,2 +1,2 @@
include *requirements.txt config/test/* cic_eth/data/config/* include *requirements.txt config/test/*

View File

@@ -1,5 +1,5 @@
SQLAlchemy==1.3.20 SQLAlchemy==1.3.20
cic-eth-registry>=0.6.1a2,<0.7.0 cic-eth-registry~=0.5.6a1
hexathon~=0.0.1a8 hexathon~=0.0.1a7
chainqueue>=0.0.3a2,<0.1.0 chainqueue~=0.0.2b5
eth-erc20>=0.1.2a2,<0.2.0 eth-erc20==0.0.10a2

View File

@@ -6,11 +6,6 @@ import logging
import celery import celery
from chainlib.eth.constant import ZERO_ADDRESS from chainlib.eth.constant import ZERO_ADDRESS
from chainlib.chain import ChainSpec from chainlib.chain import ChainSpec
from hexathon import (
add_0x,
strip_0x,
uniform as hex_uniform,
)
# local imports # local imports
from cic_eth.db.enum import LockEnum from cic_eth.db.enum import LockEnum
@@ -24,12 +19,6 @@ from cic_eth.error import LockedError
celery_app = celery.current_app celery_app = celery.current_app
logg = logging.getLogger() logg = logging.getLogger()
def normalize_address(a):
if a == None:
return None
return add_0x(hex_uniform(strip_0x(a)))
@celery_app.task(base=CriticalSQLAlchemyTask) @celery_app.task(base=CriticalSQLAlchemyTask)
def lock(chained_input, chain_spec_dict, address=ZERO_ADDRESS, flags=LockEnum.ALL, tx_hash=None): def lock(chained_input, chain_spec_dict, address=ZERO_ADDRESS, flags=LockEnum.ALL, tx_hash=None):
"""Task wrapper to set arbitrary locks """Task wrapper to set arbitrary locks
@@ -43,7 +32,6 @@ def lock(chained_input, chain_spec_dict, address=ZERO_ADDRESS, flags=LockEnum.AL
:returns: New lock state for address :returns: New lock state for address
:rtype: number :rtype: number
""" """
address = normalize_address(address)
chain_str = '::' chain_str = '::'
if chain_spec_dict != None: if chain_spec_dict != None:
chain_str = str(ChainSpec.from_dict(chain_spec_dict)) chain_str = str(ChainSpec.from_dict(chain_spec_dict))
@@ -65,7 +53,6 @@ def unlock(chained_input, chain_spec_dict, address=ZERO_ADDRESS, flags=LockEnum.
:returns: New lock state for address :returns: New lock state for address
:rtype: number :rtype: number
""" """
address = normalize_address(address)
chain_str = '::' chain_str = '::'
if chain_spec_dict != None: if chain_spec_dict != None:
chain_str = str(ChainSpec.from_dict(chain_spec_dict)) chain_str = str(ChainSpec.from_dict(chain_spec_dict))
@@ -85,7 +72,6 @@ def lock_send(chained_input, chain_spec_dict, address=ZERO_ADDRESS, tx_hash=None
:returns: New lock state for address :returns: New lock state for address
:rtype: number :rtype: number
""" """
address = normalize_address(address)
chain_str = str(ChainSpec.from_dict(chain_spec_dict)) chain_str = str(ChainSpec.from_dict(chain_spec_dict))
r = Lock.set(chain_str, LockEnum.SEND, address=address, tx_hash=tx_hash) r = Lock.set(chain_str, LockEnum.SEND, address=address, tx_hash=tx_hash)
logg.debug('Send locked for {}, flag now {}'.format(address, r)) logg.debug('Send locked for {}, flag now {}'.format(address, r))
@@ -103,7 +89,6 @@ def unlock_send(chained_input, chain_spec_dict, address=ZERO_ADDRESS):
:returns: New lock state for address :returns: New lock state for address
:rtype: number :rtype: number
""" """
address = normalize_address(address)
chain_str = str(ChainSpec.from_dict(chain_spec_dict)) chain_str = str(ChainSpec.from_dict(chain_spec_dict))
r = Lock.reset(chain_str, LockEnum.SEND, address=address) r = Lock.reset(chain_str, LockEnum.SEND, address=address)
logg.debug('Send unlocked for {}, flag now {}'.format(address, r)) logg.debug('Send unlocked for {}, flag now {}'.format(address, r))
@@ -121,7 +106,6 @@ def lock_queue(chained_input, chain_spec_dict, address=ZERO_ADDRESS, tx_hash=Non
:returns: New lock state for address :returns: New lock state for address
:rtype: number :rtype: number
""" """
address = normalize_address(address)
chain_str = str(ChainSpec.from_dict(chain_spec_dict)) chain_str = str(ChainSpec.from_dict(chain_spec_dict))
r = Lock.set(chain_str, LockEnum.QUEUE, address=address, tx_hash=tx_hash) r = Lock.set(chain_str, LockEnum.QUEUE, address=address, tx_hash=tx_hash)
logg.debug('Queue direct locked for {}, flag now {}'.format(address, r)) logg.debug('Queue direct locked for {}, flag now {}'.format(address, r))
@@ -139,7 +123,6 @@ def unlock_queue(chained_input, chain_spec_dict, address=ZERO_ADDRESS):
:returns: New lock state for address :returns: New lock state for address
:rtype: number :rtype: number
""" """
address = normalize_address(address)
chain_str = str(ChainSpec.from_dict(chain_spec_dict)) chain_str = str(ChainSpec.from_dict(chain_spec_dict))
r = Lock.reset(chain_str, LockEnum.QUEUE, address=address) r = Lock.reset(chain_str, LockEnum.QUEUE, address=address)
logg.debug('Queue direct unlocked for {}, flag now {}'.format(address, r)) logg.debug('Queue direct unlocked for {}, flag now {}'.format(address, r))
@@ -148,7 +131,6 @@ def unlock_queue(chained_input, chain_spec_dict, address=ZERO_ADDRESS):
@celery_app.task(base=CriticalSQLAlchemyTask) @celery_app.task(base=CriticalSQLAlchemyTask)
def check_lock(chained_input, chain_spec_dict, lock_flags, address=None): def check_lock(chained_input, chain_spec_dict, lock_flags, address=None):
address = normalize_address(address)
chain_str = '::' chain_str = '::'
if chain_spec_dict != None: if chain_spec_dict != None:
chain_str = str(ChainSpec.from_dict(chain_spec_dict)) chain_str = str(ChainSpec.from_dict(chain_spec_dict))

View File

@@ -14,11 +14,7 @@ from chainqueue.sql.query import get_tx
from chainqueue.sql.state import set_cancel from chainqueue.sql.state import set_cancel
from chainqueue.db.models.otx import Otx from chainqueue.db.models.otx import Otx
from chainqueue.db.models.tx import TxCache from chainqueue.db.models.tx import TxCache
from hexathon import ( from hexathon import strip_0x
strip_0x,
add_0x,
uniform as hex_uniform,
)
from potaahto.symbols import snake_and_camel from potaahto.symbols import snake_and_camel
# local imports # local imports
@@ -73,17 +69,15 @@ def shift_nonce(self, chainspec_dict, tx_hash_orig_hex, delta=1):
set_cancel(chain_spec, strip_0x(tx['hash']), manual=True, session=session) set_cancel(chain_spec, strip_0x(tx['hash']), manual=True, session=session)
query_address = add_0x(hex_uniform(strip_0x(address))) # aaaaargh
q = session.query(Otx) q = session.query(Otx)
q = q.join(TxCache) q = q.join(TxCache)
q = q.filter(TxCache.sender==query_address) q = q.filter(TxCache.sender==address)
q = q.filter(Otx.nonce>=nonce+delta) q = q.filter(Otx.nonce>=nonce+delta)
q = q.order_by(Otx.nonce.asc()) q = q.order_by(Otx.nonce.asc())
otxs = q.all() otxs = q.all()
tx_hashes = [] tx_hashes = []
txs = [] txs = []
gas_total = 0
for otx in otxs: for otx in otxs:
tx_raw = bytes.fromhex(strip_0x(otx.signed_tx)) tx_raw = bytes.fromhex(strip_0x(otx.signed_tx))
tx_new = unpack(tx_raw, chain_spec) tx_new = unpack(tx_raw, chain_spec)
@@ -95,10 +89,8 @@ def shift_nonce(self, chainspec_dict, tx_hash_orig_hex, delta=1):
tx_new['gas_price'] += 1 tx_new['gas_price'] += 1
tx_new['gasPrice'] = tx_new['gas_price'] tx_new['gasPrice'] = tx_new['gas_price']
tx_new['nonce'] -= delta tx_new['nonce'] -= delta
gas_total += tx_new['gas_price'] * tx_new['gas']
logg.debug('tx_new {}'.format(tx_new)) logg.debug('tx_new {}'.format(tx_new))
logg.debug('gas running total {}'.format(gas_total))
del(tx_new['hash']) del(tx_new['hash'])
del(tx_new['hash_unsigned']) del(tx_new['hash_unsigned'])
@@ -130,10 +122,8 @@ def shift_nonce(self, chainspec_dict, tx_hash_orig_hex, delta=1):
s = create_check_gas_task( s = create_check_gas_task(
txs, txs,
chain_spec, chain_spec,
#tx_new['from'], tx_new['from'],
address, gas=tx_new['gas'],
#gas=tx_new['gas'],
gas=gas_total,
tx_hashes_hex=tx_hashes, tx_hashes_hex=tx_hashes,
queue=queue, queue=queue,
) )
@@ -142,8 +132,7 @@ def shift_nonce(self, chainspec_dict, tx_hash_orig_hex, delta=1):
'cic_eth.admin.ctrl.unlock_send', 'cic_eth.admin.ctrl.unlock_send',
[ [
chain_spec.asdict(), chain_spec.asdict(),
address, tx_new['from'],
#tx_new['from'],
], ],
queue=queue, queue=queue,
) )
@@ -151,8 +140,7 @@ def shift_nonce(self, chainspec_dict, tx_hash_orig_hex, delta=1):
'cic_eth.admin.ctrl.unlock_queue', 'cic_eth.admin.ctrl.unlock_queue',
[ [
chain_spec.asdict(), chain_spec.asdict(),
address, tx_new['from'],
#tx_new['from'],
], ],
queue=queue, queue=queue,
) )

View File

@@ -21,7 +21,6 @@ from chainlib.hash import keccak256_hex_to_hex
from hexathon import ( from hexathon import (
strip_0x, strip_0x,
add_0x, add_0x,
uniform as hex_uniform,
) )
from chainlib.eth.gas import balance from chainlib.eth.gas import balance
from chainqueue.db.enum import ( from chainqueue.db.enum import (
@@ -308,8 +307,6 @@ class AdminApi:
:param address: Ethereum address to return transactions for :param address: Ethereum address to return transactions for
:type address: str, 0x-hex :type address: str, 0x-hex
""" """
address = add_0x(hex_uniform(strip_0x(address)))
last_nonce = -1 last_nonce = -1
s = celery.signature( s = celery.signature(
'cic_eth.queue.query.get_account_tx', 'cic_eth.queue.query.get_account_tx',

View File

@@ -520,9 +520,9 @@ class Api(ApiBase):
s_external_get = celery.signature( s_external_get = celery.signature(
external_task, external_task,
[ [
address,
offset, offset,
limit, limit,
address,
], ],
queue=external_queue, queue=external_queue,
) )

View File

@@ -21,7 +21,7 @@ def health(*args, **kwargs):
session = SessionBase.create_session() session = SessionBase.create_session()
config = kwargs['config'] config = kwargs['config']
chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC')) chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
logg.debug('check gas balance of gas gifter for chain {}'.format(chain_spec)) logg.debug('check gas balance of gas gifter for chain {}'.format(chain_spec))
try: try:

View File

@@ -15,7 +15,7 @@ logg = logging.getLogger().getChild(__name__)
def health(*args, **kwargs): def health(*args, **kwargs):
blocked = True blocked = True
max_attempts = 5 max_attempts = 5
conn = RPCConnection.connect(kwargs['config'].get('CHAIN_SPEC'), tag='signer') conn = RPCConnection.connect(kwargs['config'].get('CIC_CHAIN_SPEC'), tag='signer')
for i in range(max_attempts): for i in range(max_attempts):
idx = i + 1 idx = i + 1
logg.debug('attempt signer connection check {}/{}'.format(idx, max_attempts)) logg.debug('attempt signer connection check {}/{}'.format(idx, max_attempts))

View File

@@ -1,10 +0,0 @@
# local imports
from .base import *
from .chain import (
EthChainInterface,
chain_interface,
)
from .rpc import RPC
from .arg import ArgumentParser
from .config import Config
from .celery import CeleryApp

View File

@@ -1,31 +0,0 @@
# external imports
from chainlib.eth.cli import ArgumentParser as BaseArgumentParser
# local imports
from .base import (
CICFlag,
Flag,
)
class ArgumentParser(BaseArgumentParser):
def process_local_flags(self, local_arg_flags):
if local_arg_flags & CICFlag.REDIS:
self.add_argument('--redis-host', dest='redis_host', type=str, help='redis host to use for task submission')
self.add_argument('--redis-port', dest='redis_port', type=int, help='redis host to use for task submission')
self.add_argument('--redis-db', dest='redis_db', type=int, help='redis db to use')
if local_arg_flags & CICFlag.REDIS_CALLBACK:
self.add_argument('--redis-host-callback', dest='redis_host_callback', default='localhost', type=str, help='redis host to use for callback')
self.add_argument('--redis-port-callback', dest='redis_port_callback', default=6379, type=int, help='redis port to use for callback')
self.add_argument('--redis-timeout', default=20.0, type=float, help='Redis callback timeout')
if local_arg_flags & CICFlag.CELERY:
self.add_argument('-q', '--celery-queue', dest='celery_queue', type=str, default='cic-eth', help='Task queue')
if local_arg_flags & CICFlag.SYNCER:
self.add_argument('--offset', type=int, default=0, help='Start block height for initial history sync')
self.add_argument('--no-history', action='store_true', dest='no_history', help='Skip initial history sync')
if local_arg_flags & CICFlag.CHAIN:
self.add_argument('-r', '--registry-address', type=str, dest='registry_address', help='CIC registry contract address')

View File

@@ -1,31 +0,0 @@
# standard imports
import enum
# external imports
from chainlib.eth.cli import (
argflag_std_read,
argflag_std_write,
argflag_std_base,
Flag,
)
class CICFlag(enum.IntEnum):
# celery - nibble 1
CELERY = 1
# redis - nibble 2
REDIS = 16
REDIS_CALLBACK = 32
# chain - nibble 3
CHAIN = 256
# sync - nibble 4
SYNCER = 4096
argflag_local_task = CICFlag.CELERY
argflag_local_taskcallback = argflag_local_task | CICFlag.REDIS | CICFlag.REDIS_CALLBACK
argflag_local_chain = CICFlag.CHAIN
argflag_local_sync = CICFlag.SYNCER | CICFlag.CHAIN

View File

@@ -1,24 +0,0 @@
# standard imports
import logging
# external imports
import celery
logg = logging.getLogger(__name__)
class CeleryApp:
@classmethod
def from_config(cls, config):
backend_url = config.get('CELERY_RESULT_URL')
broker_url = config.get('CELERY_BROKER_URL')
celery_app = None
if backend_url != None:
celery_app = celery.Celery(broker=broker_url, backend=backend_url)
logg.info('creating celery app on {} with backend on {}'.format(broker_url, backend_url))
else:
celery_app = celery.Celery(broker=broker_url)
logg.info('creating celery app without results backend on {}'.format(broker_url))
return celery_app

View File

@@ -1,21 +0,0 @@
# external imports
from chainlib.eth.block import (
block_by_number,
Block,
)
from chainlib.eth.tx import (
receipt,
Tx,
)
from chainlib.interface import ChainInterface
class EthChainInterface(ChainInterface):
def __init__(self):
self._tx_receipt = receipt
self._block_by_number = block_by_number
self._block_from_src = Block.from_src
self._src_normalize = Tx.src_normalize
chain_interface = EthChainInterface()

View File

@@ -1,63 +0,0 @@
# standard imports
import os
import logging
# external imports
from chainlib.eth.cli import (
Config as BaseConfig,
Flag,
)
# local imports
from .base import CICFlag
script_dir = os.path.dirname(os.path.realpath(__file__))
logg = logging.getLogger(__name__)
class Config(BaseConfig):
local_base_config_dir = os.path.join(script_dir, '..', 'data', 'config')
@classmethod
def from_args(cls, args, arg_flags, local_arg_flags, extra_args={}, default_config_dir=None, base_config_dir=None, default_fee_limit=None):
expanded_base_config_dir = [cls.local_base_config_dir]
if base_config_dir != None:
if isinstance(base_config_dir, str):
base_config_dir = [base_config_dir]
for d in base_config_dir:
expanded_base_config_dir.append(d)
config = BaseConfig.from_args(args, arg_flags, extra_args=extra_args, default_config_dir=default_config_dir, base_config_dir=expanded_base_config_dir, load_callback=None)
local_args_override = {}
if local_arg_flags & CICFlag.REDIS:
local_args_override['REDIS_HOST'] = getattr(args, 'redis_host')
local_args_override['REDIS_PORT'] = getattr(args, 'redis_port')
local_args_override['REDIS_DB'] = getattr(args, 'redis_db')
local_args_override['REDIS_TIMEOUT'] = getattr(args, 'redis_timeout')
if local_arg_flags & CICFlag.CHAIN:
local_args_override['CIC_REGISTRY_ADDRESS'] = getattr(args, 'registry_address')
if local_arg_flags & CICFlag.CELERY:
local_args_override['CELERY_QUEUE'] = getattr(args, 'celery_queue')
if local_arg_flags & CICFlag.SYNCER:
local_args_override['SYNCER_OFFSET'] = getattr(args, 'offset')
local_args_override['SYNCER_NO_HISTORY'] = getattr(args, 'no_history')
config.dict_override(local_args_override, 'local cli args')
if local_arg_flags & CICFlag.REDIS_CALLBACK:
config.add(getattr(args, 'redis_host_callback'), '_REDIS_HOST_CALLBACK')
config.add(getattr(args, 'redis_port_callback'), '_REDIS_PORT_CALLBACK')
if local_arg_flags & CICFlag.CELERY:
config.add(config.true('CELERY_DEBUG'), 'CELERY_DEBUG', exists_ok=True)
logg.debug('config loaded:\n{}'.format(config))
return config

View File

@@ -1,90 +0,0 @@
# standard imports
import logging
# external imports
from chainlib.connection import (
RPCConnection,
ConnType,
)
from chainlib.eth.connection import (
EthUnixSignerConnection,
EthHTTPSignerConnection,
)
from chainlib.chain import ChainSpec
logg = logging.getLogger(__name__)
class RPC:
def __init__(self, chain_spec, rpc_provider, signer_provider=None):
self.chain_spec = chain_spec
self.rpc_provider = rpc_provider
self.signer_provider = signer_provider
def get_default(self):
return self.get_by_label('default')
def get_by_label(self, label):
return RPCConnection.connect(self.chain_spec, label)
@staticmethod
def from_config(config, use_signer=False, default_label='default', signer_label='signer'):
chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC'))
RPCConnection.register_location(config.get('RPC_HTTP_PROVIDER'), chain_spec, default_label)
if use_signer:
RPCConnection.register_constructor(ConnType.UNIX, EthUnixSignerConnection, signer_label)
RPCConnection.register_constructor(ConnType.HTTP, EthHTTPSignerConnection, signer_label)
RPCConnection.register_constructor(ConnType.HTTP_SSL, EthHTTPSignerConnection, signer_label)
RPCConnection.register_location(config.get('SIGNER_PROVIDER'), chain_spec, signer_label)
rpc = RPC(chain_spec, config.get('RPC_HTTP_PROVIDER'), signer_provider=config.get('SIGNER_PROVIDER'))
logg.info('set up rpc: {}'.format(rpc))
return rpc
def __str__(self):
return 'RPC factory, chain {}, rpc {}, signer {}'.format(self.chain_spec, self.rpc_provider, self.signer_provider)
# TOOD: re-implement file backend option from omittec code:
#broker = config.get('CELERY_BROKER_URL')
#if broker[:4] == 'file':
# bq = tempfile.mkdtemp()
# bp = tempfile.mkdtemp()
# conf_update = {
# 'broker_url': broker,
# 'broker_transport_options': {
# 'data_folder_in': bq,
# 'data_folder_out': bq,
# 'data_folder_processed': bp,
# },
# }
# if config.true('CELERY_DEBUG'):
# conf_update['result_extended'] = True
# current_app.conf.update(conf_update)
# logg.warning('celery broker dirs queue i/o {} processed {}, will NOT be deleted on shutdown'.format(bq, bp))
#else:
# conf_update = {
# 'broker_url': broker,
# }
# if config.true('CELERY_DEBUG'):
# conf_update['result_extended'] = True
# current_app.conf.update(conf_update)
#
#result = config.get('CELERY_RESULT_URL')
#if result[:4] == 'file':
# rq = tempfile.mkdtemp()
# current_app.conf.update({
# 'result_backend': 'file://{}'.format(rq),
# })
# logg.warning('celery backend store dir {} created, will NOT be deleted on shutdown'.format(rq))
#else:
# current_app.conf.update({
# 'result_backend': result,
# })
#

View File

@@ -1,5 +0,0 @@
[celery]
broker_url = redis://localhost:6379
result_url =
queue = cic-eth
debug = 0

View File

@@ -1,10 +0,0 @@
[database]
engine =
driver =
host =
port =
name =
user =
password =
debug = 0
pool_size = 0

View File

@@ -1,5 +0,0 @@
[redis]
host = localhost
port = 6379
db = 0
timeout = 20.0

View File

@@ -1,3 +0,0 @@
[retry]
delay =
batch_size =

View File

@@ -1,2 +0,0 @@
[signer]
provider =

View File

@@ -1,4 +0,0 @@
[syncer]
loop_interval = 1
offset = 0
no_history = 0

View File

@@ -8,8 +8,7 @@ Create Date: 2021-04-02 18:30:55.398388
from alembic import op from alembic import op
import sqlalchemy as sa import sqlalchemy as sa
#from chainqueue.db.migrations.sqlalchemy import ( from chainqueue.db.migrations.sqlalchemy import (
from chainqueue.db.migrations.default.export import (
chainqueue_upgrade, chainqueue_upgrade,
chainqueue_downgrade, chainqueue_downgrade,
) )

View File

@@ -8,8 +8,7 @@ Create Date: 2021-04-02 18:36:44.459603
from alembic import op from alembic import op
import sqlalchemy as sa import sqlalchemy as sa
#from chainsyncer.db.migrations.sqlalchemy import ( from chainsyncer.db.migrations.sqlalchemy import (
from chainsyncer.db.migrations.default.export import (
chainsyncer_upgrade, chainsyncer_upgrade,
chainsyncer_downgrade, chainsyncer_downgrade,
) )

View File

@@ -126,4 +126,3 @@ class SessionBase(Model):
logg.debug('commit and destroy session {}'.format(session_key)) logg.debug('commit and destroy session {}'.format(session_key))
session.commit() session.commit()
session.close() session.close()
del SessionBase.localsessions[session_key]

View File

@@ -23,7 +23,7 @@ from chainlib.error import JSONRPCException
from eth_accounts_index.registry import AccountRegistry from eth_accounts_index.registry import AccountRegistry
from eth_accounts_index import AccountsIndex from eth_accounts_index import AccountsIndex
from sarafu_faucet import MinterFaucet from sarafu_faucet import MinterFaucet
from chainqueue.sql.tx import cache_tx_dict from chainqueue.db.models.tx import TxCache
# local import # local import
from cic_eth_registry import CICRegistry from cic_eth_registry import CICRegistry
@@ -300,17 +300,20 @@ def cache_gift_data(
session = self.create_session() session = self.create_session()
tx_dict = { tx_cache = TxCache(
'hash': tx_hash_hex, tx_hash_hex,
'from': tx['from'], tx['from'],
'to': tx['to'], tx['to'],
'source_token': ZERO_ADDRESS, ZERO_ADDRESS,
'destination_token': ZERO_ADDRESS, ZERO_ADDRESS,
'from_value': 0, 0,
'to_value': 0, 0,
} session=session,
)
(tx_dict, cache_id) = cache_tx_dict(tx_dict, session=session) session.add(tx_cache)
session.commit()
cache_id = tx_cache.id
session.close() session.close()
return (tx_hash_hex, cache_id) return (tx_hash_hex, cache_id)
@@ -339,15 +342,18 @@ def cache_account_data(
tx_data = AccountsIndex.parse_add_request(tx['data']) tx_data = AccountsIndex.parse_add_request(tx['data'])
session = SessionBase.create_session() session = SessionBase.create_session()
tx_dict = { tx_cache = TxCache(
'hash': tx_hash_hex, tx_hash_hex,
'from': tx['from'], tx['from'],
'to': tx['to'], tx['to'],
'source_token': ZERO_ADDRESS, ZERO_ADDRESS,
'destination_token': ZERO_ADDRESS, ZERO_ADDRESS,
'from_value': 0, 0,
'to_value': 0, 0,
} session=session,
(tx_dict, cache_id) = cache_tx_dict(tx_dict, session=session) )
session.add(tx_cache)
session.commit()
cache_id = tx_cache.id
session.close() session.close()
return (tx_hash_hex, cache_id) return (tx_hash_hex, cache_id)

View File

@@ -0,0 +1,385 @@
# standard imports
import os
import logging
# third-party imports
import celery
import web3
from cic_registry import CICRegistry
from cic_registry.chain import ChainSpec
# local imports
from cic_eth.db import SessionBase
from cic_eth.db.models.convert import TxConvertTransfer
from cic_eth.db.models.otx import Otx
from cic_eth.db.models.tx import TxCache
from cic_eth.eth.task import sign_and_register_tx
from cic_eth.eth.task import create_check_gas_and_send_task
from cic_eth.eth.token import TokenTxFactory
from cic_eth.eth.factory import TxFactory
from cic_eth.eth.util import unpack_signed_raw_tx
from cic_eth.eth.rpc import RpcClient
celery_app = celery.current_app
#logg = celery_app.log.get_default_logger()
logg = logging.getLogger()
contract_function_signatures = {
'convert': 'f3898a97',
'convert2': '569706eb',
}
class BancorTxFactory(TxFactory):
"""Factory for creating Bancor network transactions.
"""
def convert(
self,
source_token_address,
destination_token_address,
reserve_address,
source_amount,
minimum_return,
chain_spec,
fee_beneficiary='0x0000000000000000000000000000000000000000',
fee_ppm=0,
):
"""Create a BancorNetwork "convert" transaction.
:param source_token_address: ERC20 contract address for token to convert from
:type source_token_address: str, 0x-hex
:param destination_token_address: ERC20 contract address for token to convert to
:type destination_token_address: str, 0x-hex
:param reserve_address: ERC20 contract address of Common reserve token
:type reserve_address: str, 0x-hex
:param source_amount: Amount of source tokens to convert
:type source_amount: int
:param minimum_return: Minimum amount of destination tokens to accept as result for conversion
:type source_amount: int
:return: Unsigned "convert" transaction in standard Ethereum format
:rtype: dict
"""
network_contract = CICRegistry.get_contract(chain_spec, 'BancorNetwork')
network_gas = network_contract.gas('convert')
tx_convert_buildable = network_contract.contract.functions.convert2(
[
source_token_address,
source_token_address,
reserve_address,
destination_token_address,
destination_token_address,
],
source_amount,
minimum_return,
fee_beneficiary,
fee_ppm,
)
tx_convert = tx_convert_buildable.buildTransaction({
'from': self.address,
'gas': network_gas,
'gasPrice': self.gas_price,
'chainId': chain_spec.chain_id(),
'nonce': self.next_nonce(),
})
return tx_convert
def unpack_convert(data):
f = data[2:10]
if f != contract_function_signatures['convert2']:
raise ValueError('Invalid convert data ({})'.format(f))
d = data[10:]
path = d[384:]
source = path[64-40:64]
destination = path[-40:]
amount = int(d[64:128], 16)
min_return = int(d[128:192], 16)
fee_recipient = d[192:256]
fee = int(d[256:320], 16)
return {
'amount': amount,
'min_return': min_return,
'source_token': web3.Web3.toChecksumAddress('0x' + source),
'destination_token': web3.Web3.toChecksumAddress('0x' + destination),
'fee_recipient': fee_recipient,
'fee': fee,
}
# Kept for historical reference, it unpacks a convert call without fee parameters
#def _unpack_convert_mint(data):
# f = data[2:10]
# if f != contract_function_signatures['convert2']:
# raise ValueError('Invalid convert data ({})'.format(f))
#
# d = data[10:]
# path = d[256:]
# source = path[64-40:64]
# destination = path[-40:]
#
# amount = int(d[64:128], 16)
# min_return = int(d[128:192], 16)
# return {
# 'amount': amount,
# 'min_return': min_return,
# 'source_token': web3.Web3.toChecksumAddress('0x' + source),
# 'destination_token': web3.Web3.toChecksumAddress('0x' + destination),
# }
@celery_app.task(bind=True)
def convert_with_default_reserve(self, tokens, from_address, source_amount, minimum_return, to_address, chain_str):
"""Performs a conversion between two liquid tokens using Bancor network.
:param tokens: Token pair, source and destination respectively
:type tokens: list of str, 0x-hex
:param from_address: Ethereum address of sender
:type from_address: str, 0x-hex
:param source_amount: Amount of source tokens to convert
:type source_amount: int
:param minimum_return: Minimum about of destination tokens to receive
:type minimum_return: int
"""
chain_spec = ChainSpec.from_chain_str(chain_str)
queue = self.request.delivery_info['routing_key']
c = RpcClient(chain_spec, holder_address=from_address)
cr = CICRegistry.get_contract(chain_spec, 'BancorNetwork')
source_token = CICRegistry.get_address(chain_spec, tokens[0]['address'])
reserve_address = CICRegistry.get_contract(chain_spec, 'BNTToken', 'ERC20').address()
tx_factory = TokenTxFactory(from_address, c)
tx_approve_zero = tx_factory.approve(source_token.address(), cr.address(), 0, chain_spec)
(tx_approve_zero_hash_hex, tx_approve_zero_signed_hex) = sign_and_register_tx(tx_approve_zero, chain_str, queue, 'cic_eth.eth.token.otx_cache_approve')
tx_approve = tx_factory.approve(source_token.address(), cr.address(), source_amount, chain_spec)
(tx_approve_hash_hex, tx_approve_signed_hex) = sign_and_register_tx(tx_approve, chain_str, queue, 'cic_eth.eth.token.otx_cache_approve')
tx_factory = BancorTxFactory(from_address, c)
tx_convert = tx_factory.convert(
tokens[0]['address'],
tokens[1]['address'],
reserve_address,
source_amount,
minimum_return,
chain_spec,
)
(tx_convert_hash_hex, tx_convert_signed_hex) = sign_and_register_tx(tx_convert, chain_str, queue, 'cic_eth.eth.bancor.otx_cache_convert')
# TODO: consider moving save recipient to async task / chain it before the tx send
if to_address != None:
save_convert_recipient(tx_convert_hash_hex, to_address, chain_str)
s = create_check_gas_and_send_task(
[tx_approve_zero_signed_hex, tx_approve_signed_hex, tx_convert_signed_hex],
chain_str,
from_address,
tx_approve_zero['gasPrice'] * tx_approve_zero['gas'],
tx_hashes_hex=[tx_approve_hash_hex],
queue=queue,
)
s.apply_async()
return tx_convert_hash_hex
#@celery_app.task()
#def process_approval(tx_hash_hex):
# t = session.query(TxConvertTransfer).query(TxConvertTransfer.approve_tx_hash==tx_hash_hex).first()
# c = session.query(Otx).query(Otx.tx_hash==t.convert_tx_hash)
# gas_limit = 8000000
# gas_price = GasOracle.gas_price()
#
# # TODO: use celery group instead
# s_queue = celery.signature(
# 'cic_eth.queue.tx.create',
# [
# nonce,
# c['address'], # TODO: check that this is in fact sender address
# c['tx_hash'],
# c['signed_tx'],
# ]
# )
# s_queue.apply_async()
#
# s_check_gas = celery.signature(
# 'cic_eth.eth.gas.check_gas',
# [
# c['address'],
# [c['signed_tx']],
# gas_limit * gas_price,
# ]
# )
# s_send = celery.signature(
# 'cic_eth.eth.tx.send',
# [],
# )
#
# s_set_sent = celery.signature(
# 'cic_eth.queue.state.set_sent',
# [False],
# )
# s_send.link(s_set_sent)
# s_check_gas.link(s_send)
# s_check_gas.apply_async()
# return tx_hash_hex
@celery_app.task()
def save_convert_recipient(convert_hash, recipient_address, chain_str):
"""Registers the recipient target for a convert-and-transfer operation.
:param convert_hash: Transaction hash of convert operation
:type convert_hash: str, 0x-hex
:param recipient_address: Address of consequtive transfer recipient
:type recipient_address: str, 0x-hex
"""
session = SessionBase.create_session()
t = TxConvertTransfer(convert_hash, recipient_address, chain_str)
session.add(t)
session.commit()
session.close()
@celery_app.task()
def save_convert_transfer(convert_hash, transfer_hash):
"""Registers that the transfer part of a convert-and-transfer operation has been executed.
:param convert_hash: Transaction hash of convert operation
:type convert_hash: str, 0x-hex
:param convert_hash: Transaction hash of transfer operation
:type convert_hash: str, 0x-hex
:returns: transfer_hash,
:rtype: list, single str, 0x-hex
"""
session = SessionBase.create_session()
t = TxConvertTransfer.get(convert_hash)
t.transfer(transfer_hash)
session.add(t)
session.commit()
session.close()
return [transfer_hash]
# TODO: seems unused, consider removing
@celery_app.task()
def resolve_converters_by_tokens(tokens, chain_str):
"""Return converters for a list of tokens.
:param tokens: Token addresses to look up
:type tokens: list of str, 0x-hex
:return: Addresses of matching converters
:rtype: list of str, 0x-hex
"""
chain_spec = ChainSpec.from_chain_str(chain_str)
for t in tokens:
c = CICRegistry.get_contract(chain_spec, 'ConverterRegistry')
fn = c.function('getConvertersByAnchors')
try:
converters = fn([t['address']]).call()
except Exception as e:
raise e
t['converters'] = converters
return tokens
@celery_app.task(bind=True)
def transfer_converted(self, tokens, holder_address, receiver_address, value, tx_convert_hash_hex, chain_str):
"""Execute the ERC20 transfer of a convert-and-transfer operation.
First argument is a list of tokens, to enable the task to be chained to the symbol to token address resolver function. However, it accepts only one token as argument.
:param tokens: Token addresses
:type tokens: list of str, 0x-hex
:param holder_address: Token holder address
:type holder_address: str, 0x-hex
:param holder_address: Token receiver address
:type holder_address: str, 0x-hex
:param value: Amount of token, in 'wei'
:type value: int
:raises TokenCountError: Either none or more then one tokens have been passed as tokens argument
:return: Transaction hash
:rtype: str, 0x-hex
"""
# we only allow one token, one transfer
if len(tokens) != 1:
raise TokenCountError
chain_spec = ChainSpec.from_chain_str(chain_str)
queue = self.request.delivery_info['routing_key']
c = RpcClient(chain_spec, holder_address=holder_address)
# get transaction parameters
gas_price = c.gas_price()
tx_factory = TokenTxFactory(holder_address, c)
token_address = tokens[0]['address']
tx_transfer = tx_factory.transfer(
token_address,
receiver_address,
value,
chain_spec,
)
(tx_transfer_hash_hex, tx_transfer_signed_hex) = sign_and_register_tx(tx_transfer, chain_str, queue, 'cic_eth.eth.token.otx_cache_transfer')
# send transaction
logg.info('transfer converted token {} from {} to {} value {} {}'.format(token_address, holder_address, receiver_address, value, tx_transfer_signed_hex))
s = create_check_gas_and_send_task(
[tx_transfer_signed_hex],
chain_str,
holder_address,
tx_transfer['gasPrice'] * tx_transfer['gas'],
None,
queue,
)
s_save = celery.signature(
'cic_eth.eth.bancor.save_convert_transfer',
[
tx_convert_hash_hex,
tx_transfer_hash_hex,
],
queue=queue,
)
s_save.link(s)
s_save.apply_async()
return tx_transfer_hash_hex
@celery_app.task()
def otx_cache_convert(
tx_hash_hex,
tx_signed_raw_hex,
chain_str,
):
chain_spec = ChainSpec.from_chain_str(chain_str)
tx_signed_raw_bytes = bytes.fromhex(tx_signed_raw_hex[2:])
tx = unpack(tx_signed_raw_bytes, chain_spec)
tx_data = unpack_convert(tx['data'])
logg.debug('tx data {}'.format(tx_data))
session = TxCache.create_session()
tx_cache = TxCache(
tx_hash_hex,
tx['from'],
tx['from'],
tx_data['source_token'],
tx_data['destination_token'],
tx_data['amount'],
tx_data['amount'],
)
session.add(tx_cache)
session.commit()
session.close()
return tx_hash_hex

View File

@@ -13,9 +13,9 @@ from chainlib.eth.tx import (
from cic_eth_registry import CICRegistry from cic_eth_registry import CICRegistry
from cic_eth_registry.erc20 import ERC20Token from cic_eth_registry.erc20 import ERC20Token
from hexathon import strip_0x from hexathon import strip_0x
from chainqueue.db.models.tx import TxCache
from chainqueue.error import NotLocalTxError from chainqueue.error import NotLocalTxError
from eth_erc20 import ERC20 from eth_erc20 import ERC20
from chainqueue.sql.tx import cache_tx_dict
# local imports # local imports
from cic_eth.db.models.base import SessionBase from cic_eth.db.models.base import SessionBase
@@ -375,16 +375,19 @@ def cache_transfer_data(
token_value = tx_data[1] token_value = tx_data[1]
session = SessionBase.create_session() session = SessionBase.create_session()
tx_dict = { tx_cache = TxCache(
'hash': tx_hash_hex, tx_hash_hex,
'from': tx['from'], tx['from'],
'to': recipient_address, recipient_address,
'source_token': tx['to'], tx['to'],
'destination_token': tx['to'], tx['to'],
'from_value': token_value, token_value,
'to_value': token_value, token_value,
} session=session,
(tx_dict, cache_id) = cache_tx_dict(tx_dict, session=session) )
session.add(tx_cache)
session.commit()
cache_id = tx_cache.id
session.close() session.close()
return (tx_hash_hex, cache_id) return (tx_hash_hex, cache_id)
@@ -414,16 +417,19 @@ def cache_transfer_from_data(
token_value = tx_data[2] token_value = tx_data[2]
session = SessionBase.create_session() session = SessionBase.create_session()
tx_dict = { tx_cache = TxCache(
'hash': tx_hash_hex, tx_hash_hex,
'from': tx['from'], tx['from'],
'to': recipient_address, recipient_address,
'source_token': tx['to'], tx['to'],
'destination_token': tx['to'], tx['to'],
'from_value': token_value, token_value,
'to_value': token_value, token_value,
} session=session,
(tx_dict, cache_id) = cache_tx_dict(tx_dict, session=session) )
session.add(tx_cache)
session.commit()
cache_id = tx_cache.id
session.close() session.close()
return (tx_hash_hex, cache_id) return (tx_hash_hex, cache_id)
@@ -452,16 +458,19 @@ def cache_approve_data(
token_value = tx_data[1] token_value = tx_data[1]
session = SessionBase.create_session() session = SessionBase.create_session()
tx_dict = { tx_cache = TxCache(
'hash': tx_hash_hex, tx_hash_hex,
'from': tx['from'], tx['from'],
'to': recipient_address, recipient_address,
'source_token': tx['to'], tx['to'],
'destination_token': tx['to'], tx['to'],
'from_value': token_value, token_value,
'to_value': token_value, token_value,
} session=session,
(tx_dict, cache_id) = cache_tx_dict(tx_dict, session=session) )
session.add(tx_cache)
session.commit()
cache_id = tx_cache.id
session.close() session.close()
return (tx_hash_hex, cache_id) return (tx_hash_hex, cache_id)

View File

@@ -9,7 +9,6 @@ from chainlib.chain import ChainSpec
from chainlib.eth.address import is_checksum_address from chainlib.eth.address import is_checksum_address
from chainlib.connection import RPCConnection from chainlib.connection import RPCConnection
from chainqueue.db.enum import StatusBits from chainqueue.db.enum import StatusBits
from chainqueue.sql.tx import cache_tx_dict
from chainlib.eth.gas import ( from chainlib.eth.gas import (
balance, balance,
price, price,
@@ -134,17 +133,20 @@ def cache_gas_data(
session = SessionBase.create_session() session = SessionBase.create_session()
tx_dict = { tx_cache = TxCache(
'hash': tx_hash_hex, tx_hash_hex,
'from': tx['from'], tx['from'],
'to': tx['to'], tx['to'],
'source_token': ZERO_ADDRESS, ZERO_ADDRESS,
'destination_token': ZERO_ADDRESS, ZERO_ADDRESS,
'from_value': tx['value'], tx['value'],
'to_value': tx['value'], tx['value'],
} session=session,
)
(tx_dict, cache_id) = cache_tx_dict(tx_dict, session=session) session.add(tx_cache)
session.commit()
cache_id = tx_cache.id
session.close() session.close()
return (tx_hash_hex, cache_id) return (tx_hash_hex, cache_id)

View File

@@ -18,6 +18,7 @@ from hexathon import (
strip_0x, strip_0x,
) )
from chainqueue.db.models.tx import Otx from chainqueue.db.models.tx import Otx
from chainqueue.db.models.tx import TxCache
from chainqueue.db.enum import StatusBits from chainqueue.db.enum import StatusBits
from chainqueue.error import NotLocalTxError from chainqueue.error import NotLocalTxError
from potaahto.symbols import snake_and_camel from potaahto.symbols import snake_and_camel

View File

@@ -12,21 +12,15 @@ from chainlib.eth.tx import (
transaction_by_block, transaction_by_block,
receipt, receipt,
) )
from chainlib.eth.error import RequestMismatchException
from chainlib.eth.block import block_by_number from chainlib.eth.block import block_by_number
from chainlib.eth.contract import abi_decode_single from chainlib.eth.contract import abi_decode_single
from chainlib.eth.constant import ZERO_ADDRESS
from chainlib.eth.tx import Tx
from hexathon import strip_0x from hexathon import strip_0x
from cic_eth_registry import CICRegistry from cic_eth_registry import CICRegistry
from cic_eth_registry.erc20 import ERC20Token from cic_eth_registry.erc20 import ERC20Token
from cic_eth_registry.error import UnknownContractError
from chainqueue.db.models.otx import Otx from chainqueue.db.models.otx import Otx
from chainqueue.db.enum import StatusEnum from chainqueue.db.enum import StatusEnum
from chainqueue.sql.query import get_tx_cache from chainqueue.sql.query import get_tx_cache
from eth_erc20 import ERC20 from eth_erc20 import ERC20
from erc20_faucet import Faucet
from potaahto.symbols import snake_and_camel
# local imports # local imports
from cic_eth.queue.time import tx_times from cic_eth.queue.time import tx_times
@@ -39,32 +33,6 @@ logg = logging.getLogger()
MAX_BLOCK_TX = 250 MAX_BLOCK_TX = 250
def parse_transaction(chain_spec, rpc, tx, sender_address=None):
try:
transfer_data = ERC20.parse_transfer_request(tx['input'])
tx_address = transfer_data[0]
tx_token_value = transfer_data[1]
logg.debug('matched transfer transaction {} in block {} sender {} recipient {} value {}'.format(tx['hash'], tx['block_number'], tx['from'], tx_address, tx_token_value))
return (tx_address, tx_token_value)
except RequestMismatchException:
pass
try:
transfer_data = Faucet.parse_give_to_request(tx['input'])
tx_address = transfer_data[0]
c = Faucet(chain_spec)
o = c.token_amount(tx['to'], sender_address=sender_address, height=tx['block_number'])
r = rpc.do(o)
tx_token_value = Faucet.parse_token_amount(r)
logg.debug('matched giveto transaction {} in block {} sender {} recipient {} value {}'.format(tx['hash'], tx['block_number'], tx['from'], tx_address, tx_token_value))
return (tx_address, tx_token_value)
except RequestMismatchException:
pass
return None
# TODO: Make this method easier to read # TODO: Make this method easier to read
@celery_app.task(bind=True, base=BaseTask) @celery_app.task(bind=True, base=BaseTask)
def list_tx_by_bloom(self, bloomspec, address, chain_spec_dict): def list_tx_by_bloom(self, bloomspec, address, chain_spec_dict):
@@ -101,39 +69,36 @@ def list_tx_by_bloom(self, bloomspec, address, chain_spec_dict):
tx_filter = moolb.Bloom(databitlen, bloomspec['filter_rounds'], default_data=tx_filter_data) tx_filter = moolb.Bloom(databitlen, bloomspec['filter_rounds'], default_data=tx_filter_data)
txs = {} txs = {}
logg.debug('processing filter with span low {} to high {}'.format(bloomspec['low'], bloomspec['high']))
for block_height in range(bloomspec['low'], bloomspec['high']): for block_height in range(bloomspec['low'], bloomspec['high']):
block_height_bytes = block_height.to_bytes(4, 'big') block_height_bytes = block_height.to_bytes(4, 'big')
if block_filter.check(block_height_bytes): if block_filter.check(block_height_bytes):
logg.debug('filter matched block {}'.format(block_height)) logg.debug('filter matched block {}'.format(block_height))
o = block_by_number(block_height) o = block_by_number(block_height)
block = rpc.do(o) block = rpc.do(o)
logg.debug('block {}'.format(block))
for tx_index in range(0, len(block['transactions'])): for tx_index in range(0, len(block['transactions'])):
tx_index_bytes = tx_index.to_bytes(4, 'big') composite = tx_index + block_height
composite = block_height_bytes + tx_index_bytes tx_index_bytes = composite.to_bytes(4, 'big')
if tx_filter.check(composite): if tx_filter.check(tx_index_bytes):
logg.debug('filter matched block {} tx {}'.format(block_height, tx_index)) logg.debug('filter matched block {} tx {}'.format(block_height, tx_index))
o = transaction_by_block(block['hash'], tx_index)
try: try:
#tx = c.w3.eth.getTransactionByBlock(block_height, tx_index)
o = transaction_by_block(block['hash'], tx_index)
tx = rpc.do(o) tx = rpc.do(o)
except Exception as e: except Exception as e:
logg.debug('false positive on block {} tx {} ({})'.format(block_height, tx_index, e)) logg.debug('false positive on block {} tx {} ({})'.format(block_height, tx_index, e))
continue continue
tx = Tx(tx).src()
logg.debug('got tx {}'.format(tx))
tx_address = None tx_address = None
tx_token_value = 0 tx_token_value = 0
try:
transfer_data = parse_transaction(chain_spec, rpc, tx, sender_address=BaseTask.call_address) transfer_data = ERC20.parse_transfer_request(tx['data'])
if transfer_data == None: tx_address = transfer_data[0]
tx_token_value = transfer_data[1]
except ValueError:
logg.debug('not a transfer transaction, skipping {}'.format(tx))
continue continue
tx_address = transfer_data[0]
tx_token_value = transfer_data[1]
if address == tx_address: if address == tx_address:
status = StatusEnum.SENT status = StatusEnum.SENT
try: try:
@@ -149,6 +114,9 @@ def list_tx_by_bloom(self, bloomspec, address, chain_spec_dict):
# TODO: pass through registry to validate declarator entry of token # TODO: pass through registry to validate declarator entry of token
#token = registry.by_address(tx['to'], sender_address=self.call_address) #token = registry.by_address(tx['to'], sender_address=self.call_address)
token = ERC20Token(chain_spec, rpc, tx['to'])
token_symbol = token.symbol
token_decimals = token.decimals
times = tx_times(tx['hash'], chain_spec) times = tx_times(tx['hash'], chain_spec)
tx_r = { tx_r = {
'hash': tx['hash'], 'hash': tx['hash'],
@@ -158,6 +126,12 @@ def list_tx_by_bloom(self, bloomspec, address, chain_spec_dict):
'destination_value': tx_token_value, 'destination_value': tx_token_value,
'source_token': tx['to'], 'source_token': tx['to'],
'destination_token': tx['to'], 'destination_token': tx['to'],
'source_token_symbol': token_symbol,
'destination_token_symbol': token_symbol,
'source_token_decimals': token_decimals,
'destination_token_decimals': token_decimals,
'source_token_chain': chain_str,
'destination_token_chain': chain_str,
'nonce': tx['nonce'], 'nonce': tx['nonce'],
} }
if times['queue'] != None: if times['queue'] != None:
@@ -169,12 +143,11 @@ def list_tx_by_bloom(self, bloomspec, address, chain_spec_dict):
return txs return txs
# TODO: Surely it must be possible to optimize this # TODO: Surely it must be possible to optimize this
# TODO: DRY this with callback filter in cic_eth/runnable/manager # TODO: DRY this with callback filter in cic_eth/runnable/manager
# TODO: Remove redundant fields from end representation (timestamp, tx_hash) # TODO: Remove redundant fields from end representation (timestamp, tx_hash)
@celery_app.task(bind=True, base=BaseTask) @celery_app.task()
def tx_collate(self, tx_batches, chain_spec_dict, offset, limit, newest_first=True, verify_contracts=True): def tx_collate(tx_batches, chain_spec_dict, offset, limit, newest_first=True):
"""Merges transaction data from multiple sources and sorts them in chronological order. """Merges transaction data from multiple sources and sorts them in chronological order.
:param tx_batches: Transaction data inputs :param tx_batches: Transaction data inputs
@@ -223,32 +196,6 @@ def tx_collate(self, tx_batches, chain_spec_dict, offset, limit, newest_first=Tr
if newest_first: if newest_first:
ks.reverse() ks.reverse()
for k in ks: for k in ks:
tx = txs_by_block[k] txs.append(txs_by_block[k])
if verify_contracts:
try:
tx = verify_and_expand(tx, chain_spec, sender_address=BaseTask.call_address)
except UnknownContractError:
logg.error('verify failed on tx {}, skipping'.format(tx['hash']))
continue
txs.append(tx)
return txs return txs
def verify_and_expand(tx, chain_spec, sender_address=ZERO_ADDRESS):
rpc = RPCConnection.connect(chain_spec, 'default')
registry = CICRegistry(chain_spec, rpc)
if tx.get('source_token_symbol') == None and tx['source_token'] != ZERO_ADDRESS:
r = registry.by_address(tx['source_token'], sender_address=sender_address)
token = ERC20Token(chain_spec, rpc, tx['source_token'])
tx['source_token_symbol'] = token.symbol
tx['source_token_decimals'] = token.decimals
if tx.get('destination_token_symbol') == None and tx['destination_token'] != ZERO_ADDRESS:
r = registry.by_address(tx['destination_token'], sender_address=sender_address)
token = ERC20Token(chain_spec, rpc, tx['destination_token'])
tx['destination_token_symbol'] = token.symbol
tx['destination_token_decimals'] = token.decimals
return tx

View File

@@ -27,7 +27,7 @@ def database_engine(
SessionBase.poolable = False SessionBase.poolable = False
dsn = dsn_from_config(load_config) dsn = dsn_from_config(load_config)
#SessionBase.connect(dsn, True) #SessionBase.connect(dsn, True)
SessionBase.connect(dsn, debug=load_config.true('DATABASE_DEBUG')) SessionBase.connect(dsn, debug=load_config.get('DATABASE_DEBUG') != None)
return dsn return dsn

View File

@@ -100,7 +100,6 @@ def get_upcoming_tx(chain_spec, status=StatusEnum.READYSEND, not_status=None, re
q_outer = q_outer.join(Lock, isouter=True) q_outer = q_outer.join(Lock, isouter=True)
q_outer = q_outer.filter(or_(Lock.flags==None, Lock.flags.op('&')(LockEnum.SEND.value)==0)) q_outer = q_outer.filter(or_(Lock.flags==None, Lock.flags.op('&')(LockEnum.SEND.value)==0))
if not is_alive(status): if not is_alive(status):
SessionBase.release_session(session) SessionBase.release_session(session)
raise ValueError('not a valid non-final tx value: {}'.format(status)) raise ValueError('not a valid non-final tx value: {}'.format(status))

View File

@@ -7,30 +7,54 @@ import json
import argparse import argparse
# external imports # external imports
import celery
import confini
import redis import redis
from xdg.BaseDirectory import xdg_config_home from xdg.BaseDirectory import xdg_config_home
from chainlib.chain import ChainSpec
# local imports # local imports
import cic_eth.cli
from cic_eth.api import Api from cic_eth.api import Api
logging.basicConfig(level=logging.WARNING) logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger() logg = logging.getLogger('create_account_script')
logging.getLogger('confini').setLevel(logging.WARNING)
logging.getLogger('gnupg').setLevel(logging.WARNING)
arg_flags = cic_eth.cli.argflag_std_base default_config_dir = os.environ.get('CONFINI_DIR', '/usr/local/etc/cic')
local_arg_flags = cic_eth.cli.argflag_local_taskcallback
argparser = cic_eth.cli.ArgumentParser(arg_flags) argparser = argparse.ArgumentParser()
argparser.add_argument('--no-register', dest='no_register', action='store_true', help='Do not register new account in on-chain accounts index') argparser.add_argument('--no-register', dest='no_register', action='store_true', help='Do not register new account in on-chain accounts index')
argparser.process_local_flags(local_arg_flags) argparser.add_argument('-c', type=str, default=default_config_dir, help='config file')
argparser.add_argument('-i', '--chain-spec', dest='i', type=str, help='chain spec')
argparser.add_argument('--redis-host', dest='redis_host', type=str, help='redis host to use for task submission')
argparser.add_argument('--redis-port', dest='redis_port', type=int, help='redis host to use for task submission')
argparser.add_argument('--redis-db', dest='redis_db', type=int, help='redis db to use for task submission and callback')
argparser.add_argument('--redis-host-callback', dest='redis_host_callback', default='localhost', type=str, help='redis host to use for callback')
argparser.add_argument('--redis-port-callback', dest='redis_port_callback', default=6379, type=int, help='redis port to use for callback')
argparser.add_argument('--timeout', default=20.0, type=float, help='Callback timeout')
argparser.add_argument('-q', type=str, default='cic-eth', help='Task queue')
argparser.add_argument('-v', action='store_true', help='Be verbose')
argparser.add_argument('-vv', action='store_true', help='Be more verbose')
args = argparser.parse_args() args = argparser.parse_args()
extra_args = { if args.vv:
'no_register': None, logg.setLevel(logging.DEBUG)
} if args.v:
config = cic_eth.cli.Config.from_args(args, arg_flags, local_arg_flags, extra_args=extra_args) logg.setLevel(logging.INFO)
config_dir = args.c
config = confini.Config(config_dir, os.environ.get('CONFINI_ENV_PREFIX'))
config.process()
args_override = {
'CIC_CHAIN_SPEC': getattr(args, 'i'),
'REDIS_HOST': getattr(args, 'redis_host'),
'REDIS_PORT': getattr(args, 'redis_port'),
'REDIS_DB': getattr(args, 'redis_db'),
}
config.dict_override(args_override, 'cli')
celery_app = celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL'))
celery_app = cic_eth.cli.CeleryApp.from_config(config)
def main(): def main():
redis_host = config.get('REDIS_HOST') redis_host = config.get('REDIS_HOST')
@@ -44,24 +68,19 @@ def main():
ps.get_message() ps.get_message()
api = Api( api = Api(
config.get('CHAIN_SPEC'), config.get('CIC_CHAIN_SPEC'),
queue=config.get('CELERY_QUEUE'), queue=args.q,
callback_param='{}:{}:{}:{}'.format(config.get('_REDIS_HOST_CALLBACK'), config.get('_REDIS_PORT_CALLBACK'), config.get('REDIS_DB'), redis_channel), callback_param='{}:{}:{}:{}'.format(args.redis_host_callback, args.redis_port_callback, redis_db, redis_channel),
callback_task='cic_eth.callbacks.redis.redis', callback_task='cic_eth.callbacks.redis.redis',
callback_queue=config.get('CELERY_QUEUE'), callback_queue=args.q,
) )
register = not config.get('_NO_REGISTER') register = not args.no_register
logg.debug('register {}'.format(register)) logg.debug('register {}'.format(register))
t = api.create_account(register=register) t = api.create_account(register=register)
ps.get_message() ps.get_message()
try: o = ps.get_message(timeout=args.timeout)
o = ps.get_message(timeout=config.get('REDIS_TIMEOUT'))
except TimeoutError as e:
sys.stderr.write('got no new address from cic-eth before timeout: {}\n'.format(e))
sys.exit(1)
ps.unsubscribe()
m = json.loads(o['data']) m = json.loads(o['data'])
print(m['result']) print(m['result'])

View File

@@ -12,38 +12,64 @@ from chainlib.eth.constant import ZERO_ADDRESS
from chainlib.eth.address import is_checksum_address from chainlib.eth.address import is_checksum_address
# local imports # local imports
import cic_eth.cli
from cic_eth.api.admin import AdminApi from cic_eth.api.admin import AdminApi
from cic_eth.db.enum import LockEnum from cic_eth.db.enum import LockEnum
logging.basicConfig(level=logging.WARNING) logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger() logg = logging.getLogger()
default_format = 'terminal'
default_config_dir = os.environ.get('CONFINI_DIR', '/usr/local/etc/cic')
arg_flags = cic_eth.cli.argflag_std_read argparser = argparse.ArgumentParser()
local_arg_flags = cic_eth.cli.argflag_local_task | cic_eth.cli.argflag_local_chain argparser.add_argument('-p', '--provider', dest='p', default='http://localhost:8545', type=str, help='Web3 provider url (http only)')
argparser = cic_eth.cli.ArgumentParser(arg_flags) argparser.add_argument('-f', '--format', dest='f', default=default_format, type=str, help='Output format')
argparser.add_argument('--no-register', dest='no_register', action='store_true', help='Do not register new account in on-chain accounts index') argparser.add_argument('-c', type=str, default=default_config_dir, help='config root to use')
argparser.process_local_flags(local_arg_flags) argparser.add_argument('-i', '--chain-spec', dest='i', type=str, help='chain spec')
argparser.add_argument('-q', type=str, default='cic-eth', help='celery queue to submit transaction tasks to')
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
argparser.add_argument('-v', action='store_true', help='Be verbose')
argparser.add_argument('-vv', help='be more verbose', action='store_true')
def process_lock_args(argparser): def process_lock_args(argparser):
argparser.add_argument('flags', type=str, help='Flags to manipulate') argparser.add_argument('flags', type=str, help='Flags to manipulate')
argparser.add_argument('address', default=ZERO_ADDRESS, nargs='?', type=str, help='Ethereum address to unlock,') argparser.add_argument('address', default=ZERO_ADDRESS, nargs='?', type=str, help='Ethereum address to unlock,')
sub = argparser.add_subparsers(help='') sub = argparser.add_subparsers()
sub.dest = "command" sub.dest = "command"
sub_lock = sub.add_parser('lock', help='Set or reset locks') sub_lock = sub.add_parser('lock', help='Set or reset locks')
sub_unlock = sub.add_parser('unlock', help='Set or reset locks') sub_unlock = sub.add_parser('unlock', help='Set or reset locks')
process_lock_args(sub_lock) process_lock_args(sub_lock)
process_lock_args(sub_unlock) process_lock_args(sub_unlock)
args = argparser.parse_args() args = argparser.parse_args()
config = cic_eth.cli.Config.from_args(args, arg_flags, local_arg_flags) if args.v == True:
logging.getLogger().setLevel(logging.INFO)
elif args.vv == True:
logging.getLogger().setLevel(logging.DEBUG)
celery_app = cic_eth.cli.CeleryApp.from_config(config) config_dir = os.path.join(args.c)
os.makedirs(config_dir, 0o777, True)
config = confini.Config(config_dir, args.env_prefix)
config.process()
args_override = {
'ETH_PROVIDER': getattr(args, 'p'),
'CIC_CHAIN_SPEC': getattr(args, 'i'),
}
# override args
config.dict_override(args_override, 'cli')
config.censor('PASSWORD', 'DATABASE')
config.censor('PASSWORD', 'SSL')
logg.debug('config loaded from {}:\n{}'.format(config_dir, config))
chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC')) celery_app = celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL'))
queue = args.q
chain_spec = None
if config.get('CIC_CHAIN_SPEC') != None and config.get('CIC_CHAIN_SPEC') != '::':
chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
admin_api = AdminApi(None) admin_api = AdminApi(None)
@@ -74,7 +100,7 @@ def main():
args.address, args.address,
flags, flags,
], ],
queue=config.get('CELERY_QUEUE'), queue=queue,
) )
t = s.apply_async() t = s.apply_async()
logg.debug('unlock {} on {} task {}'.format(flags, args.address, t)) logg.debug('unlock {} on {} task {}'.format(flags, args.address, t))
@@ -93,7 +119,7 @@ def main():
args.address, args.address,
flags, flags,
], ],
queue=config.get('CELERY_QUEUE'), queue=queue,
) )
t = s.apply_async() t = s.apply_async()
logg.debug('lock {} on {} task {}'.format(flags, args.address, t)) logg.debug('lock {} on {} task {}'.format(flags, args.address, t))

View File

@@ -8,7 +8,8 @@ import sys
import re import re
import datetime import datetime
# external imports # third-party imports
import confini
import celery import celery
from cic_eth_registry import CICRegistry from cic_eth_registry import CICRegistry
from chainlib.chain import ChainSpec from chainlib.chain import ChainSpec
@@ -23,7 +24,7 @@ from chainqueue.error import NotLocalTxError
from chainqueue.sql.state import set_reserved from chainqueue.sql.state import set_reserved
# local imports # local imports
import cic_eth.cli import cic_eth
from cic_eth.db import SessionBase from cic_eth.db import SessionBase
from cic_eth.db.enum import LockEnum from cic_eth.db.enum import LockEnum
from cic_eth.db import dsn_from_config from cic_eth.db import dsn_from_config
@@ -38,37 +39,57 @@ from cic_eth.error import (
logging.basicConfig(level=logging.WARNING) logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger() logg = logging.getLogger()
arg_flags = cic_eth.cli.argflag_std_read
local_arg_flags = cic_eth.cli.argflag_local_sync | cic_eth.cli.argflag_local_task
argparser = cic_eth.cli.ArgumentParser(arg_flags)
argparser.process_local_flags(local_arg_flags)
args = argparser.parse_args()
config = cic_eth.cli.Config.from_args(args, arg_flags, local_arg_flags) config_dir = os.path.join('/usr/local/etc/cic-eth')
# connect to celery argparser = argparse.ArgumentParser(description='daemon that monitors transactions in new blocks')
celery_app = cic_eth.cli.CeleryApp.from_config(config) argparser.add_argument('-p', '--provider', default='http://localhost:8545', dest='p', type=str, help='rpc provider')
argparser.add_argument('-c', type=str, default=config_dir, help='config root to use')
argparser.add_argument('-i', '--chain-spec', dest='i', type=str, help='chain spec')
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
argparser.add_argument('-q', type=str, default='cic-eth', help='celery queue to submit transaction tasks to')
argparser.add_argument('-v', help='be verbose', action='store_true')
argparser.add_argument('-vv', help='be more verbose', action='store_true')
args = argparser.parse_args(sys.argv[1:])
if args.v == True:
logging.getLogger().setLevel(logging.INFO)
elif args.vv == True:
logging.getLogger().setLevel(logging.DEBUG)
config_dir = os.path.join(args.c)
os.makedirs(config_dir, 0o777, True)
config = confini.Config(config_dir, args.env_prefix)
config.process()
# override args
args_override = {
'CIC_CHAIN_SPEC': getattr(args, 'i'),
'ETH_PROVIDER': getattr(args, 'p'),
}
config.dict_override(args_override, 'cli flag')
config.censor('PASSWORD', 'DATABASE')
config.censor('PASSWORD', 'SSL')
logg.debug('config loaded from {}:\n{}'.format(config_dir, config))
app = celery.Celery(backend=config.get('CELERY_RESULT_URL'), broker=config.get('CELERY_BROKER_URL'))
queue = args.q
# connect to database
dsn = dsn_from_config(config) dsn = dsn_from_config(config)
SessionBase.connect(dsn, debug=config.true('DATABASE_DEBUG')) SessionBase.connect(dsn, debug=config.true('DATABASE_DEBUG'))
chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC')) chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
# set up rpc RPCConnection.register_location(config.get('ETH_PROVIDER'), chain_spec, tag='default')
rpc = cic_eth.cli.RPC.from_config(config)
conn = rpc.get_default()
run = True run = True
class DispatchSyncer: class DispatchSyncer:
yield_delay = 0.0005 yield_delay = 0.0005
def __init__(self, chain_spec): def __init__(self, chain_spec):
self.chain_spec = chain_spec self.chain_spec = chain_spec
self.session = None
def chain(self): def chain(self):
@@ -79,18 +100,16 @@ class DispatchSyncer:
c = len(txs.keys()) c = len(txs.keys())
logg.debug('processing {} txs {}'.format(c, list(txs.keys()))) logg.debug('processing {} txs {}'.format(c, list(txs.keys())))
chain_str = str(self.chain_spec) chain_str = str(self.chain_spec)
self.session = SessionBase.create_session() session = SessionBase.create_session()
for k in txs.keys(): for k in txs.keys():
tx_raw = txs[k] tx_raw = txs[k]
tx_raw_bytes = bytes.fromhex(strip_0x(tx_raw)) tx_raw_bytes = bytes.fromhex(strip_0x(tx_raw))
tx = unpack(tx_raw_bytes, self.chain_spec) tx = unpack(tx_raw_bytes, self.chain_spec)
try: try:
set_reserved(self.chain_spec, tx['hash'], session=self.session) set_reserved(self.chain_spec, tx['hash'], session=session)
self.session.commit()
except NotLocalTxError as e: except NotLocalTxError as e:
logg.warning('dispatcher was triggered with non-local tx {}'.format(tx['hash'])) logg.warning('dispatcher was triggered with non-local tx {}'.format(tx['hash']))
self.session.rollback()
continue continue
s_check = celery.signature( s_check = celery.signature(
@@ -101,37 +120,28 @@ class DispatchSyncer:
LockEnum.QUEUE, LockEnum.QUEUE,
tx['from'], tx['from'],
], ],
queue=config.get('CELERY_QUEUE'), queue=queue,
) )
s_send = celery.signature( s_send = celery.signature(
'cic_eth.eth.tx.send', 'cic_eth.eth.tx.send',
[ [
self.chain_spec.asdict(), self.chain_spec.asdict(),
], ],
queue=config.get('CELERY_QUEUE'), queue=queue,
) )
s_check.link(s_send) s_check.link(s_send)
t = s_check.apply_async() t = s_check.apply_async()
logg.info('processed {}'.format(k)) logg.info('processed {}'.format(k))
self.session.close()
self.session = None
def loop(self, interval): def loop(self, w3, interval):
while run: while run:
txs = {} txs = {}
typ = StatusBits.QUEUED typ = StatusBits.QUEUED
utxs = get_upcoming_tx(self.chain_spec, typ) utxs = get_upcoming_tx(self.chain_spec, typ)
for k in utxs.keys(): for k in utxs.keys():
txs[k] = utxs[k] txs[k] = utxs[k]
try: self.process(w3, txs)
conn = RPCConnection.connect(self.chain_spec, 'default')
self.process(conn, txs)
except ConnectionError as e:
if self.session != None:
self.session.close()
self.session = None
logg.error('connection to node failed: {}'.format(e))
if len(utxs) > 0: if len(utxs) > 0:
time.sleep(self.yield_delay) time.sleep(self.yield_delay)
@@ -141,7 +151,8 @@ class DispatchSyncer:
def main(): def main():
syncer = DispatchSyncer(chain_spec) syncer = DispatchSyncer(chain_spec)
syncer.loop(float(config.get('DISPATCHER_LOOP_INTERVAL'))) conn = RPCConnection.connect(chain_spec, 'default')
syncer.loop(conn, float(config.get('DISPATCHER_LOOP_INTERVAL')))
sys.exit(0) sys.exit(0)

View File

@@ -11,7 +11,6 @@ from chainqueue.db.enum import StatusBits
from chainqueue.db.models.tx import TxCache from chainqueue.db.models.tx import TxCache
from chainqueue.db.models.otx import Otx from chainqueue.db.models.otx import Otx
from chainqueue.sql.query import get_paused_tx_cache as get_paused_tx from chainqueue.sql.query import get_paused_tx_cache as get_paused_tx
from chainlib.eth.address import to_checksum_address
# local imports # local imports
from cic_eth.db.models.base import SessionBase from cic_eth.db.models.base import SessionBase
@@ -48,13 +47,12 @@ class GasFilter(SyncFilter):
SessionBase.release_session(session) SessionBase.release_session(session)
address = to_checksum_address(r[0])
logg.info('resuming gas-in-waiting txs for {}'.format(r[0])) logg.info('resuming gas-in-waiting txs for {}'.format(r[0]))
if len(txs) > 0: if len(txs) > 0:
s = create_check_gas_task( s = create_check_gas_task(
list(txs.values()), list(txs.values()),
self.chain_spec, self.chain_spec,
address, r[0],
0, 0,
tx_hashes_hex=list(txs.keys()), tx_hashes_hex=list(txs.keys()),
queue=self.queue, queue=self.queue,

View File

@@ -12,24 +12,20 @@ from hexathon import (
# local imports # local imports
from .base import SyncFilter from .base import SyncFilter
logg = logging.getLogger(__name__) logg = logging.getLogger().getChild(__name__)
account_registry_add_log_hash = '0x9cc987676e7d63379f176ea50df0ae8d2d9d1141d1231d4ce15b5965f73c9430' account_registry_add_log_hash = '0x9cc987676e7d63379f176ea50df0ae8d2d9d1141d1231d4ce15b5965f73c9430'
class RegistrationFilter(SyncFilter): class RegistrationFilter(SyncFilter):
def __init__(self, chain_spec, contract_address, queue=None): def __init__(self, chain_spec, queue):
self.chain_spec = chain_spec self.chain_spec = chain_spec
self.queue = queue self.queue = queue
self.contract_address = contract_address
def filter(self, conn, block, tx, db_session=None): def filter(self, conn, block, tx, db_session=None):
if self.contract_address != tx.inputs[0]: registered_address = None
logg.debug('not an account registry tx; {} != {}'.format(self.contract_address, tx.inputs[0]))
return None
for l in tx.logs: for l in tx.logs:
event_topic_hex = l['topics'][0] event_topic_hex = l['topics'][0]
if event_topic_hex == account_registry_add_log_hash: if event_topic_hex == account_registry_add_log_hash:

View File

@@ -6,6 +6,7 @@ import argparse
import re import re
# external imports # external imports
import confini
import celery import celery
from cic_eth_registry import CICRegistry from cic_eth_registry import CICRegistry
from chainlib.chain import ChainSpec from chainlib.chain import ChainSpec
@@ -13,7 +14,6 @@ from chainlib.connection import RPCConnection
from chainsyncer.filter import SyncFilter from chainsyncer.filter import SyncFilter
# local imports # local imports
import cic_eth.cli
from cic_eth.db import dsn_from_config from cic_eth.db import dsn_from_config
from cic_eth.db import SessionBase from cic_eth.db import SessionBase
from cic_eth.admin.ctrl import lock_send from cic_eth.admin.ctrl import lock_send
@@ -25,41 +25,66 @@ from cic_eth.stat import init_chain_stat
logging.basicConfig(level=logging.WARNING) logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger() logg = logging.getLogger()
arg_flags = cic_eth.cli.argflag_std_read config_dir = os.path.join('/usr/local/etc/cic-eth')
local_arg_flags = cic_eth.cli.argflag_local_sync | cic_eth.cli.argflag_local_task
argparser = cic_eth.cli.ArgumentParser(arg_flags) argparser = argparse.ArgumentParser(description='daemon that monitors transactions in new blocks')
argparser.add_argument('-p', '--provider', dest='p', type=str, help='rpc provider')
argparser.add_argument('-c', type=str, default=config_dir, help='config root to use')
argparser.add_argument('-i', '--chain-spec', dest='i', type=str, help='chain spec')
argparser.add_argument('--batch-size', dest='batch_size', type=int, default=50, help='max amount of txs to resend per iteration') argparser.add_argument('--batch-size', dest='batch_size', type=int, default=50, help='max amount of txs to resend per iteration')
argparser.add_argument('--retry-delay', dest='retry_delay', type=int, default=20, help='seconds to wait for retrying a transaction that is marked as sent') argparser.add_argument('--retry-delay', dest='retry_delay', type=int, help='seconds to wait for retrying a transaction that is marked as sent')
argparser.process_local_flags(local_arg_flags) argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
args = argparser.parse_args() argparser.add_argument('-q', type=str, default='cic-eth', help='celery queue to submit transaction tasks to')
argparser.add_argument('-v', help='be verbose', action='store_true')
argparser.add_argument('-vv', help='be more verbose', action='store_true')
args = argparser.parse_args(sys.argv[1:])
extra_args = {
'retry_delay': 'RETRY_DELAY', if args.v == True:
'batch_size': 'RETRY_BATCH_SIZE', logging.getLogger().setLevel(logging.INFO)
elif args.vv == True:
logging.getLogger().setLevel(logging.DEBUG)
config_dir = os.path.join(args.c)
os.makedirs(config_dir, 0o777, True)
config = confini.Config(config_dir, args.env_prefix)
config.process()
# override args
args_override = {
'ETH_PROVIDER': getattr(args, 'p'),
'CIC_CHAIN_SPEC': getattr(args, 'i'),
'CIC_TX_RETRY_DELAY': getattr(args, 'retry_delay'),
} }
config = cic_eth.cli.Config.from_args(args, arg_flags, local_arg_flags, extra_args=extra_args) config.dict_override(args_override, 'cli flag')
config.censor('PASSWORD', 'DATABASE')
config.censor('PASSWORD', 'SSL')
logg.debug('config loaded from {}:\n{}'.format(config_dir, config))
config.add(args.batch_size, '_BATCH_SIZE', True)
app = celery.Celery(backend=config.get('CELERY_RESULT_URL'), broker=config.get('CELERY_BROKER_URL'))
queue = args.q
chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
RPCConnection.register_location(config.get('ETH_PROVIDER'), chain_spec, tag='default')
# connect to database
dsn = dsn_from_config(config) dsn = dsn_from_config(config)
SessionBase.connect(dsn, debug=config.true('DATABASE_DEBUG')) SessionBase.connect(dsn, debug=config.true('DATABASE_DEBUG'))
chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC'))
# set up rpc
rpc = cic_eth.cli.RPC.from_config(config)
conn = rpc.get_default()
def main(): def main():
straggler_delay = int(config.get('RETRY_DELAY')) conn = RPCConnection.connect(chain_spec, 'default')
straggler_delay = int(config.get('CIC_TX_RETRY_DELAY'))
loop_interval = config.get('SYNCER_LOOP_INTERVAL') loop_interval = config.get('SYNCER_LOOP_INTERVAL')
if loop_interval == None: if loop_interval == None:
stat = init_chain_stat(conn) stat = init_chain_stat(conn)
loop_interval = stat.block_average() loop_interval = stat.block_average()
syncer = RetrySyncer(conn, chain_spec, cic_eth.cli.chain_interface, straggler_delay, batch_size=config.get('RETRY_BATCH_SIZE')) syncer = RetrySyncer(conn, chain_spec, straggler_delay, batch_size=config.get('_BATCH_SIZE'))
syncer.backend.set(0, 0) syncer.backend.set(0, 0)
fltr = StragglerFilter(chain_spec, queue=config.get('CELERY_QUEUE')) fltr = StragglerFilter(chain_spec, queue=queue)
syncer.add_filter(fltr) syncer.add_filter(fltr)
syncer.loop(int(loop_interval), conn) syncer.loop(int(loop_interval), conn)

View File

@@ -21,17 +21,14 @@ from chainlib.eth.connection import (
EthUnixSignerConnection, EthUnixSignerConnection,
EthHTTPSignerConnection, EthHTTPSignerConnection,
) )
from chainlib.eth.address import to_checksum_address
from chainlib.chain import ChainSpec from chainlib.chain import ChainSpec
from chainqueue.db.models.otx import Otx from chainqueue.db.models.otx import Otx
from cic_eth_registry.error import UnknownContractError from cic_eth_registry.error import UnknownContractError
from cic_eth_registry.erc20 import ERC20Token from cic_eth_registry.erc20 import ERC20Token
from hexathon import add_0x
import liveness.linux import liveness.linux
# local imports # local imports
import cic_eth.cli
from cic_eth.eth import ( from cic_eth.eth import (
erc20, erc20,
tx, tx,
@@ -73,53 +70,114 @@ from cic_eth.task import BaseTask
logging.basicConfig(level=logging.WARNING) logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger() logg = logging.getLogger()
arg_flags = cic_eth.cli.argflag_std_read script_dir = os.path.dirname(os.path.realpath(__file__))
local_arg_flags = cic_eth.cli.argflag_local_task
argparser = cic_eth.cli.ArgumentParser(arg_flags) config_dir = os.path.join('/usr/local/etc/cic-eth')
argparser.process_local_flags(local_arg_flags)
argparser = argparse.ArgumentParser()
argparser.add_argument('-p', '--provider', dest='p', type=str, help='rpc provider')
argparser.add_argument('-c', type=str, default=config_dir, help='config file')
argparser.add_argument('-q', type=str, default='cic-eth', help='queue name for worker tasks')
argparser.add_argument('-r', type=str, help='CIC registry address')
argparser.add_argument('--default-token-symbol', dest='default_token_symbol', type=str, help='Symbol of default token to use') argparser.add_argument('--default-token-symbol', dest='default_token_symbol', type=str, help='Symbol of default token to use')
argparser.add_argument('--trace-queue-status', default=None, dest='trace_queue_status', action='store_true', help='set to perist all queue entry status changes to storage') argparser.add_argument('--trace-queue-status', default=None, dest='trace_queue_status', action='store_true', help='set to perist all queue entry status changes to storage')
argparser.add_argument('-i', '--chain-spec', dest='i', type=str, help='chain spec')
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
argparser.add_argument('--aux-all', action='store_true', help='include tasks from all submodules from the aux module path') argparser.add_argument('--aux-all', action='store_true', help='include tasks from all submodules from the aux module path')
argparser.add_argument('--aux', action='append', type=str, default=[], help='add single submodule from the aux module path') argparser.add_argument('--aux', action='append', type=str, default=[], help='add single submodule from the aux module path')
argparser.add_argument('-v', action='store_true', help='be verbose')
argparser.add_argument('-vv', action='store_true', help='be more verbose')
args = argparser.parse_args() args = argparser.parse_args()
# process config if args.vv:
extra_args = { logging.getLogger().setLevel(logging.DEBUG)
'default_token_symbol': 'CIC_DEFAULT_TOKEN_SYMBOL', elif args.v:
'aux_all': None, logging.getLogger().setLevel(logging.INFO)
'aux': None,
'trace_queue_status': 'TASKS_TRACE_QUEUE_STATUS', config = confini.Config(args.c, args.env_prefix)
config.process()
# override args
args_override = {
'CIC_CHAIN_SPEC': getattr(args, 'i'),
'CIC_REGISTRY_ADDRESS': getattr(args, 'r'),
'CIC_DEFAULT_TOKEN_SYMBOL': getattr(args, 'default_token_symbol'),
'ETH_PROVIDER': getattr(args, 'p'),
'TASKS_TRACE_QUEUE_STATUS': getattr(args, 'trace_queue_status'),
} }
config = cic_eth.cli.Config.from_args(args, arg_flags, local_arg_flags) config.add(args.q, '_CELERY_QUEUE', True)
config.dict_override(args_override, 'cli flag')
config.censor('PASSWORD', 'DATABASE')
config.censor('PASSWORD', 'SSL')
logg.debug('config loaded from {}:\n{}'.format(args.c, config))
# connect to celery health_modules = config.get('CIC_HEALTH_MODULES', [])
celery_app = cic_eth.cli.CeleryApp.from_config(config) if len(health_modules) != 0:
health_modules = health_modules.split(',')
logg.debug('health mods {}'.format(health_modules))
# set up rpc
rpc = cic_eth.cli.RPC.from_config(config, use_signer=True)
conn = rpc.get_default()
# connect to database # connect to database
dsn = dsn_from_config(config) dsn = dsn_from_config(config)
SessionBase.connect(dsn, pool_size=int(config.get('DATABASE_POOL_SIZE')), debug=config.true('DATABASE_DEBUG')) SessionBase.connect(dsn, pool_size=int(config.get('DATABASE_POOL_SIZE')), debug=config.true('DATABASE_DEBUG'))
# set up celery
current_app = celery.Celery(__name__)
broker = config.get('CELERY_BROKER_URL')
if broker[:4] == 'file':
bq = tempfile.mkdtemp()
bp = tempfile.mkdtemp()
conf_update = {
'broker_url': broker,
'broker_transport_options': {
'data_folder_in': bq,
'data_folder_out': bq,
'data_folder_processed': bp,
},
}
if config.true('CELERY_DEBUG'):
conf_update['result_extended'] = True
current_app.conf.update(conf_update)
logg.warning('celery broker dirs queue i/o {} processed {}, will NOT be deleted on shutdown'.format(bq, bp))
else:
conf_update = {
'broker_url': broker,
}
if config.true('CELERY_DEBUG'):
conf_update['result_extended'] = True
current_app.conf.update(conf_update)
result = config.get('CELERY_RESULT_URL')
if result[:4] == 'file':
rq = tempfile.mkdtemp()
current_app.conf.update({
'result_backend': 'file://{}'.format(rq),
})
logg.warning('celery backend store dir {} created, will NOT be deleted on shutdown'.format(rq))
else:
current_app.conf.update({
'result_backend': result,
})
chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
RPCConnection.register_constructor(ConnType.UNIX, EthUnixSignerConnection, 'signer')
RPCConnection.register_constructor(ConnType.HTTP, EthHTTPSignerConnection, 'signer')
RPCConnection.register_constructor(ConnType.HTTP_SSL, EthHTTPSignerConnection, 'signer')
RPCConnection.register_location(config.get('ETH_PROVIDER'), chain_spec, 'default')
RPCConnection.register_location(config.get('SIGNER_SOCKET_PATH'), chain_spec, 'signer')
Otx.tracing = config.true('TASKS_TRACE_QUEUE_STATUS') Otx.tracing = config.true('TASKS_TRACE_QUEUE_STATUS')
#import cic_eth.checks.gas
# execute health checks #if not cic_eth.checks.gas.health(config=config):
# TODO: health should be separate service with endpoint that can be queried # raise RuntimeError()
health_modules = config.get('CIC_HEALTH_MODULES', [])
if len(health_modules) != 0:
health_modules = health_modules.split(',')
logg.debug('health mods {}'.format(health_modules))
liveness.linux.load(health_modules, rundir=config.get('CIC_RUN_DIR'), config=config, unit='cic-eth-tasker') liveness.linux.load(health_modules, rundir=config.get('CIC_RUN_DIR'), config=config, unit='cic-eth-tasker')
rpc = RPCConnection.connect(chain_spec, 'default')
# set up chain provisions
chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC'))
registry = None
try: try:
registry = connect_registry(conn, chain_spec, config.get('CIC_REGISTRY_ADDRESS')) registry = connect_registry(rpc, chain_spec, config.get('CIC_REGISTRY_ADDRESS'))
except UnknownContractError as e: except UnknownContractError as e:
logg.exception('Registry contract connection failed for {}: {}'.format(config.get('CIC_REGISTRY_ADDRESS'), e)) logg.exception('Registry contract connection failed for {}: {}'.format(config.get('CIC_REGISTRY_ADDRESS'), e))
sys.exit(1) sys.exit(1)
@@ -130,15 +188,15 @@ if trusted_addresses_src == None:
logg.critical('At least one trusted address must be declared in CIC_TRUST_ADDRESS') logg.critical('At least one trusted address must be declared in CIC_TRUST_ADDRESS')
sys.exit(1) sys.exit(1)
trusted_addresses = trusted_addresses_src.split(',') trusted_addresses = trusted_addresses_src.split(',')
for i, address in enumerate(trusted_addresses): for address in trusted_addresses:
if config.get('_UNSAFE'):
trusted_addresses[i] = to_checksum_address(address)
logg.info('using trusted address {}'.format(address)) logg.info('using trusted address {}'.format(address))
connect_declarator(conn, chain_spec, trusted_addresses)
connect_token_registry(conn, chain_spec)
# detect auxiliary task modules (plugins) connect_declarator(rpc, chain_spec, trusted_addresses)
connect_token_registry(rpc, chain_spec)
# detect aux
# TODO: move to separate file # TODO: move to separate file
#aux_dir = os.path.join(script_dir, '..', '..', 'aux')
aux = [] aux = []
if args.aux_all: if args.aux_all:
if len(args.aux) > 0: if len(args.aux) > 0:
@@ -191,24 +249,36 @@ elif len(args.aux) > 0:
for v in aux: for v in aux:
mname = 'cic_eth_aux.' + v mname = 'cic_eth_aux.' + v
mod = importlib.import_module(mname) mod = importlib.import_module(mname)
mod.aux_setup(conn, config) mod.aux_setup(rpc, config)
logg.info('loaded aux module {}'.format(mname)) logg.info('loaded aux module {}'.format(mname))
def main(): def main():
argv = ['worker'] argv = ['worker']
log_level = logg.getEffectiveLevel() if args.vv:
log_level_name = logging.getLevelName(log_level) argv.append('--loglevel=DEBUG')
argv.append('--loglevel=' + log_level_name) elif args.v:
argv.append('--loglevel=INFO')
argv.append('-Q') argv.append('-Q')
argv.append(config.get('CELERY_QUEUE')) argv.append(args.q)
argv.append('-n') argv.append('-n')
argv.append(config.get('CELERY_QUEUE')) argv.append(args.q)
# if config.true('SSL_ENABLE_CLIENT'):
# Callback.ssl = True
# Callback.ssl_cert_file = config.get('SSL_CERT_FILE')
# Callback.ssl_key_file = config.get('SSL_KEY_FILE')
# Callback.ssl_password = config.get('SSL_PASSWORD')
#
# if config.get('SSL_CA_FILE') != '':
# Callback.ssl_ca_file = config.get('SSL_CA_FILE')
rpc = RPCConnection.connect(chain_spec, 'default')
BaseTask.default_token_symbol = config.get('CIC_DEFAULT_TOKEN_SYMBOL') BaseTask.default_token_symbol = config.get('CIC_DEFAULT_TOKEN_SYMBOL')
BaseTask.default_token_address = registry.by_name(BaseTask.default_token_symbol) BaseTask.default_token_address = registry.by_name(BaseTask.default_token_symbol)
default_token = ERC20Token(chain_spec, conn, BaseTask.default_token_address) default_token = ERC20Token(chain_spec, rpc, BaseTask.default_token_address)
default_token.load(conn) default_token.load(rpc)
BaseTask.default_token_decimals = default_token.decimals BaseTask.default_token_decimals = default_token.decimals
BaseTask.default_token_name = default_token.name BaseTask.default_token_name = default_token.name
@@ -216,13 +286,13 @@ def main():
logg.info('default token set to {} {}'.format(BaseTask.default_token_symbol, BaseTask.default_token_address)) logg.info('default token set to {} {}'.format(BaseTask.default_token_symbol, BaseTask.default_token_address))
liveness.linux.set(rundir=config.get('CIC_RUN_DIR')) liveness.linux.set(rundir=config.get('CIC_RUN_DIR'))
celery_app.worker_main(argv) current_app.worker_main(argv)
liveness.linux.reset(rundir=config.get('CIC_RUN_DIR')) liveness.linux.reset(rundir=config.get('CIC_RUN_DIR'))
@celery.signals.eventlet_pool_postshutdown.connect @celery.signals.eventlet_pool_postshutdown.connect
def shutdown(sender=None, headers=None, body=None, **kwargs): def shutdown(sender=None, headers=None, body=None, **kwargs):
logg.warning('in shutdown event hook') logg.warning('in shudown event hook')
if __name__ == '__main__': if __name__ == '__main__':

View File

@@ -8,6 +8,14 @@ import sys
import re import re
# external imports # external imports
import confini
import celery
import rlp
import cic_base.config
import cic_base.log
import cic_base.argparse
import cic_base.rpc
from cic_base.eth.syncer import chain_interface
from cic_eth_registry.error import UnknownContractError from cic_eth_registry.error import UnknownContractError
from chainlib.chain import ChainSpec from chainlib.chain import ChainSpec
from chainlib.eth.constant import ZERO_ADDRESS from chainlib.eth.constant import ZERO_ADDRESS
@@ -22,13 +30,8 @@ from chainsyncer.backend.sql import SQLBackend
from chainsyncer.driver.head import HeadSyncer from chainsyncer.driver.head import HeadSyncer
from chainsyncer.driver.history import HistorySyncer from chainsyncer.driver.history import HistorySyncer
from chainsyncer.db.models.base import SessionBase from chainsyncer.db.models.base import SessionBase
from chainlib.eth.address import (
is_checksum_address,
to_checksum_address,
)
# local imports # local imports
import cic_eth.cli
from cic_eth.db import dsn_from_config from cic_eth.db import dsn_from_config
from cic_eth.runnable.daemons.filters import ( from cic_eth.runnable.daemons.filters import (
CallbackFilter, CallbackFilter,
@@ -44,50 +47,54 @@ from cic_eth.registry import (
connect_token_registry, connect_token_registry,
) )
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
arg_flags = cic_eth.cli.argflag_std_read script_dir = os.path.realpath(os.path.dirname(__file__))
local_arg_flags = cic_eth.cli.argflag_local_sync
argparser = cic_eth.cli.ArgumentParser(arg_flags)
argparser.process_local_flags(local_arg_flags)
args = argparser.parse_args()
# process config def add_block_args(argparser):
config = cic_eth.cli.Config.from_args(args, arg_flags, local_arg_flags) argparser.add_argument('--history-start', type=int, default=0, dest='history_start', help='Start block height for initial history sync')
argparser.add_argument('--no-history', action='store_true', dest='no_history', help='Skip initial history sync')
return argparser
# connect to celery
cic_eth.cli.CeleryApp.from_config(config)
# set up database logg = cic_base.log.create()
argparser = cic_base.argparse.create(script_dir, cic_base.argparse.full_template)
argparser = cic_base.argparse.add(argparser, add_block_args, 'block')
args = cic_base.argparse.parse(argparser, logg)
config = cic_base.config.create(args.c, args, args.env_prefix)
config.add(args.y, '_KEYSTORE_FILE', True)
config.add(args.q, '_CELERY_QUEUE', True)
config.add(args.history_start, 'SYNCER_HISTORY_START', True)
config.add(args.no_history, '_NO_HISTORY', True)
cic_base.config.log(config)
dsn = dsn_from_config(config) dsn = dsn_from_config(config)
SessionBase.connect(dsn, pool_size=16, debug=config.true('DATABASE_DEBUG')) SessionBase.connect(dsn, pool_size=16, debug=config.true('DATABASE_DEBUG'))
# set up rpc chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
rpc = cic_eth.cli.RPC.from_config(config)
conn = rpc.get_default() cic_base.rpc.setup(chain_spec, config.get('ETH_PROVIDER'))
# set up chain provisions
chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC'))
registry = None
try:
registry = connect_registry(conn, chain_spec, config.get('CIC_REGISTRY_ADDRESS'))
except UnknownContractError as e:
logg.exception('Registry contract connection failed for {}: {}'.format(config.get('CIC_REGISTRY_ADDRESS'), e))
sys.exit(1)
logg.info('connected contract registry {}'.format(config.get('CIC_REGISTRY_ADDRESS')))
def main(): def main():
# connect to celery
celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL'))
# Connect to blockchain with chainlib # Connect to blockchain with chainlib
rpc = RPCConnection.connect(chain_spec, 'default')
o = block_latest() o = block_latest()
r = conn.do(o) r = rpc.do(o)
block_current = int(r, 16) block_current = int(r, 16)
block_offset = block_current + 1 block_offset = block_current + 1
loop_interval = config.get('SYNCER_LOOP_INTERVAL') loop_interval = config.get('SYNCER_LOOP_INTERVAL')
if loop_interval == None: if loop_interval == None:
stat = init_chain_stat(conn, block_start=block_current) stat = init_chain_stat(rpc, block_start=block_current)
loop_interval = stat.block_average() loop_interval = stat.block_average()
logg.debug('current block height {}'.format(block_offset)) logg.debug('current block height {}'.format(block_offset))
@@ -99,9 +106,9 @@ def main():
syncer_backends = SQLBackend.resume(chain_spec, block_offset) syncer_backends = SQLBackend.resume(chain_spec, block_offset)
if len(syncer_backends) == 0: if len(syncer_backends) == 0:
initial_block_start = config.get('SYNCER_OFFSET') initial_block_start = config.get('SYNCER_HISTORY_START')
initial_block_offset = block_offset initial_block_offset = block_offset
if config.true('SYNCER_NO_HISTORY'): if config.get('_NO_HISTORY'):
initial_block_start = block_offset initial_block_start = block_offset
initial_block_offset += 1 initial_block_offset += 1
syncer_backends.append(SQLBackend.initial(chain_spec, initial_block_offset, start_block_height=initial_block_start)) syncer_backends.append(SQLBackend.initial(chain_spec, initial_block_offset, start_block_height=initial_block_start))
@@ -114,45 +121,39 @@ def main():
for syncer_backend in syncer_backends: for syncer_backend in syncer_backends:
try: try:
syncers.append(HistorySyncer(syncer_backend, cic_eth.cli.chain_interface)) syncers.append(HistorySyncer(syncer_backend, chain_interface))
logg.info('Initializing HISTORY syncer on backend {}'.format(syncer_backend)) logg.info('Initializing HISTORY syncer on backend {}'.format(syncer_backend))
except AttributeError: except AttributeError:
logg.info('Initializing HEAD syncer on backend {}'.format(syncer_backend)) logg.info('Initializing HEAD syncer on backend {}'.format(syncer_backend))
syncers.append(HeadSyncer(syncer_backend, cic_eth.cli.chain_interface)) syncers.append(HeadSyncer(syncer_backend, chain_interface))
connect_registry(conn, chain_spec, config.get('CIC_REGISTRY_ADDRESS')) connect_registry(rpc, chain_spec, config.get('CIC_REGISTRY_ADDRESS'))
trusted_addresses_src = config.get('CIC_TRUST_ADDRESS') trusted_addresses_src = config.get('CIC_TRUST_ADDRESS')
if trusted_addresses_src == None: if trusted_addresses_src == None:
logg.critical('At least one trusted address must be declared in CIC_TRUST_ADDRESS') logg.critical('At least one trusted address must be declared in CIC_TRUST_ADDRESS')
sys.exit(1) sys.exit(1)
trusted_addresses = trusted_addresses_src.split(',') trusted_addresses = trusted_addresses_src.split(',')
for i, address in enumerate(trusted_addresses): for address in trusted_addresses:
if not config.get('_UNSAFE'):
if not is_checksum_address(address):
raise ValueError('address {} is not a valid checksum address'.format(address))
else:
trusted_addresses[i] = to_checksum_address(address)
logg.info('using trusted address {}'.format(address)) logg.info('using trusted address {}'.format(address))
connect_declarator(conn, chain_spec, trusted_addresses) connect_declarator(rpc, chain_spec, trusted_addresses)
connect_token_registry(conn, chain_spec) connect_token_registry(rpc, chain_spec)
CallbackFilter.trusted_addresses = trusted_addresses CallbackFilter.trusted_addresses = trusted_addresses
callback_filters = [] callback_filters = []
for cb in config.get('TASKS_TRANSFER_CALLBACKS', '').split(','): for cb in config.get('TASKS_TRANSFER_CALLBACKS', '').split(','):
task_split = cb.split(':') task_split = cb.split(':')
task_queue = config.get('CELERY_QUEUE') task_queue = config.get('_CELERY_QUEUE')
if len(task_split) > 1: if len(task_split) > 1:
task_queue = task_split[0] task_queue = task_split[0]
callback_filter = CallbackFilter(chain_spec, task_split[1], task_queue) callback_filter = CallbackFilter(chain_spec, task_split[1], task_queue)
callback_filters.append(callback_filter) callback_filters.append(callback_filter)
tx_filter = TxFilter(chain_spec, config.get('CELERY_QUEUE')) tx_filter = TxFilter(chain_spec, config.get('_CELERY_QUEUE'))
account_registry_address = registry.by_name('AccountRegistry') registration_filter = RegistrationFilter(chain_spec, config.get('_CELERY_QUEUE'))
registration_filter = RegistrationFilter(chain_spec, account_registry_address, queue=config.get('CELERY_QUEUE'))
gas_filter = GasFilter(chain_spec, config.get('CELERY_QUEUE')) gas_filter = GasFilter(chain_spec, config.get('_CELERY_QUEUE'))
#transfer_auth_filter = TransferAuthFilter(registry, chain_spec, config.get('_CELERY_QUEUE')) #transfer_auth_filter = TransferAuthFilter(registry, chain_spec, config.get('_CELERY_QUEUE'))
@@ -167,7 +168,7 @@ def main():
for cf in callback_filters: for cf in callback_filters:
syncer.add_filter(cf) syncer.add_filter(cf)
r = syncer.loop(int(loop_interval), conn) r = syncer.loop(int(loop_interval), rpc)
sys.stderr.write("sync {} done at block {}\n".format(syncer, r)) sys.stderr.write("sync {} done at block {}\n".format(syncer, r))
i += 1 i += 1

View File

@@ -12,27 +12,50 @@ import confini
import celery import celery
# local imports # local imports
import cic_eth.cli
from cic_eth.api import Api from cic_eth.api import Api
from cic_eth.api.admin import AdminApi from cic_eth.api.admin import AdminApi
logging.basicConfig(level=logging.WARNING) logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger() logg = logging.getLogger()
arg_flags = cic_eth.cli.argflag_std_base default_format = 'terminal'
local_arg_flags = cic_eth.cli.argflag_local_taskcallback default_config_dir = os.environ.get('CONFINI_DIR', '/usr/local/etc/cic')
argparser = cic_eth.cli.ArgumentParser(arg_flags)
argparser.process_local_flags(local_arg_flags)
argparser = argparse.ArgumentParser()
argparser.add_argument('-i', '--chain-spec', dest='i', type=str, help='chain spec')
argparser.add_argument('-c', type=str, default=default_config_dir, help='config root to use')
argparser.add_argument('-q', type=str, default='cic-eth', help='celery queue to submit transaction tasks to')
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
argparser.add_argument('-v', action='store_true', help='Be verbose')
argparser.add_argument('-vv', help='be more verbose', action='store_true')
args = argparser.parse_args() args = argparser.parse_args()
config = cic_eth.cli.Config.from_args(args, arg_flags, local_arg_flags) if args.v == True:
logging.getLogger().setLevel(logging.INFO)
elif args.vv == True:
logging.getLogger().setLevel(logging.DEBUG)
celery_app = cic_eth.cli.CeleryApp.from_config(config) config_dir = os.path.join(args.c)
os.makedirs(config_dir, 0o777, True)
config = confini.Config(config_dir, args.env_prefix)
config.process()
args_override = {
'CIC_CHAIN_SPEC': getattr(args, 'i'),
}
config.dict_override(args_override, 'cli args')
config.censor('PASSWORD', 'DATABASE')
config.censor('PASSWORD', 'SSL')
logg.debug('config loaded from {}:\n{}'.format(config_dir, config))
api = Api(config.get('CHAIN_SPEC'), queue=config.get('CELERY_QUEUE'))
celery_app = celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL'))
queue = args.q
api = Api(config.get('CIC_CHAIN_SPEC'), queue=queue)
admin_api = AdminApi(None) admin_api = AdminApi(None)
def main(): def main():
t = admin_api.registry() t = admin_api.registry()
registry_address = t.get() registry_address = t.get()

View File

@@ -5,38 +5,65 @@ import re
import os import os
# third-party imports # third-party imports
import celery
import confini
from chainlib.chain import ChainSpec from chainlib.chain import ChainSpec
from chainlib.eth.connection import EthHTTPConnection from chainlib.eth.connection import EthHTTPConnection
# local imports # local imports
import cic_eth.cli
from cic_eth.api.admin import AdminApi from cic_eth.api.admin import AdminApi
logging.basicConfig(level=logging.WARNING) logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger() logg = logging.getLogger()
arg_flags = cic_eth.cli.argflag_std_base logging.getLogger('web3').setLevel(logging.WARNING)
local_arg_flags = cic_eth.cli.argflag_local_taskcallback logging.getLogger('urllib3').setLevel(logging.WARNING)
argparser = cic_eth.cli.ArgumentParser(arg_flags)
argparser.add_argument('--unlock', action='store_true', help='Unlock account after resend') default_config_dir = os.environ.get('CONFINI_DIR', '/usr/local/etc/cic')
argparser.add_positional('tx_hash', type=str, help='Transaction hash')
argparser.process_local_flags(local_arg_flags)
extra_args = { argparser = argparse.ArgumentParser()
'unlock': None, argparser.add_argument('-c', type=str, default=default_config_dir, help='config root to use')
'tx_hash': None, argparser.add_argument('-p', '--provider', dest='p', default='http://localhost:8545', type=str, help='Web3 provider url (http only)')
} argparser.add_argument('-i', '--chain-spec', dest='i', type=str, default='Ethereum:1', help='Chain specification string')
argparser.add_argument('--unlock', action='store_true', help='Append task to unlock account')
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
argparser.add_argument('-v', action='store_true', help='Be verbose')
argparser.add_argument('-vv', action='store_true', help='Be more verbose')
argparser.add_argument('tx_hash', type=str, help='Transaction hash')
args = argparser.parse_args() args = argparser.parse_args()
config = cic_eth.cli.Config.from_args(args, arg_flags, local_arg_flags, extra_args=extra_args)
chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC')) if args.vv:
logg.setLevel(logging.DEBUG)
elif args.v:
logg.setLevel(logging.INFO)
celery_app = cic_eth.cli.CeleryApp.from_config(config) config_dir = os.path.join(args.c)
os.makedirs(config_dir, 0o777, True)
config = confini.Config(config_dir, args.env_prefix)
config.process()
args_override = {
'ETH_PROVIDER': getattr(args, 'p'),
'CIC_CHAIN_SPEC': getattr(args, 'i'),
}
# override args
config.censor('PASSWORD', 'DATABASE')
config.censor('PASSWORD', 'SSL')
logg.debug('config loaded from {}:\n{}'.format(config_dir, config))
config.add(args.tx_hash, '_TX_HASH', True)
config.add(args.unlock, '_UNLOCK', True)
chain_spec = ChainSpec.from_chain_str(args.i)
rpc = EthHTTPConnection(config.get('ETH_PROVIDER'))
celery_app = celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL'))
def main(): def main():
api = AdminApi(None) api = AdminApi(rpc)
tx_details = api.tx(chain_spec, config.get('_TX_HASH')) tx_details = api.tx(chain_spec, args.tx_hash)
t = api.resend(args.tx_hash, chain_spec, unlock=config.get('_UNLOCK')) t = api.resend(args.tx_hash, chain_spec, unlock=config.get('_UNLOCK'))
print(t.get_leaf()) print(t.get_leaf())

View File

@@ -6,7 +6,8 @@ import argparse
import re import re
# external imports # external imports
import cic_eth.cli import celery
import confini
from chainlib.chain import ChainSpec from chainlib.chain import ChainSpec
from xdg.BaseDirectory import xdg_config_home from xdg.BaseDirectory import xdg_config_home
@@ -18,28 +19,43 @@ from cic_eth.db.models.base import SessionBase
logging.basicConfig(level=logging.WARNING) logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger() logg = logging.getLogger()
arg_flags = cic_eth.cli.argflag_std_base | cic_eth.cli.Flag.UNSAFE | cic_eth.cli.Flag.CHAIN_SPEC default_config_dir = os.environ.get('CONFINI_DIR', '/usr/local/etc/cic')
local_arg_flags = cic_eth.cli.argflag_local_taskcallback
argparser = cic_eth.cli.ArgumentParser(arg_flags)
argparser.add_positional('tag', type=str, help='address tag')
argparser.add_positional('address', type=str, help='address')
argparser.process_local_flags(local_arg_flags)
args = argparser.parse_args()
config = cic_eth.cli.Config.from_args(args, arg_flags, local_arg_flags)
celery_app = cic_eth.cli.CeleryApp.from_config(config) argparser = argparse.ArgumentParser(description='daemon that monitors transactions in new blocks')
argparser.add_argument('-p', '--provider', dest='p', type=str, help='Web3 provider url (http only)')
argparser.add_argument('-c', type=str, default=default_config_dir, help='config root to use')
argparser.add_argument('-v', help='be verbose', action='store_true')
argparser.add_argument('-i', '--chain-spec', dest='i', type=str, help='chain spec')
argparser.add_argument('-vv', help='be more verbose', action='store_true')
argparser.add_argument('tag', type=str, help='address tag')
argparser.add_argument('address', type=str, help='address')
args = argparser.parse_args(sys.argv[1:])
admin_api = AdminApi(None) if args.v == True:
logging.getLogger().setLevel(logging.INFO)
elif args.vv == True:
logging.getLogger().setLevel(logging.DEBUG)
chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC')) config = confini.Config(args.c)
config.process()
args_override = {
'ETH_PROVIDER': getattr(args, 'p'),
'CIC_CHAIN_SPEC': getattr(args, 'i'),
}
config.dict_override(args_override, 'cli flag')
config.censor('PASSWORD', 'DATABASE')
config.censor('PASSWORD', 'SSL')
logg.debug('config loaded from {}\n{}'.format(args.c, config))
celery_app = cic_eth.cli.CeleryApp.from_config(config) chain_spec = ChainSpec.from_chain_str(args.i)
api = AdminApi(None)
celery_app = celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL'))
def main(): def main():
admin_api.tag_account(args.tag, args.address, chain_spec) api = AdminApi(None)
api.tag_account(args.tag, args.address, chain_spec)
if __name__ == '__main__': if __name__ == '__main__':

Some files were not shown because too many files have changed in this diff Show More